未验证 提交 7d9319d8 编写于 作者: A Aaron Robert 提交者: GitHub

[Client]Add autoPartitionsUpdateInterval for producer and consumer (#7840)

Motivation
Add auto partitions update interval setting for producer and consumer.

Modifications
add autoUpdatePartitionsInterval to partitioned producer and consumer
上级 7b60e2d4
......@@ -552,6 +552,18 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
/**
* Set the interval of updating partitions <i>(default: 1 minute)</i>. This only works if autoUpdatePartitions is
* enabled.
*
* @param interval
* the interval of updating partitions
* @param unit
* the time unit of the interval.
* @return the consumer builder instance
*/
ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
/**
* Set KeyShared subscription policy for consumer.
*
......
......@@ -503,6 +503,18 @@ public interface ProducerBuilder<T> extends Cloneable {
*/
ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
/**
* Set the interval of updating partitions <i>(default: 1 minute)</i>. This only works if autoUpdatePartitions is
* enabled.
*
* @param interval
* the interval of updating partitions
* @param unit
* the time unit of the interval.
* @return the producer builder instance
*/
ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
/**
* Control whether enable the multiple schema mode for producer.
* If enabled, producer can send a message with different schema from that specified just when it is created,
......
......@@ -383,6 +383,12 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
return this;
}
@Override
public ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit) {
conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
return this;
}
@Override
public ConsumerBuilder<T> startMessageIdInclusive() {
......
......@@ -138,7 +138,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
if (conf.isAutoUpdatePartitions()) {
topicsPartitionChangedListener = new TopicsPartitionChangedListener();
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES);
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
if (conf.getTopicNames().isEmpty()) {
......
......@@ -81,7 +81,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
if (conf.isAutoUpdatePartitions()) {
topicsPartitionChangedListener = new TopicsPartitionChangedListener();
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES);
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
}
......
......@@ -294,6 +294,12 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
return this;
}
@Override
public ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit) {
conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
return this;
}
@Override
public ProducerBuilder<T> enableMultiSchema(boolean multiSchema) {
conf.setMultiSchema(multiSchema);
......
......@@ -119,6 +119,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private boolean autoUpdatePartitions = true;
private long autoUpdatePartitionsIntervalSeconds = 60;
private boolean replicateSubscriptionState = false;
private boolean resetIncludeHead = false;
......@@ -127,6 +129,11 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private boolean batchIndexAckEnabled = false;
public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
checkArgument(interval > 0, "interval needs to be > 0");
this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
}
@JsonIgnore
public String getSingleTopic() {
checkArgument(topicNames.size() == 1);
......
......@@ -95,6 +95,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private boolean autoUpdatePartitions = true;
private long autoUpdatePartitionsIntervalSeconds = 60;
private boolean multiSchema = true;
private SortedMap<String, String> properties = new TreeMap<>();
......@@ -163,4 +165,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
return this.batchingPartitionSwitchFrequencyByPublishDelay * batchingMaxPublishDelayMicros;
}
public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
checkArgument(interval > 0, "interval needs to be > 0");
this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
}
}
......@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
......@@ -62,6 +63,7 @@ public class ConfigurationDataUtilsTest {
confData.setProducerName("unset");
confData.setBatchingEnabled(true);
confData.setBatchingMaxMessages(1234);
confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
Map<String, Object> config = new HashMap<>();
config.put("producerName", "test-producer");
config.put("batchingEnabled", false);
......@@ -70,6 +72,7 @@ public class ConfigurationDataUtilsTest {
assertEquals("test-producer", confData.getProducerName());
assertFalse(confData.isBatchingEnabled());
assertEquals(1234, confData.getBatchingMaxMessages());
assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
}
@Test
......@@ -78,6 +81,7 @@ public class ConfigurationDataUtilsTest {
confData.setSubscriptionName("unknown-subscription");
confData.setPriorityLevel(10000);
confData.setConsumerName("unknown-consumer");
confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
Map<String, Object> config = new HashMap<>();
config.put("subscriptionName", "test-subscription");
config.put("priorityLevel", 100);
......@@ -85,6 +89,7 @@ public class ConfigurationDataUtilsTest {
assertEquals("test-subscription", confData.getSubscriptionName());
assertEquals(100, confData.getPriorityLevel());
assertEquals("unknown-consumer", confData.getConsumerName());
assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册