未验证 提交 8933d8dd 编写于 作者: F feynmanlin 提交者: GitHub

Fix dispatchRate is overwritten (#8004)

Fixes #7863

### Motivation
1) Topic level policy will be overwritten by namespace level
2) After removing the topic level policy, `DispatchLimiter` does not take effect,  the namespace level policy should be used

### Modifications
1)If there is a topic-level policy, it will not be overwritten by the namespace-level policy when the policy is updated
2)When removing topic-level policy, namespace-level policy will be used. If the namespace-level policy does not exist, the broker level will be used

### Verifying this change
unit test:
TopicPoliciesTest#testPolicyOverwrittenByNamespaceLevel
上级 ed356add
......@@ -1284,10 +1284,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return subscriptions;
}
@Override
public PersistentSubscription getSubscription(String subscriptionName) {
return subscriptions.get(subscriptionName);
}
@Override
public ConcurrentOpenHashMap<String, Replicator> getReplicators() {
return replicators;
}
......@@ -1858,7 +1860,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
}
initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
this.updateMaxPublishRate(data);
......@@ -1883,7 +1884,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
// update rate-limiter if policies updated
if (this.dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().onPoliciesUpdate(data);
if (topicPolicies == null || !topicPolicies.isDispatchRateSet()) {
dispatchRateLimiter.get().onPoliciesUpdate(data);
}
}
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().onPoliciesUpdate(data);
......@@ -2375,10 +2378,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
Optional<Policies> namespacePolicies = getNamespacePolicies();
initializeTopicDispatchRateLimiterIfNeeded(policies);
if (this.dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
dispatchRateLimiter.ifPresent(dispatchRateLimiter ->
dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate()));
}
dispatchRateLimiter.ifPresent(limiter -> {
if (policies.isDispatchRateSet()) {
dispatchRateLimiter.get().updateDispatchRate(policies.getDispatchRate());
} else {
dispatchRateLimiter.get().updateDispatchRate();
}
});
if (policies.getPublishRate() != null) {
topicPolicyPublishRate = policies.getPublishRate();
......@@ -2387,17 +2394,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
if (policies.isInactiveTopicPoliciesSet()) {
inactiveTopicPolicies = policies.getInactiveTopicPolicies();
} else if (namespacePolicies.isPresent() && namespacePolicies.get().inactive_topic_policies != null) {
//topic-level policies is null , so use namespace-level
inactiveTopicPolicies = namespacePolicies.get().inactive_topic_policies;
} else {
//topic-level policies is null , so use namespace-level or broker-level
namespacePolicies.ifPresent(nsPolicies -> {
if (nsPolicies.inactive_topic_policies != null) {
inactiveTopicPolicies = nsPolicies.inactive_topic_policies;
} else {
ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
}
});
//namespace-level policies is null , so use broker level
ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
}
}
......
......@@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
......@@ -44,6 +45,8 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.UUID;
@Slf4j
public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
......@@ -516,6 +519,45 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test(timeOut = 20000)
public void testPolicyOverwrittenByNamespaceLevel() throws Exception {
final String topic = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
//wait for cache init
Thread.sleep(2000);
DispatchRate dispatchRate = new DispatchRate(200, 20000, 1, true);
admin.namespaces().setDispatchRate(myNamespace, dispatchRate);
//wait for zk
Thread.sleep(2000);
dispatchRate = new DispatchRate(100, 10000, 1, true);
admin.topics().setDispatchRate(topic, dispatchRate);
for (int i = 0; i < 10; i++) {
if (admin.topics().getDispatchRate(topic) != null) {
break;
}
Thread.sleep(500);
}
//1 Set ns level policy, topic level should not be overwritten
dispatchRate = new DispatchRate(300, 30000, 2, true);
admin.namespaces().setDispatchRate(myNamespace, dispatchRate);
//wait for zk
Thread.sleep(1000);
DispatchRateLimiter limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get();
Assert.assertEquals(limiter.getDispatchRateOnByte(), 10000);
Assert.assertEquals(limiter.getDispatchRateOnMsg(), 100);
admin.topics().removeDispatchRate(topic);
for (int i = 0; i < 10; i++) {
if (admin.topics().getDispatchRate(topic) == null) {
break;
}
Thread.sleep(500);
}
//2 Remove level policy ,DispatchRateLimiter should us ns level policy
limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get();
Assert.assertEquals(limiter.getDispatchRateOnByte(), 30000);
Assert.assertEquals(limiter.getDispatchRateOnMsg(), 300);
}
@Test
public void testGetSetCompactionThreshold() throws Exception {
long compactionThreshold = 100000;
......
......@@ -303,11 +303,9 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
super.baseSetup();
final String topicName = "persistent://prop/ns-abc/testMaxInactiveDuration-" + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
//wait for cache init
Thread.sleep(2000);
InactiveTopicPolicies inactiveTopicPolicies = admin.topics().getInactiveTopicPolicies(topicName);
assertNull(inactiveTopicPolicies);
......@@ -316,6 +314,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
policies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
policies.setMaxInactiveDurationSeconds(10);
admin.topics().setInactiveTopicPolicies(topicName, policies);
//wait for init
Thread.sleep(3000);
for (int i = 0; i < 50; i++) {
if (admin.topics().getInactiveTopicPolicies(topicName) != null) {
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册