diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index a46ee7c71917136966fbd1ba91fcc29c669c8949..c6843293998475599450fd66599d47e2616b183b 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -726,6 +726,20 @@ public class PulsarAdminToolTest { cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3")); verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3); + cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("remove-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1 -m 999")); + verify(mockTopics).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999); + + cmdTopics.run(split("get-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("remove-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1 -m 99")); + verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99); + // argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a // range of +/- 1 second of the expected timestamp class TimestampMatcher implements ArgumentMatcher { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index df0d3423d47f9febb2b4fb6b79bf3a3451b7c18e..6d18feaf6637deb814ae8f132f288c703824af12 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -127,6 +127,12 @@ public class CmdTopics extends CmdBase { jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold()); jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold()); jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold()); + jcommander.addCommand("get-max-unacked-messages-on-consumer", new GetMaxUnackedMessagesOnConsumer()); + jcommander.addCommand("set-max-unacked-messages-on-consumer", new SetMaxUnackedMessagesOnConsumer()); + jcommander.addCommand("remove-max-unacked-messages-on-consumer", new RemoveMaxUnackedMessagesOnConsumer()); + jcommander.addCommand("get-max-unacked-messages-on-subscription", new GetMaxUnackedMessagesOnSubscription()); + jcommander.addCommand("set-max-unacked-messages-on-subscription", new SetMaxUnackedMessagesOnSubscription()); + jcommander.addCommand("remove-max-unacked-messages-on-subscription", new RemoveMaxUnackedMessagesOnSubscription()); jcommander.addCommand("get-publish-rate", new GetPublishRate()); jcommander.addCommand("set-publish-rate", new SetPublishRate()); jcommander.addCommand("remove-publish-rate", new RemovePublishRate()); @@ -1170,6 +1176,84 @@ public class CmdTopics extends CmdBase { } } + @Parameters(commandDescription = "Get max unacked messages policy on consumer for a topic") + private class GetMaxUnackedMessagesOnConsumer extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(admin.topics().getMaxUnackedMessagesOnConsumer(persistentTopic)); + } + } + + @Parameters(commandDescription = "Remove max unacked messages policy on consumer for a topic") + private class RemoveMaxUnackedMessagesOnConsumer extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().removeMaxUnackedMessagesOnConsumer(persistentTopic); + } + } + + @Parameters(commandDescription = "Set max unacked messages policy on consumer for a topic") + private class SetMaxUnackedMessagesOnConsumer extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on consumer", required = true) + private int maxNum; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().setMaxUnackedMessagesOnConsumer(persistentTopic, maxNum); + } + } + + @Parameters(commandDescription = "Get max unacked messages policy on subscription for a topic") + private class GetMaxUnackedMessagesOnSubscription extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(admin.topics().getMaxUnackedMessagesOnSubscription(persistentTopic)); + } + } + + @Parameters(commandDescription = "Remove max unacked messages policy on subscription for a topic") + private class RemoveMaxUnackedMessagesOnSubscription extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().removeMaxUnackedMessagesOnSubscription(persistentTopic); + } + } + + @Parameters(commandDescription = "Set max unacked messages policy on subscription for a topic") + private class SetMaxUnackedMessagesOnSubscription extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on subscription", required = true) + private int maxNum; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().setMaxUnackedMessagesOnSubscription(persistentTopic, maxNum); + } + } + @Parameters(commandDescription = "Get compaction threshold for a topic") private class GetCompactionThreshold extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true)