提交 3ef41c45 编写于 作者: S Sijie Guo 提交者: Jia Zhai

Improve SchemaInfoProvider to fetch schema info asynchronously (#4836)

*Motivation*

Currently fetching schema information is done synchronously.
It is called in netty callback threads and will potentially block
async operations.

*Modifications*

Make most of the operations asynchronously in SchemaInfoProvider.
(cherry picked from commit 91c4254c)
上级 33a7c239
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api.schema;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
......@@ -31,14 +32,14 @@ public interface SchemaInfoProvider {
* @param schemaVersion schema version
* @return schema info of the provided <tt>schemaVersion</tt>
*/
SchemaInfo getSchemaByVersion(byte[] schemaVersion);
CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion);
/**
* Retrieve the latest schema info.
*
* @return the latest schema
*/
SchemaInfo getLatestSchema();
CompletableFuture<SchemaInfo> getLatestSchema();
/**
* Retrieve the topic name.
......
......@@ -730,19 +730,21 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((ignored, cause) -> {
if (null == cause) {
doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions);
} else {
subscribeResult.completeExceptionally(cause);
}
});
}
private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
if (log.isDebugEnabled()) {
log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions);
}
List<CompletableFuture<Consumer<T>>> futureList;
try {
client.preProcessSchemaBeforeSubscribe(client, schema, topicName);
} catch (Throwable t) {
subscribeResult.completeExceptionally(t);
return;
}
if (numPartitions > 1) {
this.topics.putIfAbsent(topicName, numPartitions);
allTopicPartitionsNumber.addAndGet(numPartitions);
......
......@@ -328,12 +328,8 @@ public class PulsarClientImpl implements PulsarClient {
}
private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
try {
preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic());
} catch (Throwable t) {
return FutureUtil.failedFuture(t);
}
return doSingleTopicSubscribeAsync(conf, schema, interceptors);
return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic())
.thenCompose(ignored -> doSingleTopicSubscribeAsync(conf, schema, interceptors));
}
private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
......@@ -444,13 +440,10 @@ public class PulsarClientImpl implements PulsarClient {
}
public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
try {
preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName());
} catch (Throwable t) {
return FutureUtil.failedFuture(t);
}
return doCreateReaderAsync(conf, schema);
return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName())
.thenCompose(ignored -> doCreateReaderAsync(conf, schema));
}
<T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
......@@ -734,33 +727,40 @@ public class PulsarClientImpl implements PulsarClient {
return schemaProviderLoadingCache;
}
protected void preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, Schema schema, String topicName) throws Throwable {
protected CompletableFuture<Void> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl,
Schema schema,
String topicName) {
if (schema != null && schema.supportSchemaVersioning()) {
SchemaInfoProvider schemaInfoProvider = null;
final SchemaInfoProvider schemaInfoProvider;
try {
schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName);
} catch (ExecutionException e) {
log.error("Failed to load schema info provider for topic {}", topicName, e);
throw e.getCause();
return FutureUtil.failedFuture(e.getCause());
}
if (schema instanceof AutoConsumeSchema) {
SchemaInfo schemaInfo = schemaInfoProvider.getLatestSchema();
if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){
throw new RuntimeException("Currently schema detection only works for topics with avro schemas");
}
// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
// to decode the messages.
GenericSchema genericSchema = GenericSchemaImpl.of(
schemaInfoProvider.getLatestSchema(), false /*useProvidedSchemaAsReaderSchema*/);
log.info("Auto detected schema for topic {} : {}",
topicName, schemaInfo.getSchemaDefinition());
((AutoConsumeSchema) schema).setSchema(genericSchema);
return schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> {
if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){
return FutureUtil.failedFuture(
new RuntimeException("Currently schema detection only works"
+ " for topics with avro or json schemas"));
}
// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
// to decode the messages.
GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfo, false /*useProvidedSchemaAsReaderSchema*/);
log.info("Auto detected schema for topic {} : {}",
topicName, schemaInfo.getSchemaDefinition());
((AutoConsumeSchema) schema).setSchema(genericSchema);
schema.setSchemaInfoProvider(schemaInfoProvider);
return CompletableFuture.completedFuture(null);
});
} else {
schema.setSchemaInfoProvider(schemaInfoProvider);
}
schema.setSchemaInfoProvider(schemaInfoProvider);
}
return CompletableFuture.completedFuture(null);
}
}
......@@ -100,7 +100,7 @@ public class AvroSchema<T> extends StructSchema<T> {
@Override
protected SchemaReader<T> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
......
......@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.schema;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
......@@ -103,14 +104,15 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
if (keySchema instanceof StructSchema) {
keySchema.setSchemaInfoProvider(new SchemaInfoProvider() {
@Override
public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getKey();
public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
return schemaInfoProvider.getSchemaByVersion(schemaVersion)
.thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getKey());
}
@Override
public SchemaInfo getLatestSchema() {
return ((StructSchema<K>) keySchema).schemaInfo;
public CompletableFuture<SchemaInfo> getLatestSchema() {
return CompletableFuture.completedFuture(
((StructSchema<K>) keySchema).schemaInfo);
}
@Override
......@@ -123,14 +125,15 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
if (valueSchema instanceof StructSchema) {
valueSchema.setSchemaInfoProvider(new SchemaInfoProvider() {
@Override
public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getValue();
public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
return schemaInfoProvider.getSchemaByVersion(schemaVersion)
.thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getValue());
}
@Override
public SchemaInfo getLatestSchema() {
return ((StructSchema<V>) valueSchema).schemaInfo;
public CompletableFuture<SchemaInfo> getLatestSchema() {
return CompletableFuture.completedFuture(
((StructSchema<V>) valueSchema).schemaInfo);
}
@Override
......@@ -142,13 +145,13 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
this.schemaInfoProvider = new SchemaInfoProvider() {
@Override
public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
return schemaInfo;
public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
return CompletableFuture.completedFuture(schemaInfo);
}
@Override
public SchemaInfo getLatestSchema() {
return schemaInfo;
public CompletableFuture<SchemaInfo> getLatestSchema() {
return CompletableFuture.completedFuture(schemaInfo);
}
@Override
......
......@@ -29,6 +29,7 @@ import com.google.common.cache.LoadingCache;
import org.apache.avro.Schema.Parser;
import org.apache.avro.reflect.ReflectData;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
......@@ -139,6 +140,26 @@ public abstract class StructSchema<T> implements Schema<T> {
*/
protected abstract SchemaReader<T> loadReader(byte[] schemaVersion);
/**
* TODO: think about how to make this async
*/
protected SchemaInfo getSchemaInfoByVersion(byte[] schemaVersion) {
try {
return schemaInfoProvider.getSchemaByVersion(schemaVersion).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SerializationException(
"Interrupted at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
e
);
} catch (ExecutionException e) {
throw new SerializationException(
"Failed at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
e.getCause()
);
}
}
protected void setWriter(SchemaWriter<T> writer) {
this.writer = writer;
}
......
......@@ -55,7 +55,7 @@ public class GenericAvroSchema extends GenericSchemaImpl {
@Override
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
......
......@@ -48,7 +48,7 @@ class GenericJsonSchema extends GenericSchemaImpl {
@Override
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
......
......@@ -21,15 +21,17 @@ package org.apache.pulsar.client.impl.schema.generic;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
......@@ -43,13 +45,21 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider {
private final TopicName topicName;
private final PulsarClientImpl pulsarClient;
private final LoadingCache<byte[], SchemaInfo> cache = CacheBuilder.newBuilder().maximumSize(100000)
.expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaInfo>() {
@Override
public SchemaInfo load(byte[] schemaVersion) throws Exception {
return loadSchema(schemaVersion);
}
});
private final LoadingCache<BytesSchemaVersion, CompletableFuture<SchemaInfo>> cache = CacheBuilder.newBuilder()
.maximumSize(100000)
.expireAfterAccess(30, TimeUnit.MINUTES)
.build(new CacheLoader<BytesSchemaVersion, CompletableFuture<SchemaInfo>>() {
@Override
public CompletableFuture<SchemaInfo> load(BytesSchemaVersion schemaVersion) {
CompletableFuture<SchemaInfo> siFuture = loadSchema(schemaVersion.get());
siFuture.whenComplete((si, cause) -> {
if (null != cause) {
cache.asMap().remove(schemaVersion, siFuture);
}
});
return siFuture;
}
});
public MultiVersionSchemaInfoProvider(TopicName topicName, PulsarClientImpl pulsarClient) {
this.topicName = topicName;
......@@ -57,30 +67,24 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider {
}
@Override
public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
try {
if (null == schemaVersion) {
return null;
return CompletableFuture.completedFuture(null);
}
return cache.get(schemaVersion);
return cache.get(BytesSchemaVersion.of(schemaVersion));
} catch (ExecutionException e) {
LOG.error("Can't get generic schema for topic {} schema version {}",
LOG.error("Can't get schema for topic {} schema version {}",
topicName.toString(), new String(schemaVersion, StandardCharsets.UTF_8), e);
throw new RuntimeException("Can't get generic schema for topic " + topicName.toString());
return FutureUtil.failedFuture(e.getCause());
}
}
@Override
public SchemaInfo getLatestSchema() {
try {
Optional<SchemaInfo> optional = pulsarClient.getLookup()
.getSchema(topicName).get();
return optional.orElse(null);
} catch (ExecutionException | InterruptedException e) {
LOG.error("Can't get current schema for topic {}",
topicName.toString(), e);
throw new RuntimeException("Can't get current schema for topic " + topicName.toString());
}
public CompletableFuture<SchemaInfo> getLatestSchema() {
return pulsarClient.getLookup()
.getSchema(topicName)
.thenApply(o -> o.orElse(null));
}
@Override
......@@ -88,10 +92,10 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider {
return topicName.getLocalName();
}
private SchemaInfo loadSchema(byte[] schemaVersion) throws ExecutionException, InterruptedException {
Optional<SchemaInfo> optional = pulsarClient.getLookup()
.getSchema(topicName, schemaVersion).get();
return optional.orElse(null);
private CompletableFuture<SchemaInfo> loadSchema(byte[] schemaVersion) {
return pulsarClient.getLookup()
.getSchema(topicName, schemaVersion)
.thenApply(o -> o.orElse(null));
}
public PulsarClientImpl getPulsarClient() {
......
......@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
......@@ -184,7 +185,7 @@ public class MessageImplTest {
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(keyValueSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
......@@ -221,7 +222,7 @@ public class MessageImplTest {
fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(keyValueSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
......@@ -259,7 +260,7 @@ public class MessageImplTest {
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(keyValueSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
......@@ -296,7 +297,7 @@ public class MessageImplTest {
fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(keyValueSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
......@@ -334,7 +335,7 @@ public class MessageImplTest {
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(keyValueSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
......@@ -371,7 +372,7 @@ public class MessageImplTest {
fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(keyValueSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
......
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
......@@ -56,7 +57,7 @@ public class SupportVersioningAvroSchemaTest {
@Test
public void testDecode() {
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(genericAvroSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo()));
SchemaTestUtils.FooV2 fooV2 = new SchemaTestUtils.FooV2();
fooV2.setField1(SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING);
SchemaTestUtils.Foo foo = (SchemaTestUtils.Foo)schema.decode(avroFooV2Schema.encode(fooV2), new byte[10]);
......
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
......@@ -44,7 +45,7 @@ public class SupportVersioningKeyValueSchemaTest {
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(keyValueSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
......@@ -82,7 +83,7 @@ public class SupportVersioningKeyValueSchemaTest {
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(keyValueSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
......
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
......@@ -67,7 +68,7 @@ public class GenericAvroSchemaTest {
MultiVersionSchemaInfoProvider provider = mock(MultiVersionSchemaInfoProvider.class);
readerSchema.setSchemaInfoProvider(provider);
when(provider.getSchemaByVersion(any(byte[].class)))
.thenReturn(writerSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(writerSchema.getSchemaInfo()));
GenericRecord dataForWriter = writerSchema.newRecordBuilder()
.set("field1", SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING)
.set("field3", 0)
......
......@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
......@@ -63,7 +64,7 @@ public class GenericSchemaImplTest {
genericSchema.setSchemaInfoProvider(multiVersionGenericSchemaProvider);
decodeSchema.setSchema(genericSchema);
when(multiVersionGenericSchemaProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(genericSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(genericSchema.getSchemaInfo()));
testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
......@@ -78,7 +79,7 @@ public class GenericSchemaImplTest {
decodeSchema.setSchema(genericSchema);
GenericSchema genericAvroSchema = GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(genericAvroSchema.getSchemaInfo());
.thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo()));
testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
......
......@@ -52,7 +52,7 @@ public class MultiVersionSchemaInfoProviderTest {
}
@Test
public void testGetSchema() {
public void testGetSchema() throws Exception {
CompletableFuture<Optional<SchemaInfo>> completableFuture = new CompletableFuture<>();
SchemaInfo schemaInfo = AvroSchema.of(SchemaDefinition.<SchemaTestUtils>builder().withPojo(SchemaTestUtils.class).build()).getSchemaInfo();
completableFuture.complete(Optional.of(schemaInfo));
......@@ -61,7 +61,7 @@ public class MultiVersionSchemaInfoProviderTest {
any(TopicName.class),
any(byte[].class)))
.thenReturn(completableFuture);
SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new byte[0]);
SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new byte[0]).get();
assertEquals(schemaInfoByVersion, schemaInfo);
}
}
......@@ -18,12 +18,22 @@
*/
package org.apache.pulsar.common.protocol.schema;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
/**
* Bytes schema version
*/
public class BytesSchemaVersion implements SchemaVersion {
public class BytesSchemaVersion implements SchemaVersion, Comparable<BytesSchemaVersion> {
private static final char[] HEX_CHARS_UPPER = {
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
};
private final byte[] bytes;
// cache the hash code for the string, default to 0
private int hashCode;
private BytesSchemaVersion(byte[] bytes) {
this.bytes = bytes;
......@@ -37,4 +47,139 @@ public class BytesSchemaVersion implements SchemaVersion {
public static BytesSchemaVersion of(byte[] bytes) {
return bytes != null ? new BytesSchemaVersion(bytes) : null;
}
/**
* Get the data from the Bytes.
* @return The underlying byte array
*/
public byte[] get() {
return this.bytes;
}
/**
* The hashcode is cached except for the case where it is computed as 0, in which
* case we compute the hashcode on every call.
*
* @return the hashcode
*/
@Override
public int hashCode() {
if (hashCode == 0) {
hashCode = Arrays.hashCode(bytes);
}
return hashCode;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
// we intentionally use the function to compute hashcode here
if (this.hashCode() != other.hashCode()) {
return false;
}
if (other instanceof BytesSchemaVersion) {
return Arrays.equals(this.bytes, ((BytesSchemaVersion) other).get());
}
return false;
}
@Override
public int compareTo(BytesSchemaVersion that) {
return BYTES_LEXICO_COMPARATOR.compare(this.bytes, that.bytes);
}
@Override
public String toString() {
return BytesSchemaVersion.toString(bytes, 0, bytes.length);
}
/**
* Write a printable representation of a byte array. Non-printable
* characters are hex escaped in the format \\x%02X, eg:
* \x00 \x05 etc.
*
* This function is brought from org.apache.hadoop.hbase.util.Bytes
*
* @param b array to write out
* @param off offset to start at
* @param len length to write
* @return string output
*/
private static String toString(final byte[] b, int off, int len) {
StringBuilder result = new StringBuilder();
if (b == null)
return result.toString();
// just in case we are passed a 'len' that is > buffer length...
if (off >= b.length)
return result.toString();
if (off + len > b.length)
len = b.length - off;
for (int i = off; i < off + len; ++i) {
int ch = b[i] & 0xFF;
if (ch >= ' ' && ch <= '~' && ch != '\\') {
result.append((char) ch);
} else {
result.append("\\x");
result.append(HEX_CHARS_UPPER[ch / 0x10]);
result.append(HEX_CHARS_UPPER[ch % 0x10]);
}
}
return result.toString();
}
/**
* A byte array comparator based on lexicograpic ordering.
*/
public final static ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();
public interface ByteArrayComparator extends Comparator<byte[]>, Serializable {
int compare(final byte[] buffer1, int offset1, int length1,
final byte[] buffer2, int offset2, int length2);
}
private static class LexicographicByteArrayComparator implements ByteArrayComparator {
private static final long serialVersionUID = -1915703761143534937L;
@Override
public int compare(byte[] buffer1, byte[] buffer2) {
return compare(buffer1, 0, buffer1.length, buffer2, 0, buffer2.length);
}
public int compare(final byte[] buffer1, int offset1, int length1,
final byte[] buffer2, int offset2, int length2) {
// short circuit equal case
if (buffer1 == buffer2 &&
offset1 == offset2 &&
length1 == length2) {
return 0;
}
// similar to Arrays.compare() but considers offset and length
int end1 = offset1 + length1;
int end2 = offset2 + length2;
for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
int a = buffer1[i] & 0xff;
int b = buffer2[j] & 0xff;
if (a != b) {
return a - b;
}
}
return length1 - length2;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册