diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index c82d1363e34782..b6a0b2dd026555 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -97,9 +97,8 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; -import org.apache.pulsar.common.policies.data.PolicyName; -import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -843,6 +842,21 @@ protected CompletableFuture internalSetOffloadPolicies(OffloadPolicies off return completableFuture; } + protected CompletableFuture internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) { + TopicPolicies topicPolicies = null; + try { + topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName); + } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { + log.error("Topic {} policies cache have not init.", topicName); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init")); + } + if (topicPolicies == null) { + topicPolicies = new TopicPolicies(); + } + topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + } + private CompletableFuture internalUpdateOffloadPolicies(OffloadPolicies offloadPolicies, TopicName topicName) { return pulsar().getBrokerService().getTopicIfExists(topicName.toString()) .thenAccept(optionalTopic -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index c897124cfcfe3a..936a7df7fcbd14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -57,6 +57,7 @@ import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; @@ -377,6 +378,69 @@ public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse as setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null); } + @GET + @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies") + @ApiOperation(value = "Get inactive topic policies on a topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), + @ApiResponse(code = 500, message = "Internal server error"),}) + public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies()); + if (topicPolicies.isInactiveTopicPoliciesSet()) { + asyncResponse.resume(topicPolicies.getInactiveTopicPolicies()); + } else { + asyncResponse.resume(Response.noContent().build()); + } + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies") + @ApiOperation(value = "Set inactive topic policies on a topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),}) + public void setInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "inactive topic policies for the specified topic") + InactiveTopicPolicies inactiveTopicPolicies) { + validateTopicName(tenant, namespace, encodedTopic); + validateAdminAccessForTenant(tenant); + validatePoliciesReadOnlyAccess(); + checkTopicLevelPolicyEnable(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + internalSetInactiveTopicPolicies(inactiveTopicPolicies).whenComplete((res, ex) -> { + if (ex instanceof RestException) { + log.error("Failed set InactiveTopicPolicies", ex); + asyncResponse.resume(ex); + } else if (ex != null) { + log.error("Failed set InactiveTopicPolicies", ex); + asyncResponse.resume(new RestException(ex)); + } else { + asyncResponse.resume(Response.noContent().build()); + } + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies") + @ApiOperation(value = "Delete inactive topic policies on a topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),}) + public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + setInactiveTopicPolicies(asyncResponse, tenant, namespace, encodedTopic, null); + } + @GET @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription") @ApiOperation(value = "Get max unacked messages per subscription config on a topic.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 6f9860b77cb706..8ff51f6c5017fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1825,14 +1825,19 @@ public CompletableFuture onPoliciesUpdate(Policies data) { delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime(); delayedDeliveryEnabled = data.delayed_delivery_policies.isActive(); } + //If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy. + TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic)); if (data.inactive_topic_policies != null) { - this.inactiveTopicPolicies = data.inactive_topic_policies; + if (topicPolicies == null || !topicPolicies.isInactiveTopicPoliciesSet()) { + this.inactiveTopicPolicies = data.inactive_topic_policies; + } } else { ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration(); resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode() , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled()); } + initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data)); this.updateMaxPublishRate(data); @@ -2344,24 +2349,50 @@ public void onUpdate(TopicPolicies policies) { if (log.isDebugEnabled()) { log.debug("[{}] update topic policy: {}", topic, policies); } - - initializeTopicDispatchRateLimiterIfNeeded(Optional.ofNullable(policies)); - if (this.dispatchRateLimiter.isPresent() && policies != null - && policies.getDispatchRate() != null) { + if (policies == null) { + return; + } + Optional namespacePolicies = getNamespacePolicies(); + initializeTopicDispatchRateLimiterIfNeeded(policies); + if (this.dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) { dispatchRateLimiter.ifPresent(dispatchRateLimiter -> dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate())); } - if (policies != null && policies.getPublishRate() != null) { + if (policies.getPublishRate() != null) { topicPolicyPublishRate = policies.getPublishRate(); updateTopicPublishDispatcher(); } + + if (policies.isInactiveTopicPoliciesSet()) { + inactiveTopicPolicies = policies.getInactiveTopicPolicies(); + } else { + //topic-level policies is null , so use namespace-level or broker-level + namespacePolicies.ifPresent(nsPolicies -> { + if (nsPolicies.inactive_topic_policies != null) { + inactiveTopicPolicies = nsPolicies.inactive_topic_policies; + } else { + ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration(); + resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode() + , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled()); + } + }); + } + } + + private Optional getNamespacePolicies(){ + try { + return Optional.ofNullable(brokerService.pulsar().getAdminClient().namespaces() + .getPolicies(TopicName.get(topic).getNamespace())); + } catch (Exception e) { + log.error("get namespace policies fail", e); + } + return Optional.empty(); } - private void initializeTopicDispatchRateLimiterIfNeeded(Optional policies) { + private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) { synchronized (dispatchRateLimiter) { - if (!dispatchRateLimiter.isPresent() && policies.isPresent() && - policies.get().getDispatchRate() != null) { + if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) { this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index 143a4ff1b9af62..79a296e87823f6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import com.google.common.collect.Sets; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -34,6 +35,9 @@ import org.testng.Assert; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; + public class InactiveTopicDeleteTest extends BrokerTestBase { protected void setup() throws Exception { @@ -125,9 +129,9 @@ public void testTopicPolicyUpdateAndClean() throws Exception { } Assert.assertTrue(policies.isDeleteWhileInactive()); - Assert.assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions); - Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1); - Assert.assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace)); + assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions); + assertEquals(policies.getMaxInactiveDurationSeconds(), 1); + assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace)); admin.namespaces().removeInactiveTopicPolicies(namespace); while (true) { @@ -137,14 +141,14 @@ public void testTopicPolicyUpdateAndClean() throws Exception { break; } } - Assert.assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies + assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies , defaultPolicy); policies = ((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies; Assert.assertTrue(policies.isDeleteWhileInactive()); - Assert.assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); - Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1); - Assert.assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace2)); + assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + assertEquals(policies.getMaxInactiveDurationSeconds(), 1); + assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace2)); admin.namespaces().removeInactiveTopicPolicies(namespace2); while (true) { @@ -154,7 +158,7 @@ public void testTopicPolicyUpdateAndClean() throws Exception { break; } } - Assert.assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies + assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies , defaultPolicy); super.internalCleanup(); @@ -166,7 +170,6 @@ public void testDeleteWhenNoSubscriptionsWithMultiConfig() throws Exception { final String namespace2 = "prop/ns-abc2"; final String namespace3 = "prop/ns-abc3"; List namespaceList = Arrays.asList(namespace2, namespace3); - conf.setBrokerDeleteInactiveTopicsEnabled(true); conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1); super.baseSetup(); @@ -293,4 +296,190 @@ public void testMaxInactiveDuration() throws Exception { super.internalCleanup(); } + + @Test(timeOut = 20000) + public void testTopicLevelInActiveTopicApi() throws Exception { + super.resetConfig(); + conf.setSystemTopicEnabled(true); + conf.setTopicLevelPoliciesEnabled(true); + super.baseSetup(); + + final String topicName = "persistent://prop/ns-abc/testMaxInactiveDuration-" + UUID.randomUUID().toString(); + admin.topics().createPartitionedTopic(topicName, 3); + //wait for cache init + Thread.sleep(2000); + InactiveTopicPolicies inactiveTopicPolicies = admin.topics().getInactiveTopicPolicies(topicName); + assertNull(inactiveTopicPolicies); + + InactiveTopicPolicies policies = new InactiveTopicPolicies(); + policies.setDeleteWhileInactive(true); + policies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); + policies.setMaxInactiveDurationSeconds(10); + admin.topics().setInactiveTopicPolicies(topicName, policies); + for (int i = 0; i < 50; i++) { + if (admin.topics().getInactiveTopicPolicies(topicName) != null) { + break; + } + Thread.sleep(100); + } + assertEquals(admin.topics().getInactiveTopicPolicies(topicName), policies); + admin.topics().removeInactiveTopicPolicies(topicName); + for (int i = 0; i < 50; i++) { + if (admin.topics().getInactiveTopicPolicies(topicName) == null) { + break; + } + Thread.sleep(100); + } + assertNull(admin.topics().getInactiveTopicPolicies(topicName)); + + super.internalCleanup(); + } + + @Test(timeOut = 30000) + public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception { + super.resetConfig(); + conf.setSystemTopicEnabled(true); + conf.setTopicLevelPoliciesEnabled(true); + conf.setBrokerDeleteInactiveTopicsEnabled(true); + conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1000); + conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); + InactiveTopicPolicies defaultPolicy = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions + , 1000, true); + + super.baseSetup(); + //wait for cache init + Thread.sleep(2000); + final String namespace = "prop/ns-abc"; + final String topic = "persistent://prop/ns-abc/testTopicLevelInactivePolicy" + UUID.randomUUID().toString(); + final String topic2 = "persistent://prop/ns-abc/testTopicLevelInactivePolicy" + UUID.randomUUID().toString(); + final String topic3 = "persistent://prop/ns-abc/testTopicLevelInactivePolicy" + UUID.randomUUID().toString(); + List topics = Arrays.asList(topic, topic2, topic3); + + for (String tp : topics) { + admin.topics().createNonPartitionedTopic(tp); + } + + InactiveTopicPolicies inactiveTopicPolicies = + new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true); + admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies); + inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + admin.topics().setInactiveTopicPolicies(topic2, inactiveTopicPolicies); + inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); + admin.topics().setInactiveTopicPolicies(topic3, inactiveTopicPolicies); + + //wait for cache + for (int i = 0; i < 50; i++) { + if (admin.topics().getInactiveTopicPolicies(topic) != null) { + break; + } + Thread.sleep(100); + } + InactiveTopicPolicies policies = ((PersistentTopic) pulsar.getBrokerService() + .getTopic(topic, false).get().get()).inactiveTopicPolicies; + Assert.assertTrue(policies.isDeleteWhileInactive()); + assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions); + assertEquals(policies.getMaxInactiveDurationSeconds(), 1); + assertEquals(policies, admin.topics().getInactiveTopicPolicies(topic)); + + admin.topics().removeInactiveTopicPolicies(topic); + for (int i = 0; i < 50; i++) { + if (admin.topics().getInactiveTopicPolicies(topic) == null) { + break; + } + Thread.sleep(100); + } + //Only the broker-level policies is set, so after removing the topic-level policies + // , the topic will use the broker-level policies + assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies + , defaultPolicy); + + policies = ((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies; + Assert.assertTrue(policies.isDeleteWhileInactive()); + assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + assertEquals(policies.getMaxInactiveDurationSeconds(), 1); + assertEquals(policies, admin.topics().getInactiveTopicPolicies(topic2)); + inactiveTopicPolicies.setMaxInactiveDurationSeconds(999); + //Both broker level and namespace level policies are set, so after removing the topic level policies + // , the topic will use the namespace level policies + admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies); + //wait for zk + Thread.sleep(1000); + admin.topics().removeInactiveTopicPolicies(topic2); + for (int i = 0; i < 50; i++) { + if (admin.topics().getInactiveTopicPolicies(topic2) == null) { + break; + } + Thread.sleep(100); + } + InactiveTopicPolicies nsPolicies = ((PersistentTopic) pulsar.getBrokerService() + .getTopic(topic2, false).get().get()).inactiveTopicPolicies; + assertEquals(nsPolicies.getMaxInactiveDurationSeconds(), 999); + + super.internalCleanup(); + } + + @Test(timeOut = 30000) + public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws Exception { + final String namespace = "prop/ns-abc"; + conf.setSystemTopicEnabled(true); + conf.setTopicLevelPoliciesEnabled(true); + conf.setBrokerDeleteInactiveTopicsEnabled(true); + conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1); + super.baseSetup(); + //wait for cache init + Thread.sleep(2000); + final String topic = "persistent://prop/ns-abc/test-" + UUID.randomUUID(); + final String topic2 = "persistent://prop/ns-abc/test-" + UUID.randomUUID(); + final String topic3 = "persistent://prop/ns-abc/test-" + UUID.randomUUID(); + List topics = Arrays.asList(topic, topic2, topic3); + //create producer/consumer and close + Map topicToSub = new HashMap<>(); + for (String tp : topics) { + Producer producer = pulsarClient.newProducer().topic(tp).create(); + String subName = "sub" + System.currentTimeMillis(); + topicToSub.put(tp, subName); + Consumer consumer = pulsarClient.newConsumer().topic(tp).subscriptionName(subName).subscribe(); + for (int i = 0; i < 10; i++) { + producer.send("Pulsar".getBytes()); + } + consumer.close(); + producer.close(); + Thread.sleep(1); + } + // "topic" use delete_when_no_subscriptions, "topic2" use delete_when_subscriptions_caught_up + // "topic3" use default:delete_when_no_subscriptions + InactiveTopicPolicies inactiveTopicPolicies = + new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,1,true); + admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies); + inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + admin.topics().setInactiveTopicPolicies(topic2, inactiveTopicPolicies); + + //wait for update + for (int i = 0; i < 50; i++) { + if (admin.topics().getInactiveTopicPolicies(topic2) != null) { + break; + } + Thread.sleep(100); + } + + // topic should still exist + Thread.sleep(2000); + Assert.assertTrue(admin.topics().getList(namespace).contains(topic)); + Assert.assertTrue(admin.topics().getList(namespace).contains(topic2)); + Assert.assertTrue(admin.topics().getList(namespace).contains(topic3)); + + // no backlog, trigger delete_when_subscriptions_caught_up + admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2)); + Thread.sleep(2000); + Assert.assertFalse(admin.topics().getList(namespace).contains(topic2)); + // delete subscription, trigger delete_when_no_subscriptions + for (Map.Entry entry : topicToSub.entrySet()) { + admin.topics().deleteSubscription(entry.getKey(), entry.getValue()); + } + Thread.sleep(2000); + Assert.assertFalse(admin.topics().getList(namespace).contains(topic)); + Assert.assertFalse(admin.topics().getList(namespace).contains(topic3)); + + super.internalCleanup(); + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 89d4ba2d285fe2..468f2222954b80 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -35,6 +35,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; @@ -1710,6 +1711,52 @@ CompletableFuture setDelayedDeliveryPolicyAsync(String topic */ CompletableFuture removeMaxUnackedMessagesOnConsumerAsync(String topic); + /** + * get inactive topic policies of a topic. + * @param topic + * @return + * @throws PulsarAdminException + */ + InactiveTopicPolicies getInactiveTopicPolicies(String topic) throws PulsarAdminException; + + /** + * get inactive topic policies of a topic asynchronously. + * @param topic + * @return + */ + CompletableFuture getInactiveTopicPoliciesAsync(String topic); + + /** + * set inactive topic policies of a topic. + * @param topic + * @param maxNum + * @throws PulsarAdminException + */ + void setInactiveTopicPolicies(String topic + , InactiveTopicPolicies inactiveTopicPolicies) throws PulsarAdminException; + + /** + * set inactive topic policies of a topic asynchronously. + * @param topic + * @param maxNum + * @return + */ + CompletableFuture setInactiveTopicPoliciesAsync(String topic, InactiveTopicPolicies inactiveTopicPolicies); + + /** + * remove inactive topic policies of a topic. + * @param topic + * @throws PulsarAdminException + */ + void removeInactiveTopicPolicies(String topic) throws PulsarAdminException; + + /** + * remove inactive topic policies of a topic asynchronously. + * @param topic + * @return + */ + CompletableFuture removeInactiveTopicPoliciesAsync(String topic); + /** * get offload policies of a topic. * @param topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index f5e0a5684a9355..b4da586aea2ec8 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -71,6 +71,7 @@ import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; @@ -1508,6 +1509,85 @@ public void removeMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminE } } + @Override + public InactiveTopicPolicies getInactiveTopicPolicies(String topic) throws PulsarAdminException { + try { + return getInactiveTopicPoliciesAsync(topic). + get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getInactiveTopicPoliciesAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "inactiveTopicPolicies"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, new InvocationCallback() { + @Override + public void completed(InactiveTopicPolicies inactiveTopicPolicies) { + future.complete(inactiveTopicPolicies); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public CompletableFuture setInactiveTopicPoliciesAsync(String topic + , InactiveTopicPolicies inactiveTopicPolicies) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "inactiveTopicPolicies"); + return asyncPostRequest(path, Entity.entity(inactiveTopicPolicies, MediaType.APPLICATION_JSON)); + } + + @Override + public void setInactiveTopicPolicies(String topic + , InactiveTopicPolicies inactiveTopicPolicies) throws PulsarAdminException { + try { + setInactiveTopicPoliciesAsync(topic, inactiveTopicPolicies) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeInactiveTopicPoliciesAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "inactiveTopicPolicies"); + return asyncDeleteRequest(path); + } + + @Override + public void removeInactiveTopicPolicies(String topic) throws PulsarAdminException { + try { + removeInactiveTopicPoliciesAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + @Override public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException { try { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 74bf4175dde0e3..965887d196be0d 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -760,6 +760,14 @@ public void topics() throws Exception { cmdTopics.run(split("set-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1 -m 99")); verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99); + cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -e -t 1s -m delete_when_no_subscriptions")); + verify(mockTopics).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1" + , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true)); + // argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a // range of +/- 1 second of the expected timestamp class TimestampMatcher implements ArgumentMatcher { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 7835372a0bcd71..f5ccaa16897fc2 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -50,6 +50,8 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -148,6 +150,9 @@ public CmdTopics(PulsarAdmin admin) { jcommander.addCommand("get-maxProducers", new GetMaxProducers()); jcommander.addCommand("set-maxProducers", new SetMaxProducers()); jcommander.addCommand("remove-maxProducers", new RemoveMaxProducers()); + jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies()); + jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies()); + jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies()); } @Parameters(commandDescription = "Get the list of topics under a namespace.") @@ -1496,4 +1501,66 @@ void run() throws PulsarAdminException { admin.topics().removeMaxProducers(persistentTopic); } } + + @Parameters(commandDescription = "Get the inactive topic policies on a topic") + private class GetInactiveTopicPolicies extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(admin.topics().getInactiveTopicPolicies(persistentTopic)); + } + } + + @Parameters(commandDescription = "Set the inactive topic policies on a topic") + private class SetInactiveTopicPolicies extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "--enable-delete-while-inactive", "-e" }, description = "Enable delete while inactive") + private boolean enableDeleteWhileInactive = false; + + @Parameter(names = { "--disable-delete-while-inactive", "-d" }, description = "Disable delete while inactive") + private boolean disableDeleteWhileInactive = false; + + @Parameter(names = {"--max-inactive-duration", "-t"}, description = "Max duration of topic inactivity in seconds" + + ",topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true) + private String deleteInactiveTopicsMaxInactiveDuration; + + @Parameter(names = { "--delete-mode", "-m" }, description = "Mode of delete inactive topic" + + ",Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]", required = true) + private String inactiveTopicDeleteMode; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + long maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration)); + + if (enableDeleteWhileInactive == disableDeleteWhileInactive) { + throw new ParameterException("Need to specify either enable-delete-while-inactive or disable-delete-while-inactive"); + } + InactiveTopicDeleteMode deleteMode = null; + try { + deleteMode = InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode); + } catch (IllegalArgumentException e) { + throw new ParameterException("delete mode can only be set to delete_when_no_subscriptions or delete_when_subscriptions_caught_up"); + } + admin.topics().setInactiveTopicPolicies(persistentTopic, + new InactiveTopicPolicies(deleteMode, (int) maxInactiveDurationInSeconds, enableDeleteWhileInactive)); + } + } + + @Parameters(commandDescription = "Remove inactive topic policies from a topic") + private class RemoveInactiveTopicPolicies extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().removeInactiveTopicPolicies(persistentTopic); + } + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index baa8aeac4fb0b9..343770d3cf0c60 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -50,13 +50,18 @@ public class TopicPolicies { private Long delayedDeliveryTickTimeMillis = null; private Boolean delayedDeliveryEnabled = null; private OffloadPolicies offloadPolicies; + private InactiveTopicPolicies inactiveTopicPolicies = null; + private DispatchRate dispatchRate = null; + private Long compactionThreshold = null; + private PublishRate publishRate = null; + + public boolean isInactiveTopicPoliciesSet() { + return inactiveTopicPolicies != null; + } public boolean isOffloadPoliciesSet() { return offloadPolicies != null; } - private DispatchRate dispatchRate = null; - private Long compactionThreshold = null; - private PublishRate publishRate = null; public boolean isMaxUnackedMessagesOnConsumerSet() { return maxUnackedMessagesOnConsumer != null;