未验证 提交 83773962 编写于 作者: F feynmanlin 提交者: GitHub

Fix PR #7818 #7802 does not support admin cli (#7940)

### Motivation
PR #7818 #7802 supports topic-level policies.
But the pulsar admin cli java doc is not supported accordingly.
上级 7e4ace21
......@@ -726,6 +726,20 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3"));
verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3);
cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1 -m 999"));
verify(mockTopics).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999);
cmdTopics.run(split("get-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("remove-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);
// argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
// range of +/- 1 second of the expected timestamp
class TimestampMatcher implements ArgumentMatcher<Long> {
......
......@@ -127,6 +127,12 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
jcommander.addCommand("get-max-unacked-messages-on-consumer", new GetMaxUnackedMessagesOnConsumer());
jcommander.addCommand("set-max-unacked-messages-on-consumer", new SetMaxUnackedMessagesOnConsumer());
jcommander.addCommand("remove-max-unacked-messages-on-consumer", new RemoveMaxUnackedMessagesOnConsumer());
jcommander.addCommand("get-max-unacked-messages-on-subscription", new GetMaxUnackedMessagesOnSubscription());
jcommander.addCommand("set-max-unacked-messages-on-subscription", new SetMaxUnackedMessagesOnSubscription());
jcommander.addCommand("remove-max-unacked-messages-on-subscription", new RemoveMaxUnackedMessagesOnSubscription());
jcommander.addCommand("get-publish-rate", new GetPublishRate());
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
......@@ -1170,6 +1176,84 @@ public class CmdTopics extends CmdBase {
}
}
@Parameters(commandDescription = "Get max unacked messages policy on consumer for a topic")
private class GetMaxUnackedMessagesOnConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(admin.topics().getMaxUnackedMessagesOnConsumer(persistentTopic));
}
}
@Parameters(commandDescription = "Remove max unacked messages policy on consumer for a topic")
private class RemoveMaxUnackedMessagesOnConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
admin.topics().removeMaxUnackedMessagesOnConsumer(persistentTopic);
}
}
@Parameters(commandDescription = "Set max unacked messages policy on consumer for a topic")
private class SetMaxUnackedMessagesOnConsumer extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on consumer", required = true)
private int maxNum;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
admin.topics().setMaxUnackedMessagesOnConsumer(persistentTopic, maxNum);
}
}
@Parameters(commandDescription = "Get max unacked messages policy on subscription for a topic")
private class GetMaxUnackedMessagesOnSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(admin.topics().getMaxUnackedMessagesOnSubscription(persistentTopic));
}
}
@Parameters(commandDescription = "Remove max unacked messages policy on subscription for a topic")
private class RemoveMaxUnackedMessagesOnSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
admin.topics().removeMaxUnackedMessagesOnSubscription(persistentTopic);
}
}
@Parameters(commandDescription = "Set max unacked messages policy on subscription for a topic")
private class SetMaxUnackedMessagesOnSubscription extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on subscription", required = true)
private int maxNum;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
admin.topics().setMaxUnackedMessagesOnSubscription(persistentTopic, maxNum);
}
}
@Parameters(commandDescription = "Get compaction threshold for a topic")
private class GetCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册