未验证 提交 1632dcaf 编写于 作者: B Boyang Jerry Peng 提交者: GitHub

Various fixes for Batch Source (#7965)

* Various fixes for Batch Source
1. Create intermediate topic/subscription prior to function running in case auto topic creation is turned off
2. Fix possible NPE in CronTrigger when calling stop()
3. Stop all producers created in ContextImpl
4. Delete intermediate topic for batch source
Co-authored-by: NJerry Peng <jerryp@splunk.com>
上级 1030fbe8
......@@ -50,6 +50,7 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
......@@ -59,7 +60,7 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
/**
* This class implements the Context interface exposed to the user.
*/
class ContextImpl implements Context, SinkContext, SourceContext {
class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable {
private InstanceConfig config;
private Logger logger;
......@@ -603,4 +604,27 @@ class ContextImpl implements Context, SinkContext, SourceContext {
this.underlyingBuilder = underlyingBuilder;
}
}
@Override
public void close() {
List<CompletableFuture> futures = new LinkedList<>();
if (publishProducers != null) {
for (Producer<?> producer : publishProducers.values()) {
futures.add(producer.closeAsync());
}
}
if (tlPublishProducers != null) {
for (Producer<?> producer : tlPublishProducers.get().values()) {
futures.add(producer.closeAsync());
}
}
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
} catch (InterruptedException | ExecutionException e) {
logger.warn("Failed to close producers", e);
}
}
}
\ No newline at end of file
......@@ -127,6 +127,7 @@ public class JavaInstance implements AutoCloseable {
@Override
public void close() {
context.close();
}
public Map<String, Double> getAndResetMetrics() {
......
......@@ -28,6 +28,8 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
......@@ -126,9 +128,7 @@ public class BatchSourceExecutor<T> implements Source<T> {
}
private void start() throws Exception {
// This is the first thing to do to ensure that any tasks discovered during the discover
// phase are not lost
setupInstanceSubscription();
createIntermediateTopicConsumer();
batchSource.open(this.config, this.sourceContext);
if (sourceContext.getInstanceId() == 0) {
discoveryTriggerer.init(batchSourceConfig.getDiscoveryTriggererConfig(),
......@@ -188,45 +188,50 @@ public class BatchSourceExecutor<T> implements Source<T> {
}
}
private void setupInstanceSubscription() {
private void createIntermediateTopicConsumer() {
String subName = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName(
sourceContext.getTenant(), sourceContext.getNamespace(),
sourceContext.getSourceName());
sourceContext.getTenant(), sourceContext.getNamespace(),
sourceContext.getSourceName());
String fqfn = FunctionCommon.getFullyQualifiedName(
sourceContext.getTenant(), sourceContext.getNamespace(),
sourceContext.getSourceName());
try {
Actions.newBuilder()
.addAction(
Actions.Action.builder()
.actionName(String.format("Setting up instance consumer for BatchSource intermediate " +
"topic for function %s", FunctionCommon.getFullyQualifiedName(
sourceContext.getTenant(), sourceContext.getNamespace(),
sourceContext.getSourceName())))
.numRetries(10)
.sleepBetweenInvocationsMs(1000)
.supplier(() -> {
try {
CompletableFuture<Consumer<byte[]>> cf = sourceContext.newConsumerBuilder(Schema.BYTES)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.topic(intermediateTopicName)
.subscribeAsync();
intermediateTopicConsumer = cf.join();
return Actions.ActionResult.builder()
.success(true)
.build();
} catch (Exception e) {
return Actions.ActionResult.builder()
.success(false)
.build();
}
})
.build())
.run();
.addAction(
Actions.Action.builder()
.actionName(String.format("Setting up instance consumer for BatchSource intermediate " +
"topic for function %s", fqfn))
.numRetries(10)
.sleepBetweenInvocationsMs(1000)
.supplier(() -> {
try {
CompletableFuture<Consumer<byte[]>> cf = sourceContext.newConsumerBuilder(Schema.BYTES)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.topic(intermediateTopicName)
.properties(InstanceUtils.getProperties(
Function.FunctionDetails.ComponentType.SOURCE, fqfn, sourceContext.getInstanceId()))
.subscribeAsync();
intermediateTopicConsumer = cf.join();
return Actions.ActionResult.builder()
.success(true)
.build();
} catch (Exception e) {
return Actions.ActionResult.builder()
.success(false)
.errorMsg(e.getMessage())
.build();
}
})
.build())
.run();
} catch (InterruptedException e) {
log.error("Error setting up instance subscription for intermediate topic", e);
throw new RuntimeException(e);
}
}
private void retrieveNextTask() throws Exception {
currentTask = intermediateTopicConsumer.receive();
return;
......
......@@ -222,6 +222,7 @@ public class BatchSourceExecutorTest {
consumerBuilder = Mockito.mock(ConsumerBuilder.class);
Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(Mockito.any());
Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(Mockito.any());
Mockito.doReturn(consumerBuilder).when(consumerBuilder).properties(Mockito.anyMap());
Mockito.doReturn(consumerBuilder).when(consumerBuilder).topic(Mockito.any());
discoveredTask = Mockito.mock(Message.class);
consumer = Mockito.mock(org.apache.pulsar.client.api.Consumer.class);
......
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
......@@ -27,10 +28,13 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
......@@ -60,6 +64,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
......@@ -130,6 +135,9 @@ public class FunctionActioner {
}
}
// Setup for batch sources if necessary
setupBatchSource(functionDetails);
RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile);
functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
......@@ -325,81 +333,136 @@ public class FunctionActioner {
? InstanceUtils.getDefaultSubscriptionName(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails())
: functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName();
deleteSubscription(topic, consumerSpec, subscriptionName, fqfn);
deleteSubscription(topic, consumerSpec, subscriptionName, String.format("Cleaning up subscriptions for function %s", fqfn));
}
});
}
if (InstanceUtils.calculateSubjectType(details) == FunctionDetails.ComponentType.SOURCE) {
// topicName -> subscriptions
Map<String, String> subscriptions =
SourceConfigUtils.computeBatchSourceIntermediateTopicSubscriptions(details,
FunctionCommon.getFullyQualifiedName(details));
if (subscriptions != null) {
subscriptions.forEach((topic, subscriptionName) -> {
Function.ConsumerSpec consumerSpec = Function.ConsumerSpec.newBuilder().setIsRegexPattern(false).build();
deleteSubscription(topic, consumerSpec, subscriptionName, fqfn);
});
}
}
// clean up done for batch sources if necessary
cleanupBatchSource(details);
}
private void deleteSubscription(String topic, Function.ConsumerSpec consumerSpec, String subscriptionName, String fqfn) {
private void deleteSubscription(String topic, Function.ConsumerSpec consumerSpec, String subscriptionName, String msg) {
try {
Actions.newBuilder()
.addAction(
Actions.Action.builder()
.actionName(String.format("Cleaning up subscriptions for function %s", fqfn))
.numRetries(10)
.sleepBetweenInvocationsMs(1000)
.supplier(() -> {
try {
if (consumerSpec.getIsRegexPattern()) {
pulsarAdmin.namespaces().unsubscribeNamespace(TopicName
.get(topic).getNamespace(), subscriptionName);
} else {
pulsarAdmin.topics().deleteSubscription(topic,
subscriptionName);
}
} catch (PulsarAdminException e) {
if (e instanceof PulsarAdminException.NotFoundException) {
return Actions.ActionResult.builder()
.success(true)
.build();
} else {
// for debugging purposes
List<Map<String, String>> existingConsumers = Collections.emptyList();
try {
TopicStats stats = pulsarAdmin.topics().getStats(topic);
SubscriptionStats sub = stats.subscriptions.get(subscriptionName);
if (sub != null) {
existingConsumers = sub.consumers.stream()
.map(consumerStats -> consumerStats.metadata)
.collect(Collectors.toList());
}
} catch (PulsarAdminException e1) {
}
String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage();
return Actions.ActionResult.builder()
.success(false)
.errorMsg(String.format("%s - existing consumers: %s", errorMsg, existingConsumers))
.build();
}
}
return Actions.ActionResult.builder()
.success(true)
.build();
})
.build())
Actions.Action.builder()
.actionName(msg)
.numRetries(10)
.sleepBetweenInvocationsMs(1000)
.supplier(
getDeleteSubscriptionSupplier(topic,
consumerSpec.getIsRegexPattern(),
subscriptionName)
)
.build())
.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private Supplier<Actions.ActionResult> getDeleteSubscriptionSupplier(
String topic, boolean isRegex, String subscriptionName) {
return () -> {
try {
if (isRegex) {
pulsarAdmin.namespaces().unsubscribeNamespace(TopicName
.get(topic).getNamespace(), subscriptionName);
} else {
pulsarAdmin.topics().deleteSubscription(topic,
subscriptionName);
}
} catch (PulsarAdminException e) {
if (e instanceof PulsarAdminException.NotFoundException) {
return Actions.ActionResult.builder()
.success(true)
.build();
} else {
// for debugging purposes
List<Map<String, String>> existingConsumers = Collections.emptyList();
SubscriptionStats sub = null;
try {
TopicStats stats = pulsarAdmin.topics().getStats(topic);
sub = stats.subscriptions.get(subscriptionName);
if (sub != null) {
existingConsumers = sub.consumers.stream()
.map(consumerStats -> consumerStats.metadata)
.collect(Collectors.toList());
}
} catch (PulsarAdminException e1) {
}
String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage();
String finalErrorMsg;
if (sub != null) {
try {
finalErrorMsg = String.format("%s - existing consumers: %s",
errorMsg, ObjectMapperFactory.getThreadLocal().writeValueAsString(sub));
} catch (JsonProcessingException jsonProcessingException) {
finalErrorMsg = errorMsg;
}
} else {
finalErrorMsg = errorMsg;
}
return Actions.ActionResult.builder()
.success(false)
.errorMsg(finalErrorMsg)
.build();
}
}
return Actions.ActionResult.builder()
.success(true)
.build();
};
}
private Supplier<Actions.ActionResult> getDeleteTopicSupplier(String topic) {
return () -> {
try {
pulsarAdmin.topics().delete(topic);
} catch (PulsarAdminException e) {
if (e instanceof PulsarAdminException.NotFoundException) {
return Actions.ActionResult.builder()
.success(true)
.build();
} else {
// for debugging purposes
TopicStats stats = null;
try {
stats = pulsarAdmin.topics().getStats(topic);
} catch (PulsarAdminException e1) {
}
String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage();
String finalErrorMsg;
if (stats != null) {
try {
finalErrorMsg = String.format("%s - topic stats: %s",
errorMsg, ObjectMapperFactory.getThreadLocal().writeValueAsString(stats));
} catch (JsonProcessingException jsonProcessingException) {
finalErrorMsg = errorMsg;
}
} else {
finalErrorMsg = errorMsg;
}
return Actions.ActionResult.builder()
.success(false)
.errorMsg(finalErrorMsg)
.build();
}
}
return Actions.ActionResult.builder()
.success(true)
.build();
};
}
private String getDownloadPackagePath(FunctionMetaData functionMetaData, int instanceId) {
return StringUtils.join(
new String[]{
......@@ -508,4 +571,100 @@ public class FunctionActioner {
throw new RuntimeException("Unknown runtime " + FunctionDetails.getRuntime());
}
}
private void setupBatchSource(Function.FunctionDetails functionDetails) {
if (isBatchSource(functionDetails)) {
String intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(
functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName()).toString();
String intermediateTopicSubscription = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName(
functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
String fqfn = FunctionCommon.getFullyQualifiedName(
functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
try {
Actions.newBuilder()
.addAction(
Actions.Action.builder()
.actionName(String.format("Creating intermediate topic %s with subscription %s for Batch Source %s",
intermediateTopicName, intermediateTopicSubscription, fqfn))
.numRetries(10)
.sleepBetweenInvocationsMs(1000)
.supplier(() -> {
try {
pulsarAdmin.topics().createSubscription(intermediateTopicName, intermediateTopicSubscription, MessageId.latest);
return Actions.ActionResult.builder()
.success(true)
.build();
} catch (PulsarAdminException.ConflictException e) {
// topic and subscription already exists so just continue
return Actions.ActionResult.builder()
.success(true)
.build();
} catch (Exception e) {
return Actions.ActionResult.builder()
.errorMsg(e.getMessage())
.success(false)
.build();
}
})
.build())
.run();
} catch (InterruptedException e) {
log.error("Error setting up instance subscription for intermediate topic", e);
throw new RuntimeException(e);
}
}
}
private void cleanupBatchSource(Function.FunctionDetails functionDetails) {
if (isBatchSource(functionDetails)) {
// clean up intermediate topic
String intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(functionDetails.getTenant(),
functionDetails.getNamespace(), functionDetails.getName()).toString();
String intermediateTopicSubscription = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName(
functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
String fqfn = FunctionCommon.getFullyQualifiedName(functionDetails);
try {
Actions.newBuilder()
.addAction(
Actions.Action.builder()
.actionName(String.format("Removing intermediate topic subscription %s for Batch Source %s",
intermediateTopicSubscription, fqfn))
.numRetries(10)
.sleepBetweenInvocationsMs(1000)
.continueOn(true)
.supplier(
getDeleteSubscriptionSupplier(intermediateTopicName,
false,
intermediateTopicSubscription)
)
.build())
.addAction(Actions.Action.builder()
.actionName(String.format("Deleting intermediate topic %s for Batch Source %s",
intermediateTopicName, fqfn))
.numRetries(10)
.sleepBetweenInvocationsMs(1000)
.supplier(getDeleteTopicSupplier(intermediateTopicName))
.build())
.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private static boolean isBatchSource(Function.FunctionDetails functionDetails) {
if (InstanceUtils.calculateSubjectType(functionDetails) == FunctionDetails.ComponentType.SOURCE) {
String fqfn = FunctionCommon.getFullyQualifiedName(functionDetails);
Map<String, Object> configMap = SourceConfigUtils.extractSourceConfig(functionDetails.getSource(), fqfn);
if (configMap != null) {
BatchSourceConfig batchSourceConfig = SourceConfigUtils.extractBatchSourceConfig(configMap);
if (batchSourceConfig != null) {
return true;
}
}
}
return false;
}
}
......@@ -58,8 +58,9 @@ public class CronTriggerer implements BatchSourceTriggerer {
@Override
public void stop() {
scheduler.shutdown();
if (scheduler != null) {
scheduler.shutdown();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册