From dd7cc890bc1a950f4200c55bcbbee657c44b0e28 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 30 Jul 2019 22:41:06 +0800 Subject: [PATCH] [schema] KeyValue schema support using AUTO_CONSUME as key/value schema (#4839) *Motivation* Currently KeyValue schema doesn't support using AUTO_CONSUME. This PR is to add this support. This PR is based on #4836 *Changes* - refactor a bit on Schema interface to support fetching schema info for both AutoConsumeSchema and KeyValueSchema before subscribing - add AUTO_CONSUME support to KeyValueSchema - add tests --- .../pulsar/client/api/SimpleSchemaTest.java | 134 +++++++++++++ .../org/apache/pulsar/client/api/Schema.java | 22 +++ .../pulsar/client/impl/PulsarClientImpl.java | 26 ++- .../client/impl/schema/AutoConsumeSchema.java | 35 +++- .../client/impl/schema/KeyValueSchema.java | 126 +++++++----- .../impl/schema/KeyValueSchemaInfo.java | 39 +++- .../schema/generic/GenericAvroSchema.java | 4 +- .../schema/generic/GenericSchemaImplTest.java | 179 ++++++++++++------ 8 files changed, 425 insertions(+), 140 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index 5d55bfd1aff..2e1d5f82e03 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -24,6 +24,9 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -33,6 +36,14 @@ import org.testng.annotations.Test; public class SimpleSchemaTest extends ProducerConsumerBase { + @DataProvider(name = "batchingModes") + public static Object[][] batchingModes() { + return new Object[][] { + { true }, + { false } + }; + } + @DataProvider(name = "schemaValidationModes") public static Object[][] schemaValidationModes() { return new Object[][] { { true }, { false } }; @@ -50,6 +61,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase { @Override protected void setup() throws Exception { conf.setSchemaValidationEnforced(schemaValidationEnforced); + this.isTcpLookup = true; super.internalSetup(); super.producerBaseSetup(); } @@ -252,4 +264,126 @@ public class SimpleSchemaTest extends ProducerConsumerBase { assertEquals(data.getValue(), new V1Data(1)); } } + + @Test(dataProvider = "batchingModes") + public void testAutoConsume(boolean batching) throws Exception { + String topic = "my-property/my-ns/schema-test-auto-consume-" + batching; + + try (Producer p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)) + .topic(topic) + .enableBatching(batching) + .create(); + Consumer c = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) + .topic(topic) + .subscriptionName("sub1") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + + int numMessages = 10; + + for (int i = 0; i < numMessages; i++) { + p.sendAsync(new V1Data(i)); + } + p.flush(); + + for (int i = 0; i < numMessages; i++) { + Message data = c.receive(); + assertNotNull(data.getSchemaVersion()); + assertEquals(data.getValue().getField("i"), i); + } + } + } + + @Test(dataProvider = "batchingModes") + public void testAutoKeyValueConsume(boolean batching) throws Exception { + String topic = "my-property/my-ns/schema-test-auto-keyvalue-consume-" + batching; + + Schema> pojoSchema = Schema.KeyValue( + Schema.AVRO(V1Data.class), + Schema.AVRO(V1Data.class), + KeyValueEncodingType.SEPARATED); + + try (Producer> p = pulsarClient.newProducer(pojoSchema) + .topic(topic) + .enableBatching(batching) + .create(); + Consumer> c1 = pulsarClient.newConsumer( + Schema.KeyValue( + Schema.AUTO_CONSUME(), + Schema.AUTO_CONSUME(), + KeyValueEncodingType.SEPARATED)) + .topic(topic) + .subscriptionName("sub1") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Consumer> c2 = pulsarClient.newConsumer( + Schema.KeyValue( + Schema.AVRO(V1Data.class), + Schema.AVRO(V1Data.class), + KeyValueEncodingType.SEPARATED)) + .topic(topic) + .subscriptionName("sub2") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Consumer> c3 = pulsarClient.newConsumer( + Schema.KeyValue( + Schema.AUTO_CONSUME(), + Schema.AVRO(V1Data.class), + KeyValueEncodingType.SEPARATED)) + .topic(topic) + .subscriptionName("sub3") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Consumer> c4 = pulsarClient.newConsumer( + Schema.KeyValue( + Schema.AVRO(V1Data.class), + Schema.AUTO_CONSUME(), + KeyValueEncodingType.SEPARATED)) + .topic(topic) + .subscriptionName("sub4") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe() + ) { + + int numMessages = 10; + + for (int i = 0; i < numMessages; i++) { + p.sendAsync(new KeyValue<>(new V1Data(i * 100), new V1Data(i * 1000))); + } + p.flush(); + + // verify c1 + for (int i = 0; i < numMessages; i++) { + Message> data = c1.receive(); + assertNotNull(data.getSchemaVersion()); + assertEquals(data.getValue().getKey().getField("i"), i * 100); + assertEquals(data.getValue().getValue().getField("i"), i * 1000); + } + + // verify c2 + for (int i = 0; i < numMessages; i++) { + Message> data = c2.receive(); + assertNotNull(data.getSchemaVersion()); + assertEquals(data.getValue().getKey().i, i * 100); + assertEquals(data.getValue().getValue().i, i * 1000); + } + + // verify c3 + for (int i = 0; i < numMessages; i++) { + Message> data = c3.receive(); + assertNotNull(data.getSchemaVersion()); + assertEquals(data.getValue().getKey().getField("i"), i * 100); + assertEquals(data.getValue().getValue().i, i * 1000); + } + + // verify c4 + for (int i = 0; i < numMessages; i++) { + Message> data = c4.receive(); + assertNotNull(data.getSchemaVersion()); + assertEquals(data.getValue().getKey().i, i * 100); + assertEquals(data.getValue().getValue().getField("i"), i * 1000); + } + } + } + } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index b55cdbc0103..a5d4467e6bd 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -116,6 +116,28 @@ public interface Schema { */ SchemaInfo getSchemaInfo(); + /** + * Check if this schema requires fetching schema info to configure the schema. + * + * @return true if the schema requires fetching schema info to configure the schema, + * otherwise false. + */ + default boolean requireFetchingSchemaInfo() { + return false; + } + + /** + * Configure the schema to use the provided schema info. + * + * @param topic topic name + * @param componentName component name + * @param schemaInfo schema info + */ + default void configureSchemaInfo(String topic, String componentName, + SchemaInfo schemaInfo) { + // no-op + } + /** * Schema that doesn't perform any encoding on the message payloads. Accepts a byte array and it passes it through. */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 134bb7774f1..a3923b7fc19 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -60,7 +60,6 @@ import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.api.schema.SchemaInfoProvider; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode; @@ -70,7 +69,6 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema; -import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl; import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; @@ -79,7 +77,6 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.schema.SchemaInfo; -import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; @@ -727,6 +724,7 @@ public class PulsarClientImpl implements PulsarClient { return schemaProviderLoadingCache; } + @SuppressWarnings("unchecked") protected CompletableFuture preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, Schema schema, String topicName) { @@ -739,20 +737,20 @@ public class PulsarClientImpl implements PulsarClient { return FutureUtil.failedFuture(e.getCause()); } - if (schema instanceof AutoConsumeSchema) { + if (schema.requireFetchingSchemaInfo()) { return schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> { - if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){ + if (null == schemaInfo) { + // no schema info is found return FutureUtil.failedFuture( - new RuntimeException("Currently schema detection only works" - + " for topics with avro or json schemas")); + new PulsarClientException.NotFoundException( + "No latest schema found for topic " + topicName)); + } + try { + log.info("Configuring schema for topic {} : {}", topicName, schemaInfo); + schema.configureSchemaInfo(topicName, "topic", schemaInfo); + } catch (RuntimeException re) { + return FutureUtil.failedFuture(re); } - - // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader - // to decode the messages. - GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfo, false /*useProvidedSchemaAsReaderSchema*/); - log.info("Auto detected schema for topic {} : {}", - topicName, schemaInfo.getSchemaDefinition()); - ((AutoConsumeSchema) schema).setSchema(genericSchema); schema.setSchemaInfoProvider(schemaInfoProvider); return CompletableFuture.completedFuture(null); }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index 845af6ae2b8..00f1e6b4cc6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -18,18 +18,22 @@ */ package org.apache.pulsar.client.impl.schema; - import static com.google.common.base.Preconditions.checkState; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.api.schema.SchemaInfoProvider; import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; /** * Auto detect schema. */ +@Slf4j public class AutoConsumeSchema implements Schema { private Schema schema; @@ -80,6 +84,27 @@ public class AutoConsumeSchema implements Schema { return schema.getSchemaInfo(); } + @Override + public boolean requireFetchingSchemaInfo() { + return true; + } + + @Override + public void configureSchemaInfo(String topicName, + String componentName, + SchemaInfo schemaInfo) { + if (schemaInfo.getType() != SchemaType.AVRO + && schemaInfo.getType() != SchemaType.JSON) { + throw new RuntimeException("Currently auto consume only works for topics with avro or json schemas"); + } + // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader + // to decode the messages. + GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfo, false /*useProvidedSchemaAsReaderSchema*/); + setSchema(genericSchema); + log.info("Configure {} schema for topic {} : {}", + componentName, topicName, schemaInfo.getSchemaDefinition()); + } + public static Schema getSchema(SchemaInfo schemaInfo) { switch (schemaInfo.getType()) { case INT8: @@ -106,11 +131,15 @@ public class AutoConsumeSchema implements Schema { return TimeSchema.of(); case TIMESTAMP: return TimestampSchema.of(); - case KEY_VALUE: - return KeyValueSchema.kvBytes(); case JSON: case AVRO: return GenericSchemaImpl.of(schemaInfo); + case KEY_VALUE: + KeyValue kvSchemaInfo = + KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo); + Schema keySchema = getSchema(kvSchemaInfo.getKey()); + Schema valueSchema = getSchema(kvSchemaInfo.getValue()); + return KeyValueSchema.of(keySchema, valueSchema); default: throw new IllegalArgumentException("Retrieve schema instance from schema info for type '" + schemaInfo.getType() + "' is not supported yet"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java index fd7dcaa1e9e..0e173d21d15 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import java.util.concurrent.CompletableFuture; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaInfoProvider; @@ -33,6 +34,7 @@ import org.apache.pulsar.common.schema.SchemaType; /** * [Key, Value] pair schema definition */ +@Slf4j public class KeyValueSchema implements Schema> { @Getter @@ -40,13 +42,12 @@ public class KeyValueSchema implements Schema> { @Getter private final Schema valueSchema; - // schemaInfo combined by KeySchemaInfo and ValueSchemaInfo: - // [keyInfo.length][keyInfo][valueInfo.length][ValueInfo] - private final SchemaInfo schemaInfo; - @Getter private final KeyValueEncodingType keyValueEncodingType; + // schemaInfo combined by KeySchemaInfo and ValueSchemaInfo: + // [keyInfo.length][keyInfo][valueInfo.length][ValueInfo] + private SchemaInfo schemaInfo; protected SchemaInfoProvider schemaInfoProvider; /** @@ -97,52 +98,6 @@ public class KeyValueSchema implements Schema> { this.keySchema = keySchema; this.valueSchema = valueSchema; this.keyValueEncodingType = keyValueEncodingType; - this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo( - keySchema, valueSchema, keyValueEncodingType - ); - - if (keySchema instanceof StructSchema) { - keySchema.setSchemaInfoProvider(new SchemaInfoProvider() { - @Override - public CompletableFuture getSchemaByVersion(byte[] schemaVersion) { - return schemaInfoProvider.getSchemaByVersion(schemaVersion) - .thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getKey()); - } - - @Override - public CompletableFuture getLatestSchema() { - return CompletableFuture.completedFuture( - ((StructSchema) keySchema).schemaInfo); - } - - @Override - public String getTopicName() { - return "key-schema"; - } - }); - } - - if (valueSchema instanceof StructSchema) { - valueSchema.setSchemaInfoProvider(new SchemaInfoProvider() { - @Override - public CompletableFuture getSchemaByVersion(byte[] schemaVersion) { - return schemaInfoProvider.getSchemaByVersion(schemaVersion) - .thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getValue()); - } - - @Override - public CompletableFuture getLatestSchema() { - return CompletableFuture.completedFuture( - ((StructSchema) valueSchema).schemaInfo); - } - - @Override - public String getTopicName() { - return "value-schema"; - } - }); - } - this.schemaInfoProvider = new SchemaInfoProvider() { @Override public CompletableFuture getSchemaByVersion(byte[] schemaVersion) { @@ -159,6 +114,12 @@ public class KeyValueSchema implements Schema> { return "key-value-schema"; } }; + // if either key schema or value schema requires fetching schema info, + // we don't need to configure the key/value schema info right now. + // defer configuring the key/value schema info until `configureSchemaInfo` is called. + if (!requireFetchingSchemaInfo()) { + configureKeyValueSchemaInfo(); + } } // encode as bytes: [key.length][key.bytes][value.length][value.bytes] or [value.bytes] @@ -173,7 +134,6 @@ public class KeyValueSchema implements Schema> { } else { return valueSchema.encode(message.getValue()); } - } public KeyValue decode(byte[] bytes) { @@ -212,4 +172,68 @@ public class KeyValueSchema implements Schema> { this.schemaInfoProvider = schemaInfoProvider; } + @Override + public boolean requireFetchingSchemaInfo() { + return keySchema.requireFetchingSchemaInfo() || valueSchema.requireFetchingSchemaInfo(); + } + + @Override + public void configureSchemaInfo(String topicName, + String componentName, + SchemaInfo schemaInfo) { + KeyValue kvSchemaInfo = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo); + keySchema.configureSchemaInfo(topicName, "key", kvSchemaInfo.getKey()); + valueSchema.configureSchemaInfo(topicName, "value", kvSchemaInfo.getValue()); + configureKeyValueSchemaInfo(); + + if (null == this.schemaInfo) { + throw new RuntimeException( + "No key schema info or value schema info : key = " + keySchema.getSchemaInfo() + + ", value = " + valueSchema.getSchemaInfo()); + } + } + + private void configureKeyValueSchemaInfo() { + this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo( + keySchema, valueSchema, keyValueEncodingType + ); + + this.keySchema.setSchemaInfoProvider(new SchemaInfoProvider() { + @Override + public CompletableFuture getSchemaByVersion(byte[] schemaVersion) { + return schemaInfoProvider.getSchemaByVersion(schemaVersion) + .thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getKey()); + } + + @Override + public CompletableFuture getLatestSchema() { + return CompletableFuture.completedFuture( + ((StructSchema) keySchema).schemaInfo); + } + + @Override + public String getTopicName() { + return "key-schema"; + } + }); + + this.valueSchema.setSchemaInfoProvider(new SchemaInfoProvider() { + @Override + public CompletableFuture getSchemaByVersion(byte[] schemaVersion) { + return schemaInfoProvider.getSchemaByVersion(schemaVersion) + .thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getValue()); + } + + @Override + public CompletableFuture getLatestSchema() { + return CompletableFuture.completedFuture( + ((StructSchema) valueSchema).schemaInfo); + } + + @Override + public String getTopicName() { + return "value-schema"; + } + }); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java index 6d54d572044..120f5a75bb6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java @@ -108,20 +108,46 @@ public final class KeyValueSchemaInfo { Schema keySchema, Schema valueSchema, KeyValueEncodingType keyValueEncodingType) { + return encodeKeyValueSchemaInfo( + schemaName, + keySchema.getSchemaInfo(), + valueSchema.getSchemaInfo(), + keyValueEncodingType + ); + } + + /** + * Encode key & value into schema into a KeyValue schema. + * + * @param schemaName the final schema name + * @param keySchemaInfo the key schema info + * @param valueSchemaInfo the value schema info + * @param keyValueEncodingType the encoding type to encode and decode key value pair + * @return the final schema info + */ + public static SchemaInfo encodeKeyValueSchemaInfo(String schemaName, + SchemaInfo keySchemaInfo, + SchemaInfo valueSchemaInfo, + KeyValueEncodingType keyValueEncodingType) { checkNotNull(keyValueEncodingType, "Null encoding type is provided"); + if (keySchemaInfo == null || valueSchemaInfo == null) { + // schema is not ready + return null; + } + // process key/value schema data byte[] schemaData = KeyValue.encode( - keySchema.getSchemaInfo(), + keySchemaInfo, SCHEMA_INFO_WRITER, - valueSchema.getSchemaInfo(), + valueSchemaInfo, SCHEMA_INFO_WRITER ); // process key/value schema properties Map properties = new HashMap<>(); encodeSubSchemaInfoToParentSchemaProperties( - keySchema, + keySchemaInfo, KEY_SCHEMA_NAME, KEY_SCHEMA_TYPE, KEY_SCHEMA_PROPS, @@ -129,7 +155,7 @@ public final class KeyValueSchemaInfo { ); encodeSubSchemaInfoToParentSchemaProperties( - valueSchema, + valueSchemaInfo, VALUE_SCHEMA_NAME, VALUE_SCHEMA_TYPE, VALUE_SCHEMA_PROPS, @@ -143,16 +169,13 @@ public final class KeyValueSchemaInfo { .setType(SchemaType.KEY_VALUE) .setSchema(schemaData) .setProperties(properties); - } - private static void encodeSubSchemaInfoToParentSchemaProperties(Schema schema, + private static void encodeSubSchemaInfoToParentSchemaProperties(SchemaInfo schemaInfo, String schemaNameProperty, String schemaTypeProperty, String schemaPropsProperty, Map parentSchemaProperties) { - SchemaInfo schemaInfo = schema.getSchemaInfo(); - parentSchemaProperties.put(schemaNameProperty, schemaInfo.getName()); parentSchemaProperties.put(schemaTypeProperty, String.valueOf(schemaInfo.getType())); parentSchemaProperties.put( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java index c295a3b1ada..fab70d43531 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java @@ -59,7 +59,7 @@ public class GenericAvroSchema extends GenericSchemaImpl { if (schemaInfo != null) { log.info("Load schema reader for version({}), schema is : {}", SchemaUtils.getStringSchemaVersion(schemaVersion), - schemaInfo.getSchemaDefinition()); + schemaInfo); Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition()); Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema; return new GenericAvroReader( @@ -69,7 +69,7 @@ public class GenericAvroSchema extends GenericSchemaImpl { } else { log.warn("No schema found for version({}), use latest schema : {}", SchemaUtils.getStringSchemaVersion(schemaVersion), - this.schemaInfo.getSchemaDefinition()); + this.schemaInfo); return reader; } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java index 24acbeac6a0..477e3d808ef 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java @@ -25,14 +25,20 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import com.google.common.collect.Lists; +import java.util.List; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; +import org.apache.pulsar.client.impl.schema.KeyValueSchema; +import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar; import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.testng.annotations.Test; /** @@ -57,94 +63,143 @@ public class GenericSchemaImplTest { @Test public void testAutoAvroSchema() { - MultiVersionSchemaInfoProvider multiVersionGenericSchemaProvider = mock(MultiVersionSchemaInfoProvider.class); - AutoConsumeSchema decodeSchema = new AutoConsumeSchema(); + // configure encode schema Schema encodeSchema = Schema.AVRO(Foo.class); - GenericSchema genericSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo()); - genericSchema.setSchemaInfoProvider(multiVersionGenericSchemaProvider); - decodeSchema.setSchema(genericSchema); + + // configure the schema info provider + MultiVersionSchemaInfoProvider multiVersionGenericSchemaProvider = mock(MultiVersionSchemaInfoProvider.class); when(multiVersionGenericSchemaProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(CompletableFuture.completedFuture(genericSchema.getSchemaInfo())); + .thenReturn(CompletableFuture.completedFuture(encodeSchema.getSchemaInfo())); - testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); + // configure decode schema + AutoConsumeSchema decodeSchema = new AutoConsumeSchema(); + decodeSchema.configureSchemaInfo( + "test-topic", "topic", encodeSchema.getSchemaInfo() + ); + decodeSchema.setSchemaInfoProvider(multiVersionGenericSchemaProvider); + + testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); } @Test public void testAutoJsonSchema() { + // configure the schema info provider MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class); - Schema encodeSchema = Schema.JSON(Foo.class); - GenericSchema genericSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo()); - genericSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); - AutoConsumeSchema decodeSchema = new AutoConsumeSchema(); - decodeSchema.setSchema(genericSchema); GenericSchema genericAvroSchema = GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo()); when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) .thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo())); - testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); + + // configure encode schema + Schema encodeSchema = Schema.JSON(Foo.class); + + // configure decode schema + AutoConsumeSchema decodeSchema = new AutoConsumeSchema(); + decodeSchema.configureSchemaInfo("test-topic", "topic", encodeSchema.getSchemaInfo()); + decodeSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); + + testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); } - public void testAUTOEncodeAndDecodeGenericRecord(Schema encodeSchema, - Schema decodeSchema) { + private void testEncodeAndDecodeGenericRecord(Schema encodeSchema, + Schema decodeSchema) { int numRecords = 10; for (int i = 0; i < numRecords; i++) { - Foo foo = new Foo(); - foo.setField1("field-1-" + i); - foo.setField2("field-2-" + i); - foo.setField3(i); - Bar bar = new Bar(); - bar.setField1(i % 2 == 0); - foo.setField4(bar); - foo.setFieldUnableNull("fieldUnableNull-1-" + i); + Foo foo = newFoo(i); byte[] data = encodeSchema.encode(foo); log.info("Decoding : {}", new String(data, UTF_8)); - GenericRecord record = decodeSchema.decode(data, new byte[10]); - Object field1 = record.getField("field1"); - assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass()); - Object field2 = record.getField("field2"); - assertEquals("field-2-" + i, field2, "Field 2 is " + field2.getClass()); - Object field3 = record.getField("field3"); - assertEquals(i, field3, "Field 3 is " + field3.getClass()); - Object field4 = record.getField("field4"); - assertTrue(field4 instanceof GenericRecord); - GenericRecord field4Record = (GenericRecord) field4; - assertEquals(i % 2 == 0, field4Record.getField("field1")); - Object fieldUnableNull = record.getField("fieldUnableNull"); - assertEquals("fieldUnableNull-1-" + i, fieldUnableNull, "fieldUnableNull 1 is " + fieldUnableNull.getClass()); + GenericRecord record; + if (decodeSchema instanceof AutoConsumeSchema) { + record = decodeSchema.decode(data, new byte[0]); + } else { + record = decodeSchema.decode(data); + } + verifyFooRecord(record, i); } } - public void testEncodeAndDecodeGenericRecord(Schema encodeSchema, - Schema decodeSchema) { + @Test + public void testKeyValueSchema() { + // configure the schema info provider + MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class); + GenericSchema genericAvroSchema = GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo()); + when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) + .thenReturn(CompletableFuture.completedFuture( + KeyValueSchemaInfo.encodeKeyValueSchemaInfo( + genericAvroSchema, + genericAvroSchema, + KeyValueEncodingType.INLINE + ) + )); + + List> encodeSchemas = Lists.newArrayList( + Schema.JSON(Foo.class), + Schema.AVRO(Foo.class) + ); + + for (Schema keySchema : encodeSchemas) { + for (Schema valueSchema : encodeSchemas) { + // configure encode schema + Schema> kvSchema = KeyValueSchema.of( + keySchema, valueSchema + ); + + // configure decode schema + Schema> decodeSchema = KeyValueSchema.of( + Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME() + ); + decodeSchema.configureSchemaInfo( + "test-topic", "topic",kvSchema.getSchemaInfo() + ); + decodeSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); + + testEncodeAndDecodeKeyValues(kvSchema, decodeSchema); + } + } + + } + + private void testEncodeAndDecodeKeyValues(Schema> encodeSchema, + Schema> decodeSchema) { int numRecords = 10; for (int i = 0; i < numRecords; i++) { - Foo foo = new Foo(); - foo.setField1("field-1-" + i); - foo.setField2("field-2-" + i); - foo.setField3(i); - Bar bar = new Bar(); - bar.setField1(i % 2 == 0); - foo.setField4(bar); - foo.setFieldUnableNull("fieldUnableNull-1-" + i); - byte[] data = encodeSchema.encode(foo); - - log.info("Decoding : {}", new String(data, UTF_8)); + Foo foo = newFoo(i); + byte[] data = encodeSchema.encode(new KeyValue<>(foo, foo)); - GenericRecord record = decodeSchema.decode(data); - Object field1 = record.getField("field1"); - assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass()); - Object field2 = record.getField("field2"); - assertEquals("field-2-" + i, field2, "Field 2 is " + field2.getClass()); - Object field3 = record.getField("field3"); - assertEquals(i, field3, "Field 3 is " + field3.getClass()); - Object field4 = record.getField("field4"); - assertTrue(field4 instanceof GenericRecord); - GenericRecord field4Record = (GenericRecord) field4; - assertEquals(i % 2 == 0, field4Record.getField("field1")); - Object fieldUnableNull = record.getField("fieldUnableNull"); - assertEquals("fieldUnableNull-1-" + i, fieldUnableNull, "fieldUnableNull 1 is " + fieldUnableNull.getClass()); + KeyValue kv = decodeSchema.decode(data, new byte[0]); + verifyFooRecord(kv.getKey(), i); + verifyFooRecord(kv.getValue(), i); } } + private static Foo newFoo(int i) { + Foo foo = new Foo(); + foo.setField1("field-1-" + i); + foo.setField2("field-2-" + i); + foo.setField3(i); + Bar bar = new Bar(); + bar.setField1(i % 2 == 0); + foo.setField4(bar); + foo.setFieldUnableNull("fieldUnableNull-1-" + i); + + return foo; + } + + private static void verifyFooRecord(GenericRecord record, int i) { + Object field1 = record.getField("field1"); + assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass()); + Object field2 = record.getField("field2"); + assertEquals("field-2-" + i, field2, "Field 2 is " + field2.getClass()); + Object field3 = record.getField("field3"); + assertEquals(i, field3, "Field 3 is " + field3.getClass()); + Object field4 = record.getField("field4"); + assertTrue(field4 instanceof GenericRecord); + GenericRecord field4Record = (GenericRecord) field4; + assertEquals(i % 2 == 0, field4Record.getField("field1")); + Object fieldUnableNull = record.getField("fieldUnableNull"); + assertEquals("fieldUnableNull-1-" + i, fieldUnableNull, + "fieldUnableNull 1 is " + fieldUnableNull.getClass()); + } + } -- GitLab