未验证 提交 6c13f189 编写于 作者: S Sijie Guo 提交者: GitHub

[schema] Introduce schema data validator (#4360)

*Motivation*

Currently the schema data is only validated in compatibility checker. If the schema data is uploaded from admin api, there is no validation if the schema is the first version.

*Changes*

Add schema data validator to validate the schema data before storing it in schema storage.
上级 e814a7ff
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.v2;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.isNull;
import static org.apache.commons.lang.StringUtils.defaultIfEmpty;
import static org.apache.pulsar.common.util.Codec.decode;
......@@ -48,9 +49,11 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
......@@ -85,7 +88,7 @@ public class SchemasResource extends AdminResource {
this.clock = clock;
}
private long getLongSchemaVersion(SchemaVersion schemaVersion) {
private static long getLongSchemaVersion(SchemaVersion schemaVersion) {
if (schemaVersion instanceof LongSchemaVersion) {
return ((LongSchemaVersion) schemaVersion).getVersion();
} else {
......@@ -116,29 +119,7 @@ public class SchemasResource extends AdminResource {
String schemaId = buildSchemaId(tenant, namespace, topic);
pulsar().getSchemaRegistryService().getSchema(schemaId)
.handle((schema, error) -> {
if (isNull(error)) {
if (isNull(schema)) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else if (schema.schema.isDeleted()) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else {
response.resume(
Response.ok()
.encoding(MediaType.APPLICATION_JSON)
.entity(GetSchemaResponse.builder()
.version(getLongSchemaVersion(schema.version))
.type(schema.schema.getType())
.timestamp(schema.schema.getTimestamp())
.data(new String(schema.schema.getData()))
.properties(schema.schema.getProps())
.build()
)
.build()
);
}
} else {
response.resume(error);
}
handleGetSchemaResponse(response, schema, error);
return null;
});
}
......@@ -170,32 +151,38 @@ public class SchemasResource extends AdminResource {
SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
pulsar().getSchemaRegistryService().getSchema(schemaId, v)
.handle((schema, error) -> {
if (isNull(error)) {
if (isNull(schema)) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else if (schema.schema.isDeleted()) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else {
response.resume(
Response.ok()
.encoding(MediaType.APPLICATION_JSON)
.entity(GetSchemaResponse.builder()
.version(getLongSchemaVersion(schema.version))
.type(schema.schema.getType())
.timestamp(schema.schema.getTimestamp())
.data(new String(schema.schema.getData()))
.properties(schema.schema.getProps())
.build()
).build()
);
}
} else {
response.resume(error);
}
handleGetSchemaResponse(response, schema, error);
return null;
});
}
private static void handleGetSchemaResponse(AsyncResponse response,
SchemaAndMetadata schema, Throwable error) {
if (isNull(error)) {
if (isNull(schema)) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else if (schema.schema.isDeleted()) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
} else {
response.resume(
Response.ok()
.encoding(MediaType.APPLICATION_JSON)
.entity(GetSchemaResponse.builder()
.version(getLongSchemaVersion(schema.version))
.type(schema.schema.getType())
.timestamp(schema.schema.getTimestamp())
.data(new String(schema.schema.getData(), UTF_8))
.properties(schema.schema.getProps())
.build()
).build()
);
}
} else {
response.resume(error);
}
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
......@@ -244,7 +231,9 @@ public class SchemasResource extends AdminResource {
@ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"),
@ApiResponse(code = 403, message = "Client is not authenticated"),
@ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"),
@ApiResponse(code = 409, message = "Incompatible schema"),
@ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
@ApiResponse(code = 422, message = "Invalid schema data"),
})
public void postSchema(
@PathParam("tenant") String tenant,
......@@ -287,6 +276,11 @@ public class SchemasResource extends AdminResource {
).exceptionally(error -> {
if (error instanceof IncompatibleSchemaException) {
response.resume(Response.status(Response.Status.CONFLICT).build());
} else if (error instanceof InvalidSchemaDataException) {
response.resume(Response.status(
422, /* Unprocessable Entity */
error.getMessage()
).build());
} else {
response.resume(
Response.serverError().build()
......
......@@ -18,7 +18,8 @@
*/
package org.apache.pulsar.broker.service;
import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
......@@ -37,6 +38,10 @@ public class BrokerServiceException extends Exception {
super(t);
}
public BrokerServiceException(String message, Throwable cause) {
super(message, cause);
}
public static class ConsumerBusyException extends BrokerServiceException {
public ConsumerBusyException(String msg) {
super(msg);
......@@ -154,6 +159,10 @@ public class BrokerServiceException extends Exception {
}
public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
return getClientErrorCode(t, true);
}
private static PulsarApi.ServerError getClientErrorCode(Throwable t, boolean checkCauseIfUnknown) {
if (t instanceof ServerMetadataException) {
return PulsarApi.ServerError.MetadataError;
} else if (t instanceof NamingException) {
......@@ -171,12 +180,19 @@ public class BrokerServiceException extends Exception {
} else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException
|| t instanceof SubscriptionFencedException) {
return PulsarApi.ServerError.ServiceNotReady;
} else if (t instanceof IncompatibleSchemaException) {
} else if (t instanceof IncompatibleSchemaException
|| t instanceof InvalidSchemaDataException) {
// for backward compatible with old clients, invalid schema data
// is treated as "incompatible schema".
return PulsarApi.ServerError.IncompatibleSchema;
} else if (t instanceof ConsumerAssignException) {
return ServerError.ConsumerAssignError;
} else {
return PulsarApi.ServerError.UnknownError;
if (checkCauseIfUnknown) {
return getClientErrorCode(t.getCause(), false);
} else {
return PulsarApi.ServerError.UnknownError;
}
}
}
}
......@@ -312,7 +312,7 @@ public class Consumer {
}).exceptionally(exception -> {
log.warn("Unsubscribe failed for {}", subscription, exception);
ctx.writeAndFlush(
Commands.newError(requestId, BrokerServiceException.getClientErrorCode(exception.getCause()),
Commands.newError(requestId, BrokerServiceException.getClientErrorCode(exception),
exception.getCause().getMessage()));
return null;
});
......
......@@ -58,7 +58,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
......@@ -738,7 +738,7 @@ public class ServerCnx extends PulsarHandler {
// back to client, only if not completed already.
if (consumerFuture.completeExceptionally(exception)) {
ctx.writeAndFlush(Commands.newError(requestId,
BrokerServiceException.getClientErrorCode(exception.getCause()),
BrokerServiceException.getClientErrorCode(exception),
exception.getCause().getMessage()));
}
consumers.remove(consumerId, consumerFuture);
......@@ -927,7 +927,7 @@ public class ServerCnx extends PulsarHandler {
schemaVersionFuture.exceptionally(exception -> {
ctx.writeAndFlush(Commands.newError(requestId,
BrokerServiceException.getClientErrorCode(exception.getCause()),
BrokerServiceException.getClientErrorCode(exception),
exception.getMessage()));
producers.remove(producerId, producerFuture);
return null;
......@@ -1455,7 +1455,7 @@ public class ServerCnx extends PulsarHandler {
future.getNow(null);
} catch (Exception e) {
if (e.getCause() instanceof BrokerServiceException) {
error = BrokerServiceException.getClientErrorCode((BrokerServiceException) e.getCause());
error = BrokerServiceException.getClientErrorCode(e.getCause());
}
}
return error;
......
......@@ -39,7 +39,7 @@ public class KeyValueSchemaCompatibilityCheck implements SchemaCompatibilityChec
this.checkers = checkers;
}
private KeyValue<SchemaData, SchemaData> decodeKeyValueSchemaData(SchemaData schemaData) {
public static KeyValue<SchemaData, SchemaData> decodeKeyValueSchemaData(SchemaData schemaData) {
KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaData.toSchemaInfo());
return new KeyValue<>(
......
......@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -58,7 +59,8 @@ public interface SchemaRegistryService extends SchemaRegistry {
schemaStorage.start();
return new SchemaRegistryServiceImpl(schemaStorage, checkers);
return SchemaRegistryServiceWithSchemaDataValidator.of(
new SchemaRegistryServiceImpl(schemaStorage, checkers));
} catch (Exception e) {
log.warn("Unable to create schema registry storage, defaulting to empty storage: {}", e);
}
......
......@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
......
......@@ -16,9 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;
package org.apache.pulsar.broker.service.schema.exceptions;
/**
* Exception is thrown when an incompatible schema is used.
*/
public class IncompatibleSchemaException extends SchemaException {
private static final long serialVersionUID = -6013970359956508359L;
public class IncompatibleSchemaException extends Exception {
public IncompatibleSchemaException() {
super("Incompatible schema used");
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema.exceptions;
/**
* Exception thrown when the schema data is not in a valid form.
*/
public class InvalidSchemaDataException extends SchemaException {
private static final long serialVersionUID = -2846364736743783766L;
public InvalidSchemaDataException(String message) {
super(message);
}
public InvalidSchemaDataException(String message, Throwable cause) {
super(message, cause);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema.exceptions;
import org.apache.pulsar.broker.service.BrokerServiceException;
/**
* Schema related exceptions.
*/
public class SchemaException extends BrokerServiceException {
private static final long serialVersionUID = -6587520779026691815L;
public SchemaException(String message) {
super(message);
}
public SchemaException(String message, Throwable cause) {
super(message, cause);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema.validator;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.protocol.schema.SchemaData;
/**
* Validate if the primitive schema is in expected form.
*/
class PrimitiveSchemaDataValidator implements SchemaDataValidator {
public static PrimitiveSchemaDataValidator of() {
return INSTANCE;
}
private static final PrimitiveSchemaDataValidator INSTANCE = new PrimitiveSchemaDataValidator();
private PrimitiveSchemaDataValidator() {}
@Override
public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
byte[] data = schemaData.getData();
if (null != data && data.length > 0) {
throw new InvalidSchemaDataException("Invalid schema definition data for primitive schemas :"
+ "length of schema data should be zero, but " + data.length + " bytes is found");
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema.validator;
import org.apache.pulsar.broker.service.schema.KeyValueSchemaCompatibilityCheck;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.KeyValue;
/**
* A validator to validate the schema data is well formed.
*/
public interface SchemaDataValidator {
/**
* Validate if the schema data is well formed.
*
* @param schemaData schema data to validate
* @throws InvalidSchemaDataException if the schema data is not in a valid form.
*/
static void validateSchemaData(SchemaData schemaData) throws InvalidSchemaDataException {
switch (schemaData.getType()) {
case AVRO:
case JSON:
case PROTOBUF:
StructSchemaDataValidator.of().validate(schemaData);
break;
case STRING:
StringSchemaDataValidator.of().validate(schemaData);
break;
case BOOLEAN:
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT:
case DOUBLE:
case DATE:
case TIME:
case TIMESTAMP:
PrimitiveSchemaDataValidator.of().validate(schemaData);
break;
case NONE:
case BYTES:
// `NONE` and `BYTES` schema is not stored
break;
case AUTO:
case AUTO_CONSUME:
case AUTO_PUBLISH:
throw new InvalidSchemaDataException(
"Schema " + schemaData.getType() + " is a client-side schema type");
case KEY_VALUE:
KeyValue<SchemaData, SchemaData> kvSchema =
KeyValueSchemaCompatibilityCheck.decodeKeyValueSchemaData(schemaData);
validateSchemaData(kvSchema.getKey());
validateSchemaData(kvSchema.getValue());
break;
default:
throw new InvalidSchemaDataException("Unknown schema type : " + schemaData.getType());
}
}
/**
* Validate a schema data is in a valid form.
*
* @param schemaData schema data to validate
* @throws InvalidSchemaDataException if the schema data is not in a valid form.
*/
void validate(SchemaData schemaData) throws InvalidSchemaDataException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema.validator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
/**
* A {@link SchemaRegistryService} wrapper that validate schema data.
*/
public class SchemaRegistryServiceWithSchemaDataValidator implements SchemaRegistryService {
public static SchemaRegistryServiceWithSchemaDataValidator of(SchemaRegistryService service) {
return new SchemaRegistryServiceWithSchemaDataValidator(service);
}
private final SchemaRegistryService service;
private SchemaRegistryServiceWithSchemaDataValidator(SchemaRegistryService service) {
this.service = service;
}
@Override
public void close() throws Exception {
this.service.close();
}
@Override
public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId) {
return this.service.getSchema(schemaId);
}
@Override
public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVersion version) {
return this.service.getSchema(schemaId, version);
}
@Override
public CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> getAllSchemas(String schemaId) {
return this.service.getAllSchemas(schemaId);
}
@Override
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId,
SchemaData schema,
SchemaCompatibilityStrategy strategy) {
try {
SchemaDataValidator.validateSchemaData(schema);
} catch (InvalidSchemaDataException e) {
return FutureUtil.failedFuture(e);
}
return service.putSchemaIfAbsent(schemaId, schema, strategy);
}
@Override
public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user) {
return service.deleteSchema(schemaId, user);
}
@Override
public CompletableFuture<Boolean> isCompatible(String schemaId,
SchemaData schema,
SchemaCompatibilityStrategy strategy) {
try {
SchemaDataValidator.validateSchemaData(schema);
} catch (InvalidSchemaDataException e) {
return FutureUtil.failedFuture(e);
}
return service.isCompatible(schemaId, schema, strategy);
}
@Override
public SchemaVersion versionFromBytes(byte[] version) {
return service.versionFromBytes(version);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema.validator;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.protocol.schema.SchemaData;
/**
* Validate if the string schema is in expected form.
*/
class StringSchemaDataValidator implements SchemaDataValidator {
public static final StringSchemaDataValidator of() {
return INSTANCE;
}
private static final StringSchemaDataValidator INSTANCE = new StringSchemaDataValidator();
private static final String PY_NONE_SCHEMA_INFO = "null";
private StringSchemaDataValidator() {}
@Override
public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
byte[] data = schemaData.getData();
if (null != data && data.length > 0) {
// python send 'null' string as schema data
String schemaDataStr = new String(data, UTF_8);
if (!PY_NONE_SCHEMA_INFO.equals(schemaDataStr)) {
throw new InvalidSchemaDataException("Invalid schema definition data for string schema : '"
+ schemaDataStr + "'");
}
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema.validator;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
/**
* Validate if the struct schema is in expected form.
*/
class StructSchemaDataValidator implements SchemaDataValidator {
public static StructSchemaDataValidator of() {
return INSTANCE;
}
private static final StructSchemaDataValidator INSTANCE = new StructSchemaDataValidator();
private StructSchemaDataValidator() {}
@Override
public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
byte[] data = schemaData.getData();
try {
Schema.Parser avroSchemaParser = new Schema.Parser();
avroSchemaParser.parse(new String(data, UTF_8));
} catch (SchemaParseException e) {
if (schemaData.getType() == SchemaType.JSON) {
// we used JsonSchema for storing the definition of a JSON schema
// hence for backward compatibility consideration, we need to try
// to use JsonSchema to decode the schema data
ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal();
try {
objectMapper.readValue(data, JsonSchema.class);
} catch (IOException ioe) {
throwInvalidSchemaDataException(schemaData, ioe);
}
} else {
throwInvalidSchemaDataException(schemaData, e);
}
} catch (Exception e) {
throwInvalidSchemaDataException(schemaData, e);
}
}
private static void throwInvalidSchemaDataException(SchemaData schemaData,
Throwable cause) throws InvalidSchemaDataException {
throw new InvalidSchemaDataException("Invalid schema definition data for "
+ schemaData.getType() + " schema", cause);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema.validator;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class SchemaDataValidatorTest {
private static class Foo {
int field;
}
@DataProvider(name = "primitiveSchemas")
public static Object[][] primitiveSchemas() {
return new Object[][] {
{ SchemaType.STRING },
{ SchemaType.BOOLEAN },
{ SchemaType.INT8 },
{ SchemaType.INT16 },
{ SchemaType.INT32 },
{ SchemaType.INT64 },
{ SchemaType.FLOAT },
{ SchemaType.DOUBLE },
{ SchemaType.DATE },
{ SchemaType.TIME },
{ SchemaType.TIMESTAMP },
};
}
@DataProvider(name = "clientSchemas")
public static Object[][] clientSchemas() {
return new Object[][] {
{ SchemaType.AUTO_CONSUME },
{ SchemaType.AUTO_PUBLISH },
{ SchemaType.AUTO },
};
}
@DataProvider(name = "structSchemas")
public static Object[][] structSchemas() {
return new Object[][] {
{ SchemaType.AVRO },
{ SchemaType.JSON },
{ SchemaType.PROTOBUF },
};
}
@Test(dataProvider = "primitiveSchemas")
public void testPrimitiveValidatorSuccess(SchemaType type) throws Exception {
SchemaData data = SchemaData.builder()
.type(type)
.data(new byte[0])
.build();
SchemaDataValidator.validateSchemaData(data);
}
@Test(dataProvider = "primitiveSchemas", expectedExceptions = InvalidSchemaDataException.class)
public void testPrimitiveValidatorInvalid(SchemaType type) throws Exception {
SchemaData data = SchemaData.builder()
.type(type)
.data(new byte[10])
.build();
SchemaDataValidator.validateSchemaData(data);
}
@Test(dataProvider = "clientSchemas", expectedExceptions = InvalidSchemaDataException.class)
public void testValidateClientSchemas(SchemaType type) throws Exception {
SchemaData data = SchemaData.builder()
.type(type)
.data(new byte[0])
.build();
SchemaDataValidator.validateSchemaData(data);
}
@Test(dataProvider = "structSchemas")
public void testStructValidatorSuccess(SchemaType type) throws Exception {
Schema<Foo> schema = Schema.AVRO(Foo.class);
SchemaData data = SchemaData.builder()
.type(type)
.data(schema.getSchemaInfo().getSchema())
.build();
SchemaDataValidator.validateSchemaData(data);
}
@Test(dataProvider = "structSchemas", expectedExceptions = InvalidSchemaDataException.class)
public void testStructValidatorInvalid(SchemaType type) throws Exception {
SchemaData data = SchemaData.builder()
.type(type)
.data("bad-schema".getBytes(UTF_8))
.build();
SchemaDataValidator.validateSchemaData(data);
}
@Test
public void testJsonSchemaTypeWithJsonSchemaData() throws Exception {
ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
SchemaData data = SchemaData.builder()
.type(SchemaType.JSON)
.data(
mapper.writeValueAsBytes(
new JsonSchemaGenerator(mapper)
.generateSchema(Foo.class)))
.build();
SchemaDataValidator.validateSchemaData(data);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema.validator;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
* Unit test {@link SchemaRegistryServiceWithSchemaDataValidator}.
*/
public class SchemaRegistryServiceWithSchemaDataValidatorTest {
private SchemaRegistryService underlyingService;
private SchemaRegistryServiceWithSchemaDataValidator service;
@BeforeMethod
public void setup() {
this.underlyingService = mock(SchemaRegistryService.class);
this.service = SchemaRegistryServiceWithSchemaDataValidator.of(underlyingService);
}
@Test
public void testGetLatestSchema() {
String schemaId = "test-schema-id";
CompletableFuture<SchemaAndMetadata> getFuture = new CompletableFuture<>();
when(underlyingService.getSchema(eq(schemaId))).thenReturn(getFuture);
assertSame(getFuture, service.getSchema(schemaId));
verify(underlyingService, times(1)).getSchema(eq(schemaId));
}
@Test
public void testGetSchemaByVersion() {
String schemaId = "test-schema-id";
CompletableFuture<SchemaAndMetadata> getFuture = new CompletableFuture<>();
when(underlyingService.getSchema(eq(schemaId), any(SchemaVersion.class)))
.thenReturn(getFuture);
assertSame(getFuture, service.getSchema(schemaId, SchemaVersion.Latest));
verify(underlyingService, times(1))
.getSchema(eq(schemaId), same(SchemaVersion.Latest));
}
@Test
public void testDeleteSchema() {
String schemaId = "test-schema-id";
String user = "test-user";
CompletableFuture<SchemaVersion> deleteFuture = new CompletableFuture<>();
when(underlyingService.deleteSchema(eq(schemaId), eq(user)))
.thenReturn(deleteFuture);
assertSame(deleteFuture, service.deleteSchema(schemaId, user));
verify(underlyingService, times(1))
.deleteSchema(eq(schemaId), eq(user));
}
@Test
public void testIsCompatibleWithGoodSchemaData() {
String schemaId = "test-schema-id";
SchemaCompatibilityStrategy strategy = SchemaCompatibilityStrategy.FULL;
CompletableFuture<Boolean> future = new CompletableFuture<>();
when(underlyingService.isCompatible(eq(schemaId), any(SchemaData.class), eq(strategy)))
.thenReturn(future);
SchemaData schemaData = SchemaData.builder()
.type(SchemaType.BOOLEAN)
.data(new byte[0])
.build();
assertSame(future, service.isCompatible(schemaId, schemaData, strategy));
verify(underlyingService, times(1))
.isCompatible(eq(schemaId), same(schemaData), eq(strategy));
}
@Test
public void testIsCompatibleWithBadSchemaData() {
String schemaId = "test-schema-id";
SchemaCompatibilityStrategy strategy = SchemaCompatibilityStrategy.FULL;
CompletableFuture<Boolean> future = new CompletableFuture<>();
when(underlyingService.isCompatible(eq(schemaId), any(SchemaData.class), eq(strategy)))
.thenReturn(future);
SchemaData schemaData = SchemaData.builder()
.type(SchemaType.BOOLEAN)
.data(new byte[10])
.build();
try {
service.isCompatible(schemaId, schemaData, strategy).get();
fail("Should fail isCompatible check");
} catch (Exception e) {
assertTrue(e.getCause() instanceof InvalidSchemaDataException);
}
verify(underlyingService, times(0))
.isCompatible(eq(schemaId), same(schemaData), eq(strategy));
}
@Test
public void testPutSchemaIfAbsentWithGoodSchemaData() {
String schemaId = "test-schema-id";
SchemaCompatibilityStrategy strategy = SchemaCompatibilityStrategy.FULL;
CompletableFuture<SchemaVersion> future = new CompletableFuture<>();
when(underlyingService.putSchemaIfAbsent(eq(schemaId), any(SchemaData.class), eq(strategy)))
.thenReturn(future);
SchemaData schemaData = SchemaData.builder()
.type(SchemaType.BOOLEAN)
.data(new byte[0])
.build();
assertSame(future, service.putSchemaIfAbsent(schemaId, schemaData, strategy));
verify(underlyingService, times(1))
.putSchemaIfAbsent(eq(schemaId), same(schemaData), eq(strategy));
}
@Test
public void testPutSchemaIfAbsentWithBadSchemaData() {
String schemaId = "test-schema-id";
SchemaCompatibilityStrategy strategy = SchemaCompatibilityStrategy.FULL;
CompletableFuture<SchemaVersion> future = new CompletableFuture<>();
when(underlyingService.putSchemaIfAbsent(eq(schemaId), any(SchemaData.class), eq(strategy)))
.thenReturn(future);
SchemaData schemaData = SchemaData.builder()
.type(SchemaType.BOOLEAN)
.data(new byte[10])
.build();
try {
service.putSchemaIfAbsent(schemaId, schemaData, strategy).get();
fail("Should fail putSchemaIfAbsent");
} catch (Exception e) {
assertTrue(e.getCause() instanceof InvalidSchemaDataException);
}
verify(underlyingService, times(0))
.putSchemaIfAbsent(eq(schemaId), same(schemaData), eq(strategy));
}
}
......@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.base.MoreObjects;
......@@ -37,6 +38,7 @@ import lombok.Cleanup;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
......@@ -156,62 +158,32 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
}
@Test
public void testJsonConsumerWithWrongCorruptedSchema() throws Exception {
public void testWrongCorruptedSchema() throws Exception {
log.info("-- Starting {} test --", methodName);
byte[] randomSchemaBytes = "hello".getBytes();
pulsar.getSchemaRegistryService()
.putSchemaIfAbsent("my-property/my-ns/my-topic1",
SchemaData.builder()
.type(SchemaType.JSON)
.isDeleted(false)
.timestamp(Clock.systemUTC().millis())
.user("me")
.data(randomSchemaBytes)
.props(Collections.emptyMap())
.build(),
SchemaCompatibilityStrategy.FULL
).get();
Consumer<JsonEncodedPojo> consumer = pulsarClient
.newConsumer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscribe();
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testJsonProducerWithWrongCorruptedSchema() throws Exception {
log.info("-- Starting {} test --", methodName);
byte[] randomSchemaBytes = "hello".getBytes();
pulsar.getSchemaRegistryService()
.putSchemaIfAbsent("my-property/my-ns/my-topic1",
SchemaData.builder()
.type(SchemaType.JSON)
.isDeleted(false)
.timestamp(Clock.systemUTC().millis())
.user("me")
.data(randomSchemaBytes)
.props(Collections.emptyMap())
.build(),
SchemaCompatibilityStrategy.FULL
).get();
Producer<JsonEncodedPojo> producer = pulsarClient
.newProducer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.create();
try {
pulsar.getSchemaRegistryService()
.putSchemaIfAbsent("my-property/my-ns/my-topic1",
SchemaData.builder()
.type(SchemaType.JSON)
.isDeleted(false)
.timestamp(Clock.systemUTC().millis())
.user("me")
.data(randomSchemaBytes)
.props(Collections.emptyMap())
.build(),
SchemaCompatibilityStrategy.FULL
).get();
fail("Should fail to add corrupted schema data");
} catch (Exception e) {
assertTrue(e.getCause() instanceof InvalidSchemaDataException);
}
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testProtobufProducerAndConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
......
......@@ -53,6 +53,15 @@ public interface Schemas {
*/
void deleteSchema(String topic) throws PulsarAdminException;
/**
* Create a schema for a given <tt>topic</tt> with the provided schema info.
*
* @param topic topic name, in fully qualified fomrat
* @param schemaInfo schema info
* @throws PulsarAdminException
*/
void createSchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException;
/**
* Create a schema for a given <tt>topic</tt>.
*
......
......@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.admin.internal;
import static java.nio.charset.StandardCharsets.UTF_8;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.PulsarAdminException;
......@@ -44,12 +46,7 @@ public class SchemasImpl extends BaseResource implements Schemas {
try {
TopicName tn = TopicName.get(topic);
GetSchemaResponse response = request(schemaPath(tn)).get(GetSchemaResponse.class);
SchemaInfo info = new SchemaInfo();
info.setSchema(response.getData().getBytes());
info.setType(response.getType());
info.setProperties(response.getProperties());
info.setName(tn.getLocalName());
return info;
return convertGetSchemaResponseToSchemaInfo(tn, response);
} catch (Exception e) {
throw getApiException(e);
}
......@@ -59,13 +56,9 @@ public class SchemasImpl extends BaseResource implements Schemas {
public SchemaInfo getSchemaInfo(String topic, long version) throws PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
GetSchemaResponse response = request(schemaPath(tn).path(Long.toString(version))).get(GetSchemaResponse.class);
SchemaInfo info = new SchemaInfo();
info.setSchema(response.getData().getBytes());
info.setType(response.getType());
info.setProperties(response.getProperties());
info.setName(tn.getLocalName());
return info;
GetSchemaResponse response = request(schemaPath(tn).path(Long.toString(version)))
.get(GetSchemaResponse.class);
return convertGetSchemaResponseToSchemaInfo(tn, response);
} catch (Exception e) {
throw getApiException(e);
}
......@@ -81,6 +74,18 @@ public class SchemasImpl extends BaseResource implements Schemas {
}
}
@Override
public void createSchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException {
PostSchemaPayload payload = new PostSchemaPayload();
payload.setType(schemaInfo.getType().name());
payload.setProperties(schemaInfo.getProperties());
// for backward compatibility concern, we convert `bytes` to `string`
// we can consider fixing it in a new version of rest endpoint
payload.setSchema(convertSchemaDataToStringLegacy(schemaInfo.getSchema()));
createSchema(topic, payload);
}
@Override
public void createSchema(String topic, PostSchemaPayload payload) throws PulsarAdminException {
try {
......@@ -99,4 +104,25 @@ public class SchemasImpl extends BaseResource implements Schemas {
.path(topicName.getEncodedLocalName())
.path("schema");
}
// the util function converts `GetSchemaResponse` to `SchemaInfo`
static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
GetSchemaResponse response) {
SchemaInfo info = new SchemaInfo();
info.setSchema(response.getData().getBytes(UTF_8));
info.setType(response.getType());
info.setProperties(response.getProperties());
info.setName(tn.getLocalName());
return info;
}
// the util function exists for backward compatibility concern
static String convertSchemaDataToStringLegacy(byte[] schemaData) {
if (null == schemaData) {
return "";
}
return new String(schemaData, UTF_8);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册