未验证 提交 e36458c3 编写于 作者: J jianyun 提交者: GitHub

add java8 date and time type to pulsar's primitive schemas (#7874)

### Motivation

*Compatible with flink 1.11 need to use java8 date api in pulsar's primitive schemas.*

### Modifications

*Add Instant, LocalDate, LocalTime, LocalDateTime to pulsar's primitive schemas*

### Verifying this change

Add Instant, LocalDate, LocalTime, LocalDateTime types to the Schema type test
上级 2f44c926
......@@ -72,7 +72,12 @@ jobs:
- name: run install by skip tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn -q -B -ntp clean install -DskipTests
- name: build pulsar image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
- name: build pulsar-all image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
- name: build artifacts and docker image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
......
......@@ -54,6 +54,10 @@ public interface SchemaDataValidator {
case DATE:
case TIME:
case TIMESTAMP:
case INSTANT:
case LOCAL_DATE:
case LOCAL_TIME:
case LOCAL_DATE_TIME:
PrimitiveSchemaDataValidator.of().validate(schemaData);
break;
case NONE:
......
......@@ -40,6 +40,10 @@ message SchemaInfo {
TIME = 14;
TIMESTAMP = 15;
KEYVALUE = 16;
INSTANT = 17;
LOCALDATE = 18;
LOCALTIME = 19;
LOCALDATETIME = 20;
}
message KeyValuePair {
required string key = 1;
......
......@@ -98,6 +98,10 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
{ Schema.DATE },
{ Schema.TIME },
{ Schema.TIMESTAMP },
{ Schema.INSTANT },
{ Schema.LOCAL_DATE},
{ Schema.LOCAL_TIME},
{ Schema.LOCAL_DATE_TIME},
{ Schema.AVRO(
SchemaDefinition.builder()
.withPojo(Foo.class)
......
......@@ -50,6 +50,10 @@ public class SchemaDataValidatorTest {
{ SchemaType.DATE },
{ SchemaType.TIME },
{ SchemaType.TIMESTAMP },
{ SchemaType.INSTANT },
{ SchemaType.LOCAL_DATE },
{ SchemaType.LOCAL_TIME },
{ SchemaType.LOCAL_DATE_TIME },
};
}
......
......@@ -21,6 +21,10 @@ package org.apache.pulsar.client.api;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
......@@ -208,6 +212,23 @@ public interface Schema<T> extends Cloneable{
*/
Schema<Timestamp> TIMESTAMP = DefaultImplementation.newTimestampSchema();
/**
* Instant Schema.
*/
Schema<Instant> INSTANT = DefaultImplementation.newInstantSchema();
/**
* LocalDate Schema.
*/
Schema<LocalDate> LOCAL_DATE = DefaultImplementation.newLocalDateSchema();
/**
* LocalTime Schema.
*/
Schema<LocalTime> LOCAL_TIME = DefaultImplementation.newLocalTimeSchema();
/**
* LocalDateTime Schema.
*/
Schema<LocalDateTime> LOCAL_DATE_TIME = DefaultImplementation.newLocalDateTimeSchema();
// CHECKSTYLE.OFF: MethodName
/**
......
......@@ -29,6 +29,10 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
import java.util.Map;
import java.util.function.Supplier;
......@@ -219,6 +223,30 @@ public class DefaultImplementation {
"org.apache.pulsar.client.impl.schema.TimestampSchema", "of", null)
.invoke(null, null));
}
public static Schema<Instant> newInstantSchema() {
return catchExceptions(
() -> (Schema<Instant>) getStaticMethod(
"org.apache.pulsar.client.impl.schema.InstantSchema", "of", null)
.invoke(null, null));
}
public static Schema<LocalDate> newLocalDateSchema() {
return catchExceptions(
() -> (Schema<LocalDate>) getStaticMethod(
"org.apache.pulsar.client.impl.schema.LocalDateSchema", "of", null)
.invoke(null, null));
}
public static Schema<LocalTime> newLocalTimeSchema() {
return catchExceptions(
() -> (Schema<LocalTime>) getStaticMethod(
"org.apache.pulsar.client.impl.schema.LocalTimeSchema", "of", null)
.invoke(null, null));
}
public static Schema<LocalDateTime> newLocalDateTimeSchema() {
return catchExceptions(
() -> (Schema<LocalDateTime>) getStaticMethod(
"org.apache.pulsar.client.impl.schema.LocalDateTimeSchema", "of", null)
.invoke(null, null));
}
public static <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition) {
return catchExceptions(
......
......@@ -114,6 +114,26 @@ public enum SchemaType {
*/
KEY_VALUE(15),
/**
* Instant.
*/
INSTANT(16),
/**
* LocalDate.
*/
LOCAL_DATE(17),
/**
* LocalTime.
*/
LOCAL_TIME(18),
/**
* LocalDateTime.
*/
LOCAL_DATE_TIME(19),
//
// Schemas that don't have schema info. the value should be negative.
//
......@@ -167,6 +187,10 @@ public enum SchemaType {
case 13: return TIME;
case 14: return TIMESTAMP;
case 15: return KEY_VALUE;
case 16: return INSTANT;
case 17: return LOCAL_DATE;
case 18: return LOCAL_TIME;
case 19: return LOCAL_DATE_TIME;
case -1: return BYTES;
case -2: return AUTO;
case -3: return AUTO_CONSUME;
......@@ -198,6 +222,10 @@ public enum SchemaType {
case TIME:
case TIMESTAMP:
case BYTES:
case INSTANT:
case LOCAL_DATE:
case LOCAL_TIME:
case LOCAL_DATE_TIME:
case NONE:
return true;
default:
......
......@@ -182,6 +182,14 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
return TimeSchema.of();
case TIMESTAMP:
return TimestampSchema.of();
case INSTANT:
return InstantSchema.of();
case LOCAL_DATE:
return LocalDateSchema.of();
case LOCAL_TIME:
return LocalTimeSchema.of();
case LOCAL_DATE_TIME:
return LocalDateTimeSchema.of();
case JSON:
case AVRO:
return GenericSchemaImpl.of(schemaInfo);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import io.netty.buffer.ByteBuf;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.ByteBuffer;
import java.time.Instant;
/**
* A schema for `java.time.Instant`.
*/
public class InstantSchema extends AbstractSchema<Instant> {
private static final InstantSchema INSTANCE;
private static final SchemaInfo SCHEMA_INFO;
static {
SCHEMA_INFO = new SchemaInfo()
.setName("Instant")
.setType(SchemaType.INSTANT)
.setSchema(new byte[0]);
INSTANCE = new InstantSchema();
}
public static InstantSchema of() {
return INSTANCE;
}
@Override
public byte[] encode(Instant message) {
if (null == message) {
return null;
}
// Instant is accurate to nanoseconds and requires two value storage.
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
buffer.putLong(message.getEpochSecond());
buffer.putInt(message.getNano());
return buffer.array();
}
@Override
public Instant decode(byte[] bytes) {
if (null == bytes) {
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(bytes);
long epochSecond = buffer.getLong();
int nanos = buffer.getInt();
return Instant.ofEpochSecond(epochSecond, nanos);
}
@Override
public Instant decode(ByteBuf byteBuf) {
if (null == byteBuf) {
return null;
}
long epochSecond = byteBuf.readLong();
int nanos = byteBuf.readInt();
return Instant.ofEpochSecond(epochSecond, nanos);
}
@Override
public SchemaInfo getSchemaInfo() {
return SCHEMA_INFO;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import io.netty.buffer.ByteBuf;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.time.LocalDate;
/**
* A schema for `java.time.LocalDate`.
*/
public class LocalDateSchema extends AbstractSchema<LocalDate> {
private static final LocalDateSchema INSTANCE;
private static final SchemaInfo SCHEMA_INFO;
static {
SCHEMA_INFO = new SchemaInfo()
.setName("LocalDate")
.setType(SchemaType.LOCAL_DATE)
.setSchema(new byte[0]);
INSTANCE = new LocalDateSchema();
}
public static LocalDateSchema of() {
return INSTANCE;
}
@Override
public byte[] encode(LocalDate message) {
if (null == message) {
return null;
}
Long epochDay = message.toEpochDay();
return LongSchema.of().encode(epochDay);
}
@Override
public LocalDate decode(byte[] bytes) {
if (null == bytes) {
return null;
}
Long decode = LongSchema.of().decode(bytes);
return LocalDate.ofEpochDay(decode);
}
@Override
public LocalDate decode(ByteBuf byteBuf) {
if (null == byteBuf) {
return null;
}
Long decode = LongSchema.of().decode(byteBuf);
return LocalDate.ofEpochDay(decode);
}
@Override
public SchemaInfo getSchemaInfo() {
return SCHEMA_INFO;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import io.netty.buffer.ByteBuf;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
/**
* A schema for `java.time.LocalDateTime`.
*/
public class LocalDateTimeSchema extends AbstractSchema<LocalDateTime> {
private static final LocalDateTimeSchema INSTANCE;
private static final SchemaInfo SCHEMA_INFO;
public static final String DELIMITER = ":";
static {
SCHEMA_INFO = new SchemaInfo()
.setName("LocalDateTime")
.setType(SchemaType.LOCAL_DATE_TIME)
.setSchema(new byte[0]);
INSTANCE = new LocalDateTimeSchema();
}
public static LocalDateTimeSchema of() {
return INSTANCE;
}
@Override
public byte[] encode(LocalDateTime message) {
if (null == message) {
return null;
}
//LocalDateTime is accurate to nanoseconds and requires two value storage.
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * 2);
buffer.putLong(message.toLocalDate().toEpochDay());
buffer.putLong(message.toLocalTime().toNanoOfDay());
return buffer.array();
}
@Override
public LocalDateTime decode(byte[] bytes) {
if (null == bytes) {
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(bytes);
long epochDay = buffer.getLong();
long nanoOfDay = buffer.getLong();
return LocalDateTime.of(LocalDate.ofEpochDay(epochDay), LocalTime.ofNanoOfDay(nanoOfDay));
}
@Override
public LocalDateTime decode(ByteBuf byteBuf) {
if (null == byteBuf) {
return null;
}
long epochDay = byteBuf.readLong();
long nanoOfDay = byteBuf.readLong();
return LocalDateTime.of(LocalDate.ofEpochDay(epochDay), LocalTime.ofNanoOfDay(nanoOfDay));
}
@Override
public SchemaInfo getSchemaInfo() {
return SCHEMA_INFO;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import io.netty.buffer.ByteBuf;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.time.LocalTime;
/**
* A schema for `java.time.LocalTime`.
*/
public class LocalTimeSchema extends AbstractSchema<LocalTime> {
private static final LocalTimeSchema INSTANCE;
private static final SchemaInfo SCHEMA_INFO;
static {
SCHEMA_INFO = new SchemaInfo()
.setName("LocalTime")
.setType(SchemaType.LOCAL_TIME)
.setSchema(new byte[0]);
INSTANCE = new LocalTimeSchema();
}
public static LocalTimeSchema of() {
return INSTANCE;
}
@Override
public byte[] encode(LocalTime message) {
if (null == message) {
return null;
}
Long nanoOfDay = message.toNanoOfDay();
return LongSchema.of().encode(nanoOfDay);
}
@Override
public LocalTime decode(byte[] bytes) {
if (null == bytes) {
return null;
}
Long decode = LongSchema.of().decode(bytes);
return LocalTime.ofNanoOfDay(decode);
}
@Override
public LocalTime decode(ByteBuf byteBuf) {
if (null == byteBuf) {
return null;
}
Long decode = LongSchema.of().decode(byteBuf);
return LocalTime.ofNanoOfDay(decode);
}
@Override
public SchemaInfo getSchemaInfo() {
return SCHEMA_INFO;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.nio.ByteBuffer;
import java.time.Instant;
public class InstantSchemaTest {
@Test
public void testSchemaEncode() {
InstantSchema schema = InstantSchema.of();
Instant instant = Instant.now();
ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
byteBuffer.putLong(instant.getEpochSecond());
byteBuffer.putInt(instant.getNano());
byte[] expected = byteBuffer.array();
Assert.assertEquals(expected, schema.encode(instant));
}
@Test
public void testSchemaEncodeDecodeFidelity() {
InstantSchema schema = InstantSchema.of();
Instant instant = Instant.now();
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES + Integer.BYTES);
byte[] bytes = schema.encode(instant);
byteBuf.writeBytes(bytes);
Assert.assertEquals(instant, schema.decode(bytes));
Assert.assertEquals(instant, schema.decode(byteBuf));
}
@Test
public void testSchemaDecode() {
Instant instant = Instant.now();
ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
byteBuffer.putLong(instant.getEpochSecond());
byteBuffer.putInt(instant.getNano());
byte[] byteData = byteBuffer.array();
long epochSecond = instant.getEpochSecond();
long nano = instant.getNano();
InstantSchema schema = InstantSchema.of();
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES + Integer.BYTES);
byteBuf.writeBytes(byteData);
Instant decode = schema.decode(byteData);
Assert.assertEquals(epochSecond, decode.getEpochSecond());
Assert.assertEquals(nano, decode.getNano());
decode = schema.decode(byteBuf);
Assert.assertEquals(epochSecond, decode.getEpochSecond());
Assert.assertEquals(nano, decode.getNano());
}
@Test
public void testNullEncodeDecode() {
ByteBuf byteBuf = null;
byte[] bytes = null;
Assert.assertNull(InstantSchema.of().encode(null));
Assert.assertNull(InstantSchema.of().decode(byteBuf));
Assert.assertNull(InstantSchema.of().decode(bytes));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.time.LocalDate;
public class LocalDateSchemaTest {
@Test
public void testSchemaEncode() {
LocalDateSchema schema = LocalDateSchema.of();
LocalDate localDate = LocalDate.now();
byte[] expected = new byte[] {
(byte) (localDate.toEpochDay() >>> 56),
(byte) (localDate.toEpochDay() >>> 48),
(byte) (localDate.toEpochDay() >>> 40),
(byte) (localDate.toEpochDay() >>> 32),
(byte) (localDate.toEpochDay() >>> 24),
(byte) (localDate.toEpochDay() >>> 16),
(byte) (localDate.toEpochDay() >>> 8),
((Long)localDate.toEpochDay()).byteValue()
};
Assert.assertEquals(expected, schema.encode(localDate));
}
@Test
public void testSchemaEncodeDecodeFidelity() {
LocalDateSchema schema = LocalDateSchema.of();
LocalDate localDate = LocalDate.now();
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
byte[] bytes = schema.encode(localDate);
byteBuf.writeBytes(bytes);
Assert.assertEquals(localDate, schema.decode(bytes));
Assert.assertEquals(localDate, schema.decode(byteBuf));
}
@Test
public void testSchemaDecode() {
byte[] byteData = new byte[] {
0,
0,
0,
0,
0,
10,
24,
42
};
long expected = 10*65536 + 24*256 + 42;
LocalDateSchema schema = LocalDateSchema.of();
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
byteBuf.writeBytes(byteData);
Assert.assertEquals(expected, schema.decode(byteData).toEpochDay());
Assert.assertEquals(expected, schema.decode(byteBuf).toEpochDay());
}
@Test
public void testNullEncodeDecode() {
ByteBuf byteBuf = null;
byte[] bytes = null;
Assert.assertNull(LocalDateSchema.of().encode(null));
Assert.assertNull(LocalDateSchema.of().decode(byteBuf));
Assert.assertNull(LocalDateSchema.of().decode(bytes));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
public class LocalDateTimeSchemaTest {
@Test
public void testSchemaEncode() {
LocalDateTimeSchema schema = LocalDateTimeSchema.of();
LocalDateTime localDateTime = LocalDateTime.now();
ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES * 2);
byteBuffer.putLong(localDateTime.toLocalDate().toEpochDay());
byteBuffer.putLong(localDateTime.toLocalTime().toNanoOfDay());
byte[] expected = byteBuffer.array();
Assert.assertEquals(expected, schema.encode(localDateTime));
}
@Test
public void testSchemaEncodeDecodeFidelity() {
LocalDateTimeSchema schema = LocalDateTimeSchema.of();
LocalDateTime localDateTime = LocalDateTime.now();
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES * 2);
byte[] bytes = schema.encode(localDateTime);
byteBuf.writeBytes(bytes);
Assert.assertEquals(localDateTime, schema.decode(bytes));
Assert.assertEquals(localDateTime, schema.decode(byteBuf));
}
@Test
public void testSchemaDecode() {
LocalDateTime localDateTime = LocalDateTime.of(2020, 8, 22, 2, 0, 0);
ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES * 2);
byteBuffer.putLong(localDateTime.toLocalDate().toEpochDay());
byteBuffer.putLong(localDateTime.toLocalTime().toNanoOfDay());
byte[] byteData = byteBuffer.array();
long expectedEpochDay = localDateTime.toLocalDate().toEpochDay();
long expectedNanoOfDay = localDateTime.toLocalTime().toNanoOfDay();
LocalDateTimeSchema schema = LocalDateTimeSchema.of();
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES * 2);
byteBuf.writeBytes(byteData);
LocalDateTime decode = schema.decode(byteData);
Assert.assertEquals(expectedEpochDay, decode.toLocalDate().toEpochDay());
Assert.assertEquals(expectedNanoOfDay, decode.toLocalTime().toNanoOfDay());
decode = schema.decode(byteBuf);
Assert.assertEquals(expectedEpochDay, decode.toLocalDate().toEpochDay());
Assert.assertEquals(expectedNanoOfDay, decode.toLocalTime().toNanoOfDay());
}
@Test
public void testNullEncodeDecode() {
ByteBuf byteBuf = null;
byte[] bytes = null;
Assert.assertNull(LocalDateSchema.of().encode(null));
Assert.assertNull(LocalDateSchema.of().decode(byteBuf));
Assert.assertNull(LocalDateSchema.of().decode(bytes));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.time.LocalTime;
public class LocalTimeSchemaTest {
@Test
public void testSchemaEncode() {
LocalTimeSchema schema = LocalTimeSchema.of();
LocalTime localTime = LocalTime.now();
byte[] expected = new byte[] {
(byte) (localTime.toNanoOfDay() >>> 56),
(byte) (localTime.toNanoOfDay() >>> 48),
(byte) (localTime.toNanoOfDay() >>> 40),
(byte) (localTime.toNanoOfDay() >>> 32),
(byte) (localTime.toNanoOfDay() >>> 24),
(byte) (localTime.toNanoOfDay() >>> 16),
(byte) (localTime.toNanoOfDay() >>> 8),
((Long)localTime.toNanoOfDay()).byteValue()
};
Assert.assertEquals(expected, schema.encode(localTime));
}
@Test
public void testSchemaEncodeDecodeFidelity() {
LocalTimeSchema schema = LocalTimeSchema.of();
LocalTime localTime = LocalTime.now();
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
byte[] bytes = schema.encode(localTime);
byteBuf.writeBytes(bytes);
Assert.assertEquals(localTime, schema.decode(bytes));
Assert.assertEquals(localTime, schema.decode(byteBuf));
}
@Test
public void testSchemaDecode() {
byte[] byteData = new byte[] {
0,
0,
0,
0,
0,
10,
24,
42
};
long expected = 10*65536 + 24*256 + 42;
LocalTimeSchema schema = LocalTimeSchema.of();
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(8);
byteBuf.writeBytes(byteData);
Assert.assertEquals(expected, schema.decode(byteData).toNanoOfDay());
Assert.assertEquals(expected, schema.decode(byteBuf).toNanoOfDay());
}
@Test
public void testNullEncodeDecode() {
ByteBuf byteBuf = null;
byte[] bytes = null;
Assert.assertNull(LocalTimeSchema.of().encode(null));
Assert.assertNull(LocalTimeSchema.of().decode(byteBuf));
Assert.assertNull(LocalTimeSchema.of().decode(bytes));
}
}
......@@ -28,6 +28,10 @@ import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
......@@ -60,6 +64,10 @@ public class PrimitiveSchemaTest {
put(DateSchema.of(), Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
put(TimeSchema.of(), Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
put(TimestampSchema.of(), Arrays.asList(new Timestamp(new java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime())));
put(InstantSchema.of(), Arrays.asList(Instant.now(), Instant.now().minusSeconds(60*23L)));
put(LocalDateSchema.of(), Arrays.asList(LocalDate.now(), LocalDate.now().minusDays(2)));
put(LocalTimeSchema.of(), Arrays.asList(LocalTime.now(), LocalTime.now().minusHours(2)));
put(LocalDateTimeSchema.of(), Arrays.asList(LocalDateTime.now(), LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10)));
}
};
......@@ -78,6 +86,10 @@ public class PrimitiveSchemaTest {
put(Schema.DATE, Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
put(Schema.TIME, Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
put(Schema.TIMESTAMP, Arrays.asList(new Timestamp(new java.util.Date().getTime() - 10000), new Timestamp(new java.util.Date().getTime())));
put(Schema.INSTANT, Arrays.asList(Instant.now(), Instant.now().minusSeconds(60*23L)));
put(Schema.LOCAL_DATE, Arrays.asList(LocalDate.now(), LocalDate.now().minusDays(2)));
put(Schema.LOCAL_TIME, Arrays.asList(LocalTime.now(), LocalTime.now().minusHours(2)));
put(Schema.LOCAL_DATE_TIME, Arrays.asList(LocalDateTime.now(), LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10)));
}
};
......@@ -139,6 +151,10 @@ public class PrimitiveSchemaTest {
assertEquals(SchemaType.DATE, DateSchema.of().getSchemaInfo().getType());
assertEquals(SchemaType.TIME, TimeSchema.of().getSchemaInfo().getType());
assertEquals(SchemaType.TIMESTAMP, TimestampSchema.of().getSchemaInfo().getType());
assertEquals(SchemaType.INSTANT, InstantSchema.of().getSchemaInfo().getType());
assertEquals(SchemaType.LOCAL_DATE, LocalDateSchema.of().getSchemaInfo().getType());
assertEquals(SchemaType.LOCAL_TIME, LocalTimeSchema.of().getSchemaInfo().getType());
assertEquals(SchemaType.LOCAL_DATE_TIME, LocalDateTimeSchema.of().getSchemaInfo().getType());
}
......
......@@ -25,6 +25,10 @@ import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
......@@ -39,7 +43,11 @@ import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.apache.pulsar.client.impl.schema.DateSchema;
import org.apache.pulsar.client.impl.schema.DoubleSchema;
import org.apache.pulsar.client.impl.schema.FloatSchema;
import org.apache.pulsar.client.impl.schema.InstantSchema;
import org.apache.pulsar.client.impl.schema.IntSchema;
import org.apache.pulsar.client.impl.schema.LocalDateSchema;
import org.apache.pulsar.client.impl.schema.LocalDateTimeSchema;
import org.apache.pulsar.client.impl.schema.LocalTimeSchema;
import org.apache.pulsar.client.impl.schema.LongSchema;
import org.apache.pulsar.client.impl.schema.ShortSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
......@@ -69,6 +77,10 @@ public class KeyValueTest {
put(DateSchema.of(), Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
put(TimeSchema.of(), Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
put(TimestampSchema.of(), Arrays.asList(new Timestamp(new java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime())));
put(InstantSchema.of(), Arrays.asList(Instant.now(), Instant.now().minusSeconds(60*23L)));
put(LocalDateSchema.of(), Arrays.asList(LocalDate.now(), LocalDate.now().minusDays(2)));
put(LocalTimeSchema.of(), Arrays.asList(LocalTime.now(), LocalTime.now().minusHours(2)));
put(LocalDateTimeSchema.of(), Arrays.asList(LocalDateTime.now(), LocalDateTime.now().minusDays(2), LocalDateTime.now().minusWeeks(10)));
}
};
......
......@@ -444,6 +444,10 @@ public final class PulsarApi {
Time(13, 13),
Timestamp(14, 14),
KeyValue(15, 15),
Instant(16, 16),
LocalDate(17, 17),
LocalTime(18, 18),
LocalDateTime(19, 19),
;
public static final int None_VALUE = 0;
......@@ -462,6 +466,10 @@ public final class PulsarApi {
public static final int Time_VALUE = 13;
public static final int Timestamp_VALUE = 14;
public static final int KeyValue_VALUE = 15;
public static final int Instant_VALUE = 16;
public static final int LocalDate_VALUE = 17;
public static final int LocalTime_VALUE = 18;
public static final int LocalDateTime_VALUE = 19;
public final int getNumber() { return value; }
......@@ -484,6 +492,10 @@ public final class PulsarApi {
case 13: return Time;
case 14: return Timestamp;
case 15: return KeyValue;
case 16: return Instant;
case 17: return LocalDate;
case 18: return LocalTime;
case 19: return LocalDateTime;
default: return null;
}
}
......@@ -1578,6 +1590,15 @@ public final class PulsarApi {
ackSet_.add(input.readInt64());
break;
}
case 42: {
int length = input.readRawVarint32();
int limit = input.pushLimit(length);
while (input.getBytesUntilLimit() > 0) {
addAckSet(input.readInt64());
}
input.popLimit(limit);
break;
}
}
}
}
......@@ -18860,6 +18881,15 @@ public final class PulsarApi {
ackSet_.add(input.readInt64());
break;
}
case 34: {
int length = input.readRawVarint32();
int limit = input.pushLimit(length);
while (input.getBytesUntilLimit() > 0) {
addAckSet(input.readInt64());
}
input.popLimit(limit);
break;
}
}
}
}
......@@ -40,6 +40,10 @@ message Schema {
Time = 13;
Timestamp = 14;
KeyValue = 15;
Instant = 16;
LocalDate = 17;
LocalTime = 18;
LocalDateTime = 19;
}
required string name = 1;
......
......@@ -273,6 +273,10 @@ public class SchemaTest extends PulsarTestSuite {
schemas.add(Schema.DATE);
schemas.add(Schema.TIME);
schemas.add(Schema.TIMESTAMP);
schemas.add(Schema.INSTANT);
schemas.add(Schema.LOCAL_DATE);
schemas.add(Schema.LOCAL_TIME);
schemas.add(Schema.LOCAL_DATE_TIME);
schemas.forEach(schemaProducer -> {
schemas.forEach(schemaConsumer -> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册