未验证 提交 66688d15 编写于 作者: H hangc0276 提交者: GitHub

support message publish rate on topic level (#7948)

Modifications
Support set publish rate on topic level.
Support get publish rate on topic level.
Support remove publish rate on topic level.
上级 5cee214a
......@@ -97,9 +97,8 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
......@@ -3128,18 +3127,60 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
}
checkTopicLevelPolicyEnable();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
}
topicPolicies.get().setCompactionThreshold(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}
topicPolicies.get().setCompactionThreshold(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}
protected Optional<PublishRate> internalGetPublishRate() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate);
}
protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
if (publishRate == null) {
return CompletableFuture.completedFuture(null);
}
TopicPolicies topicPolicies = getTopicPolicies(topicName)
.orElseGet(TopicPolicies::new);
topicPolicies.setPublishRate(publishRate);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}
protected CompletableFuture<Void> internalRemovePublishRate() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
}
topicPolicies.get().setPublishRate(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}
}
......@@ -61,6 +61,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
......@@ -1871,5 +1872,92 @@ public class PersistentTopics extends PersistentTopicsBase {
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/publishRate")
@ApiOperation(value = "Get publish rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getPublishRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
Optional<PublishRate> publishRate = internalGetPublishRate();
if (!publishRate.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(publishRate.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/publishRate")
@ApiOperation(value = "Set message publish rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setPublishRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetPublishRate(publishRate).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic dispatch rate", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed to set topic dispatch rate");
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully set topic publish rate: tenant={}, namespace={}, topic={}, publishRate={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
jsonMapper().writeValueAsString(publishRate));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/publishRate")
@ApiOperation(value = "Remove message publish rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removePublishRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
internalRemovePublishRate().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic publish rate", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
......@@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.google.common.base.MoreObjects;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
......@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
......@@ -44,6 +46,7 @@ import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
......@@ -94,6 +97,8 @@ public abstract class AbstractTopic implements Topic {
protected boolean preciseTopicPublishRateLimitingEnable;
protected volatile PublishRate topicPolicyPublishRate = null;
private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();
......@@ -110,17 +115,28 @@ public abstract class AbstractTopic implements Topic {
this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
this.lastActive = System.nanoTime();
Policies policies = null;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
ServiceConfiguration brokerConfig = brokerService.pulsar().getConfiguration();
if (brokerConfig.isSystemTopicEnabled() && brokerConfig.isSystemTopicEnabled()) {
topicPolicyPublishRate = Optional.ofNullable(getTopicPolicies(TopicName.get(topic)))
.map(TopicPolicies::getPublishRate)
.orElse(null);
}
if (topicPolicyPublishRate != null) {
// update topic level publish dispatcher
updateTopicPublishDispatcher();
} else {
Policies policies = null;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
this.preciseTopicPublishRateLimitingEnable =
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(policies);
updatePublishDispatcher(policies);
}
}
protected boolean isProducersExceeded() {
......@@ -466,6 +482,12 @@ public abstract class AbstractTopic implements Topic {
}
private void updatePublishDispatcher(Policies policies) {
// if topic level publish rate policy is set, skip update publish rate on namespace level
if (topicPolicyPublishRate != null) {
log.info("Using topic policy publish rate instead of namespace level topic publish rate on topic {}", this.topic);
return;
}
final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
......@@ -532,4 +554,33 @@ public abstract class AbstractTopic implements Topic {
inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds);
inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
}
/**
* Get {@link TopicPolicies} for this topic.
* @param topicName
* @return TopicPolicies is exist else return null.
*/
public TopicPolicies getTopicPolicies(TopicName topicName) {
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
try {
return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
return null;
} catch (NullPointerException e) {
log.warn("Topic level policies are not enabled. " +
"Please refer to systemTopicEnabled and topicLevelPoliciesEnabled on broker.conf");
return null;
}
}
/**
* update topic publish dispatcher for this topic.
*/
protected void updateTopicPublishDispatcher() {
// noop
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.util.RateLimitFunction;
import org.apache.pulsar.common.util.RateLimiter;
import java.util.concurrent.TimeUnit;
public class PrecisPublishLimiter implements PublishRateLimiter {
protected volatile int publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
// precise mode for publish rate limiter
private RateLimiter topicPublishRateLimiterOnMessage;
private RateLimiter topicPublishRateLimiterOnByte;
private final RateLimitFunction rateLimitFunction;
public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) {
this.rateLimitFunction = rateLimitFunction;
update(policies, clusterName);
}
public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) {
this.rateLimitFunction = rateLimitFunction;
update(publishRate);
}
@Override
public void checkPublishRate() {
// No-op
}
@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
// No-op
}
@Override
public boolean resetPublishCount() {
return true;
}
@Override
public boolean isPublishRateExceeded() {
return false;
}
@Override
public void update(Policies policies, String clusterName) {
final PublishRate maxPublishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
this.update(maxPublishRate);
}
public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage = new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
topicPublishRateLimiterOnMessage = null;
topicPublishRateLimiterOnByte = null;
}
}
@Override
public boolean tryAcquire(int numbers, long bytes) {
return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers)) &&
(topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes));
}
}
......@@ -18,13 +18,8 @@
*/
package org.apache.pulsar.broker.service;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.util.RateLimitFunction;
import org.apache.pulsar.common.util.RateLimiter;
public interface PublishRateLimiter {
......@@ -76,196 +71,3 @@ public interface PublishRateLimiter {
* */
boolean tryAcquire(int numbers, long bytes);
}
class PrecisPublishLimiter implements PublishRateLimiter {
protected volatile int publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
// precise mode for publish rate limiter
private RateLimiter topicPublishRateLimiterOnMessage;
private RateLimiter topicPublishRateLimiterOnByte;
private final RateLimitFunction rateLimitFunction;
public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) {
this.rateLimitFunction = rateLimitFunction;
update(policies, clusterName);
}
@Override
public void checkPublishRate() {
// No-op
}
@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
// No-op
}
@Override
public boolean resetPublishCount() {
return true;
}
@Override
public boolean isPublishRateExceeded() {
return false;
}
@Override
public void update(Policies policies, String clusterName) {
final PublishRate maxPublishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
this.update(maxPublishRate);
}
public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage = new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
topicPublishRateLimiterOnMessage = null;
topicPublishRateLimiterOnByte = null;
}
}
@Override
public boolean tryAcquire(int numbers, long bytes) {
return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers)) &&
(topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes));
}
}
class PublishRateLimiterImpl implements PublishRateLimiter {
protected volatile int publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
protected volatile boolean publishRateExceeded = false;
protected volatile LongAdder currentPublishMsgCount = new LongAdder();
protected volatile LongAdder currentPublishByteCount = new LongAdder();
public PublishRateLimiterImpl(Policies policies, String clusterName) {
update(policies, clusterName);
}
public PublishRateLimiterImpl(PublishRate maxPublishRate) {
update(maxPublishRate);
}
@Override
public void checkPublishRate() {
if (this.publishThrottlingEnabled && !publishRateExceeded) {
long currentPublishMsgRate = this.currentPublishMsgCount.sum();
long currentPublishByteRate = this.currentPublishByteCount.sum();
if ((this.publishMaxMessageRate > 0 && currentPublishMsgRate > this.publishMaxMessageRate)
|| (this.publishMaxByteRate > 0 && currentPublishByteRate > this.publishMaxByteRate)) {
publishRateExceeded = true;
}
}
}
@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
if (this.publishThrottlingEnabled) {
this.currentPublishMsgCount.add(numOfMessages);
this.currentPublishByteCount.add(msgSizeInBytes);
}
}
@Override
public boolean resetPublishCount() {
if (this.publishThrottlingEnabled) {
this.currentPublishMsgCount.reset();
this.currentPublishByteCount.reset();
this.publishRateExceeded = false;
return true;
}
return false;
}
@Override
public boolean isPublishRateExceeded() {
return publishRateExceeded;
}
@Override
public void update(Policies policies, String clusterName) {
final PublishRate maxPublishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
update(maxPublishRate);
}
public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
}
resetPublishCount();
}
@Override
public boolean tryAcquire(int numbers, long bytes) {
return false;
}
}
class PublishRateLimiterDisable implements PublishRateLimiter {
public static final PublishRateLimiterDisable DISABLED_RATE_LIMITER = new PublishRateLimiterDisable();
@Override
public void checkPublishRate() {
// No-op
}
@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
// No-op
}
@Override
public boolean resetPublishCount() {
// No-op
return false;
}
@Override
public boolean isPublishRateExceeded() {
return false;
}
@Override
public void update(Policies policies, String clusterName) {
// No-op
}
@Override
public void update(PublishRate maxPublishRate) {
// No-op
}
@Override
public boolean tryAcquire(int numbers, long bytes) {
// No-op
return false;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
public class PublishRateLimiterDisable implements PublishRateLimiter {
public static final PublishRateLimiterDisable DISABLED_RATE_LIMITER = new PublishRateLimiterDisable();
@Override
public void checkPublishRate() {
// No-op
}
@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
// No-op
}
@Override
public boolean resetPublishCount() {
// No-op
return false;
}
@Override
public boolean isPublishRateExceeded() {
return false;
}
@Override
public void update(Policies policies, String clusterName) {
// No-op
}
@Override
public void update(PublishRate maxPublishRate) {
// No-op
}
@Override
public boolean tryAcquire(int numbers, long bytes) {
// No-op
return false;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import java.util.concurrent.atomic.LongAdder;
public class PublishRateLimiterImpl implements PublishRateLimiter {
protected volatile int publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
protected volatile boolean publishRateExceeded = false;
protected volatile LongAdder currentPublishMsgCount = new LongAdder();
protected volatile LongAdder currentPublishByteCount = new LongAdder();
public PublishRateLimiterImpl(Policies policies, String clusterName) {
update(policies, clusterName);
}
public PublishRateLimiterImpl(PublishRate maxPublishRate) {
update(maxPublishRate);
}
@Override
public void checkPublishRate() {
if (this.publishThrottlingEnabled && !publishRateExceeded) {
long currentPublishMsgRate = this.currentPublishMsgCount.sum();
long currentPublishByteRate = this.currentPublishByteCount.sum();
if ((this.publishMaxMessageRate > 0 && currentPublishMsgRate > this.publishMaxMessageRate)
|| (this.publishMaxByteRate > 0 && currentPublishByteRate > this.publishMaxByteRate)) {
publishRateExceeded = true;
}
}
}
@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
if (this.publishThrottlingEnabled) {
this.currentPublishMsgCount.add(numOfMessages);
this.currentPublishByteCount.add(msgSizeInBytes);
}
}
@Override
public boolean resetPublishCount() {
if (this.publishThrottlingEnabled) {
this.currentPublishMsgCount.reset();
this.currentPublishByteCount.reset();
this.publishRateExceeded = false;
return true;
}
return false;
}
@Override
public boolean isPublishRateExceeded() {
return publishRateExceeded;
}
@Override
public void update(Policies policies, String clusterName) {
final PublishRate maxPublishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
update(maxPublishRate);
}
public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
}
resetPublishCount();
}
@Override
public boolean tryAcquire(int numbers, long bytes) {
return false;
}
}
......@@ -82,7 +82,10 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedEx
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.PrecisPublishLimiter;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.StreamingStats;
......@@ -2134,24 +2137,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
}
/**
* Get {@link TopicPolicies} for this topic.
* @param topicName
* @return TopicPolicies is exist else return null.
*/
public TopicPolicies getTopicPolicies(TopicName topicName) {
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
try {
return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
return null;
}
}
/**
* Get message TTL for this topic.
* @param topicPolicies TopicPolicies
......@@ -2366,6 +2351,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
dispatchRateLimiter.ifPresent(dispatchRateLimiter ->
dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate()));
}
if (policies != null && policies.getPublishRate() != null) {
topicPolicyPublishRate = policies.getPublishRate();
updateTopicPublishDispatcher();
}
}
private void initializeTopicDispatchRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
......@@ -2381,6 +2371,32 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return this;
}
@Override
protected void updateTopicPublishDispatcher() {
if (topicPolicyPublishRate != null && (topicPolicyPublishRate.publishThrottlingRateInByte > 0
|| topicPolicyPublishRate.publishThrottlingRateInMsg > 0)) {
log.info("Enabling topic policy publish rate limiting {} on topic {}", topicPolicyPublishRate, this.topic);
if (!preciseTopicPublishRateLimitingEnable) {
this.brokerService.setupBrokerPublishRateLimiterMonitor();
}
if (this.topicPublishRateLimiter == null
|| this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(topicPolicyPublishRate, ()-> this.enableCnxAutoRead());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(topicPolicyPublishRate);
}
} else {
this.topicPublishRateLimiter.update(topicPolicyPublishRate);
}
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProducerReadForPublishRateLimiting();
}
}
@VisibleForTesting
public MessageDeduplication getMessageDeduplication() {
......
......@@ -26,6 +26,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
......@@ -170,4 +171,24 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(e.getStatusCode(), 405);
}
}
@Test
public void testPublishRateDisabled() throws Exception {
PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic);
try {
admin.topics().setPublishRate(testTopic, publishRate);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}
try {
admin.topics().getPublishRate(testTopic);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), 405);
}
}
}
......@@ -34,6 +34,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
......@@ -415,4 +416,42 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testGetSetPublishRate() throws Exception {
PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic);
admin.topics().setPublishRate(testTopic, publishRate);
log.info("Publish Rate set success on topic: {}", testTopic);
Thread.sleep(3000);
PublishRate getPublishRate = admin.topics().getPublishRate(testTopic);
log.info("Publish Rate: {} get on topic: {}", getPublishRate, testTopic);
Assert.assertEquals(getPublishRate, publishRate);
admin.topics().deletePartitionedTopic(testTopic, true);
}
@Test
public void testRemovePublishRate() throws Exception {
PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic);
admin.topics().setPublishRate(testTopic, publishRate);
log.info("Publish Rate set success on topic: {}", testTopic);
Thread.sleep(3000);
PublishRate getPublishRate = admin.topics().getPublishRate(testTopic);
log.info("Publish Rate: {} get on topic: {}", getPublishRate, testTopic);
Assert.assertEquals(getPublishRate, publishRate);
admin.topics().removePublishRate(testTopic);
Thread.sleep(3000);
getPublishRate = admin.topics().getPublishRate(testTopic);
log.info("Publish Rate get on topic: {} after remove", getPublishRate, testTopic);
Assert.assertNull(getPublishRate);
admin.topics().deletePartitionedTopic(testTopic, true);
}
}
......@@ -39,9 +39,9 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
/**
* Admin interface for Topics management.
*/
......@@ -2007,4 +2007,64 @@ public interface Topics {
*/
CompletableFuture<Void> removeCompactionThresholdAsync(String topic);
/**
* Set message-publish-rate (topics can publish this many messages per second).
*
* @param topic
* @param publishMsgRate
* number of messages per second
* @throws PulsarAdminException
* Unexpected error
*/
void setPublishRate(String topic, PublishRate publishMsgRate) throws PulsarAdminException;
/**
* Set message-publish-rate (topics can publish this many messages per second) asynchronously.
*
* @param topic
* @param publishMsgRate
* number of messages per second
*/
CompletableFuture<Void> setPublishRateAsync(String topic, PublishRate publishMsgRate);
/**
* Get message-publish-rate (topics can publish this many messages per second).
*
* @param topic
* @return number of messages per second
* @throws PulsarAdminException Unexpected error
*/
PublishRate getPublishRate(String topic) throws PulsarAdminException;
/**
* Get message-publish-rate (topics can publish this many messages per second) asynchronously.
*
* @param topic
* @return number of messages per second
*/
CompletableFuture<PublishRate> getPublishRateAsync(String topic);
/**
* Remove message-publish-rate.
* <p/>
* Remove topic message publish rate
*
* @param topic
* @throws PulsarAdminException
* unexpected error
*/
void removePublishRate(String topic) throws PulsarAdminException;
/**
* Remove message-publish-rate asynchronously.
* <p/>
* Remove topic message publish rate
*
* @param topic
* @throws PulsarAdminException
* unexpected error
*/
CompletableFuture<Void> removePublishRateAsync(String topic) throws PulsarAdminException;
}
......@@ -75,6 +75,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
......@@ -2079,5 +2080,81 @@ public class TopicsImpl extends BaseResource implements Topics {
return asyncDeleteRequest(path);
}
private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
@Override
public PublishRate getPublishRate(String topic) throws PulsarAdminException {
try {
return getPublishRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<PublishRate> getPublishRateAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "publishRate");
final CompletableFuture<PublishRate> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<PublishRate>() {
@Override
public void completed(PublishRate publishRate) {
future.complete(publishRate);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setPublishRate(String topic, PublishRate publishRate) throws PulsarAdminException {
try {
setPublishRateAsync(topic, publishRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setPublishRateAsync(String topic, PublishRate publishRate) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "publishRate");
return asyncPostRequest(path, Entity.entity(publishRate, MediaType.APPLICATION_JSON));
}
@Override
public void removePublishRate(String topic) throws PulsarAdminException {
try {
removePublishRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removePublishRateAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "publishRate");
return asyncDeleteRequest(path);
}
private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
......@@ -52,6 +52,7 @@ import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;
......@@ -126,6 +127,9 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
jcommander.addCommand("get-publish-rate", new GetPublishRate());
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
}
@Parameters(commandDescription = "Get the list of topics under a namespace.")
......@@ -1207,4 +1211,49 @@ public class CmdTopics extends CmdBase {
admin.topics().removeCompactionThreshold(persistentTopic);
}
}
@Parameters(commandDescription = "Get publish rate for a topic")
private class GetPublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(admin.topics().getPublishRate(persistentTopic));
}
}
@Parameters(commandDescription = "Set publish rate for a topic")
private class SetPublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Parameter(names = { "--msg-publish-rate",
"-m" }, description = "message-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
private int msgPublishRate = -1;
@Parameter(names = { "--byte-publish-rate",
"-b" }, description = "byte-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
private long bytePublishRate = -1;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
admin.topics().setPublishRate(persistentTopic,
new PublishRate(msgPublishRate, bytePublishRate));
}
}
@Parameters(commandDescription = "Remove publish rate for a topic")
private class RemovePublishRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
admin.topics().removePublishRate(persistentTopic);
}
}
}
......@@ -51,6 +51,7 @@ public class TopicPolicies {
private Boolean delayedDeliveryEnabled = null;
private DispatchRate dispatchRate = null;
private Long compactionThreshold = null;
private PublishRate publishRate = null;
public boolean isMaxUnackedMessagesOnConsumerSet() {
return maxUnackedMessagesOnConsumer != null;
......@@ -107,4 +108,8 @@ public class TopicPolicies {
public boolean isCompactionThresholdSet() {
return compactionThreshold != null;
}
public boolean isPublishRateSet() {
return publishRate != null;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册