From 6c13f189423892b1c670e2a4791f16895ea23c4a Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Thu, 20 Jun 2019 19:46:17 -0700 Subject: [PATCH] [schema] Introduce schema data validator (#4360) *Motivation* Currently the schema data is only validated in compatibility checker. If the schema data is uploaded from admin api, there is no validation if the schema is the first version. *Changes* Add schema data validator to validate the schema data before storing it in schema storage. --- .../broker/admin/v2/SchemasResource.java | 88 +++++----- .../service/BrokerServiceException.java | 22 ++- .../pulsar/broker/service/Consumer.java | 2 +- .../pulsar/broker/service/ServerCnx.java | 8 +- .../KeyValueSchemaCompatibilityCheck.java | 2 +- .../service/schema/SchemaRegistryService.java | 4 +- .../schema/SchemaRegistryServiceImpl.java | 1 + .../IncompatibleSchemaException.java | 10 +- .../InvalidSchemaDataException.java | 35 ++++ .../schema/exceptions/SchemaException.java | 37 ++++ .../PrimitiveSchemaDataValidator.java | 45 +++++ .../schema/validator/SchemaDataValidator.java | 87 ++++++++++ ...egistryServiceWithSchemaDataValidator.java | 98 +++++++++++ .../validator/StringSchemaDataValidator.java | 53 ++++++ .../validator/StructSchemaDataValidator.java | 77 +++++++++ .../validator/SchemaDataValidatorTest.java | 133 ++++++++++++++ ...tryServiceWithSchemaDataValidatorTest.java | 163 ++++++++++++++++++ .../api/SimpleTypedProducerConsumerTest.java | 68 +++----- .../apache/pulsar/client/admin/Schemas.java | 9 + .../client/admin/internal/SchemasImpl.java | 52 ++++-- 20 files changed, 874 insertions(+), 120 deletions(-) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/{ => exceptions}/IncompatibleSchemaException.java (78%) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/InvalidSchemaDataException.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/PrimitiveSchemaDataValidator.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StringSchemaDataValidator.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java index eb7cc646d6e..1e8c8e9fd2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin.v2; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.isNull; import static org.apache.commons.lang.StringUtils.defaultIfEmpty; import static org.apache.pulsar.common.util.Codec.decode; @@ -48,9 +49,11 @@ import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.pulsar.broker.admin.AdminResource; -import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.LongSchemaVersion; import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy; +import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse; @@ -85,7 +88,7 @@ public class SchemasResource extends AdminResource { this.clock = clock; } - private long getLongSchemaVersion(SchemaVersion schemaVersion) { + private static long getLongSchemaVersion(SchemaVersion schemaVersion) { if (schemaVersion instanceof LongSchemaVersion) { return ((LongSchemaVersion) schemaVersion).getVersion(); } else { @@ -116,29 +119,7 @@ public class SchemasResource extends AdminResource { String schemaId = buildSchemaId(tenant, namespace, topic); pulsar().getSchemaRegistryService().getSchema(schemaId) .handle((schema, error) -> { - if (isNull(error)) { - if (isNull(schema)) { - response.resume(Response.status(Response.Status.NOT_FOUND).build()); - } else if (schema.schema.isDeleted()) { - response.resume(Response.status(Response.Status.NOT_FOUND).build()); - } else { - response.resume( - Response.ok() - .encoding(MediaType.APPLICATION_JSON) - .entity(GetSchemaResponse.builder() - .version(getLongSchemaVersion(schema.version)) - .type(schema.schema.getType()) - .timestamp(schema.schema.getTimestamp()) - .data(new String(schema.schema.getData())) - .properties(schema.schema.getProps()) - .build() - ) - .build() - ); - } - } else { - response.resume(error); - } + handleGetSchemaResponse(response, schema, error); return null; }); } @@ -170,32 +151,38 @@ public class SchemasResource extends AdminResource { SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array()); pulsar().getSchemaRegistryService().getSchema(schemaId, v) .handle((schema, error) -> { - if (isNull(error)) { - if (isNull(schema)) { - response.resume(Response.status(Response.Status.NOT_FOUND).build()); - } else if (schema.schema.isDeleted()) { - response.resume(Response.status(Response.Status.NOT_FOUND).build()); - } else { - response.resume( - Response.ok() - .encoding(MediaType.APPLICATION_JSON) - .entity(GetSchemaResponse.builder() - .version(getLongSchemaVersion(schema.version)) - .type(schema.schema.getType()) - .timestamp(schema.schema.getTimestamp()) - .data(new String(schema.schema.getData())) - .properties(schema.schema.getProps()) - .build() - ).build() - ); - } - } else { - response.resume(error); - } + handleGetSchemaResponse(response, schema, error); return null; }); } + private static void handleGetSchemaResponse(AsyncResponse response, + SchemaAndMetadata schema, Throwable error) { + if (isNull(error)) { + if (isNull(schema)) { + response.resume(Response.status(Response.Status.NOT_FOUND).build()); + } else if (schema.schema.isDeleted()) { + response.resume(Response.status(Response.Status.NOT_FOUND).build()); + } else { + response.resume( + Response.ok() + .encoding(MediaType.APPLICATION_JSON) + .entity(GetSchemaResponse.builder() + .version(getLongSchemaVersion(schema.version)) + .type(schema.schema.getType()) + .timestamp(schema.schema.getTimestamp()) + .data(new String(schema.schema.getData(), UTF_8)) + .properties(schema.schema.getProps()) + .build() + ).build() + ); + } + } else { + response.resume(error); + } + + } + @DELETE @Path("/{tenant}/{namespace}/{topic}/schema") @Produces(MediaType.APPLICATION_JSON) @@ -244,7 +231,9 @@ public class SchemasResource extends AdminResource { @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"), @ApiResponse(code = 403, message = "Client is not authenticated"), @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"), + @ApiResponse(code = 409, message = "Incompatible schema"), @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"), + @ApiResponse(code = 422, message = "Invalid schema data"), }) public void postSchema( @PathParam("tenant") String tenant, @@ -287,6 +276,11 @@ public class SchemasResource extends AdminResource { ).exceptionally(error -> { if (error instanceof IncompatibleSchemaException) { response.resume(Response.status(Response.Status.CONFLICT).build()); + } else if (error instanceof InvalidSchemaDataException) { + response.resume(Response.status( + 422, /* Unprocessable Entity */ + error.getMessage() + ).build()); } else { response.resume( Response.serverError().build() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index c3f69097024..2e287002113 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -18,7 +18,8 @@ */ package org.apache.pulsar.broker.service; -import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; @@ -37,6 +38,10 @@ public class BrokerServiceException extends Exception { super(t); } + public BrokerServiceException(String message, Throwable cause) { + super(message, cause); + } + public static class ConsumerBusyException extends BrokerServiceException { public ConsumerBusyException(String msg) { super(msg); @@ -154,6 +159,10 @@ public class BrokerServiceException extends Exception { } public static PulsarApi.ServerError getClientErrorCode(Throwable t) { + return getClientErrorCode(t, true); + } + + private static PulsarApi.ServerError getClientErrorCode(Throwable t, boolean checkCauseIfUnknown) { if (t instanceof ServerMetadataException) { return PulsarApi.ServerError.MetadataError; } else if (t instanceof NamingException) { @@ -171,12 +180,19 @@ public class BrokerServiceException extends Exception { } else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException || t instanceof SubscriptionFencedException) { return PulsarApi.ServerError.ServiceNotReady; - } else if (t instanceof IncompatibleSchemaException) { + } else if (t instanceof IncompatibleSchemaException + || t instanceof InvalidSchemaDataException) { + // for backward compatible with old clients, invalid schema data + // is treated as "incompatible schema". return PulsarApi.ServerError.IncompatibleSchema; } else if (t instanceof ConsumerAssignException) { return ServerError.ConsumerAssignError; } else { - return PulsarApi.ServerError.UnknownError; + if (checkCauseIfUnknown) { + return getClientErrorCode(t.getCause(), false); + } else { + return PulsarApi.ServerError.UnknownError; + } } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 927689212ef..678347ad720 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -312,7 +312,7 @@ public class Consumer { }).exceptionally(exception -> { log.warn("Unsubscribe failed for {}", subscription, exception); ctx.writeAndFlush( - Commands.newError(requestId, BrokerServiceException.getClientErrorCode(exception.getCause()), + Commands.newError(requestId, BrokerServiceException.getClientErrorCode(exception), exception.getCause().getMessage())); return null; }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index a29c9a40e3d..d8de814eb47 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -58,7 +58,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; -import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException; +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.api.PulsarClientException; @@ -738,7 +738,7 @@ public class ServerCnx extends PulsarHandler { // back to client, only if not completed already. if (consumerFuture.completeExceptionally(exception)) { ctx.writeAndFlush(Commands.newError(requestId, - BrokerServiceException.getClientErrorCode(exception.getCause()), + BrokerServiceException.getClientErrorCode(exception), exception.getCause().getMessage())); } consumers.remove(consumerId, consumerFuture); @@ -927,7 +927,7 @@ public class ServerCnx extends PulsarHandler { schemaVersionFuture.exceptionally(exception -> { ctx.writeAndFlush(Commands.newError(requestId, - BrokerServiceException.getClientErrorCode(exception.getCause()), + BrokerServiceException.getClientErrorCode(exception), exception.getMessage())); producers.remove(producerId, producerFuture); return null; @@ -1455,7 +1455,7 @@ public class ServerCnx extends PulsarHandler { future.getNow(null); } catch (Exception e) { if (e.getCause() instanceof BrokerServiceException) { - error = BrokerServiceException.getClientErrorCode((BrokerServiceException) e.getCause()); + error = BrokerServiceException.getClientErrorCode(e.getCause()); } } return error; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java index 2426f3e01a2..c5fa364c442 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java @@ -39,7 +39,7 @@ public class KeyValueSchemaCompatibilityCheck implements SchemaCompatibilityChec this.checkers = checkers; } - private KeyValue decodeKeyValueSchemaData(SchemaData schemaData) { + public static KeyValue decodeKeyValueSchemaData(SchemaData schemaData) { KeyValue schemaInfoKeyValue = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaData.toSchemaInfo()); return new KeyValue<>( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java index baa57a4ed7e..73244cfdf1e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator; import org.apache.pulsar.common.schema.SchemaType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +59,8 @@ public interface SchemaRegistryService extends SchemaRegistry { schemaStorage.start(); - return new SchemaRegistryServiceImpl(schemaStorage, checkers); + return SchemaRegistryServiceWithSchemaDataValidator.of( + new SchemaRegistryServiceImpl(schemaStorage, checkers)); } catch (Exception e) { log.warn("Unable to create schema registry storage, defaulting to empty storage: {}", e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 5c38c8a1d5e..57246f0f1e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -40,6 +40,7 @@ import java.util.stream.Collectors; import javax.validation.constraints.NotNull; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java similarity index 78% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java index 975ba0d0ea3..e045deb3cd2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java @@ -16,9 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.service.schema; +package org.apache.pulsar.broker.service.schema.exceptions; + +/** + * Exception is thrown when an incompatible schema is used. + */ +public class IncompatibleSchemaException extends SchemaException { + + private static final long serialVersionUID = -6013970359956508359L; -public class IncompatibleSchemaException extends Exception { public IncompatibleSchemaException() { super("Incompatible schema used"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/InvalidSchemaDataException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/InvalidSchemaDataException.java new file mode 100644 index 00000000000..a6a58082ff2 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/InvalidSchemaDataException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.exceptions; + +/** + * Exception thrown when the schema data is not in a valid form. + */ +public class InvalidSchemaDataException extends SchemaException { + + private static final long serialVersionUID = -2846364736743783766L; + + public InvalidSchemaDataException(String message) { + super(message); + } + + public InvalidSchemaDataException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java new file mode 100644 index 00000000000..54a22e92ed0 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.exceptions; + +import org.apache.pulsar.broker.service.BrokerServiceException; + +/** + * Schema related exceptions. + */ +public class SchemaException extends BrokerServiceException { + + private static final long serialVersionUID = -6587520779026691815L; + + public SchemaException(String message) { + super(message); + } + + public SchemaException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/PrimitiveSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/PrimitiveSchemaDataValidator.java new file mode 100644 index 00000000000..346a6d03258 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/PrimitiveSchemaDataValidator.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; +import org.apache.pulsar.common.protocol.schema.SchemaData; + +/** + * Validate if the primitive schema is in expected form. + */ +class PrimitiveSchemaDataValidator implements SchemaDataValidator { + + public static PrimitiveSchemaDataValidator of() { + return INSTANCE; + } + + private static final PrimitiveSchemaDataValidator INSTANCE = new PrimitiveSchemaDataValidator(); + + private PrimitiveSchemaDataValidator() {} + + @Override + public void validate(SchemaData schemaData) throws InvalidSchemaDataException { + byte[] data = schemaData.getData(); + if (null != data && data.length > 0) { + throw new InvalidSchemaDataException("Invalid schema definition data for primitive schemas :" + + "length of schema data should be zero, but " + data.length + " bytes is found"); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java new file mode 100644 index 00000000000..0c44612c0db --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import org.apache.pulsar.broker.service.schema.KeyValueSchemaCompatibilityCheck; +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; +import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.schema.KeyValue; + +/** + * A validator to validate the schema data is well formed. + */ +public interface SchemaDataValidator { + + /** + * Validate if the schema data is well formed. + * + * @param schemaData schema data to validate + * @throws InvalidSchemaDataException if the schema data is not in a valid form. + */ + static void validateSchemaData(SchemaData schemaData) throws InvalidSchemaDataException { + switch (schemaData.getType()) { + case AVRO: + case JSON: + case PROTOBUF: + StructSchemaDataValidator.of().validate(schemaData); + break; + case STRING: + StringSchemaDataValidator.of().validate(schemaData); + break; + case BOOLEAN: + case INT8: + case INT16: + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + case DATE: + case TIME: + case TIMESTAMP: + PrimitiveSchemaDataValidator.of().validate(schemaData); + break; + case NONE: + case BYTES: + // `NONE` and `BYTES` schema is not stored + break; + case AUTO: + case AUTO_CONSUME: + case AUTO_PUBLISH: + throw new InvalidSchemaDataException( + "Schema " + schemaData.getType() + " is a client-side schema type"); + case KEY_VALUE: + KeyValue kvSchema = + KeyValueSchemaCompatibilityCheck.decodeKeyValueSchemaData(schemaData); + validateSchemaData(kvSchema.getKey()); + validateSchemaData(kvSchema.getValue()); + break; + default: + throw new InvalidSchemaDataException("Unknown schema type : " + schemaData.getType()); + } + } + + /** + * Validate a schema data is in a valid form. + * + * @param schemaData schema data to validate + * @throws InvalidSchemaDataException if the schema data is not in a valid form. + */ + void validate(SchemaData schemaData) throws InvalidSchemaDataException; + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java new file mode 100644 index 00000000000..f0ca3fc0c04 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy; +import org.apache.pulsar.broker.service.schema.SchemaRegistryService; +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; +import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.apache.pulsar.common.util.FutureUtil; + +/** + * A {@link SchemaRegistryService} wrapper that validate schema data. + */ +public class SchemaRegistryServiceWithSchemaDataValidator implements SchemaRegistryService { + + public static SchemaRegistryServiceWithSchemaDataValidator of(SchemaRegistryService service) { + return new SchemaRegistryServiceWithSchemaDataValidator(service); + } + + private final SchemaRegistryService service; + + private SchemaRegistryServiceWithSchemaDataValidator(SchemaRegistryService service) { + this.service = service; + } + + @Override + public void close() throws Exception { + this.service.close(); + } + + @Override + public CompletableFuture getSchema(String schemaId) { + return this.service.getSchema(schemaId); + } + + @Override + public CompletableFuture getSchema(String schemaId, SchemaVersion version) { + return this.service.getSchema(schemaId, version); + } + + @Override + public CompletableFuture>> getAllSchemas(String schemaId) { + return this.service.getAllSchemas(schemaId); + } + + @Override + public CompletableFuture putSchemaIfAbsent(String schemaId, + SchemaData schema, + SchemaCompatibilityStrategy strategy) { + try { + SchemaDataValidator.validateSchemaData(schema); + } catch (InvalidSchemaDataException e) { + return FutureUtil.failedFuture(e); + } + return service.putSchemaIfAbsent(schemaId, schema, strategy); + } + + @Override + public CompletableFuture deleteSchema(String schemaId, String user) { + return service.deleteSchema(schemaId, user); + } + + @Override + public CompletableFuture isCompatible(String schemaId, + SchemaData schema, + SchemaCompatibilityStrategy strategy) { + try { + SchemaDataValidator.validateSchemaData(schema); + } catch (InvalidSchemaDataException e) { + return FutureUtil.failedFuture(e); + } + return service.isCompatible(schemaId, schema, strategy); + } + + @Override + public SchemaVersion versionFromBytes(byte[] version) { + return service.versionFromBytes(version); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StringSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StringSchemaDataValidator.java new file mode 100644 index 00000000000..4368b1406e3 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StringSchemaDataValidator.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; +import org.apache.pulsar.common.protocol.schema.SchemaData; + +/** + * Validate if the string schema is in expected form. + */ +class StringSchemaDataValidator implements SchemaDataValidator { + + public static final StringSchemaDataValidator of() { + return INSTANCE; + } + + private static final StringSchemaDataValidator INSTANCE = new StringSchemaDataValidator(); + + private static final String PY_NONE_SCHEMA_INFO = "null"; + + private StringSchemaDataValidator() {} + + @Override + public void validate(SchemaData schemaData) throws InvalidSchemaDataException { + byte[] data = schemaData.getData(); + if (null != data && data.length > 0) { + // python send 'null' string as schema data + String schemaDataStr = new String(data, UTF_8); + if (!PY_NONE_SCHEMA_INFO.equals(schemaDataStr)) { + throw new InvalidSchemaDataException("Invalid schema definition data for string schema : '" + + schemaDataStr + "'"); + } + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java new file mode 100644 index 00000000000..1eeed83a000 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jsonSchema.JsonSchema; +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.SchemaParseException; +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; +import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.util.ObjectMapperFactory; + +/** + * Validate if the struct schema is in expected form. + */ +class StructSchemaDataValidator implements SchemaDataValidator { + + public static StructSchemaDataValidator of() { + return INSTANCE; + } + + private static final StructSchemaDataValidator INSTANCE = new StructSchemaDataValidator(); + + private StructSchemaDataValidator() {} + + @Override + public void validate(SchemaData schemaData) throws InvalidSchemaDataException { + byte[] data = schemaData.getData(); + + try { + Schema.Parser avroSchemaParser = new Schema.Parser(); + avroSchemaParser.parse(new String(data, UTF_8)); + } catch (SchemaParseException e) { + if (schemaData.getType() == SchemaType.JSON) { + // we used JsonSchema for storing the definition of a JSON schema + // hence for backward compatibility consideration, we need to try + // to use JsonSchema to decode the schema data + ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal(); + try { + objectMapper.readValue(data, JsonSchema.class); + } catch (IOException ioe) { + throwInvalidSchemaDataException(schemaData, ioe); + } + } else { + throwInvalidSchemaDataException(schemaData, e); + } + } catch (Exception e) { + throwInvalidSchemaDataException(schemaData, e); + } + } + + private static void throwInvalidSchemaDataException(SchemaData schemaData, + Throwable cause) throws InvalidSchemaDataException { + throw new InvalidSchemaDataException("Invalid schema definition data for " + + schemaData.getType() + " schema", cause); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java new file mode 100644 index 00000000000..998d363c19f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator; +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class SchemaDataValidatorTest { + + private static class Foo { + int field; + } + + @DataProvider(name = "primitiveSchemas") + public static Object[][] primitiveSchemas() { + return new Object[][] { + { SchemaType.STRING }, + { SchemaType.BOOLEAN }, + { SchemaType.INT8 }, + { SchemaType.INT16 }, + { SchemaType.INT32 }, + { SchemaType.INT64 }, + { SchemaType.FLOAT }, + { SchemaType.DOUBLE }, + { SchemaType.DATE }, + { SchemaType.TIME }, + { SchemaType.TIMESTAMP }, + }; + } + + @DataProvider(name = "clientSchemas") + public static Object[][] clientSchemas() { + return new Object[][] { + { SchemaType.AUTO_CONSUME }, + { SchemaType.AUTO_PUBLISH }, + { SchemaType.AUTO }, + }; + } + + @DataProvider(name = "structSchemas") + public static Object[][] structSchemas() { + return new Object[][] { + { SchemaType.AVRO }, + { SchemaType.JSON }, + { SchemaType.PROTOBUF }, + }; + } + + @Test(dataProvider = "primitiveSchemas") + public void testPrimitiveValidatorSuccess(SchemaType type) throws Exception { + SchemaData data = SchemaData.builder() + .type(type) + .data(new byte[0]) + .build(); + SchemaDataValidator.validateSchemaData(data); + } + + @Test(dataProvider = "primitiveSchemas", expectedExceptions = InvalidSchemaDataException.class) + public void testPrimitiveValidatorInvalid(SchemaType type) throws Exception { + SchemaData data = SchemaData.builder() + .type(type) + .data(new byte[10]) + .build(); + SchemaDataValidator.validateSchemaData(data); + } + + @Test(dataProvider = "clientSchemas", expectedExceptions = InvalidSchemaDataException.class) + public void testValidateClientSchemas(SchemaType type) throws Exception { + SchemaData data = SchemaData.builder() + .type(type) + .data(new byte[0]) + .build(); + SchemaDataValidator.validateSchemaData(data); + } + + @Test(dataProvider = "structSchemas") + public void testStructValidatorSuccess(SchemaType type) throws Exception { + Schema schema = Schema.AVRO(Foo.class); + SchemaData data = SchemaData.builder() + .type(type) + .data(schema.getSchemaInfo().getSchema()) + .build(); + SchemaDataValidator.validateSchemaData(data); + } + + @Test(dataProvider = "structSchemas", expectedExceptions = InvalidSchemaDataException.class) + public void testStructValidatorInvalid(SchemaType type) throws Exception { + SchemaData data = SchemaData.builder() + .type(type) + .data("bad-schema".getBytes(UTF_8)) + .build(); + SchemaDataValidator.validateSchemaData(data); + } + + @Test + public void testJsonSchemaTypeWithJsonSchemaData() throws Exception { + ObjectMapper mapper = ObjectMapperFactory.getThreadLocal(); + SchemaData data = SchemaData.builder() + .type(SchemaType.JSON) + .data( + mapper.writeValueAsBytes( + new JsonSchemaGenerator(mapper) + .generateSchema(Foo.class))) + .build(); + SchemaDataValidator.validateSchemaData(data); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java new file mode 100644 index 00000000000..aa1cfbb457a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy; +import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; +import org.apache.pulsar.broker.service.schema.SchemaRegistryService; +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; +import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.apache.pulsar.common.schema.SchemaType; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Unit test {@link SchemaRegistryServiceWithSchemaDataValidator}. + */ +public class SchemaRegistryServiceWithSchemaDataValidatorTest { + + private SchemaRegistryService underlyingService; + private SchemaRegistryServiceWithSchemaDataValidator service; + + @BeforeMethod + public void setup() { + this.underlyingService = mock(SchemaRegistryService.class); + this.service = SchemaRegistryServiceWithSchemaDataValidator.of(underlyingService); + } + + @Test + public void testGetLatestSchema() { + String schemaId = "test-schema-id"; + CompletableFuture getFuture = new CompletableFuture<>(); + when(underlyingService.getSchema(eq(schemaId))).thenReturn(getFuture); + assertSame(getFuture, service.getSchema(schemaId)); + verify(underlyingService, times(1)).getSchema(eq(schemaId)); + } + + @Test + public void testGetSchemaByVersion() { + String schemaId = "test-schema-id"; + CompletableFuture getFuture = new CompletableFuture<>(); + when(underlyingService.getSchema(eq(schemaId), any(SchemaVersion.class))) + .thenReturn(getFuture); + assertSame(getFuture, service.getSchema(schemaId, SchemaVersion.Latest)); + verify(underlyingService, times(1)) + .getSchema(eq(schemaId), same(SchemaVersion.Latest)); + } + + @Test + public void testDeleteSchema() { + String schemaId = "test-schema-id"; + String user = "test-user"; + CompletableFuture deleteFuture = new CompletableFuture<>(); + when(underlyingService.deleteSchema(eq(schemaId), eq(user))) + .thenReturn(deleteFuture); + assertSame(deleteFuture, service.deleteSchema(schemaId, user)); + verify(underlyingService, times(1)) + .deleteSchema(eq(schemaId), eq(user)); + } + + @Test + public void testIsCompatibleWithGoodSchemaData() { + String schemaId = "test-schema-id"; + SchemaCompatibilityStrategy strategy = SchemaCompatibilityStrategy.FULL; + CompletableFuture future = new CompletableFuture<>(); + when(underlyingService.isCompatible(eq(schemaId), any(SchemaData.class), eq(strategy))) + .thenReturn(future); + SchemaData schemaData = SchemaData.builder() + .type(SchemaType.BOOLEAN) + .data(new byte[0]) + .build(); + assertSame(future, service.isCompatible(schemaId, schemaData, strategy)); + verify(underlyingService, times(1)) + .isCompatible(eq(schemaId), same(schemaData), eq(strategy)); + } + + @Test + public void testIsCompatibleWithBadSchemaData() { + String schemaId = "test-schema-id"; + SchemaCompatibilityStrategy strategy = SchemaCompatibilityStrategy.FULL; + CompletableFuture future = new CompletableFuture<>(); + when(underlyingService.isCompatible(eq(schemaId), any(SchemaData.class), eq(strategy))) + .thenReturn(future); + SchemaData schemaData = SchemaData.builder() + .type(SchemaType.BOOLEAN) + .data(new byte[10]) + .build(); + try { + service.isCompatible(schemaId, schemaData, strategy).get(); + fail("Should fail isCompatible check"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof InvalidSchemaDataException); + } + verify(underlyingService, times(0)) + .isCompatible(eq(schemaId), same(schemaData), eq(strategy)); + } + + @Test + public void testPutSchemaIfAbsentWithGoodSchemaData() { + String schemaId = "test-schema-id"; + SchemaCompatibilityStrategy strategy = SchemaCompatibilityStrategy.FULL; + CompletableFuture future = new CompletableFuture<>(); + when(underlyingService.putSchemaIfAbsent(eq(schemaId), any(SchemaData.class), eq(strategy))) + .thenReturn(future); + SchemaData schemaData = SchemaData.builder() + .type(SchemaType.BOOLEAN) + .data(new byte[0]) + .build(); + assertSame(future, service.putSchemaIfAbsent(schemaId, schemaData, strategy)); + verify(underlyingService, times(1)) + .putSchemaIfAbsent(eq(schemaId), same(schemaData), eq(strategy)); + } + + @Test + public void testPutSchemaIfAbsentWithBadSchemaData() { + String schemaId = "test-schema-id"; + SchemaCompatibilityStrategy strategy = SchemaCompatibilityStrategy.FULL; + CompletableFuture future = new CompletableFuture<>(); + when(underlyingService.putSchemaIfAbsent(eq(schemaId), any(SchemaData.class), eq(strategy))) + .thenReturn(future); + SchemaData schemaData = SchemaData.builder() + .type(SchemaType.BOOLEAN) + .data(new byte[10]) + .build(); + try { + service.putSchemaIfAbsent(schemaId, schemaData, strategy).get(); + fail("Should fail putSchemaIfAbsent"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof InvalidSchemaDataException); + } + verify(underlyingService, times(0)) + .putSchemaIfAbsent(eq(schemaId), same(schemaData), eq(strategy)); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index 9aa35738dd1..163185025a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.base.MoreObjects; @@ -37,6 +38,7 @@ import lombok.Cleanup; import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy; import org.apache.pulsar.broker.service.schema.SchemaRegistry; +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.AvroSchema; @@ -156,62 +158,32 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase { } @Test - public void testJsonConsumerWithWrongCorruptedSchema() throws Exception { + public void testWrongCorruptedSchema() throws Exception { log.info("-- Starting {} test --", methodName); byte[] randomSchemaBytes = "hello".getBytes(); - pulsar.getSchemaRegistryService() - .putSchemaIfAbsent("my-property/my-ns/my-topic1", - SchemaData.builder() - .type(SchemaType.JSON) - .isDeleted(false) - .timestamp(Clock.systemUTC().millis()) - .user("me") - .data(randomSchemaBytes) - .props(Collections.emptyMap()) - .build(), - SchemaCompatibilityStrategy.FULL - ).get(); - - Consumer consumer = pulsarClient - .newConsumer(JSONSchema.of(SchemaDefinition.builder().withPojo(JsonEncodedPojo.class).build())) - .topic("persistent://my-property/use/my-ns/my-topic1") - .subscriptionName("my-subscriber-name") - .subscribe(); - - log.info("-- Exiting {} test --", methodName); - } - - @Test - public void testJsonProducerWithWrongCorruptedSchema() throws Exception { - log.info("-- Starting {} test --", methodName); - - byte[] randomSchemaBytes = "hello".getBytes(); - - pulsar.getSchemaRegistryService() - .putSchemaIfAbsent("my-property/my-ns/my-topic1", - SchemaData.builder() - .type(SchemaType.JSON) - .isDeleted(false) - .timestamp(Clock.systemUTC().millis()) - .user("me") - .data(randomSchemaBytes) - .props(Collections.emptyMap()) - .build(), - SchemaCompatibilityStrategy.FULL - ).get(); - - Producer producer = pulsarClient - .newProducer(JSONSchema.of(SchemaDefinition.builder().withPojo(JsonEncodedPojo.class).build())) - .topic("persistent://my-property/use/my-ns/my-topic1") - .create(); - + try { + pulsar.getSchemaRegistryService() + .putSchemaIfAbsent("my-property/my-ns/my-topic1", + SchemaData.builder() + .type(SchemaType.JSON) + .isDeleted(false) + .timestamp(Clock.systemUTC().millis()) + .user("me") + .data(randomSchemaBytes) + .props(Collections.emptyMap()) + .build(), + SchemaCompatibilityStrategy.FULL + ).get(); + fail("Should fail to add corrupted schema data"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof InvalidSchemaDataException); + } log.info("-- Exiting {} test --", methodName); } - @Test public void testProtobufProducerAndConsumer() throws Exception { log.info("-- Starting {} test --", methodName); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java index abc8d16da8b..9be3c637435 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java @@ -53,6 +53,15 @@ public interface Schemas { */ void deleteSchema(String topic) throws PulsarAdminException; + /** + * Create a schema for a given topic with the provided schema info. + * + * @param topic topic name, in fully qualified fomrat + * @param schemaInfo schema info + * @throws PulsarAdminException + */ + void createSchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException; + /** * Create a schema for a given topic. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java index 9d06676941f..cce5cd2fb13 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.admin.internal; +import static java.nio.charset.StandardCharsets.UTF_8; + import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -44,12 +46,7 @@ public class SchemasImpl extends BaseResource implements Schemas { try { TopicName tn = TopicName.get(topic); GetSchemaResponse response = request(schemaPath(tn)).get(GetSchemaResponse.class); - SchemaInfo info = new SchemaInfo(); - info.setSchema(response.getData().getBytes()); - info.setType(response.getType()); - info.setProperties(response.getProperties()); - info.setName(tn.getLocalName()); - return info; + return convertGetSchemaResponseToSchemaInfo(tn, response); } catch (Exception e) { throw getApiException(e); } @@ -59,13 +56,9 @@ public class SchemasImpl extends BaseResource implements Schemas { public SchemaInfo getSchemaInfo(String topic, long version) throws PulsarAdminException { try { TopicName tn = TopicName.get(topic); - GetSchemaResponse response = request(schemaPath(tn).path(Long.toString(version))).get(GetSchemaResponse.class); - SchemaInfo info = new SchemaInfo(); - info.setSchema(response.getData().getBytes()); - info.setType(response.getType()); - info.setProperties(response.getProperties()); - info.setName(tn.getLocalName()); - return info; + GetSchemaResponse response = request(schemaPath(tn).path(Long.toString(version))) + .get(GetSchemaResponse.class); + return convertGetSchemaResponseToSchemaInfo(tn, response); } catch (Exception e) { throw getApiException(e); } @@ -81,6 +74,18 @@ public class SchemasImpl extends BaseResource implements Schemas { } } + @Override + public void createSchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException { + PostSchemaPayload payload = new PostSchemaPayload(); + payload.setType(schemaInfo.getType().name()); + payload.setProperties(schemaInfo.getProperties()); + // for backward compatibility concern, we convert `bytes` to `string` + // we can consider fixing it in a new version of rest endpoint + payload.setSchema(convertSchemaDataToStringLegacy(schemaInfo.getSchema())); + + createSchema(topic, payload); + } + @Override public void createSchema(String topic, PostSchemaPayload payload) throws PulsarAdminException { try { @@ -99,4 +104,25 @@ public class SchemasImpl extends BaseResource implements Schemas { .path(topicName.getEncodedLocalName()) .path("schema"); } + + // the util function converts `GetSchemaResponse` to `SchemaInfo` + static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn, + GetSchemaResponse response) { + SchemaInfo info = new SchemaInfo(); + info.setSchema(response.getData().getBytes(UTF_8)); + info.setType(response.getType()); + info.setProperties(response.getProperties()); + info.setName(tn.getLocalName()); + return info; + } + + + // the util function exists for backward compatibility concern + static String convertSchemaDataToStringLegacy(byte[] schemaData) { + if (null == schemaData) { + return ""; + } + + return new String(schemaData, UTF_8); + } } -- GitLab