提交 9c275a64 编写于 作者: 冉小龙 提交者: xiaolong.ran

Add subscribe position param for consumer of sink (#5532)

* Add subscribe position param for consumer of sink
Signed-off-by: Nxiaolong.ran <ranxiaolong716@gmail.com>

(cherry picked from commit 39af4777)
上级 96710908
......@@ -46,6 +46,7 @@ import org.apache.commons.lang3.text.WordUtils;
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
......@@ -263,6 +264,9 @@ public class CmdSinks extends CmdBase {
@Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer")
protected String subsName;
@Parameter(names = "--subs-position", description = "Pulsar source subscription position if user wants to consume messages from the specified location")
protected SubscriptionInitialPosition subsPosition;
@Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)", hidden = true)
protected String DEPRECATED_customSerdeInputString;
@Parameter(names = "--custom-serde-inputs", description = "The map of input topics to SerDe class names (as a JSON string)")
......@@ -377,6 +381,10 @@ public class CmdSinks extends CmdBase {
sinkConfig.setSourceSubscriptionName(subsName);
}
if (null != subsPosition) {
sinkConfig.setSourceSubscriptionPosition(subsPosition);
}
if (null != topicsPattern) {
sinkConfig.setTopicsPattern(topicsPattern);
}
......
......@@ -29,6 +29,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
......@@ -51,6 +52,7 @@ public class SinkConfig {
private String name;
private String className;
private String sourceSubscriptionName;
private SubscriptionInitialPosition sourceSubscriptionPosition;
private Collection<String> inputs;
......
......@@ -48,10 +48,12 @@ import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.protocol.schema.LatestVersion;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
......@@ -679,6 +681,15 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
FunctionConfig.ProcessingGuarantees.valueOf(
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
switch (sourceSpec.getSubscriptionPosition()) {
case EARLIEST:
pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
break;
default:
pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
break;
}
switch (sourceSpec.getSubscriptionType()) {
case FAILOVER:
pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover);
......
......@@ -69,10 +69,12 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
String topic = e.getKey();
ConsumerConfig<T> conf = e.getValue();
log.info("Creating consumers for topic : {}, schema : {}", topic, conf.getSchema());
ConsumerBuilder<T> cb = pulsarClient.newConsumer(conf.getSchema())
// consume message even if can't decrypt and deliver it along with encryption-ctx
.cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.subscriptionName(pulsarSourceConfig.getSubscriptionName())
.subscriptionInitialPosition(pulsarSourceConfig.getSubscriptionPosition())
.subscriptionType(pulsarSourceConfig.getSubscriptionType())
.messageListener(this);
......
......@@ -26,6 +26,7 @@ import java.util.TreeMap;
import lombok.Data;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.functions.ConsumerConfig;
......@@ -37,6 +38,7 @@ public class PulsarSourceConfig {
private FunctionConfig.ProcessingGuarantees processingGuarantees;
SubscriptionType subscriptionType;
private String subscriptionName;
private SubscriptionInitialPosition subscriptionPosition;
// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
private Integer maxMessageRetries = -1;
private String deadLetterTopic;
......
......@@ -42,6 +42,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
......@@ -80,6 +82,7 @@ public class PulsarSourceTest {
doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
doReturn(consumerBuilder).when(consumerBuilder).cryptoFailureAction(any());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(any());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionInitialPosition(any());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any());
doReturn(consumerBuilder).when(consumerBuilder).messageListener(any());
......@@ -96,6 +99,8 @@ public class PulsarSourceTest {
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
pulsarConfig.setTopicSchema(consumerConfigs);
pulsarConfig.setTypeClassName(String.class.getName());
pulsarConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
pulsarConfig.setSubscriptionType(SubscriptionType.Shared);
return pulsarConfig;
}
......
......@@ -34,6 +34,11 @@ enum SubscriptionType {
FAILOVER = 1;
}
enum SubscriptionPosition {
LATEST = 0;
EARLIEST = 1;
}
message Resources {
double cpu = 1;
int64 ram = 2;
......@@ -112,6 +117,7 @@ message SourceSpec {
string builtin = 8;
string subscriptionName = 9;
bool cleanupSubscription = 11;
SubscriptionPosition subscriptionPosition = 12;
}
message SinkSpec {
......
......@@ -27,6 +27,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
......@@ -170,6 +171,12 @@ public class SinkConfigUtils {
sourceSpecBuilder.setCleanupSubscription(true);
}
if (sinkConfig.getSourceSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.EARLIEST);
} else {
sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.LATEST);
}
functionDetailsBuilder.setSource(sourceSpecBuilder);
// set up sink spec
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册