diff --git a/.github/workflows/ci-integration-schema.yaml b/.github/workflows/ci-integration-schema.yaml index 944c74f4c1aa94be83734cb28eaadf605bc3160b..e608af7d376c43b2525f45593e97c91ddb9ad42a 100644 --- a/.github/workflows/ci-integration-schema.yaml +++ b/.github/workflows/ci-integration-schema.yaml @@ -72,7 +72,12 @@ jobs: - name: run install by skip tests if: steps.docs.outputs.changed_only == 'no' run: mvn -q -B -ntp clean install -DskipTests - + - name: build pulsar image + if: steps.docs.outputs.changed_only == 'no' + run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true + - name: build pulsar-all image + if: steps.docs.outputs.changed_only == 'no' + run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true - name: build artifacts and docker image if: steps.docs.outputs.changed_only == 'no' run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java index 0c44612c0db839c74d60652292f1d4f04dd88d94..5e5f846a7ea118b8f58c4143f0c5e4dd8334ed44 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java @@ -54,6 +54,10 @@ public interface SchemaDataValidator { case DATE: case TIME: case TIMESTAMP: + case INSTANT: + case LOCAL_DATE: + case LOCAL_TIME: + case LOCAL_DATE_TIME: PrimitiveSchemaDataValidator.of().validate(schemaData); break; case NONE: diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto index 9d2c756983ed7a0ece3dafb494889e673aedf972..32790b2868411c2f1a0b6f8ee302c0224bfa61ad 100644 --- a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto +++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto @@ -40,6 +40,10 @@ message SchemaInfo { TIME = 14; TIMESTAMP = 15; KEYVALUE = 16; + INSTANT = 17; + LOCALDATE = 18; + LOCALTIME = 19; + LOCALDATETIME = 20; } message KeyValuePair { required string key = 1; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java index 9cc81051a23a86a908cc4c1b799d4e93e5c05e4b..fa09b7df3c3619f6dea913eb79025abc74c527d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java @@ -98,6 +98,10 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest { { Schema.DATE }, { Schema.TIME }, { Schema.TIMESTAMP }, + { Schema.INSTANT }, + { Schema.LOCAL_DATE}, + { Schema.LOCAL_TIME}, + { Schema.LOCAL_DATE_TIME}, { Schema.AVRO( SchemaDefinition.builder() .withPojo(Foo.class) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java index 998d363c19f00cba1d08be89a2dfeb162b8d561e..1f85655ccf29a585c53b734c84466aa0d42b8362 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java @@ -50,6 +50,10 @@ public class SchemaDataValidatorTest { { SchemaType.DATE }, { SchemaType.TIME }, { SchemaType.TIMESTAMP }, + { SchemaType.INSTANT }, + { SchemaType.LOCAL_DATE }, + { SchemaType.LOCAL_TIME }, + { SchemaType.LOCAL_DATE_TIME }, }; } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index 0c871b35b03afd85aaca5a62856a39acf65f48da..e7e5bbe420ec817a6e2bfd65e901939d926c7361 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -21,6 +21,10 @@ package org.apache.pulsar.client.api; import java.nio.ByteBuffer; import java.sql.Time; import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Date; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; @@ -208,6 +212,23 @@ public interface Schema extends Cloneable{ */ Schema TIMESTAMP = DefaultImplementation.newTimestampSchema(); + /** + * Instant Schema. + */ + Schema INSTANT = DefaultImplementation.newInstantSchema(); + /** + * LocalDate Schema. + */ + Schema LOCAL_DATE = DefaultImplementation.newLocalDateSchema(); + /** + * LocalTime Schema. + */ + Schema LOCAL_TIME = DefaultImplementation.newLocalTimeSchema(); + /** + * LocalDateTime Schema. + */ + Schema LOCAL_DATE_TIME = DefaultImplementation.newLocalDateTimeSchema(); + // CHECKSTYLE.OFF: MethodName /** diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java index 0171be5b93790eb720fc8bd0b403d7e2c10ec819..589ac69cd2b374a5019e0aced7e87448f72603e1 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java @@ -29,6 +29,10 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.sql.Time; import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Date; import java.util.Map; import java.util.function.Supplier; @@ -219,6 +223,30 @@ public class DefaultImplementation { "org.apache.pulsar.client.impl.schema.TimestampSchema", "of", null) .invoke(null, null)); } + public static Schema newInstantSchema() { + return catchExceptions( + () -> (Schema) getStaticMethod( + "org.apache.pulsar.client.impl.schema.InstantSchema", "of", null) + .invoke(null, null)); + } + public static Schema newLocalDateSchema() { + return catchExceptions( + () -> (Schema) getStaticMethod( + "org.apache.pulsar.client.impl.schema.LocalDateSchema", "of", null) + .invoke(null, null)); + } + public static Schema newLocalTimeSchema() { + return catchExceptions( + () -> (Schema) getStaticMethod( + "org.apache.pulsar.client.impl.schema.LocalTimeSchema", "of", null) + .invoke(null, null)); + } + public static Schema newLocalDateTimeSchema() { + return catchExceptions( + () -> (Schema) getStaticMethod( + "org.apache.pulsar.client.impl.schema.LocalDateTimeSchema", "of", null) + .invoke(null, null)); + } public static Schema newAvroSchema(SchemaDefinition schemaDefinition) { return catchExceptions( diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java index dd8fc4a16937978e380c6567cc9ea4b25b2ddeaa..3b16e73a3715a75105b2027f429861b6c088346a 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java @@ -114,6 +114,26 @@ public enum SchemaType { */ KEY_VALUE(15), + /** + * Instant. + */ + INSTANT(16), + + /** + * LocalDate. + */ + LOCAL_DATE(17), + + /** + * LocalTime. + */ + LOCAL_TIME(18), + + /** + * LocalDateTime. + */ + LOCAL_DATE_TIME(19), + // // Schemas that don't have schema info. the value should be negative. // @@ -167,6 +187,10 @@ public enum SchemaType { case 13: return TIME; case 14: return TIMESTAMP; case 15: return KEY_VALUE; + case 16: return INSTANT; + case 17: return LOCAL_DATE; + case 18: return LOCAL_TIME; + case 19: return LOCAL_DATE_TIME; case -1: return BYTES; case -2: return AUTO; case -3: return AUTO_CONSUME; @@ -198,6 +222,10 @@ public enum SchemaType { case TIME: case TIMESTAMP: case BYTES: + case INSTANT: + case LOCAL_DATE: + case LOCAL_TIME: + case LOCAL_DATE_TIME: case NONE: return true; default: diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index 049b0f55e5b987631f08af019e4eaa65e120d420..fb3ac59c31dbef6a8890da0fbe92f207ef2094bf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -182,6 +182,14 @@ public class AutoConsumeSchema implements Schema { return TimeSchema.of(); case TIMESTAMP: return TimestampSchema.of(); + case INSTANT: + return InstantSchema.of(); + case LOCAL_DATE: + return LocalDateSchema.of(); + case LOCAL_TIME: + return LocalTimeSchema.of(); + case LOCAL_DATE_TIME: + return LocalDateTimeSchema.of(); case JSON: case AVRO: return GenericSchemaImpl.of(schemaInfo); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java new file mode 100644 index 0000000000000000000000000000000000000000..5830ceaf571d4dd8546200615e4bbce4fab0c149 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; +import java.nio.ByteBuffer; +import java.time.Instant; + +/** + * A schema for `java.time.Instant`. + */ +public class InstantSchema extends AbstractSchema { + + private static final InstantSchema INSTANCE; + private static final SchemaInfo SCHEMA_INFO; + + static { + SCHEMA_INFO = new SchemaInfo() + .setName("Instant") + .setType(SchemaType.INSTANT) + .setSchema(new byte[0]); + INSTANCE = new InstantSchema(); + } + + public static InstantSchema of() { + return INSTANCE; + } + + @Override + public byte[] encode(Instant message) { + if (null == message) { + return null; + } + // Instant is accurate to nanoseconds and requires two value storage. + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES); + buffer.putLong(message.getEpochSecond()); + buffer.putInt(message.getNano()); + return buffer.array(); + } + + @Override + public Instant decode(byte[] bytes) { + if (null == bytes) { + return null; + } + ByteBuffer buffer = ByteBuffer.wrap(bytes); + long epochSecond = buffer.getLong(); + int nanos = buffer.getInt(); + return Instant.ofEpochSecond(epochSecond, nanos); + } + + @Override + public Instant decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + long epochSecond = byteBuf.readLong(); + int nanos = byteBuf.readInt(); + return Instant.ofEpochSecond(epochSecond, nanos); + } + + @Override + public SchemaInfo getSchemaInfo() { + return SCHEMA_INFO; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java new file mode 100644 index 0000000000000000000000000000000000000000..add6fd28b5b12994521cd94eefd716c7bfea5de9 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; +import java.time.LocalDate; + +/** + * A schema for `java.time.LocalDate`. + */ +public class LocalDateSchema extends AbstractSchema { + + private static final LocalDateSchema INSTANCE; + private static final SchemaInfo SCHEMA_INFO; + + static { + SCHEMA_INFO = new SchemaInfo() + .setName("LocalDate") + .setType(SchemaType.LOCAL_DATE) + .setSchema(new byte[0]); + INSTANCE = new LocalDateSchema(); + } + + public static LocalDateSchema of() { + return INSTANCE; + } + + @Override + public byte[] encode(LocalDate message) { + if (null == message) { + return null; + } + + Long epochDay = message.toEpochDay(); + return LongSchema.of().encode(epochDay); + } + + @Override + public LocalDate decode(byte[] bytes) { + if (null == bytes) { + return null; + } + + Long decode = LongSchema.of().decode(bytes); + return LocalDate.ofEpochDay(decode); + } + + @Override + public LocalDate decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + + Long decode = LongSchema.of().decode(byteBuf); + return LocalDate.ofEpochDay(decode); + } + + @Override + public SchemaInfo getSchemaInfo() { + return SCHEMA_INFO; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java new file mode 100644 index 0000000000000000000000000000000000000000..8a6c4fbc8a40e64ff96febf31a28c8d223308763 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; + +/** + * A schema for `java.time.LocalDateTime`. + */ +public class LocalDateTimeSchema extends AbstractSchema { + + private static final LocalDateTimeSchema INSTANCE; + private static final SchemaInfo SCHEMA_INFO; + public static final String DELIMITER = ":"; + + static { + SCHEMA_INFO = new SchemaInfo() + .setName("LocalDateTime") + .setType(SchemaType.LOCAL_DATE_TIME) + .setSchema(new byte[0]); + INSTANCE = new LocalDateTimeSchema(); + } + + public static LocalDateTimeSchema of() { + return INSTANCE; + } + + @Override + public byte[] encode(LocalDateTime message) { + if (null == message) { + return null; + } + //LocalDateTime is accurate to nanoseconds and requires two value storage. + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 2); + buffer.putLong(message.toLocalDate().toEpochDay()); + buffer.putLong(message.toLocalTime().toNanoOfDay()); + return buffer.array(); + } + + @Override + public LocalDateTime decode(byte[] bytes) { + if (null == bytes) { + return null; + } + ByteBuffer buffer = ByteBuffer.wrap(bytes); + long epochDay = buffer.getLong(); + long nanoOfDay = buffer.getLong(); + return LocalDateTime.of(LocalDate.ofEpochDay(epochDay), LocalTime.ofNanoOfDay(nanoOfDay)); + } + + @Override + public LocalDateTime decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + long epochDay = byteBuf.readLong(); + long nanoOfDay = byteBuf.readLong(); + return LocalDateTime.of(LocalDate.ofEpochDay(epochDay), LocalTime.ofNanoOfDay(nanoOfDay)); + } + + @Override + public SchemaInfo getSchemaInfo() { + return SCHEMA_INFO; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java new file mode 100644 index 0000000000000000000000000000000000000000..6e2bf627006e71827a4a74ccba36cbd5b2ceb998 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; +import java.time.LocalTime; + +/** + * A schema for `java.time.LocalTime`. + */ +public class LocalTimeSchema extends AbstractSchema { + + private static final LocalTimeSchema INSTANCE; + private static final SchemaInfo SCHEMA_INFO; + + static { + SCHEMA_INFO = new SchemaInfo() + .setName("LocalTime") + .setType(SchemaType.LOCAL_TIME) + .setSchema(new byte[0]); + INSTANCE = new LocalTimeSchema(); + } + + public static LocalTimeSchema of() { + return INSTANCE; + } + + @Override + public byte[] encode(LocalTime message) { + if (null == message) { + return null; + } + + Long nanoOfDay = message.toNanoOfDay(); + return LongSchema.of().encode(nanoOfDay); + } + + @Override + public LocalTime decode(byte[] bytes) { + if (null == bytes) { + return null; + } + + Long decode = LongSchema.of().decode(bytes); + return LocalTime.ofNanoOfDay(decode); + } + + @Override + public LocalTime decode(ByteBuf byteBuf) { + if (null == byteBuf) { + return null; + } + + Long decode = LongSchema.of().decode(byteBuf); + return LocalTime.ofNanoOfDay(decode); + } + + @Override + public SchemaInfo getSchemaInfo() { + return SCHEMA_INFO; + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/InstantSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/InstantSchemaTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ca7856a6bae541095beaab3db721adc17b2e24ab --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/InstantSchemaTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.nio.ByteBuffer; +import java.time.Instant; + +public class InstantSchemaTest { + + @Test + public void testSchemaEncode() { + InstantSchema schema = InstantSchema.of(); + Instant instant = Instant.now(); + ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES); + byteBuffer.putLong(instant.getEpochSecond()); + byteBuffer.putInt(instant.getNano()); + byte[] expected = byteBuffer.array(); + Assert.assertEquals(expected, schema.encode(instant)); + } + + @Test + public void testSchemaEncodeDecodeFidelity() { + InstantSchema schema = InstantSchema.of(); + Instant instant = Instant.now(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES + Integer.BYTES); + byte[] bytes = schema.encode(instant); + byteBuf.writeBytes(bytes); + Assert.assertEquals(instant, schema.decode(bytes)); + Assert.assertEquals(instant, schema.decode(byteBuf)); + } + + @Test + public void testSchemaDecode() { + Instant instant = Instant.now(); + ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES); + byteBuffer.putLong(instant.getEpochSecond()); + byteBuffer.putInt(instant.getNano()); + byte[] byteData = byteBuffer.array(); + long epochSecond = instant.getEpochSecond(); + long nano = instant.getNano(); + + InstantSchema schema = InstantSchema.of(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES + Integer.BYTES); + byteBuf.writeBytes(byteData); + Instant decode = schema.decode(byteData); + Assert.assertEquals(epochSecond, decode.getEpochSecond()); + Assert.assertEquals(nano, decode.getNano()); + decode = schema.decode(byteBuf); + Assert.assertEquals(epochSecond, decode.getEpochSecond()); + Assert.assertEquals(nano, decode.getNano()); + } + + @Test + public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; + + Assert.assertNull(InstantSchema.of().encode(null)); + Assert.assertNull(InstantSchema.of().decode(byteBuf)); + Assert.assertNull(InstantSchema.of().decode(bytes)); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateSchemaTest.java new file mode 100644 index 0000000000000000000000000000000000000000..31b48b2fc2e34ca4fe22a5ae886614b1512c0a5f --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateSchemaTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.time.LocalDate; + +public class LocalDateSchemaTest { + + @Test + public void testSchemaEncode() { + LocalDateSchema schema = LocalDateSchema.of(); + LocalDate localDate = LocalDate.now(); + byte[] expected = new byte[] { + (byte) (localDate.toEpochDay() >>> 56), + (byte) (localDate.toEpochDay() >>> 48), + (byte) (localDate.toEpochDay() >>> 40), + (byte) (localDate.toEpochDay() >>> 32), + (byte) (localDate.toEpochDay() >>> 24), + (byte) (localDate.toEpochDay() >>> 16), + (byte) (localDate.toEpochDay() >>> 8), + ((Long)localDate.toEpochDay()).byteValue() + }; + Assert.assertEquals(expected, schema.encode(localDate)); + } + + @Test + public void testSchemaEncodeDecodeFidelity() { + LocalDateSchema schema = LocalDateSchema.of(); + LocalDate localDate = LocalDate.now(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8); + byte[] bytes = schema.encode(localDate); + byteBuf.writeBytes(bytes); + Assert.assertEquals(localDate, schema.decode(bytes)); + Assert.assertEquals(localDate, schema.decode(byteBuf)); + } + + @Test + public void testSchemaDecode() { + byte[] byteData = new byte[] { + 0, + 0, + 0, + 0, + 0, + 10, + 24, + 42 + }; + long expected = 10*65536 + 24*256 + 42; + + LocalDateSchema schema = LocalDateSchema.of(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8); + byteBuf.writeBytes(byteData); + Assert.assertEquals(expected, schema.decode(byteData).toEpochDay()); + Assert.assertEquals(expected, schema.decode(byteBuf).toEpochDay()); + } + + @Test + public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; + + Assert.assertNull(LocalDateSchema.of().encode(null)); + Assert.assertNull(LocalDateSchema.of().decode(byteBuf)); + Assert.assertNull(LocalDateSchema.of().decode(bytes)); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchemaTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1214560b05a40725f073398329e897412894b1f6 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchemaTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.nio.ByteBuffer; +import java.time.LocalDateTime; + +public class LocalDateTimeSchemaTest { + + @Test + public void testSchemaEncode() { + LocalDateTimeSchema schema = LocalDateTimeSchema.of(); + LocalDateTime localDateTime = LocalDateTime.now(); + ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES * 2); + byteBuffer.putLong(localDateTime.toLocalDate().toEpochDay()); + byteBuffer.putLong(localDateTime.toLocalTime().toNanoOfDay()); + byte[] expected = byteBuffer.array(); + Assert.assertEquals(expected, schema.encode(localDateTime)); + } + + @Test + public void testSchemaEncodeDecodeFidelity() { + LocalDateTimeSchema schema = LocalDateTimeSchema.of(); + LocalDateTime localDateTime = LocalDateTime.now(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES * 2); + byte[] bytes = schema.encode(localDateTime); + byteBuf.writeBytes(bytes); + Assert.assertEquals(localDateTime, schema.decode(bytes)); + Assert.assertEquals(localDateTime, schema.decode(byteBuf)); + } + + @Test + public void testSchemaDecode() { + LocalDateTime localDateTime = LocalDateTime.of(2020, 8, 22, 2, 0, 0); + ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES * 2); + byteBuffer.putLong(localDateTime.toLocalDate().toEpochDay()); + byteBuffer.putLong(localDateTime.toLocalTime().toNanoOfDay()); + byte[] byteData = byteBuffer.array(); + long expectedEpochDay = localDateTime.toLocalDate().toEpochDay(); + long expectedNanoOfDay = localDateTime.toLocalTime().toNanoOfDay(); + + LocalDateTimeSchema schema = LocalDateTimeSchema.of(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES * 2); + byteBuf.writeBytes(byteData); + LocalDateTime decode = schema.decode(byteData); + Assert.assertEquals(expectedEpochDay, decode.toLocalDate().toEpochDay()); + Assert.assertEquals(expectedNanoOfDay, decode.toLocalTime().toNanoOfDay()); + decode = schema.decode(byteBuf); + Assert.assertEquals(expectedEpochDay, decode.toLocalDate().toEpochDay()); + Assert.assertEquals(expectedNanoOfDay, decode.toLocalTime().toNanoOfDay()); + } + + @Test + public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; + + Assert.assertNull(LocalDateSchema.of().encode(null)); + Assert.assertNull(LocalDateSchema.of().decode(byteBuf)); + Assert.assertNull(LocalDateSchema.of().decode(bytes)); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalTimeSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalTimeSchemaTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ed729dec7ab1e91c052c7636402162d60d3aec48 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/LocalTimeSchemaTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.time.LocalTime; + +public class LocalTimeSchemaTest { + + @Test + public void testSchemaEncode() { + LocalTimeSchema schema = LocalTimeSchema.of(); + LocalTime localTime = LocalTime.now(); + byte[] expected = new byte[] { + (byte) (localTime.toNanoOfDay() >>> 56), + (byte) (localTime.toNanoOfDay() >>> 48), + (byte) (localTime.toNanoOfDay() >>> 40), + (byte) (localTime.toNanoOfDay() >>> 32), + (byte) (localTime.toNanoOfDay() >>> 24), + (byte) (localTime.toNanoOfDay() >>> 16), + (byte) (localTime.toNanoOfDay() >>> 8), + ((Long)localTime.toNanoOfDay()).byteValue() + }; + Assert.assertEquals(expected, schema.encode(localTime)); + } + + @Test + public void testSchemaEncodeDecodeFidelity() { + LocalTimeSchema schema = LocalTimeSchema.of(); + LocalTime localTime = LocalTime.now(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8); + byte[] bytes = schema.encode(localTime); + byteBuf.writeBytes(bytes); + Assert.assertEquals(localTime, schema.decode(bytes)); + Assert.assertEquals(localTime, schema.decode(byteBuf)); + } + + @Test + public void testSchemaDecode() { + byte[] byteData = new byte[] { + 0, + 0, + 0, + 0, + 0, + 10, + 24, + 42 + }; + long expected = 10*65536 + 24*256 + 42; + + LocalTimeSchema schema = LocalTimeSchema.of(); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8); + byteBuf.writeBytes(byteData); + Assert.assertEquals(expected, schema.decode(byteData).toNanoOfDay()); + Assert.assertEquals(expected, schema.decode(byteBuf).toNanoOfDay()); + } + + @Test + public void testNullEncodeDecode() { + ByteBuf byteBuf = null; + byte[] bytes = null; + + Assert.assertNull(LocalTimeSchema.of().encode(null)); + Assert.assertNull(LocalTimeSchema.of().decode(byteBuf)); + Assert.assertNull(LocalTimeSchema.of().decode(bytes)); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java index dceebbd9db6eec265bb8fb970899bae11a6ceab3..a2efb251b5cca1da45afea2bac16c40d415ed827 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java @@ -28,6 +28,10 @@ import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -60,6 +64,10 @@ public class PrimitiveSchemaTest { put(DateSchema.of(), Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime()))); put(TimeSchema.of(), Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime()))); put(TimestampSchema.of(), Arrays.asList(new Timestamp(new java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime()))); + put(InstantSchema.of(), Arrays.asList(Instant.now(), Instant.now().minusSeconds(60*23L))); + put(LocalDateSchema.of(), Arrays.asList(LocalDate.now(), LocalDate.now().minusDays(2))); + put(LocalTimeSchema.of(), Arrays.asList(LocalTime.now(), LocalTime.now().minusHours(2))); + put(LocalDateTimeSchema.of(), Arrays.asList(LocalDateTime.now(), LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10))); } }; @@ -78,6 +86,10 @@ public class PrimitiveSchemaTest { put(Schema.DATE, Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime()))); put(Schema.TIME, Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime()))); put(Schema.TIMESTAMP, Arrays.asList(new Timestamp(new java.util.Date().getTime() - 10000), new Timestamp(new java.util.Date().getTime()))); + put(Schema.INSTANT, Arrays.asList(Instant.now(), Instant.now().minusSeconds(60*23L))); + put(Schema.LOCAL_DATE, Arrays.asList(LocalDate.now(), LocalDate.now().minusDays(2))); + put(Schema.LOCAL_TIME, Arrays.asList(LocalTime.now(), LocalTime.now().minusHours(2))); + put(Schema.LOCAL_DATE_TIME, Arrays.asList(LocalDateTime.now(), LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10))); } }; @@ -139,6 +151,10 @@ public class PrimitiveSchemaTest { assertEquals(SchemaType.DATE, DateSchema.of().getSchemaInfo().getType()); assertEquals(SchemaType.TIME, TimeSchema.of().getSchemaInfo().getType()); assertEquals(SchemaType.TIMESTAMP, TimestampSchema.of().getSchemaInfo().getType()); + assertEquals(SchemaType.INSTANT, InstantSchema.of().getSchemaInfo().getType()); + assertEquals(SchemaType.LOCAL_DATE, LocalDateSchema.of().getSchemaInfo().getType()); + assertEquals(SchemaType.LOCAL_TIME, LocalTimeSchema.of().getSchemaInfo().getType()); + assertEquals(SchemaType.LOCAL_DATE_TIME, LocalDateTimeSchema.of().getSchemaInfo().getType()); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java index 7ee270b9aa49b3dec0ca7901f6b9faa0f7bee453..1011e0ec50d857f702caf1b36d0131d39e5e4ae5 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java @@ -25,6 +25,10 @@ import io.netty.buffer.Unpooled; import java.nio.ByteBuffer; import java.sql.Time; import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.Date; import java.util.HashMap; @@ -39,7 +43,11 @@ import org.apache.pulsar.client.impl.schema.BytesSchema; import org.apache.pulsar.client.impl.schema.DateSchema; import org.apache.pulsar.client.impl.schema.DoubleSchema; import org.apache.pulsar.client.impl.schema.FloatSchema; +import org.apache.pulsar.client.impl.schema.InstantSchema; import org.apache.pulsar.client.impl.schema.IntSchema; +import org.apache.pulsar.client.impl.schema.LocalDateSchema; +import org.apache.pulsar.client.impl.schema.LocalDateTimeSchema; +import org.apache.pulsar.client.impl.schema.LocalTimeSchema; import org.apache.pulsar.client.impl.schema.LongSchema; import org.apache.pulsar.client.impl.schema.ShortSchema; import org.apache.pulsar.client.impl.schema.StringSchema; @@ -69,6 +77,10 @@ public class KeyValueTest { put(DateSchema.of(), Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime()))); put(TimeSchema.of(), Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime()))); put(TimestampSchema.of(), Arrays.asList(new Timestamp(new java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime()))); + put(InstantSchema.of(), Arrays.asList(Instant.now(), Instant.now().minusSeconds(60*23L))); + put(LocalDateSchema.of(), Arrays.asList(LocalDate.now(), LocalDate.now().minusDays(2))); + put(LocalTimeSchema.of(), Arrays.asList(LocalTime.now(), LocalTime.now().minusHours(2))); + put(LocalDateTimeSchema.of(), Arrays.asList(LocalDateTime.now(), LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10))); } }; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 4c3b88529c08fe57a5abcf13588809d8f45a815d..65c4a9352e279d9edada154f69b6b85101bc46e2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -444,6 +444,10 @@ public final class PulsarApi { Time(13, 13), Timestamp(14, 14), KeyValue(15, 15), + Instant(16, 16), + LocalDate(17, 17), + LocalTime(18, 18), + LocalDateTime(19, 19), ; public static final int None_VALUE = 0; @@ -462,6 +466,10 @@ public final class PulsarApi { public static final int Time_VALUE = 13; public static final int Timestamp_VALUE = 14; public static final int KeyValue_VALUE = 15; + public static final int Instant_VALUE = 16; + public static final int LocalDate_VALUE = 17; + public static final int LocalTime_VALUE = 18; + public static final int LocalDateTime_VALUE = 19; public final int getNumber() { return value; } @@ -484,6 +492,10 @@ public final class PulsarApi { case 13: return Time; case 14: return Timestamp; case 15: return KeyValue; + case 16: return Instant; + case 17: return LocalDate; + case 18: return LocalTime; + case 19: return LocalDateTime; default: return null; } } @@ -1578,6 +1590,15 @@ public final class PulsarApi { ackSet_.add(input.readInt64()); break; } + case 42: { + int length = input.readRawVarint32(); + int limit = input.pushLimit(length); + while (input.getBytesUntilLimit() > 0) { + addAckSet(input.readInt64()); + } + input.popLimit(limit); + break; + } } } } @@ -18860,6 +18881,15 @@ public final class PulsarApi { ackSet_.add(input.readInt64()); break; } + case 34: { + int length = input.readRawVarint32(); + int limit = input.pushLimit(length); + while (input.getBytesUntilLimit() > 0) { + addAckSet(input.readInt64()); + } + input.popLimit(limit); + break; + } } } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 63ff6eec57b3c3675bb36f7491ba053bbbc178b4..9c4b9132eb9ff6d94c54877076b74f8d3b5e2f97 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -40,6 +40,10 @@ message Schema { Time = 13; Timestamp = 14; KeyValue = 15; + Instant = 16; + LocalDate = 17; + LocalTime = 18; + LocalDateTime = 19; } required string name = 1; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java index 4b0083a76094cbec995237e7282e2c19a145b95f..6194f9527dd86da67b44e80272f7b60126e040eb 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java @@ -273,6 +273,10 @@ public class SchemaTest extends PulsarTestSuite { schemas.add(Schema.DATE); schemas.add(Schema.TIME); schemas.add(Schema.TIMESTAMP); + schemas.add(Schema.INSTANT); + schemas.add(Schema.LOCAL_DATE); + schemas.add(Schema.LOCAL_TIME); + schemas.add(Schema.LOCAL_DATE_TIME); schemas.forEach(schemaProducer -> { schemas.forEach(schemaConsumer -> {