diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index e3f169e32ec274566b66e1471de989e35456368f..2be3e6470c49044ed3bb3800445ea53f1c9e637d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -50,6 +50,7 @@ import org.slf4j.Logger; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkState; @@ -59,7 +60,7 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; /** * This class implements the Context interface exposed to the user. */ -class ContextImpl implements Context, SinkContext, SourceContext { +class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable { private InstanceConfig config; private Logger logger; @@ -603,4 +604,27 @@ class ContextImpl implements Context, SinkContext, SourceContext { this.underlyingBuilder = underlyingBuilder; } } + + @Override + public void close() { + List futures = new LinkedList<>(); + + if (publishProducers != null) { + for (Producer producer : publishProducers.values()) { + futures.add(producer.closeAsync()); + } + } + + if (tlPublishProducers != null) { + for (Producer producer : tlPublishProducers.get().values()) { + futures.add(producer.closeAsync()); + } + } + + try { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } catch (InterruptedException | ExecutionException e) { + logger.warn("Failed to close producers", e); + } + } } \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 1e18a0763d46586762f23c6f9ddf4cd8291d5774..8c9616d7c1332fbe54d6100ca8a4427a05f0bc5b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -127,6 +127,7 @@ public class JavaInstance implements AutoCloseable { @Override public void close() { + context.close(); } public Map getAndResetMetrics() { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java index 2933b0bc02971f3da0a4b789df975df3d16812c5..c42d22cd60e3ed31fb6b7858d30fd2053bd3f79e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java @@ -28,6 +28,8 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.common.io.BatchSourceConfig; import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.instance.InstanceUtils; +import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.utils.Actions; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.utils.SourceConfigUtils; @@ -126,9 +128,7 @@ public class BatchSourceExecutor implements Source { } private void start() throws Exception { - // This is the first thing to do to ensure that any tasks discovered during the discover - // phase are not lost - setupInstanceSubscription(); + createIntermediateTopicConsumer(); batchSource.open(this.config, this.sourceContext); if (sourceContext.getInstanceId() == 0) { discoveryTriggerer.init(batchSourceConfig.getDiscoveryTriggererConfig(), @@ -188,45 +188,50 @@ public class BatchSourceExecutor implements Source { } } - private void setupInstanceSubscription() { + private void createIntermediateTopicConsumer() { String subName = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName( - sourceContext.getTenant(), sourceContext.getNamespace(), - sourceContext.getSourceName()); + sourceContext.getTenant(), sourceContext.getNamespace(), + sourceContext.getSourceName()); + String fqfn = FunctionCommon.getFullyQualifiedName( + sourceContext.getTenant(), sourceContext.getNamespace(), + sourceContext.getSourceName()); try { Actions.newBuilder() - .addAction( - Actions.Action.builder() - .actionName(String.format("Setting up instance consumer for BatchSource intermediate " + - "topic for function %s", FunctionCommon.getFullyQualifiedName( - sourceContext.getTenant(), sourceContext.getNamespace(), - sourceContext.getSourceName()))) - .numRetries(10) - .sleepBetweenInvocationsMs(1000) - .supplier(() -> { - try { - CompletableFuture> cf = sourceContext.newConsumerBuilder(Schema.BYTES) - .subscriptionName(subName) - .subscriptionType(SubscriptionType.Shared) - .topic(intermediateTopicName) - .subscribeAsync(); - intermediateTopicConsumer = cf.join(); - return Actions.ActionResult.builder() - .success(true) - .build(); - } catch (Exception e) { - return Actions.ActionResult.builder() - .success(false) - .build(); - } - }) - .build()) - .run(); + .addAction( + Actions.Action.builder() + .actionName(String.format("Setting up instance consumer for BatchSource intermediate " + + "topic for function %s", fqfn)) + .numRetries(10) + .sleepBetweenInvocationsMs(1000) + .supplier(() -> { + try { + CompletableFuture> cf = sourceContext.newConsumerBuilder(Schema.BYTES) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .topic(intermediateTopicName) + .properties(InstanceUtils.getProperties( + Function.FunctionDetails.ComponentType.SOURCE, fqfn, sourceContext.getInstanceId())) + .subscribeAsync(); + intermediateTopicConsumer = cf.join(); + return Actions.ActionResult.builder() + .success(true) + .build(); + } catch (Exception e) { + return Actions.ActionResult.builder() + .success(false) + .errorMsg(e.getMessage()) + .build(); + } + }) + .build()) + .run(); } catch (InterruptedException e) { log.error("Error setting up instance subscription for intermediate topic", e); throw new RuntimeException(e); } } + private void retrieveNextTask() throws Exception { currentTask = intermediateTopicConsumer.receive(); return; diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java index 28c8ac9fa99b52cc4e4b7b72b05c516256ab58c5..5125453cbd8f8fdb3460ec2bec456fa6f3164d50 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java @@ -222,6 +222,7 @@ public class BatchSourceExecutorTest { consumerBuilder = Mockito.mock(ConsumerBuilder.class); Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(Mockito.any()); Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(Mockito.any()); + Mockito.doReturn(consumerBuilder).when(consumerBuilder).properties(Mockito.anyMap()); Mockito.doReturn(consumerBuilder).when(consumerBuilder).topic(Mockito.any()); discoveredTask = Mockito.mock(Message.class); consumer = Mockito.mock(org.apache.pulsar.client.api.Consumer.class); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index b43ce0a5a3863eb6726a64f2b8eeb3bb096bdedb..a79138dce986507cea5b97924df4a1c0a5199c2b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.io.MoreFiles; import com.google.common.io.RecursiveDeleteOption; @@ -27,10 +28,13 @@ import org.apache.commons.lang3.StringUtils; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.common.io.BatchSourceConfig; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.auth.FunctionAuthProvider; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.InstanceUtils; @@ -60,6 +64,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.commons.lang3.StringUtils.isBlank; @@ -130,6 +135,9 @@ public class FunctionActioner { } } + // Setup for batch sources if necessary + setupBatchSource(functionDetails); + RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile); functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner); @@ -325,81 +333,136 @@ public class FunctionActioner { ? InstanceUtils.getDefaultSubscriptionName(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails()) : functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName(); - deleteSubscription(topic, consumerSpec, subscriptionName, fqfn); + deleteSubscription(topic, consumerSpec, subscriptionName, String.format("Cleaning up subscriptions for function %s", fqfn)); } }); } - if (InstanceUtils.calculateSubjectType(details) == FunctionDetails.ComponentType.SOURCE) { - // topicName -> subscriptions - Map subscriptions = - SourceConfigUtils.computeBatchSourceIntermediateTopicSubscriptions(details, - FunctionCommon.getFullyQualifiedName(details)); - if (subscriptions != null) { - subscriptions.forEach((topic, subscriptionName) -> { - Function.ConsumerSpec consumerSpec = Function.ConsumerSpec.newBuilder().setIsRegexPattern(false).build(); - deleteSubscription(topic, consumerSpec, subscriptionName, fqfn); - }); - } - } + + // clean up done for batch sources if necessary + cleanupBatchSource(details); } - private void deleteSubscription(String topic, Function.ConsumerSpec consumerSpec, String subscriptionName, String fqfn) { + private void deleteSubscription(String topic, Function.ConsumerSpec consumerSpec, String subscriptionName, String msg) { try { Actions.newBuilder() .addAction( - Actions.Action.builder() - .actionName(String.format("Cleaning up subscriptions for function %s", fqfn)) - .numRetries(10) - .sleepBetweenInvocationsMs(1000) - .supplier(() -> { - try { - if (consumerSpec.getIsRegexPattern()) { - pulsarAdmin.namespaces().unsubscribeNamespace(TopicName - .get(topic).getNamespace(), subscriptionName); - } else { - pulsarAdmin.topics().deleteSubscription(topic, - subscriptionName); - } - } catch (PulsarAdminException e) { - if (e instanceof PulsarAdminException.NotFoundException) { - return Actions.ActionResult.builder() - .success(true) - .build(); - } else { - // for debugging purposes - List> existingConsumers = Collections.emptyList(); - try { - TopicStats stats = pulsarAdmin.topics().getStats(topic); - SubscriptionStats sub = stats.subscriptions.get(subscriptionName); - if (sub != null) { - existingConsumers = sub.consumers.stream() - .map(consumerStats -> consumerStats.metadata) - .collect(Collectors.toList()); - } - } catch (PulsarAdminException e1) { - - } - - String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage(); - return Actions.ActionResult.builder() - .success(false) - .errorMsg(String.format("%s - existing consumers: %s", errorMsg, existingConsumers)) - .build(); - } - } - - return Actions.ActionResult.builder() - .success(true) - .build(); - - }) - .build()) + Actions.Action.builder() + .actionName(msg) + .numRetries(10) + .sleepBetweenInvocationsMs(1000) + .supplier( + getDeleteSubscriptionSupplier(topic, + consumerSpec.getIsRegexPattern(), + subscriptionName) + ) + .build()) .run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } + private Supplier getDeleteSubscriptionSupplier( + String topic, boolean isRegex, String subscriptionName) { + return () -> { + try { + if (isRegex) { + pulsarAdmin.namespaces().unsubscribeNamespace(TopicName + .get(topic).getNamespace(), subscriptionName); + } else { + pulsarAdmin.topics().deleteSubscription(topic, + subscriptionName); + } + } catch (PulsarAdminException e) { + if (e instanceof PulsarAdminException.NotFoundException) { + return Actions.ActionResult.builder() + .success(true) + .build(); + } else { + // for debugging purposes + List> existingConsumers = Collections.emptyList(); + SubscriptionStats sub = null; + try { + TopicStats stats = pulsarAdmin.topics().getStats(topic); + sub = stats.subscriptions.get(subscriptionName); + if (sub != null) { + existingConsumers = sub.consumers.stream() + .map(consumerStats -> consumerStats.metadata) + .collect(Collectors.toList()); + } + } catch (PulsarAdminException e1) { + + } + + String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage(); + String finalErrorMsg; + if (sub != null) { + try { + finalErrorMsg = String.format("%s - existing consumers: %s", + errorMsg, ObjectMapperFactory.getThreadLocal().writeValueAsString(sub)); + } catch (JsonProcessingException jsonProcessingException) { + finalErrorMsg = errorMsg; + } + } else { + finalErrorMsg = errorMsg; + } + return Actions.ActionResult.builder() + .success(false) + .errorMsg(finalErrorMsg) + .build(); + } + } + + return Actions.ActionResult.builder() + .success(true) + .build(); + }; + } + + private Supplier getDeleteTopicSupplier(String topic) { + return () -> { + try { + pulsarAdmin.topics().delete(topic); + } catch (PulsarAdminException e) { + if (e instanceof PulsarAdminException.NotFoundException) { + return Actions.ActionResult.builder() + .success(true) + .build(); + } else { + // for debugging purposes + TopicStats stats = null; + try { + stats = pulsarAdmin.topics().getStats(topic); + } catch (PulsarAdminException e1) { + + } + + String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage(); + String finalErrorMsg; + if (stats != null) { + try { + finalErrorMsg = String.format("%s - topic stats: %s", + errorMsg, ObjectMapperFactory.getThreadLocal().writeValueAsString(stats)); + } catch (JsonProcessingException jsonProcessingException) { + finalErrorMsg = errorMsg; + } + } else { + finalErrorMsg = errorMsg; + } + + return Actions.ActionResult.builder() + .success(false) + .errorMsg(finalErrorMsg) + .build(); + } + } + + return Actions.ActionResult.builder() + .success(true) + .build(); + }; + } + private String getDownloadPackagePath(FunctionMetaData functionMetaData, int instanceId) { return StringUtils.join( new String[]{ @@ -508,4 +571,100 @@ public class FunctionActioner { throw new RuntimeException("Unknown runtime " + FunctionDetails.getRuntime()); } } + + private void setupBatchSource(Function.FunctionDetails functionDetails) { + if (isBatchSource(functionDetails)) { + + String intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName( + functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName()).toString(); + + String intermediateTopicSubscription = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName( + functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName()); + String fqfn = FunctionCommon.getFullyQualifiedName( + functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName()); + try { + Actions.newBuilder() + .addAction( + Actions.Action.builder() + .actionName(String.format("Creating intermediate topic %s with subscription %s for Batch Source %s", + intermediateTopicName, intermediateTopicSubscription, fqfn)) + .numRetries(10) + .sleepBetweenInvocationsMs(1000) + .supplier(() -> { + try { + pulsarAdmin.topics().createSubscription(intermediateTopicName, intermediateTopicSubscription, MessageId.latest); + return Actions.ActionResult.builder() + .success(true) + .build(); + } catch (PulsarAdminException.ConflictException e) { + // topic and subscription already exists so just continue + return Actions.ActionResult.builder() + .success(true) + .build(); + } catch (Exception e) { + return Actions.ActionResult.builder() + .errorMsg(e.getMessage()) + .success(false) + .build(); + } + }) + .build()) + .run(); + } catch (InterruptedException e) { + log.error("Error setting up instance subscription for intermediate topic", e); + throw new RuntimeException(e); + } + } + } + + private void cleanupBatchSource(Function.FunctionDetails functionDetails) { + if (isBatchSource(functionDetails)) { + // clean up intermediate topic + String intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(functionDetails.getTenant(), + functionDetails.getNamespace(), functionDetails.getName()).toString(); + String intermediateTopicSubscription = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName( + functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName()); + String fqfn = FunctionCommon.getFullyQualifiedName(functionDetails); + try { + Actions.newBuilder() + .addAction( + Actions.Action.builder() + .actionName(String.format("Removing intermediate topic subscription %s for Batch Source %s", + intermediateTopicSubscription, fqfn)) + .numRetries(10) + .sleepBetweenInvocationsMs(1000) + .continueOn(true) + .supplier( + getDeleteSubscriptionSupplier(intermediateTopicName, + false, + intermediateTopicSubscription) + ) + .build()) + .addAction(Actions.Action.builder() + .actionName(String.format("Deleting intermediate topic %s for Batch Source %s", + intermediateTopicName, fqfn)) + .numRetries(10) + .sleepBetweenInvocationsMs(1000) + .supplier(getDeleteTopicSupplier(intermediateTopicName)) + .build()) + .run(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static boolean isBatchSource(Function.FunctionDetails functionDetails) { + if (InstanceUtils.calculateSubjectType(functionDetails) == FunctionDetails.ComponentType.SOURCE) { + String fqfn = FunctionCommon.getFullyQualifiedName(functionDetails); + Map configMap = SourceConfigUtils.extractSourceConfig(functionDetails.getSource(), fqfn); + if (configMap != null) { + BatchSourceConfig batchSourceConfig = SourceConfigUtils.extractBatchSourceConfig(configMap); + if (batchSourceConfig != null) { + return true; + } + } + } + return false; + } } diff --git a/pulsar-io/batch-discovery-triggerers/src/main/java/org/apache/pulsar/io/batchdiscovery/CronTriggerer.java b/pulsar-io/batch-discovery-triggerers/src/main/java/org/apache/pulsar/io/batchdiscovery/CronTriggerer.java index 6c35b9d68448ccd3dca8f9337dfc5715ed0e3fd4..9b166c80591d19771d7eecdde1cdc15640c92697 100644 --- a/pulsar-io/batch-discovery-triggerers/src/main/java/org/apache/pulsar/io/batchdiscovery/CronTriggerer.java +++ b/pulsar-io/batch-discovery-triggerers/src/main/java/org/apache/pulsar/io/batchdiscovery/CronTriggerer.java @@ -58,8 +58,9 @@ public class CronTriggerer implements BatchSourceTriggerer { @Override public void stop() { - scheduler.shutdown(); + if (scheduler != null) { + scheduler.shutdown(); + } } - }