提交 daa951e3 编写于 作者: C congbo 提交者: Jia Zhai

Fix StructSchema reader cache loading logic

### Motivation
StructSchema LoadingCache for cache reader, but key is byte[], it will compare with the address's  hashcode. So every decode will generate a new reader so we should change the type of the key for LoadingCache

(cherry picked from commit 0367f5f0)
上级 d5f46ad9
...@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition; ...@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader; import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.reader.AvroReader; import org.apache.pulsar.client.impl.schema.reader.AvroReader;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter; import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -99,16 +100,16 @@ public class AvroSchema<T> extends StructSchema<T> { ...@@ -99,16 +100,16 @@ public class AvroSchema<T> extends StructSchema<T> {
} }
@Override @Override
protected SchemaReader<T> loadReader(byte[] schemaVersion) { protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion); SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) { if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}", log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion), SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo.getSchemaDefinition()); schemaInfo.getSchemaDefinition());
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema); return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema);
} else { } else {
log.warn("No schema found for version({}), use latest schema : {}", log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion), SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
this.schemaInfo.getSchemaDefinition()); this.schemaInfo.getSchemaDefinition());
return reader; return reader;
} }
......
...@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition; ...@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader; import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.reader.JsonReader; import org.apache.pulsar.client.impl.schema.reader.JsonReader;
import org.apache.pulsar.client.impl.schema.writer.JsonWriter; import org.apache.pulsar.client.impl.schema.writer.JsonWriter;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaType;
...@@ -58,7 +59,7 @@ public class JSONSchema<T> extends StructSchema<T> { ...@@ -58,7 +59,7 @@ public class JSONSchema<T> extends StructSchema<T> {
} }
@Override @Override
protected SchemaReader<T> loadReader(byte[] schemaVersion) { protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
throw new RuntimeException("JSONSchema don't support schema versioning"); throw new RuntimeException("JSONSchema don't support schema versioning");
} }
......
...@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition; ...@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader; import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.reader.ProtobufReader; import org.apache.pulsar.client.impl.schema.reader.ProtobufReader;
import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter; import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaType;
...@@ -94,7 +95,7 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex ...@@ -94,7 +95,7 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
} }
@Override @Override
protected SchemaReader<T> loadReader(byte[] schemaVersion) { protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
throw new RuntimeException("ProtobufSchema don't support schema versioning"); throw new RuntimeException("ProtobufSchema don't support schema versioning");
} }
......
...@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition; ...@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider; import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.schema.SchemaReader; import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter; import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -60,10 +61,10 @@ public abstract class StructSchema<T> implements Schema<T> { ...@@ -60,10 +61,10 @@ public abstract class StructSchema<T> implements Schema<T> {
protected SchemaReader<T> reader; protected SchemaReader<T> reader;
protected SchemaWriter<T> writer; protected SchemaWriter<T> writer;
protected SchemaInfoProvider schemaInfoProvider; protected SchemaInfoProvider schemaInfoProvider;
private final LoadingCache<byte[], SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000) private final LoadingCache<BytesSchemaVersion, SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000)
.expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaReader<T>>() { .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<BytesSchemaVersion, SchemaReader<T>>() {
@Override @Override
public SchemaReader<T> load(byte[] schemaVersion) { public SchemaReader<T> load(BytesSchemaVersion schemaVersion) {
return loadReader(schemaVersion); return loadReader(schemaVersion);
} }
}); });
...@@ -90,7 +91,7 @@ public abstract class StructSchema<T> implements Schema<T> { ...@@ -90,7 +91,7 @@ public abstract class StructSchema<T> implements Schema<T> {
@Override @Override
public T decode(byte[] bytes, byte[] schemaVersion) { public T decode(byte[] bytes, byte[] schemaVersion) {
try { try {
return readerCache.get(schemaVersion).read(bytes); return readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
} catch (ExecutionException e) { } catch (ExecutionException e) {
LOG.error("Can't get generic schema for topic {} schema version {}", LOG.error("Can't get generic schema for topic {} schema version {}",
schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e); schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
...@@ -138,7 +139,7 @@ public abstract class StructSchema<T> implements Schema<T> { ...@@ -138,7 +139,7 @@ public abstract class StructSchema<T> implements Schema<T> {
* @param schemaVersion the provided schema version * @param schemaVersion the provided schema version
* @return the schema reader for decoding messages encoded by the provided schema version. * @return the schema reader for decoding messages encoded by the provided schema version.
*/ */
protected abstract SchemaReader<T> loadReader(byte[] schemaVersion); protected abstract SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion);
/** /**
* TODO: think about how to make this async * TODO: think about how to make this async
......
...@@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord; ...@@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder; import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader; import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.SchemaUtils; import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfo;
/** /**
...@@ -54,21 +55,21 @@ public class GenericAvroSchema extends GenericSchemaImpl { ...@@ -54,21 +55,21 @@ public class GenericAvroSchema extends GenericSchemaImpl {
} }
@Override @Override
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) { protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion); SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) { if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}", log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion), SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo); schemaInfo);
Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition()); Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema; Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
return new GenericAvroReader( return new GenericAvroReader(
writerSchema, writerSchema,
readerSchema, readerSchema,
schemaVersion); schemaVersion.get());
} else { } else {
log.warn("No schema found for version({}), use latest schema : {}", log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion), SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
this.schemaInfo); this.schemaInfo);
return reader; return reader;
} }
......
...@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord; ...@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder; import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader; import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.SchemaUtils; import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfo;
/** /**
...@@ -47,11 +48,11 @@ class GenericJsonSchema extends GenericSchemaImpl { ...@@ -47,11 +48,11 @@ class GenericJsonSchema extends GenericSchemaImpl {
} }
@Override @Override
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) { protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion); SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) { if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}", log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion), SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo.getSchemaDefinition()); schemaInfo.getSchemaDefinition());
Schema readerSchema; Schema readerSchema;
if (useProvidedSchemaAsReaderSchema) { if (useProvidedSchemaAsReaderSchema) {
...@@ -59,14 +60,14 @@ class GenericJsonSchema extends GenericSchemaImpl { ...@@ -59,14 +60,14 @@ class GenericJsonSchema extends GenericSchemaImpl {
} else { } else {
readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition()); readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
} }
return new GenericJsonReader(schemaVersion, return new GenericJsonReader(schemaVersion.get(),
readerSchema.getFields() readerSchema.getFields()
.stream() .stream()
.map(f -> new Field(f.name(), f.pos())) .map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList())); .collect(Collectors.toList()));
} else { } else {
log.warn("No schema found for version({}), use latest schema : {}", log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion), SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
this.schemaInfo.getSchemaDefinition()); this.schemaInfo.getSchemaDefinition());
return reader; return reader;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册