diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ba585b95782f347c27aac8ae7c3867ad28811435..7f5966fc8ee873ca51e8c19e44b77c2a93886f6f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -92,6 +92,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceExcept import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; @@ -1273,8 +1274,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener { topic.getSubscriptions().forEach((subName, persistentSubscription) -> { Dispatcher dispatcher = persistentSubscription.getDispatcher(); - if (dispatcher.getRateLimiter().isPresent()) { - dispatcher.getRateLimiter().get().updateDispatchRate(); + if (dispatcher != null) { + dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate); } }); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2e6b103a0f3afb506ae753a91234118760138a6e..1a088a148c7036515892ddad97e978be11bfa080 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -63,6 +63,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedExcept import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException; import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.ServerCnx; @@ -269,8 +270,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } // dispatch rate limiter for each subscription - subscriptions.forEach((name, subscription) -> - subscription.getDispatcher().initializeDispatchRateLimiterIfNeeded(policies)); + subscriptions.forEach((name, subscription) -> { + Dispatcher dispatcher = subscription.getDispatcher(); + if (dispatcher != null) { + dispatcher.initializeDispatchRateLimiterIfNeeded(policies); + } + }); // dispatch rate limiter for each replicator replicators.forEach((name, replicator) -> @@ -1673,8 +1678,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal }); subscriptions.forEach((subName, sub) -> { sub.getConsumers().forEach(Consumer::checkPermissions); - if (sub.getDispatcher().getRateLimiter().isPresent()) { - sub.getDispatcher().getRateLimiter().get().onPoliciesUpdate(data); + Dispatcher dispatcher = sub.getDispatcher(); + if (dispatcher != null) { + dispatcher.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data)); } }); replicators.forEach((name, replicator) ->