diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9202ac8efe8a5b6ee771bc26115923e5b8a12601..8f325804d1cab86c530f2bdb4de5198d939609e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -18,23 +18,23 @@ */ package org.apache.pulsar.broker.service.persistent; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; - +import com.carrotsearch.hppc.ObjectObjectHashMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.FastThreadLocal; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; - import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -98,11 +98,12 @@ import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo; -import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublisherStats; import org.apache.pulsar.common.policies.data.ReplicatorStats; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; @@ -116,14 +117,10 @@ import org.apache.pulsar.utils.StatsOutputStream; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.carrotsearch.hppc.ObjectObjectHashMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import io.netty.buffer.ByteBuf; -import io.netty.util.concurrent.FastThreadLocal; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback { @@ -1561,13 +1558,16 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal Optional policies = brokerService.pulsar().getConfigurationCache().policiesCache() .get(AdminResource.path(POLICIES, name.getNamespace())); // If no policies, the default is to have no retention and delete the inactive topic - return policies.map(p -> p.retention_policies).map(rp -> { - long retentionTime = TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes()); + RetentionPolicies retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( + () -> new RetentionPolicies( + brokerService.pulsar().getConfiguration().getDefaultRetentionTimeInMinutes(), + brokerService.pulsar().getConfiguration().getDefaultRetentionSizeInMB()) + ); + long retentionTime = TimeUnit.MINUTES.toNanos(retentionPolicies.getRetentionTimeInMinutes()); - // Negative retention time means the topic should be retained indefinitely, - // because its own data has to be retained - return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime; - }).orElse(false); + // Negative retention time means the topic should be retained indefinitely, + // because its own data has to be retained + return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime; } catch (Exception e) { if (log.isDebugEnabled()) { log.debug("[{}] Error getting policies", topic); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index e06b2497ff72eaff29036877ecc6e89b5e299669..c7fce99fc5469b0ba6ac93263638ed1cfeeb9810 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -24,9 +24,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.collect.Sets; - import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; @@ -40,7 +38,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -772,6 +769,53 @@ public class PersistentTopicE2ETest extends BrokerTestBase { assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); } + /** + * Set retention policy in default configuration. + * It should be effective. + */ + @Test + public void testServiceConfigurationRetentionPolicy() throws Exception { + // set retention policy in service configuration + pulsar.getConfiguration().setDefaultRetentionSizeInMB(-1); + pulsar.getConfiguration().setDefaultRetentionTimeInMinutes(-1); + + String namespaceName = "prop/ns-default-retention-policy"; + admin.namespaces().createNamespace(namespaceName); + + // 1. Simple successful GC + String topicName = "persistent://prop/ns-abc/topic-10"; + Producer producer = pulsarClient.newProducer().topic(topicName).create(); + producer.close(); + + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + runGC(); + // Should not have been deleted, since we have retention + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // Remove retention + admin.namespaces().setRetention("prop/ns-abc", new RetentionPolicies(0, 10)); + Thread.sleep(300); + + // 2. Topic is not GCed with live connection + String subName = "sub1"; + Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + + runGC(); + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // 3. Topic with subscription is not GCed even with no connections + consumer.close(); + + runGC(); + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // 4. Topic can be GCed after unsubscribe + admin.topics().deleteSubscription(topicName, subName); + + runGC(); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + } + @Test public void testMessageExpiry() throws Exception { int messageTTLSecs = 1;