提交 33a7c239 编写于 作者: X Xiaobing Fang 提交者: Jia Zhai

Fix : default retention policy in configuration do nothing (#4810)

fix #4755
Retention policy is used in Ledger GC and Topic GC.
Default retention policy is not uploaded to zookeeper, but it is getted from zookeeper when it is used.
So, if zookeeper doesn't have retention policy, we should load it from default config file.
In Ledger GC configuration, it's OK.
https://github.com/apache/pulsar/blob/075f28b71c8fd9259ce8e136dbb81c0629c3f271/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L739-L742
But in Topic GC, do nothing.
(cherry picked from commit 1dcac8cc)
上级 2b5bb183
......@@ -18,23 +18,23 @@
*/
package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
......@@ -98,11 +98,12 @@ import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
......@@ -116,14 +117,10 @@ import org.apache.pulsar.utils.StatsOutputStream;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {
......@@ -1561,13 +1558,16 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
Optional<Policies> policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()));
// If no policies, the default is to have no retention and delete the inactive topic
return policies.map(p -> p.retention_policies).map(rp -> {
long retentionTime = TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes());
RetentionPolicies retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(
brokerService.pulsar().getConfiguration().getDefaultRetentionTimeInMinutes(),
brokerService.pulsar().getConfiguration().getDefaultRetentionSizeInMB())
);
long retentionTime = TimeUnit.MINUTES.toNanos(retentionPolicies.getRetentionTimeInMinutes());
// Negative retention time means the topic should be retained indefinitely,
// because its own data has to be retained
return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime;
}).orElse(false);
// Negative retention time means the topic should be retained indefinitely,
// because its own data has to be retained
return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime;
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
......
......@@ -24,9 +24,7 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
......@@ -40,7 +38,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
......@@ -772,6 +769,53 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
}
/**
* Set retention policy in default configuration.
* It should be effective.
*/
@Test
public void testServiceConfigurationRetentionPolicy() throws Exception {
// set retention policy in service configuration
pulsar.getConfiguration().setDefaultRetentionSizeInMB(-1);
pulsar.getConfiguration().setDefaultRetentionTimeInMinutes(-1);
String namespaceName = "prop/ns-default-retention-policy";
admin.namespaces().createNamespace(namespaceName);
// 1. Simple successful GC
String topicName = "persistent://prop/ns-abc/topic-10";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
runGC();
// Should not have been deleted, since we have retention
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// Remove retention
admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(0, 10));
Thread.sleep(300);
// 2. Topic is not GCed with live connection
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
runGC();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 3. Topic with subscription is not GCed even with no connections
consumer.close();
runGC();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 4. Topic can be GCed after unsubscribe
admin.topics().deleteSubscription(topicName, subName);
runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
}
@Test
public void testMessageExpiry() throws Exception {
int messageTTLSecs = 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册