提交 84a519fa 编写于 作者: M Masahiro Sakamoto 提交者: nkurihar

[pulsar-broker] Fix bug that namespace policies does not take effect due to NPE (#5408)

* Fix bug that namespace policies does not take effect due to NPE

* Prevent NPE if Dispatcher and DispatchRateLimiter return to null
上级 ddf5429e
......@@ -99,6 +99,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceExcept
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
......@@ -1372,8 +1373,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
forEachTopic(topic -> {
topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
Dispatcher dispatcher = persistentSubscription.getDispatcher();
if (dispatcher.getRateLimiter().isPresent()) {
dispatcher.getRateLimiter().get().updateDispatchRate();
if (dispatcher != null) {
dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate);
}
});
});
......
......@@ -63,6 +63,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedExcept
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ServerCnx;
......@@ -269,8 +270,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
// dispatch rate limiter for each subscription
subscriptions.forEach((name, subscription) ->
subscription.getDispatcher().initializeDispatchRateLimiterIfNeeded(policies));
subscriptions.forEach((name, subscription) -> {
Dispatcher dispatcher = subscription.getDispatcher();
if (dispatcher != null) {
dispatcher.initializeDispatchRateLimiterIfNeeded(policies);
}
});
// dispatch rate limiter for each replicator
replicators.forEach((name, replicator) ->
......@@ -1673,8 +1678,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
});
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
if (sub.getDispatcher().getRateLimiter().isPresent()) {
sub.getDispatcher().getRateLimiter().get().onPoliciesUpdate(data);
Dispatcher dispatcher = sub.getDispatcher();
if (dispatcher != null) {
dispatcher.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
}
});
replicators.forEach((name, replicator) ->
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册