未验证 提交 90c365c1 编写于 作者: M Matteo Merli 提交者: GitHub

Removed shading relocations for Circe-checksum and lz4 libraries (#2191)

* Removed shading relocations for Circe-checksum and lz4 libraries

* Also remove the jni shading in pulsar-broker-shaded

* Removed remaining relocations directives

* Moved Kafka wrapper tests to integration tests

* Do not shade io.netty.buffer.ByteBuf since it's passed between components

* Fixed presto dependencies license file
上级 828e22df
......@@ -388,7 +388,7 @@ The Apache Software License, Version 2.0
- org.apache.distributedlog-distributedlog-core-4.7.2.jar
- org.apache.distributedlog-distributedlog-protocol-4.7.2.jar
- org.apache.bookkeeper.stats-codahale-metrics-provider-4.7.2.jar
* LZ4 -- net.jpountz.lz4-lz4-1.3.0.jar
* LZ4 -- org.lz4-lz4-java-1.5.0.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.1.0-alpha26.jar
- org.asynchttpclient-async-http-client-netty-utils-2.1.0-alpha26.jar
......
......@@ -78,7 +78,6 @@
<include>io.netty:netty</include>
<include>io.netty:netty-all</include>
<include>org.apache.pulsar:pulsar-common</include>
<include>net.jpountz.lz4:lz4</include>
<include>com.yahoo.datasketches:sketches-core</include>
<include>org.glassfish.jersey*:*</include>
<include>javax.ws.rs:*</include>
......@@ -113,12 +112,6 @@
</includes>
</artifactSet>
<filters>
<filter>
<artifact>net.jpountz.lz4:lz4</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.pulsar:pulsar-io-core</artifact>
<includes>
......@@ -152,6 +145,9 @@
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
<excludes>
<exclude>io.netty.buffer.ByteBuf</exclude>
</excludes>
</relocation>
<relocation>
<pattern>org.apache.pulsar.policies</pattern>
......@@ -161,14 +157,6 @@
<pattern>org.apache.pulsar.checksum</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.checksum</shadedPattern>
</relocation>
<relocation>
<pattern>com.scurrilous.circe</pattern>
<shadedPattern>org.apache.pulsar.shade.com.scurrilous.circe</shadedPattern>
</relocation>
<relocation>
<pattern>net.jpountz</pattern>
<shadedPattern>org.apache.pulsar.shade.net.jpountz</shadedPattern>
</relocation>
<relocation>
<pattern>com.yahoo</pattern>
<shadedPattern>org.apache.pulsar.shade.com.yahoo</shadedPattern>
......
......@@ -433,9 +433,9 @@ flexible messaging model and an intuitive client API.</description>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
......
......@@ -76,8 +76,6 @@
<include>io.netty:netty</include>
<include>io.netty:netty-all</include>
<include>org.apache.pulsar:pulsar-common</include>
<include>org.apache.bookkeeper:circe-checksum</include>
<include>net.jpountz.lz4:lz4</include>
<include>com.yahoo.datasketches:sketches-core</include>
<include>javax.ws.rs:*</include>
......@@ -117,12 +115,6 @@
</includes>
</artifactSet>
<filters>
<filter>
<artifact>net.jpountz.lz4:lz4</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.pulsar:pulsar-client-original</artifact>
<includes>
......@@ -156,6 +148,9 @@
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
<excludes>
<exclude>io.netty.buffer.ByteBuf</exclude>
</excludes>
</relocation>
<relocation>
<pattern>org.apache.pulsar.common</pattern>
......@@ -169,14 +164,6 @@
<pattern>org.apache.pulsar.checksum</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.checksum</shadedPattern>
</relocation>
<relocation>
<pattern>com.scurrilous.circe</pattern>
<shadedPattern>org.apache.pulsar.shade.com.scurrilous.circe</shadedPattern>
</relocation>
<relocation>
<pattern>net.jpountz</pattern>
<shadedPattern>org.apache.pulsar.shade.net.jpountz</shadedPattern>
</relocation>
<relocation>
<pattern>com.yahoo.datasketches</pattern>
<shadedPattern>org.apache.pulsar.shade.com.yahoo.datasketches</shadedPattern>
......
......@@ -77,8 +77,6 @@
<include>io.netty:netty</include>
<include>io.netty:netty-all</include>
<include>org.apache.pulsar:pulsar-common</include>
<include>org.apache.bookkeeper:circe-checksum</include>
<include>net.jpountz.lz4:lz4</include>
<include>com.yahoo.datasketches:sketches-core</include>
<include>org.glassfish.jersey*:*</include>
<include>javax.ws.rs:*</include>
......@@ -101,12 +99,6 @@
</includes>
</artifactSet>
<filters>
<filter>
<artifact>net.jpountz.lz4:lz4</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.pulsar:pulsar-io-core</artifact>
<includes>
......@@ -140,6 +132,9 @@
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
<excludes>
<exclude>io.netty.buffer.ByteBuf</exclude>
</excludes>
</relocation>
<relocation>
<pattern>org.apache.pulsar.policies</pattern>
......@@ -149,14 +144,6 @@
<pattern>org.apache.pulsar.checksum</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.checksum</shadedPattern>
</relocation>
<relocation>
<pattern>com.scurrilous.circe</pattern>
<shadedPattern>org.apache.pulsar.shade.com.scurrilous.circe</shadedPattern>
</relocation>
<relocation>
<pattern>net.jpountz</pattern>
<shadedPattern>org.apache.pulsar.shade.net.jpountz</shadedPattern>
</relocation>
<relocation>
<pattern>com.yahoo</pattern>
<shadedPattern>org.apache.pulsar.shade.com.yahoo</shadedPattern>
......
......@@ -87,8 +87,6 @@
<include>io.netty:netty</include>
<include>io.netty:netty-*</include>
<include>org.apache.pulsar:pulsar-common</include>
<include>org.apache.bookkeeper:circe-checksum</include>
<include>net.jpountz.lz4:lz4</include>
<include>com.yahoo.datasketches:sketches-core</include>
<include>org.apache.httpcomponents:httpclient</include>
<include>commons-logging:commons-logging</include>
......@@ -148,26 +146,6 @@
<pattern>io.netty</pattern>
<shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.pulsar.common</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.common</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.pulsar.policies</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.policies</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.pulsar.checksum</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.checksum</shadedPattern>
</relocation>
<relocation>
<pattern>com.scurrilous.circe</pattern>
<shadedPattern>org.apache.pulsar.shade.com.scurrilous.circe</shadedPattern>
</relocation>
<relocation>
<pattern>net.jpountz</pattern>
<shadedPattern>org.apache.pulsar.shade.net.jpountz</shadedPattern>
</relocation>
<relocation>
<pattern>com.yahoo.datasketches</pattern>
<shadedPattern>org.apache.pulsar.shade.com.yahoo.datasketches</shadedPattern>
......@@ -206,12 +184,6 @@
</relocation>
</relocations>
<filters>
<filter>
<artifact>net.jpountz.lz4:lz4</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.pulsar:pulsar-client-original</artifact>
<includes>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.kafka.compat.tests;
import static org.testng.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class KafkaApiTest extends BrokerTestBase {
public KafkaApiTest() {
super.isTcpLookup = true;
}
@BeforeClass
@Override
protected void setup() throws Exception {
super.baseSetup();
}
@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test(timeOut = 30000)
public void testSimpleProducerConsumer() throws Exception {
String topic = "persistent://prop/ns-abc/testSimpleProducerConsumer";
Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers", lookupUrl.toString());
producerProperties.put("key.serializer", IntegerSerializer.class.getName());
producerProperties.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", lookupUrl.toString());
consumerProperties.put("group.id", "my-subscription-name");
consumerProperties.put("key.deserializer", IntegerDeserializer.class.getName());
consumerProperties.put("value.deserializer", StringDeserializer.class.getName());
consumerProperties.put("enable.auto.commit", "true");
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Arrays.asList(topic));
List<Long> offsets = new ArrayList<>();
for (int i = 0; i < 10; i++) {
RecordMetadata md = producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i)).get();
offsets.add(md.offset());
log.info("Published message at {}", Long.toHexString(md.offset()));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
records.forEach(record -> {
assertEquals(record.key().intValue(), received.get());
assertEquals(record.value(), "hello-" + received.get());
assertEquals(record.offset(), offsets.get(received.get()).longValue());
received.incrementAndGet();
});
consumer.commitSync();
}
consumer.close();
}
private static final Logger log = LoggerFactory.getLogger(KafkaApiTest.class);
}
......@@ -39,7 +39,7 @@
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
<artifactId>pulsar-client</artifactId>
<version>${project.version}</version>
</dependency>
......@@ -52,35 +52,16 @@
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
<exclusion>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>managed-ledger-original</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
</project>
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.kafka.compat.tests;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.PulsarKafkaProducer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class KafkaProducerTest extends BrokerTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
isTcpLookup = true;
super.baseSetup();
}
@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testSimpleProducer() throws Exception {
String topic = "persistent://prop/ns-abc/testSimpleProducer";
Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic).subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
props.put("bootstrap.servers", lookupUrl.toString());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new PulsarKafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
}
producer.flush();
producer.close();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "hello-" + i);
pulsarConsumer.acknowledge(msg);
}
}
@Test(timeOut = 10000)
public void testProducerCallback() throws Exception {
String topic = "persistent://prop/ns-abc/testProducerCallback";
Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic).subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
props.put("bootstrap.servers", lookupUrl.toString());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new PulsarKafkaProducer<>(props);
CountDownLatch counter = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i), (metadata, exception) -> {
assertEquals(metadata.topic(), topic);
assertNull(exception);
counter.countDown();
});
}
counter.await();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "hello-" + i);
pulsarConsumer.acknowledge(msg);
}
producer.close();
}
}
......@@ -141,8 +141,6 @@
<include>com.fasterxml.jackson.core</include>
<include>io.netty:netty</include>
<include>io.netty:netty-all</include>
<include>org.apache.bookkeeper:circe-checksum</include>
<include>net.jpountz.lz4:lz4</include>
<include>com.yahoo.datasketches:sketches-core</include>
<include>org.glassfish.jersey*:*</include>
<include>javax.ws.rs:*</include>
......@@ -173,12 +171,6 @@
</includes>
</artifactSet>
<filters>
<filter>
<artifact>net.jpountz.lz4:lz4</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.pulsar:pulsar-io-core</artifact>
<includes>
......@@ -220,23 +212,14 @@
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
<excludes>
<exclude>io.netty.buffer.ByteBuf</exclude>
</excludes>
</relocation>
<relocation>
<pattern>org.apache.pulsar.policies</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.policies</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.pulsar.checksum</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.checksum</shadedPattern>
</relocation>
<relocation>
<pattern>com.scurrilous.circe</pattern>
<shadedPattern>org.apache.pulsar.shade.com.scurrilous.circe</shadedPattern>
</relocation>
<relocation>
<pattern>net.jpountz</pattern>
<shadedPattern>org.apache.pulsar.shade.net.jpountz</shadedPattern>
</relocation>
<relocation>
<pattern>com.yahoo</pattern>
<shadedPattern>org.apache.pulsar.shade.com.yahoo</shadedPattern>
......
......@@ -88,8 +88,6 @@
<include>commons-*:*</include>
<include>org.apache.pulsar:pulsar-common</include>
<include>org.apache.bookkeeper:circe-checksum</include>
<include>net.jpountz.lz4:lz4</include>
<include>com.yahoo.datasketches:sketches-core</include>
<include>org.apache.httpcomponents:httpclient</include>
<include>commons-logging:commons-logging</include>
......@@ -100,12 +98,6 @@
</includes>
</artifactSet>
<filters>
<filter>
<artifact>net.jpountz.lz4:lz4</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.pulsar:pulsar-io-core</artifact>
<includes>
......@@ -145,23 +137,14 @@
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
<excludes>
<exclude>io.netty.buffer.ByteBuf</exclude>
</excludes>
</relocation>
<relocation>
<pattern>org.apache.pulsar.policies</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.policies</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.pulsar.checksum</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.checksum</shadedPattern>
</relocation>
<relocation>
<pattern>com.scurrilous.circe</pattern>
<shadedPattern>org.apache.pulsar.shade.com.scurrilous.circe</shadedPattern>
</relocation>
<relocation>
<pattern>net.jpountz</pattern>
<shadedPattern>org.apache.pulsar.shade.net.jpountz</shadedPattern>
</relocation>
<relocation>
<pattern>com.yahoo.datasketches</pattern>
<shadedPattern>org.apache.pulsar.shade.com.yahoo.datasketches</shadedPattern>
......
......@@ -68,8 +68,8 @@
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</dependency>
<dependency>
......
......@@ -158,14 +158,6 @@
<relocation>
<pattern>org.glassfish</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.glassfish</shadedPattern>
</relocation>
<relocation>
<pattern>com.scurrilous.circe</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.scurrilous.circe</shadedPattern>
</relocation>
<relocation>
<pattern>net.jpountz</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.jpountz</shadedPattern>
</relocation>
<relocation>
<pattern>com.yahoo</pattern>
......@@ -191,7 +183,7 @@
<pattern>org.eclipse</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.eclipse</shadedPattern>
</relocation>
<!--
<!--
asynchttpclient can only be shaded to be under `org.apache.pulsar.shade`
see {@link https://github.com/apache/incubator-pulsar/pull/390}
and {@link https://github.com/apache/incubator-pulsar/blob/master/pulsar-client/src/main/resources/ahc.properties}
......
......@@ -96,6 +96,12 @@
<dependency>
<groupId>org.apache.distributedlog</groupId>
<artifactId>distributedlog-core</artifactId>
<exclusions>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
......@@ -107,7 +113,7 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-core</artifactId>
......
......@@ -115,6 +115,9 @@
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
<excludes>
<exclude>io.netty.buffer.ByteBuf</exclude>
</excludes>
</relocation>
</relocations>
</configuration>
......
......@@ -238,6 +238,12 @@ The Apache Software License, Version 2.0
- commons-compress-1.15.jar
- commons-lang3-3.3.2.jar
- commons-lang3-3.4.jar
- commons-beanutils-1.7.0.jar
- commons-collections-3.2.1.jar
- commons-configuration-1.6.jar
- commons-digester-1.8.jar
- commons-lang-2.4.jar
- commons-logging-1.1.1.jar
* Netty
- netty-3.6.2.Final.jar
* Joda Time
......@@ -391,6 +397,10 @@ The Apache Software License, Version 2.0
- simpleclient_common-0.0.23.jar
- simpleclient_hotspot-0.0.23.jar
- simpleclient_servlet-0.0.23.jar
* BookKeeper
- circe-checksum-4.7.2.jar
* LZ4
- lz4-java-1.5.0.jar
Protocol Buffers License
* Protocol Buffers
......
......@@ -50,7 +50,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.javax.ws.rs.ClientErrorException;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
......@@ -603,7 +603,7 @@ public abstract class TestPulsarConnector {
Schema schema = topicsToSchemas.get(topicSchemaName).getType() == SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload
ByteBuf payload
= org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo));
ByteBuf byteBuf = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, payload);
......@@ -851,7 +851,7 @@ public abstract class TestPulsarConnector {
Schema schema = topicsToSchemas.get(schemaName).getType() == SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload
ByteBuf payload
= org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo));
ByteBuf byteBuf = serializeMetadataAndPayload
......
......@@ -18,6 +18,11 @@
*/
package org.apache.pulsar.websocket.service;
import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;
import com.google.common.collect.Lists;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
......@@ -26,7 +31,6 @@ import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.net.ssl.SSLContext;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
......@@ -35,7 +39,6 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.websocket.WebSocketService;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
......@@ -53,11 +56,6 @@ import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;
import com.google.common.collect.Lists;
import io.netty.util.concurrent.DefaultThreadFactory;
public class ProxyServer {
private final Server server;
private final List<Handler> handlers = Lists.newArrayList();
......
......@@ -28,7 +28,6 @@
<version>2.2.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.pulsar.tests</groupId>
<artifactId>integration</artifactId>
<packaging>jar</packaging>
<name>Apache Pulsar :: Tests :: Integration</name>
......@@ -74,6 +73,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
......@@ -128,7 +133,7 @@
<artifactId>jackson-dataformat-yaml</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
......
......@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.kafka.compat.tests;
package org.apache.pulsar.tests.integration.compat.kafka;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import java.util.ArrayList;
import java.util.Arrays;
......@@ -27,51 +28,98 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.PulsarKafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.testng.annotations.Test;
public class KafkaConsumerTest extends BrokerTestBase {
@Slf4j
public class KafkaApiTest extends PulsarTestSuite {
@Test(timeOut = 30000)
public void testSimpleProducerConsumer() throws Exception {
String topic = "persistent://public/default/testSimpleProducerConsumer";
Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
producerProperties.put("key.serializer", IntegerSerializer.class.getName());
producerProperties.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
consumerProperties.put("group.id", "my-subscription-name");
consumerProperties.put("key.deserializer", IntegerDeserializer.class.getName());
consumerProperties.put("value.deserializer", StringDeserializer.class.getName());
consumerProperties.put("enable.auto.commit", "true");
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Arrays.asList(topic));
List<Long> offsets = new ArrayList<>();
@BeforeClass
@Override
protected void setup() throws Exception {
isTcpLookup = true;
super.baseSetup();
}
for (int i = 0; i < 10; i++) {
RecordMetadata md = producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i)).get();
offsets.add(md.offset());
log.info("Published message at {}", Long.toHexString(md.offset()));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
records.forEach(record -> {
assertEquals(record.key().intValue(), received.get());
assertEquals(record.value(), "hello-" + received.get());
assertEquals(record.offset(), offsets.get(received.get()).longValue());
received.incrementAndGet();
});
@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
consumer.commitSync();
}
consumer.close();
}
@Test
public void testSimpleConsumer() throws Exception {
String topic = "persistent://prop/ns-abc/testSimpleConsumer";
String topic = "testSimpleConsumer";
Properties props = new Properties();
props.put("bootstrap.servers", lookupUrl.toString());
props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
......@@ -95,19 +143,21 @@ public class KafkaConsumerTest extends BrokerTestBase {
@Test
public void testConsumerAutoCommit() throws Exception {
String topic = "persistent://prop/ns-abc/testConsumerAutoCommit";
String topic = "testConsumerAutoCommit";
Properties props = new Properties();
props.put("bootstrap.servers", lookupUrl.toString());
props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
......@@ -126,7 +176,7 @@ public class KafkaConsumerTest extends BrokerTestBase {
consumer.close();
// Re-open consumer and verify every message was acknowledged
Consumer<String, String> consumer2 = new PulsarKafkaConsumer<>(props);
Consumer<String, String> consumer2 = new KafkaConsumer<>(props);
consumer2.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = consumer2.poll(100);
......@@ -136,19 +186,21 @@ public class KafkaConsumerTest extends BrokerTestBase {
@Test
public void testConsumerManualOffsetCommit() throws Exception {
String topic = "persistent://sample/standalone/ns/testConsumerManualOffsetCommit";
String topic = "testConsumerManualOffsetCommit";
Properties props = new Properties();
props.put("bootstrap.servers", lookupUrl.toString());
props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
......@@ -173,7 +225,7 @@ public class KafkaConsumerTest extends BrokerTestBase {
consumer.close();
// Re-open consumer and verify every message was acknowledged
Consumer<String, String> consumer2 = new PulsarKafkaConsumer<>(props);
Consumer<String, String> consumer2 = new KafkaConsumer<>(props);
consumer2.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = consumer2.poll(100);
......@@ -183,26 +235,29 @@ public class KafkaConsumerTest extends BrokerTestBase {
@Test
public void testPartitions() throws Exception {
String topic = "persistent://sample/standalone/ns/testPartitions";
String topic = "testPartitions";
// Create 8 partitions in topic
admin.tenants().createTenant("sample", new TenantInfo());
@Cleanup
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
admin.topics().createPartitionedTopic(topic, 8);
Properties props = new Properties();
props.put("bootstrap.servers", lookupUrl.toString());
props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic)
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic)
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition).create();
// Create 2 Kakfa consumer and verify each gets half of the messages
List<Consumer<String, String>> consumers = new ArrayList<>();
for (int c = 0; c < 2; c++) {
Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
consumers.add(consumer);
}
......@@ -231,20 +286,22 @@ public class KafkaConsumerTest extends BrokerTestBase {
@Test
public void testConsumerSeek() throws Exception {
String topic = "persistent://sample/standalone/ns/testSimpleConsumer";
String topic = "testSimpleConsumer";
Properties props = new Properties();
props.put("bootstrap.servers", lookupUrl.toString());
props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("pulsar.consumer.acknowledgments.group.time.millis", "0");
Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
......@@ -286,20 +343,22 @@ public class KafkaConsumerTest extends BrokerTestBase {
@Test
public void testConsumerSeekToEnd() throws Exception {
String topic = "persistent://sample/standalone/ns/testSimpleConsumer";
String topic = "testSimpleConsumer";
Properties props = new Properties();
props.put("bootstrap.servers", lookupUrl.toString());
props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("pulsar.consumer.acknowledgments.group.time.millis", "0");
Consumer<String, String> consumer = new PulsarKafkaConsumer<>(props);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
......@@ -324,7 +383,7 @@ public class KafkaConsumerTest extends BrokerTestBase {
consumer.close();
// Recreate the consumer
consumer = new PulsarKafkaConsumer<>(props);
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
ConsumerRecords<String, String> records = consumer.poll(100);
......@@ -334,4 +393,76 @@ public class KafkaConsumerTest extends BrokerTestBase {
consumer.close();
}
@Test
public void testSimpleProducer() throws Exception {
String topic = "testSimpleProducer";
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
}
producer.flush();
producer.close();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "hello-" + i);
pulsarConsumer.acknowledge(msg);
}
}
@Test(timeOut = 10000)
public void testProducerCallback() throws Exception {
String topic = "testProducerCallback";
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
CountDownLatch counter = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i), (metadata, exception) -> {
assertEquals(metadata.topic(), topic);
assertNull(exception);
counter.countDown();
});
}
counter.await();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "hello-" + i);
pulsarConsumer.acknowledge(msg);
}
producer.close();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册