未验证 提交 dd7cc890 编写于 作者: S Sijie Guo 提交者: GitHub

[schema] KeyValue schema support using AUTO_CONSUME as key/value schema (#4839)

*Motivation*

Currently KeyValue schema doesn't support using AUTO_CONSUME.

This PR is to add this support.

This PR is based on #4836

*Changes*

- refactor a bit on Schema interface to support fetching schema info for both AutoConsumeSchema and KeyValueSchema before subscribing
- add AUTO_CONSUME support to KeyValueSchema
- add tests
上级 3c9136b2
......@@ -24,6 +24,9 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
......@@ -33,6 +36,14 @@ import org.testng.annotations.Test;
public class SimpleSchemaTest extends ProducerConsumerBase {
@DataProvider(name = "batchingModes")
public static Object[][] batchingModes() {
return new Object[][] {
{ true },
{ false }
};
}
@DataProvider(name = "schemaValidationModes")
public static Object[][] schemaValidationModes() {
return new Object[][] { { true }, { false } };
......@@ -50,6 +61,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
@Override
protected void setup() throws Exception {
conf.setSchemaValidationEnforced(schemaValidationEnforced);
this.isTcpLookup = true;
super.internalSetup();
super.producerBaseSetup();
}
......@@ -252,4 +264,126 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
assertEquals(data.getValue(), new V1Data(1));
}
}
@Test(dataProvider = "batchingModes")
public void testAutoConsume(boolean batching) throws Exception {
String topic = "my-property/my-ns/schema-test-auto-consume-" + batching;
try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic)
.enableBatching(batching)
.create();
Consumer<GenericRecord> c = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionName("sub1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
p.sendAsync(new V1Data(i));
}
p.flush();
for (int i = 0; i < numMessages; i++) {
Message<GenericRecord> data = c.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getField("i"), i);
}
}
}
@Test(dataProvider = "batchingModes")
public void testAutoKeyValueConsume(boolean batching) throws Exception {
String topic = "my-property/my-ns/schema-test-auto-keyvalue-consume-" + batching;
Schema<KeyValue<V1Data, V1Data>> pojoSchema = Schema.KeyValue(
Schema.AVRO(V1Data.class),
Schema.AVRO(V1Data.class),
KeyValueEncodingType.SEPARATED);
try (Producer<KeyValue<V1Data, V1Data>> p = pulsarClient.newProducer(pojoSchema)
.topic(topic)
.enableBatching(batching)
.create();
Consumer<KeyValue<GenericRecord, GenericRecord>> c1 = pulsarClient.newConsumer(
Schema.KeyValue(
Schema.AUTO_CONSUME(),
Schema.AUTO_CONSUME(),
KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("sub1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Consumer<KeyValue<V1Data, V1Data>> c2 = pulsarClient.newConsumer(
Schema.KeyValue(
Schema.AVRO(V1Data.class),
Schema.AVRO(V1Data.class),
KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("sub2")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Consumer<KeyValue<GenericRecord, V1Data>> c3 = pulsarClient.newConsumer(
Schema.KeyValue(
Schema.AUTO_CONSUME(),
Schema.AVRO(V1Data.class),
KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("sub3")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Consumer<KeyValue<V1Data, GenericRecord>> c4 = pulsarClient.newConsumer(
Schema.KeyValue(
Schema.AVRO(V1Data.class),
Schema.AUTO_CONSUME(),
KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("sub4")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
) {
int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
p.sendAsync(new KeyValue<>(new V1Data(i * 100), new V1Data(i * 1000)));
}
p.flush();
// verify c1
for (int i = 0; i < numMessages; i++) {
Message<KeyValue<GenericRecord, GenericRecord>> data = c1.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().getField("i"), i * 100);
assertEquals(data.getValue().getValue().getField("i"), i * 1000);
}
// verify c2
for (int i = 0; i < numMessages; i++) {
Message<KeyValue<V1Data, V1Data>> data = c2.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().i, i * 100);
assertEquals(data.getValue().getValue().i, i * 1000);
}
// verify c3
for (int i = 0; i < numMessages; i++) {
Message<KeyValue<GenericRecord, V1Data>> data = c3.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().getField("i"), i * 100);
assertEquals(data.getValue().getValue().i, i * 1000);
}
// verify c4
for (int i = 0; i < numMessages; i++) {
Message<KeyValue<V1Data, GenericRecord>> data = c4.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().i, i * 100);
assertEquals(data.getValue().getValue().getField("i"), i * 1000);
}
}
}
}
......@@ -116,6 +116,28 @@ public interface Schema<T> {
*/
SchemaInfo getSchemaInfo();
/**
* Check if this schema requires fetching schema info to configure the schema.
*
* @return true if the schema requires fetching schema info to configure the schema,
* otherwise false.
*/
default boolean requireFetchingSchemaInfo() {
return false;
}
/**
* Configure the schema to use the provided schema info.
*
* @param topic topic name
* @param componentName component name
* @param schemaInfo schema info
*/
default void configureSchemaInfo(String topic, String componentName,
SchemaInfo schemaInfo) {
// no-op
}
/**
* Schema that doesn't perform any encoding on the message payloads. Accepts a byte array and it passes it through.
*/
......
......@@ -60,7 +60,6 @@ import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
......@@ -70,7 +69,6 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
......@@ -79,7 +77,6 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
......@@ -727,6 +724,7 @@ public class PulsarClientImpl implements PulsarClient {
return schemaProviderLoadingCache;
}
@SuppressWarnings("unchecked")
protected CompletableFuture<Void> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl,
Schema schema,
String topicName) {
......@@ -739,20 +737,20 @@ public class PulsarClientImpl implements PulsarClient {
return FutureUtil.failedFuture(e.getCause());
}
if (schema instanceof AutoConsumeSchema) {
if (schema.requireFetchingSchemaInfo()) {
return schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> {
if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){
if (null == schemaInfo) {
// no schema info is found
return FutureUtil.failedFuture(
new RuntimeException("Currently schema detection only works"
+ " for topics with avro or json schemas"));
new PulsarClientException.NotFoundException(
"No latest schema found for topic " + topicName));
}
try {
log.info("Configuring schema for topic {} : {}", topicName, schemaInfo);
schema.configureSchemaInfo(topicName, "topic", schemaInfo);
} catch (RuntimeException re) {
return FutureUtil.failedFuture(re);
}
// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
// to decode the messages.
GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfo, false /*useProvidedSchemaAsReaderSchema*/);
log.info("Auto detected schema for topic {} : {}",
topicName, schemaInfo.getSchemaDefinition());
((AutoConsumeSchema) schema).setSchema(genericSchema);
schema.setSchemaInfoProvider(schemaInfoProvider);
return CompletableFuture.completedFuture(null);
});
......
......@@ -18,18 +18,22 @@
*/
package org.apache.pulsar.client.impl.schema;
import static com.google.common.base.Preconditions.checkState;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
/**
* Auto detect schema.
*/
@Slf4j
public class AutoConsumeSchema implements Schema<GenericRecord> {
private Schema<GenericRecord> schema;
......@@ -80,6 +84,27 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
return schema.getSchemaInfo();
}
@Override
public boolean requireFetchingSchemaInfo() {
return true;
}
@Override
public void configureSchemaInfo(String topicName,
String componentName,
SchemaInfo schemaInfo) {
if (schemaInfo.getType() != SchemaType.AVRO
&& schemaInfo.getType() != SchemaType.JSON) {
throw new RuntimeException("Currently auto consume only works for topics with avro or json schemas");
}
// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
// to decode the messages.
GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfo, false /*useProvidedSchemaAsReaderSchema*/);
setSchema(genericSchema);
log.info("Configure {} schema for topic {} : {}",
componentName, topicName, schemaInfo.getSchemaDefinition());
}
public static Schema<?> getSchema(SchemaInfo schemaInfo) {
switch (schemaInfo.getType()) {
case INT8:
......@@ -106,11 +131,15 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
return TimeSchema.of();
case TIMESTAMP:
return TimestampSchema.of();
case KEY_VALUE:
return KeyValueSchema.kvBytes();
case JSON:
case AVRO:
return GenericSchemaImpl.of(schemaInfo);
case KEY_VALUE:
KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo =
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo);
Schema<?> keySchema = getSchema(kvSchemaInfo.getKey());
Schema<?> valueSchema = getSchema(kvSchemaInfo.getValue());
return KeyValueSchema.of(keySchema, valueSchema);
default:
throw new IllegalArgumentException("Retrieve schema instance from schema info for type '"
+ schemaInfo.getType() + "' is not supported yet");
......
......@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
......@@ -33,6 +34,7 @@ import org.apache.pulsar.common.schema.SchemaType;
/**
* [Key, Value] pair schema definition
*/
@Slf4j
public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
@Getter
......@@ -40,13 +42,12 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
@Getter
private final Schema<V> valueSchema;
// schemaInfo combined by KeySchemaInfo and ValueSchemaInfo:
// [keyInfo.length][keyInfo][valueInfo.length][ValueInfo]
private final SchemaInfo schemaInfo;
@Getter
private final KeyValueEncodingType keyValueEncodingType;
// schemaInfo combined by KeySchemaInfo and ValueSchemaInfo:
// [keyInfo.length][keyInfo][valueInfo.length][ValueInfo]
private SchemaInfo schemaInfo;
protected SchemaInfoProvider schemaInfoProvider;
/**
......@@ -97,52 +98,6 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
this.keySchema = keySchema;
this.valueSchema = valueSchema;
this.keyValueEncodingType = keyValueEncodingType;
this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
keySchema, valueSchema, keyValueEncodingType
);
if (keySchema instanceof StructSchema) {
keySchema.setSchemaInfoProvider(new SchemaInfoProvider() {
@Override
public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
return schemaInfoProvider.getSchemaByVersion(schemaVersion)
.thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getKey());
}
@Override
public CompletableFuture<SchemaInfo> getLatestSchema() {
return CompletableFuture.completedFuture(
((StructSchema<K>) keySchema).schemaInfo);
}
@Override
public String getTopicName() {
return "key-schema";
}
});
}
if (valueSchema instanceof StructSchema) {
valueSchema.setSchemaInfoProvider(new SchemaInfoProvider() {
@Override
public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
return schemaInfoProvider.getSchemaByVersion(schemaVersion)
.thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getValue());
}
@Override
public CompletableFuture<SchemaInfo> getLatestSchema() {
return CompletableFuture.completedFuture(
((StructSchema<V>) valueSchema).schemaInfo);
}
@Override
public String getTopicName() {
return "value-schema";
}
});
}
this.schemaInfoProvider = new SchemaInfoProvider() {
@Override
public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
......@@ -159,6 +114,12 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
return "key-value-schema";
}
};
// if either key schema or value schema requires fetching schema info,
// we don't need to configure the key/value schema info right now.
// defer configuring the key/value schema info until `configureSchemaInfo` is called.
if (!requireFetchingSchemaInfo()) {
configureKeyValueSchemaInfo();
}
}
// encode as bytes: [key.length][key.bytes][value.length][value.bytes] or [value.bytes]
......@@ -173,7 +134,6 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
} else {
return valueSchema.encode(message.getValue());
}
}
public KeyValue<K, V> decode(byte[] bytes) {
......@@ -212,4 +172,68 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
this.schemaInfoProvider = schemaInfoProvider;
}
@Override
public boolean requireFetchingSchemaInfo() {
return keySchema.requireFetchingSchemaInfo() || valueSchema.requireFetchingSchemaInfo();
}
@Override
public void configureSchemaInfo(String topicName,
String componentName,
SchemaInfo schemaInfo) {
KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo);
keySchema.configureSchemaInfo(topicName, "key", kvSchemaInfo.getKey());
valueSchema.configureSchemaInfo(topicName, "value", kvSchemaInfo.getValue());
configureKeyValueSchemaInfo();
if (null == this.schemaInfo) {
throw new RuntimeException(
"No key schema info or value schema info : key = " + keySchema.getSchemaInfo()
+ ", value = " + valueSchema.getSchemaInfo());
}
}
private void configureKeyValueSchemaInfo() {
this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
keySchema, valueSchema, keyValueEncodingType
);
this.keySchema.setSchemaInfoProvider(new SchemaInfoProvider() {
@Override
public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
return schemaInfoProvider.getSchemaByVersion(schemaVersion)
.thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getKey());
}
@Override
public CompletableFuture<SchemaInfo> getLatestSchema() {
return CompletableFuture.completedFuture(
((StructSchema<K>) keySchema).schemaInfo);
}
@Override
public String getTopicName() {
return "key-schema";
}
});
this.valueSchema.setSchemaInfoProvider(new SchemaInfoProvider() {
@Override
public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
return schemaInfoProvider.getSchemaByVersion(schemaVersion)
.thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getValue());
}
@Override
public CompletableFuture<SchemaInfo> getLatestSchema() {
return CompletableFuture.completedFuture(
((StructSchema<V>) valueSchema).schemaInfo);
}
@Override
public String getTopicName() {
return "value-schema";
}
});
}
}
......@@ -108,20 +108,46 @@ public final class KeyValueSchemaInfo {
Schema<K> keySchema,
Schema<V> valueSchema,
KeyValueEncodingType keyValueEncodingType) {
return encodeKeyValueSchemaInfo(
schemaName,
keySchema.getSchemaInfo(),
valueSchema.getSchemaInfo(),
keyValueEncodingType
);
}
/**
* Encode key & value into schema into a KeyValue schema.
*
* @param schemaName the final schema name
* @param keySchemaInfo the key schema info
* @param valueSchemaInfo the value schema info
* @param keyValueEncodingType the encoding type to encode and decode key value pair
* @return the final schema info
*/
public static SchemaInfo encodeKeyValueSchemaInfo(String schemaName,
SchemaInfo keySchemaInfo,
SchemaInfo valueSchemaInfo,
KeyValueEncodingType keyValueEncodingType) {
checkNotNull(keyValueEncodingType, "Null encoding type is provided");
if (keySchemaInfo == null || valueSchemaInfo == null) {
// schema is not ready
return null;
}
// process key/value schema data
byte[] schemaData = KeyValue.encode(
keySchema.getSchemaInfo(),
keySchemaInfo,
SCHEMA_INFO_WRITER,
valueSchema.getSchemaInfo(),
valueSchemaInfo,
SCHEMA_INFO_WRITER
);
// process key/value schema properties
Map<String, String> properties = new HashMap<>();
encodeSubSchemaInfoToParentSchemaProperties(
keySchema,
keySchemaInfo,
KEY_SCHEMA_NAME,
KEY_SCHEMA_TYPE,
KEY_SCHEMA_PROPS,
......@@ -129,7 +155,7 @@ public final class KeyValueSchemaInfo {
);
encodeSubSchemaInfoToParentSchemaProperties(
valueSchema,
valueSchemaInfo,
VALUE_SCHEMA_NAME,
VALUE_SCHEMA_TYPE,
VALUE_SCHEMA_PROPS,
......@@ -143,16 +169,13 @@ public final class KeyValueSchemaInfo {
.setType(SchemaType.KEY_VALUE)
.setSchema(schemaData)
.setProperties(properties);
}
private static void encodeSubSchemaInfoToParentSchemaProperties(Schema<?> schema,
private static void encodeSubSchemaInfoToParentSchemaProperties(SchemaInfo schemaInfo,
String schemaNameProperty,
String schemaTypeProperty,
String schemaPropsProperty,
Map<String, String> parentSchemaProperties) {
SchemaInfo schemaInfo = schema.getSchemaInfo();
parentSchemaProperties.put(schemaNameProperty, schemaInfo.getName());
parentSchemaProperties.put(schemaTypeProperty, String.valueOf(schemaInfo.getType()));
parentSchemaProperties.put(
......
......@@ -59,7 +59,7 @@ public class GenericAvroSchema extends GenericSchemaImpl {
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
schemaInfo.getSchemaDefinition());
schemaInfo);
Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
return new GenericAvroReader(
......@@ -69,7 +69,7 @@ public class GenericAvroSchema extends GenericSchemaImpl {
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
this.schemaInfo.getSchemaDefinition());
this.schemaInfo);
return reader;
}
}
......
......@@ -25,14 +25,20 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.annotations.Test;
/**
......@@ -57,94 +63,143 @@ public class GenericSchemaImplTest {
@Test
public void testAutoAvroSchema() {
MultiVersionSchemaInfoProvider multiVersionGenericSchemaProvider = mock(MultiVersionSchemaInfoProvider.class);
AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
// configure encode schema
Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
GenericSchema genericSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
genericSchema.setSchemaInfoProvider(multiVersionGenericSchemaProvider);
decodeSchema.setSchema(genericSchema);
// configure the schema info provider
MultiVersionSchemaInfoProvider multiVersionGenericSchemaProvider = mock(MultiVersionSchemaInfoProvider.class);
when(multiVersionGenericSchemaProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(CompletableFuture.completedFuture(genericSchema.getSchemaInfo()));
.thenReturn(CompletableFuture.completedFuture(encodeSchema.getSchemaInfo()));
testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
// configure decode schema
AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
decodeSchema.configureSchemaInfo(
"test-topic", "topic", encodeSchema.getSchemaInfo()
);
decodeSchema.setSchemaInfoProvider(multiVersionGenericSchemaProvider);
testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
@Test
public void testAutoJsonSchema() {
// configure the schema info provider
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
GenericSchema genericSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
genericSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
decodeSchema.setSchema(genericSchema);
GenericSchema genericAvroSchema = GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo()));
testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
// configure encode schema
Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
// configure decode schema
AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
decodeSchema.configureSchemaInfo("test-topic", "topic", encodeSchema.getSchemaInfo());
decodeSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
public void testAUTOEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
Schema<GenericRecord> decodeSchema) {
private void testEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
Schema<GenericRecord> decodeSchema) {
int numRecords = 10;
for (int i = 0; i < numRecords; i++) {
Foo foo = new Foo();
foo.setField1("field-1-" + i);
foo.setField2("field-2-" + i);
foo.setField3(i);
Bar bar = new Bar();
bar.setField1(i % 2 == 0);
foo.setField4(bar);
foo.setFieldUnableNull("fieldUnableNull-1-" + i);
Foo foo = newFoo(i);
byte[] data = encodeSchema.encode(foo);
log.info("Decoding : {}", new String(data, UTF_8));
GenericRecord record = decodeSchema.decode(data, new byte[10]);
Object field1 = record.getField("field1");
assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass());
Object field2 = record.getField("field2");
assertEquals("field-2-" + i, field2, "Field 2 is " + field2.getClass());
Object field3 = record.getField("field3");
assertEquals(i, field3, "Field 3 is " + field3.getClass());
Object field4 = record.getField("field4");
assertTrue(field4 instanceof GenericRecord);
GenericRecord field4Record = (GenericRecord) field4;
assertEquals(i % 2 == 0, field4Record.getField("field1"));
Object fieldUnableNull = record.getField("fieldUnableNull");
assertEquals("fieldUnableNull-1-" + i, fieldUnableNull, "fieldUnableNull 1 is " + fieldUnableNull.getClass());
GenericRecord record;
if (decodeSchema instanceof AutoConsumeSchema) {
record = decodeSchema.decode(data, new byte[0]);
} else {
record = decodeSchema.decode(data);
}
verifyFooRecord(record, i);
}
}
public void testEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
Schema<GenericRecord> decodeSchema) {
@Test
public void testKeyValueSchema() {
// configure the schema info provider
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
GenericSchema genericAvroSchema = GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(CompletableFuture.completedFuture(
KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
genericAvroSchema,
genericAvroSchema,
KeyValueEncodingType.INLINE
)
));
List<Schema<Foo>> encodeSchemas = Lists.newArrayList(
Schema.JSON(Foo.class),
Schema.AVRO(Foo.class)
);
for (Schema<Foo> keySchema : encodeSchemas) {
for (Schema<Foo> valueSchema : encodeSchemas) {
// configure encode schema
Schema<KeyValue<Foo, Foo>> kvSchema = KeyValueSchema.of(
keySchema, valueSchema
);
// configure decode schema
Schema<KeyValue<GenericRecord, GenericRecord>> decodeSchema = KeyValueSchema.of(
Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME()
);
decodeSchema.configureSchemaInfo(
"test-topic", "topic",kvSchema.getSchemaInfo()
);
decodeSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
testEncodeAndDecodeKeyValues(kvSchema, decodeSchema);
}
}
}
private void testEncodeAndDecodeKeyValues(Schema<KeyValue<Foo, Foo>> encodeSchema,
Schema<KeyValue<GenericRecord, GenericRecord>> decodeSchema) {
int numRecords = 10;
for (int i = 0; i < numRecords; i++) {
Foo foo = new Foo();
foo.setField1("field-1-" + i);
foo.setField2("field-2-" + i);
foo.setField3(i);
Bar bar = new Bar();
bar.setField1(i % 2 == 0);
foo.setField4(bar);
foo.setFieldUnableNull("fieldUnableNull-1-" + i);
byte[] data = encodeSchema.encode(foo);
log.info("Decoding : {}", new String(data, UTF_8));
Foo foo = newFoo(i);
byte[] data = encodeSchema.encode(new KeyValue<>(foo, foo));
GenericRecord record = decodeSchema.decode(data);
Object field1 = record.getField("field1");
assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass());
Object field2 = record.getField("field2");
assertEquals("field-2-" + i, field2, "Field 2 is " + field2.getClass());
Object field3 = record.getField("field3");
assertEquals(i, field3, "Field 3 is " + field3.getClass());
Object field4 = record.getField("field4");
assertTrue(field4 instanceof GenericRecord);
GenericRecord field4Record = (GenericRecord) field4;
assertEquals(i % 2 == 0, field4Record.getField("field1"));
Object fieldUnableNull = record.getField("fieldUnableNull");
assertEquals("fieldUnableNull-1-" + i, fieldUnableNull, "fieldUnableNull 1 is " + fieldUnableNull.getClass());
KeyValue<GenericRecord, GenericRecord> kv = decodeSchema.decode(data, new byte[0]);
verifyFooRecord(kv.getKey(), i);
verifyFooRecord(kv.getValue(), i);
}
}
private static Foo newFoo(int i) {
Foo foo = new Foo();
foo.setField1("field-1-" + i);
foo.setField2("field-2-" + i);
foo.setField3(i);
Bar bar = new Bar();
bar.setField1(i % 2 == 0);
foo.setField4(bar);
foo.setFieldUnableNull("fieldUnableNull-1-" + i);
return foo;
}
private static void verifyFooRecord(GenericRecord record, int i) {
Object field1 = record.getField("field1");
assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass());
Object field2 = record.getField("field2");
assertEquals("field-2-" + i, field2, "Field 2 is " + field2.getClass());
Object field3 = record.getField("field3");
assertEquals(i, field3, "Field 3 is " + field3.getClass());
Object field4 = record.getField("field4");
assertTrue(field4 instanceof GenericRecord);
GenericRecord field4Record = (GenericRecord) field4;
assertEquals(i % 2 == 0, field4Record.getField("field1"));
Object fieldUnableNull = record.getField("fieldUnableNull");
assertEquals("fieldUnableNull-1-" + i, fieldUnableNull,
"fieldUnableNull 1 is " + fieldUnableNull.getClass());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册