From 3ef41c454fc3f4b5b798d2ccee31977e09a99c37 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Mon, 29 Jul 2019 10:24:32 +0800 Subject: [PATCH] Improve SchemaInfoProvider to fetch schema info asynchronously (#4836) *Motivation* Currently fetching schema information is done synchronously. It is called in netty callback threads and will potentially block async operations. *Modifications* Make most of the operations asynchronously in SchemaInfoProvider. (cherry picked from commit 91c4254cae3be91774220c9fea145d3e433b6a01) --- .../client/api/schema/SchemaInfoProvider.java | 5 +- .../client/impl/MultiTopicsConsumerImpl.java | 18 ++- .../pulsar/client/impl/PulsarClientImpl.java | 58 +++---- .../pulsar/client/impl/schema/AvroSchema.java | 2 +- .../client/impl/schema/KeyValueSchema.java | 31 ++-- .../client/impl/schema/StructSchema.java | 21 +++ .../schema/generic/GenericAvroSchema.java | 2 +- .../schema/generic/GenericJsonSchema.java | 2 +- .../MultiVersionSchemaInfoProvider.java | 58 +++---- .../pulsar/client/impl/MessageImplTest.java | 13 +- .../SupportVersioningAvroSchemaTest.java | 3 +- .../SupportVersioningKeyValueSchemaTest.java | 5 +- .../schema/generic/GenericAvroSchemaTest.java | 3 +- .../schema/generic/GenericSchemaImplTest.java | 5 +- .../MultiVersionSchemaInfoProviderTest.java | 4 +- .../protocol/schema/BytesSchemaVersion.java | 147 +++++++++++++++++- 16 files changed, 279 insertions(+), 98 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java index 81e4f44b264..ac8f5da7149 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api.schema; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.common.schema.SchemaInfo; /** @@ -31,14 +32,14 @@ public interface SchemaInfoProvider { * @param schemaVersion schema version * @return schema info of the provided schemaVersion */ - SchemaInfo getSchemaByVersion(byte[] schemaVersion); + CompletableFuture getSchemaByVersion(byte[] schemaVersion); /** * Retrieve the latest schema info. * * @return the latest schema */ - SchemaInfo getLatestSchema(); + CompletableFuture getLatestSchema(); /** * Retrieve the topic name. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index e50e8bc557e..3521eb4e871 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -730,19 +730,21 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { } private void subscribeTopicPartitions(CompletableFuture subscribeResult, String topicName, int numPartitions) { + client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((ignored, cause) -> { + if (null == cause) { + doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions); + } else { + subscribeResult.completeExceptionally(cause); + } + }); + } + + private void doSubscribeTopicPartitions(CompletableFuture subscribeResult, String topicName, int numPartitions) { if (log.isDebugEnabled()) { log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions); } List>> futureList; - - try { - client.preProcessSchemaBeforeSubscribe(client, schema, topicName); - } catch (Throwable t) { - subscribeResult.completeExceptionally(t); - return; - } - if (numPartitions > 1) { this.topics.putIfAbsent(topicName, numPartitions); allTopicPartitionsNumber.addAndGet(numPartitions); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index bceaf2c5a3c..134bb7774f1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -328,12 +328,8 @@ public class PulsarClientImpl implements PulsarClient { } private CompletableFuture> singleTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { - try { - preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic()); - } catch (Throwable t) { - return FutureUtil.failedFuture(t); - } - return doSingleTopicSubscribeAsync(conf, schema, interceptors); + return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic()) + .thenCompose(ignored -> doSingleTopicSubscribeAsync(conf, schema, interceptors)); } private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { @@ -444,13 +440,10 @@ public class PulsarClientImpl implements PulsarClient { } public CompletableFuture> createReaderAsync(ReaderConfigurationData conf, Schema schema) { - try { - preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName()); - } catch (Throwable t) { - return FutureUtil.failedFuture(t); - } - return doCreateReaderAsync(conf, schema); + return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName()) + .thenCompose(ignored -> doCreateReaderAsync(conf, schema)); } + CompletableFuture> doCreateReaderAsync(ReaderConfigurationData conf, Schema schema) { if (state.get() != State.Open) { return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed")); @@ -734,33 +727,40 @@ public class PulsarClientImpl implements PulsarClient { return schemaProviderLoadingCache; } - protected void preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, Schema schema, String topicName) throws Throwable { + protected CompletableFuture preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, + Schema schema, + String topicName) { if (schema != null && schema.supportSchemaVersioning()) { - SchemaInfoProvider schemaInfoProvider = null; + final SchemaInfoProvider schemaInfoProvider; try { schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName); } catch (ExecutionException e) { log.error("Failed to load schema info provider for topic {}", topicName, e); - throw e.getCause(); + return FutureUtil.failedFuture(e.getCause()); } if (schema instanceof AutoConsumeSchema) { - SchemaInfo schemaInfo = schemaInfoProvider.getLatestSchema(); - if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){ - throw new RuntimeException("Currently schema detection only works for topics with avro schemas"); - } - - // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader - // to decode the messages. - GenericSchema genericSchema = GenericSchemaImpl.of( - schemaInfoProvider.getLatestSchema(), false /*useProvidedSchemaAsReaderSchema*/); - log.info("Auto detected schema for topic {} : {}", - topicName, schemaInfo.getSchemaDefinition()); - ((AutoConsumeSchema) schema).setSchema(genericSchema); + return schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> { + if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){ + return FutureUtil.failedFuture( + new RuntimeException("Currently schema detection only works" + + " for topics with avro or json schemas")); + } + + // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader + // to decode the messages. + GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfo, false /*useProvidedSchemaAsReaderSchema*/); + log.info("Auto detected schema for topic {} : {}", + topicName, schemaInfo.getSchemaDefinition()); + ((AutoConsumeSchema) schema).setSchema(genericSchema); + schema.setSchemaInfoProvider(schemaInfoProvider); + return CompletableFuture.completedFuture(null); + }); + } else { + schema.setSchemaInfoProvider(schemaInfoProvider); } - schema.setSchemaInfoProvider(schemaInfoProvider); } - + return CompletableFuture.completedFuture(null); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java index 2944e14e77f..4f4ae5a49d2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java @@ -100,7 +100,7 @@ public class AvroSchema extends StructSchema { @Override protected SchemaReader loadReader(byte[] schemaVersion) { - SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion); + SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion); if (schemaInfo != null) { log.info("Load schema reader for version({}), schema is : {}", SchemaUtils.getStringSchemaVersion(schemaVersion), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java index 7eb47570b32..fd7dcaa1e9e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.schema; import static com.google.common.base.Preconditions.checkArgument; +import java.util.concurrent.CompletableFuture; import lombok.Getter; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; @@ -103,14 +104,15 @@ public class KeyValueSchema implements Schema> { if (keySchema instanceof StructSchema) { keySchema.setSchemaInfoProvider(new SchemaInfoProvider() { @Override - public SchemaInfo getSchemaByVersion(byte[] schemaVersion) { - SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion); - return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getKey(); + public CompletableFuture getSchemaByVersion(byte[] schemaVersion) { + return schemaInfoProvider.getSchemaByVersion(schemaVersion) + .thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getKey()); } @Override - public SchemaInfo getLatestSchema() { - return ((StructSchema) keySchema).schemaInfo; + public CompletableFuture getLatestSchema() { + return CompletableFuture.completedFuture( + ((StructSchema) keySchema).schemaInfo); } @Override @@ -123,14 +125,15 @@ public class KeyValueSchema implements Schema> { if (valueSchema instanceof StructSchema) { valueSchema.setSchemaInfoProvider(new SchemaInfoProvider() { @Override - public SchemaInfo getSchemaByVersion(byte[] schemaVersion) { - SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion); - return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getValue(); + public CompletableFuture getSchemaByVersion(byte[] schemaVersion) { + return schemaInfoProvider.getSchemaByVersion(schemaVersion) + .thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getValue()); } @Override - public SchemaInfo getLatestSchema() { - return ((StructSchema) valueSchema).schemaInfo; + public CompletableFuture getLatestSchema() { + return CompletableFuture.completedFuture( + ((StructSchema) valueSchema).schemaInfo); } @Override @@ -142,13 +145,13 @@ public class KeyValueSchema implements Schema> { this.schemaInfoProvider = new SchemaInfoProvider() { @Override - public SchemaInfo getSchemaByVersion(byte[] schemaVersion) { - return schemaInfo; + public CompletableFuture getSchemaByVersion(byte[] schemaVersion) { + return CompletableFuture.completedFuture(schemaInfo); } @Override - public SchemaInfo getLatestSchema() { - return schemaInfo; + public CompletableFuture getLatestSchema() { + return CompletableFuture.completedFuture(schemaInfo); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java index dd618ae6763..4ec9fc8910a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java @@ -29,6 +29,7 @@ import com.google.common.cache.LoadingCache; import org.apache.avro.Schema.Parser; import org.apache.avro.reflect.ReflectData; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.SchemaDefinition; @@ -139,6 +140,26 @@ public abstract class StructSchema implements Schema { */ protected abstract SchemaReader loadReader(byte[] schemaVersion); + /** + * TODO: think about how to make this async + */ + protected SchemaInfo getSchemaInfoByVersion(byte[] schemaVersion) { + try { + return schemaInfoProvider.getSchemaByVersion(schemaVersion).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SerializationException( + "Interrupted at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion), + e + ); + } catch (ExecutionException e) { + throw new SerializationException( + "Failed at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion), + e.getCause() + ); + } + } + protected void setWriter(SchemaWriter writer) { this.writer = writer; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java index 2fa6829a8ec..c295a3b1ada 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java @@ -55,7 +55,7 @@ public class GenericAvroSchema extends GenericSchemaImpl { @Override protected SchemaReader loadReader(byte[] schemaVersion) { - SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion); + SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion); if (schemaInfo != null) { log.info("Load schema reader for version({}), schema is : {}", SchemaUtils.getStringSchemaVersion(schemaVersion), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java index 3b825918928..307edd50649 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java @@ -48,7 +48,7 @@ class GenericJsonSchema extends GenericSchemaImpl { @Override protected SchemaReader loadReader(byte[] schemaVersion) { - SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion); + SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion); if (schemaInfo != null) { log.info("Load schema reader for version({}), schema is : {}", SchemaUtils.getStringSchemaVersion(schemaVersion), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java index 12e8956ef2e..4c430be6f27 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java @@ -21,15 +21,17 @@ package org.apache.pulsar.client.impl.schema.generic; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.schema.SchemaInfoProvider; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; -import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -43,13 +45,21 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider { private final TopicName topicName; private final PulsarClientImpl pulsarClient; - private final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { - @Override - public SchemaInfo load(byte[] schemaVersion) throws Exception { - return loadSchema(schemaVersion); - } - }); + private final LoadingCache> cache = CacheBuilder.newBuilder() + .maximumSize(100000) + .expireAfterAccess(30, TimeUnit.MINUTES) + .build(new CacheLoader>() { + @Override + public CompletableFuture load(BytesSchemaVersion schemaVersion) { + CompletableFuture siFuture = loadSchema(schemaVersion.get()); + siFuture.whenComplete((si, cause) -> { + if (null != cause) { + cache.asMap().remove(schemaVersion, siFuture); + } + }); + return siFuture; + } + }); public MultiVersionSchemaInfoProvider(TopicName topicName, PulsarClientImpl pulsarClient) { this.topicName = topicName; @@ -57,30 +67,24 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider { } @Override - public SchemaInfo getSchemaByVersion(byte[] schemaVersion) { + public CompletableFuture getSchemaByVersion(byte[] schemaVersion) { try { if (null == schemaVersion) { - return null; + return CompletableFuture.completedFuture(null); } - return cache.get(schemaVersion); + return cache.get(BytesSchemaVersion.of(schemaVersion)); } catch (ExecutionException e) { - LOG.error("Can't get generic schema for topic {} schema version {}", + LOG.error("Can't get schema for topic {} schema version {}", topicName.toString(), new String(schemaVersion, StandardCharsets.UTF_8), e); - throw new RuntimeException("Can't get generic schema for topic " + topicName.toString()); + return FutureUtil.failedFuture(e.getCause()); } } @Override - public SchemaInfo getLatestSchema() { - try { - Optional optional = pulsarClient.getLookup() - .getSchema(topicName).get(); - return optional.orElse(null); - } catch (ExecutionException | InterruptedException e) { - LOG.error("Can't get current schema for topic {}", - topicName.toString(), e); - throw new RuntimeException("Can't get current schema for topic " + topicName.toString()); - } + public CompletableFuture getLatestSchema() { + return pulsarClient.getLookup() + .getSchema(topicName) + .thenApply(o -> o.orElse(null)); } @Override @@ -88,10 +92,10 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider { return topicName.getLocalName(); } - private SchemaInfo loadSchema(byte[] schemaVersion) throws ExecutionException, InterruptedException { - Optional optional = pulsarClient.getLookup() - .getSchema(topicName, schemaVersion).get(); - return optional.orElse(null); + private CompletableFuture loadSchema(byte[] schemaVersion) { + return pulsarClient.getLookup() + .getSchema(topicName, schemaVersion) + .thenApply(o -> o.orElse(null)); } public PulsarClientImpl getPulsarClient() { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java index e253ace5a80..dd494451284 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.AvroSchema; @@ -184,7 +185,7 @@ public class MessageImplTest { Schema> keyValueSchema = Schema.KeyValue(fooSchema, barSchema); keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(keyValueSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo())); SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo(); foo.setField1("field1"); @@ -221,7 +222,7 @@ public class MessageImplTest { fooSchema, barSchema, KeyValueEncodingType.SEPARATED); keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(keyValueSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo())); SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo(); foo.setField1("field1"); @@ -259,7 +260,7 @@ public class MessageImplTest { Schema> keyValueSchema = Schema.KeyValue(fooSchema, barSchema); keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(keyValueSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo())); SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo(); foo.setField1("field1"); @@ -296,7 +297,7 @@ public class MessageImplTest { fooSchema, barSchema, KeyValueEncodingType.SEPARATED); keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(keyValueSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo())); SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo(); foo.setField1("field1"); @@ -334,7 +335,7 @@ public class MessageImplTest { Schema> keyValueSchema = Schema.KeyValue(fooSchema, barSchema); keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(keyValueSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo())); SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo(); foo.setField1("field1"); @@ -371,7 +372,7 @@ public class MessageImplTest { fooSchema, barSchema, KeyValueEncodingType.SEPARATED); keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(keyValueSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo())); SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo(); foo.setField1("field1"); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java index 1b3ef525930..21c8a0b021c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider; @@ -56,7 +57,7 @@ public class SupportVersioningAvroSchemaTest { @Test public void testDecode() { when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(genericAvroSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo())); SchemaTestUtils.FooV2 fooV2 = new SchemaTestUtils.FooV2(); fooV2.setField1(SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING); SchemaTestUtils.Foo foo = (SchemaTestUtils.Foo)schema.decode(avroFooV2Schema.encode(fooV2), new byte[10]); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java index ebcc00dbf49..40f879eeb32 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl.schema; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider; @@ -44,7 +45,7 @@ public class SupportVersioningKeyValueSchemaTest { keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(keyValueSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo())); SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar(); bar.setField1(true); @@ -82,7 +83,7 @@ public class SupportVersioningKeyValueSchemaTest { keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(keyValueSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo())); SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar(); bar.setField1(true); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java index 814975de7ba..0752291144e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl.schema.generic; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.AvroSchema; @@ -67,7 +68,7 @@ public class GenericAvroSchemaTest { MultiVersionSchemaInfoProvider provider = mock(MultiVersionSchemaInfoProvider.class); readerSchema.setSchemaInfoProvider(provider); when(provider.getSchemaByVersion(any(byte[].class))) - .thenReturn(writerSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(writerSchema.getSchemaInfo())); GenericRecord dataForWriter = writerSchema.newRecordBuilder() .set("field1", SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING) .set("field3", 0) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java index 74133f4cb8b..24acbeac6a0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -63,7 +64,7 @@ public class GenericSchemaImplTest { genericSchema.setSchemaInfoProvider(multiVersionGenericSchemaProvider); decodeSchema.setSchema(genericSchema); when(multiVersionGenericSchemaProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(genericSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(genericSchema.getSchemaInfo())); testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); } @@ -78,7 +79,7 @@ public class GenericSchemaImplTest { decodeSchema.setSchema(genericSchema); GenericSchema genericAvroSchema = GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo()); when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) - .thenReturn(genericAvroSchema.getSchemaInfo()); + .thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo())); testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java index 2e101b1e1cd..c81ff146004 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java @@ -52,7 +52,7 @@ public class MultiVersionSchemaInfoProviderTest { } @Test - public void testGetSchema() { + public void testGetSchema() throws Exception { CompletableFuture> completableFuture = new CompletableFuture<>(); SchemaInfo schemaInfo = AvroSchema.of(SchemaDefinition.builder().withPojo(SchemaTestUtils.class).build()).getSchemaInfo(); completableFuture.complete(Optional.of(schemaInfo)); @@ -61,7 +61,7 @@ public class MultiVersionSchemaInfoProviderTest { any(TopicName.class), any(byte[].class))) .thenReturn(completableFuture); - SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new byte[0]); + SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new byte[0]).get(); assertEquals(schemaInfoByVersion, schemaInfo); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java index 5ceacbd4460..4b3ba786565 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java @@ -18,12 +18,22 @@ */ package org.apache.pulsar.common.protocol.schema; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Comparator; + /** * Bytes schema version */ -public class BytesSchemaVersion implements SchemaVersion { +public class BytesSchemaVersion implements SchemaVersion, Comparable { + + private static final char[] HEX_CHARS_UPPER = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' + }; private final byte[] bytes; + // cache the hash code for the string, default to 0 + private int hashCode; private BytesSchemaVersion(byte[] bytes) { this.bytes = bytes; @@ -37,4 +47,139 @@ public class BytesSchemaVersion implements SchemaVersion { public static BytesSchemaVersion of(byte[] bytes) { return bytes != null ? new BytesSchemaVersion(bytes) : null; } + + /** + * Get the data from the Bytes. + * @return The underlying byte array + */ + public byte[] get() { + return this.bytes; + } + + /** + * The hashcode is cached except for the case where it is computed as 0, in which + * case we compute the hashcode on every call. + * + * @return the hashcode + */ + @Override + public int hashCode() { + if (hashCode == 0) { + hashCode = Arrays.hashCode(bytes); + } + return hashCode; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null) { + return false; + } + + // we intentionally use the function to compute hashcode here + if (this.hashCode() != other.hashCode()) { + return false; + } + + if (other instanceof BytesSchemaVersion) { + return Arrays.equals(this.bytes, ((BytesSchemaVersion) other).get()); + } + + return false; + } + + @Override + public int compareTo(BytesSchemaVersion that) { + return BYTES_LEXICO_COMPARATOR.compare(this.bytes, that.bytes); + } + + @Override + public String toString() { + return BytesSchemaVersion.toString(bytes, 0, bytes.length); + } + + /** + * Write a printable representation of a byte array. Non-printable + * characters are hex escaped in the format \\x%02X, eg: + * \x00 \x05 etc. + * + * This function is brought from org.apache.hadoop.hbase.util.Bytes + * + * @param b array to write out + * @param off offset to start at + * @param len length to write + * @return string output + */ + private static String toString(final byte[] b, int off, int len) { + StringBuilder result = new StringBuilder(); + + if (b == null) + return result.toString(); + + // just in case we are passed a 'len' that is > buffer length... + if (off >= b.length) + return result.toString(); + + if (off + len > b.length) + len = b.length - off; + + for (int i = off; i < off + len; ++i) { + int ch = b[i] & 0xFF; + if (ch >= ' ' && ch <= '~' && ch != '\\') { + result.append((char) ch); + } else { + result.append("\\x"); + result.append(HEX_CHARS_UPPER[ch / 0x10]); + result.append(HEX_CHARS_UPPER[ch % 0x10]); + } + } + return result.toString(); + } + + /** + * A byte array comparator based on lexicograpic ordering. + */ + public final static ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator(); + + public interface ByteArrayComparator extends Comparator, Serializable { + + int compare(final byte[] buffer1, int offset1, int length1, + final byte[] buffer2, int offset2, int length2); + } + + private static class LexicographicByteArrayComparator implements ByteArrayComparator { + + private static final long serialVersionUID = -1915703761143534937L; + + @Override + public int compare(byte[] buffer1, byte[] buffer2) { + return compare(buffer1, 0, buffer1.length, buffer2, 0, buffer2.length); + } + + public int compare(final byte[] buffer1, int offset1, int length1, + final byte[] buffer2, int offset2, int length2) { + + // short circuit equal case + if (buffer1 == buffer2 && + offset1 == offset2 && + length1 == length2) { + return 0; + } + + // similar to Arrays.compare() but considers offset and length + int end1 = offset1 + length1; + int end2 = offset2 + length2; + for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) { + int a = buffer1[i] & 0xff; + int b = buffer2[j] & 0xff; + if (a != b) { + return a - b; + } + } + return length1 - length2; + } + } } -- GitLab