未验证 提交 d45ee07f 编写于 作者: B Boyang Jerry Peng 提交者: GitHub

Improve batch source intermediate topic cleanup (#7985)

Co-authored-by: NJerry Peng <jerryp@splunk.com>
上级 bd4a8301
......@@ -422,7 +422,7 @@ public class FunctionActioner {
private Supplier<Actions.ActionResult> getDeleteTopicSupplier(String topic) {
return () -> {
try {
pulsarAdmin.topics().delete(topic);
pulsarAdmin.topics().delete(topic, true);
} catch (PulsarAdminException e) {
if (e instanceof PulsarAdminException.NotFoundException) {
return Actions.ActionResult.builder()
......@@ -628,19 +628,21 @@ public class FunctionActioner {
try {
Actions.newBuilder()
.addAction(
// Unsubscribe and allow time for consumers to close
Actions.Action.builder()
.actionName(String.format("Removing intermediate topic subscription %s for Batch Source %s",
intermediateTopicSubscription, fqfn))
.numRetries(10)
.sleepBetweenInvocationsMs(1000)
.continueOn(true)
.supplier(
getDeleteSubscriptionSupplier(intermediateTopicName,
false,
intermediateTopicSubscription)
)
.build())
.addAction(Actions.Action.builder()
.addAction(
// Delete topic forcibly regardless whether unsubscribe succeeded or not
Actions.Action.builder()
.actionName(String.format("Deleting intermediate topic %s for Batch Source %s",
intermediateTopicName, fqfn))
.numRetries(10)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册