diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f6010f139f2b97..51c895253bbb8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2189,6 +2189,21 @@ protected void internalSetBacklogQuota(AsyncResponse asyncResponse, BacklogQuota }); } + protected CompletableFuture internalSetDeduplicationEnabled(Boolean enabled) { + TopicPolicies topicPolicies = null; + try { + topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName); + } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { + log.error("Topic {} policies cache have not init.", topicName); + throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"); + } + if (topicPolicies == null) { + topicPolicies = new TopicPolicies(); + } + topicPolicies.setDeduplicationEnabled(enabled); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + } + protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInSecond) { //Validate message ttl value. if (ttlInSecond != null && ttlInSecond.intValue() < 0) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 42dbcb82fb1b3b..39477c052a91c9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1268,6 +1268,65 @@ public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse, internalSetMessageTTL(asyncResponse, null); } + @GET + @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled") + @ApiOperation(value = "Get deduplication configuration of a topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")}) + public void getDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies()); + if (topicPolicies.isDeduplicationSet()) { + asyncResponse.resume(topicPolicies.getDeduplicationEnabled()); + } else { + asyncResponse.resume(Response.noContent().build()); + } + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled") + @ApiOperation(value = "Set deduplication enabled on a topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")}) + public void setDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "DeduplicationEnabled policies for the specified topic") Boolean enabled) { + validateTopicName(tenant, namespace, encodedTopic); + internalSetDeduplicationEnabled(enabled).whenComplete((r, ex) -> { + if (ex instanceof RestException) { + log.error("Failed updated deduplication", ex); + asyncResponse.resume(ex); + }else if (ex != null) { + log.error("Failed updated deduplication", ex); + asyncResponse.resume(new RestException(ex)); + } else { + asyncResponse.resume(Response.noContent().build()); + } + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled") + @ApiOperation(value = "Remove deduplication configuration for specified topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification") }) + public void removeDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + setDeduplicationEnabled(asyncResponse, tenant, namespace, encodedTopic, null); + } + @GET @Path("/{tenant}/{namespace}/{topic}/retention") @ApiOperation(value = "Get retention configuration for specified topic.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 2fb4944d0f95d1..c9c215dece1be8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -34,6 +34,7 @@ import org.apache.pulsar.broker.service.Topic.PublishContext; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; @@ -421,7 +422,11 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { private CompletableFuture isDeduplicationEnabled() { TopicName name = TopicName.get(topic.getName()); - + //Topic level setting has higher priority than namespace level + TopicPolicies topicPolicies = topic.getTopicPolicies(name); + if (topicPolicies != null && topicPolicies.isDeduplicationSet()) { + return CompletableFuture.completedFuture(topicPolicies.getDeduplicationEnabled()); + } return pulsar.getConfigurationCache().policiesCache() .getAsync(AdminResource.path(POLICIES, name.getNamespace())).thenApply(policies -> { // If namespace policies have the field set, it will override the broker-level setting diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index daf08e15aacc2b..74ef28ed21c49e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2128,7 +2128,7 @@ public synchronized OffloadProcessStatus offloadStatus() { * @param topicName * @return TopicPolicies is exist else return null. */ - private TopicPolicies getTopicPolicies(TopicName topicName) { + public TopicPolicies getTopicPolicies(TopicName topicName) { TopicName cloneTopicName = topicName; if (topicName.isPartitioned()) { cloneTopicName = TopicName.get(topicName.getPartitionedTopicName()); @@ -2342,4 +2342,9 @@ public int getMaxUnackedMessagesOnSubscription() { } return maxUnackedMessagesOnSubscription; } + + @VisibleForTesting + public MessageDeduplication getMessageDeduplication() { + return messageDeduplication; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java index 93fd08c10e8a47..f679f30f6b7e6b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java @@ -305,12 +305,13 @@ private void startConsumer(Consumer consumer, AtomicInteger consumerCoun private void waitCacheInit(String topicName) throws Exception { for (int i = 0; i < 50; i++) { + //wait for server init + Thread.sleep(1000); try { admin.topics().getMaxUnackedMessagesOnSubscription(topicName); break; } catch (Exception e) { //ignore - Thread.sleep(100); } if (i == 49) { throw new RuntimeException("Waiting for cache initialization has timed out"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java new file mode 100644 index 00000000000000..faa010af791974 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import lombok.Cleanup; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.junit.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +public class TopicDuplicationTest extends ProducerConsumerBase { + private final String testTenant = "my-property"; + private final String testNamespace = "my-ns"; + private final String myNamespace = testTenant + "/" + testNamespace; + private final String testTopic = "persistent://" + myNamespace + "/max-unacked-"; + + @BeforeMethod + @Override + protected void setup() throws Exception { + this.conf.setSystemTopicEnabled(true); + this.conf.setTopicLevelPoliciesEnabled(true); + this.conf.setBrokerDeduplicationEnabled(true); + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 10000) + public void testDuplicationApi() throws Exception { + final String topicName = testTopic + UUID.randomUUID().toString(); + waitCacheInit(topicName); + admin.topics().createPartitionedTopic(topicName, 3); + Boolean enabled = admin.topics().getDeduplicationEnabled(topicName); + assertNull(enabled); + + admin.topics().enableDeduplication(topicName, true); + for (int i = 0; i < 50; i++) { + if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) { + break; + } + Thread.sleep(100); + } + Assert.assertEquals(admin.topics().getDeduplicationEnabled(topicName), true); + admin.topics().disableDeduplication(topicName); + for (int i = 0; i < 50; i++) { + if (admin.topics().getDeduplicationEnabled(topicName) == null) { + break; + } + Thread.sleep(100); + } + assertNull(admin.topics().getDeduplicationEnabled(topicName)); + } + + @Test(timeOut = 20000) + public void testDuplicationMethod() throws Exception { + final String topicName = testTopic + UUID.randomUUID().toString(); + final String producerName = "my-producer"; + final int maxMsgNum = 100; + waitCacheInit(topicName); + admin.topics().createPartitionedTopic(testTopic, 3); + //1) Start up producer and send msg.We specified the max sequenceId + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName) + .producerName(producerName).create(); + long seq = System.currentTimeMillis(); + for (int i = 0; i <= maxMsgNum; i++) { + producer.newMessage().value("msg-" + i).sequenceId(seq + i).send(); + } + long maxSeq = seq + maxMsgNum; + //2) Max sequenceId should be recorded correctly + CompletableFuture> completableFuture = pulsar.getBrokerService().getTopics().get(topicName); + Topic topic = completableFuture.get(1, TimeUnit.SECONDS).get(); + PersistentTopic persistentTopic = (PersistentTopic) topic; + MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication(); + messageDeduplication.checkStatus().whenComplete((res, ex) -> { + if (ex != null) { + fail("should not fail"); + } + assertNotNull(messageDeduplication.highestSequencedPersisted); + assertNotNull(messageDeduplication.highestSequencedPushed); + long seqId = messageDeduplication.getLastPublishedSequenceId(producerName); + assertEquals(seqId, maxSeq); + assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(), maxSeq); + assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(), maxSeq); + }).get(); + //3) disable the deduplication check + admin.topics().enableDeduplication(topicName, false); + for (int i = 0; i < 50; i++) { + if (admin.topics().getDeduplicationEnabled(topicName) != null) { + break; + } + Thread.sleep(100); + } + for (int i = 0; i < 100; i++) { + producer.newMessage().value("msg-" + i).sequenceId(maxSeq + i).send(); + } + //4) Max sequenceId record should be clear + messageDeduplication.checkStatus().whenComplete((res, ex) -> { + if (ex != null) { + fail("should not fail"); + } + assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1); + assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0); + assertEquals(messageDeduplication.highestSequencedPushed.size(), 0); + }).get(); + + } + + private void waitCacheInit(String topicName) throws Exception { + for (int i = 0; i < 50; i++) { + //wait for server init + Thread.sleep(3000); + try { + admin.topics().getDeduplicationEnabled(topicName); + break; + } catch (Exception e) { + //ignore + } + if (i == 49) { + throw new RuntimeException("Waiting for cache initialization has timed out"); + } + } + } + +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 485dcce7914b93..6476f27fe74da5 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1800,4 +1800,49 @@ CompletableFuture setDelayedDeliveryPolicyAsync(String topic * @param topic Topic name */ CompletableFuture removePersistenceAsync(String topic); + + /** + * get deduplication enabled of a topic. + * @param topic + * @return + * @throws PulsarAdminException + */ + Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException; + + /** + * get deduplication enabled of a topic asynchronously. + * @param topic + * @return + */ + CompletableFuture getDeduplicationEnabledAsync(String topic); + + /** + * set deduplication enabled of a topic. + * @param topic + * @param enabled + * @throws PulsarAdminException + */ + void enableDeduplication(String topic, boolean enabled) throws PulsarAdminException; + + /** + * set deduplication enabled of a topic asynchronously. + * @param topic + * @param enabled + * @return + */ + CompletableFuture enableDeduplicationAsync(String topic, boolean enabled); + + /** + * remove deduplication enabled of a topic. + * @param topic + * @throws PulsarAdminException + */ + void disableDeduplication(String topic) throws PulsarAdminException; + + /** + * remove deduplication enabled of a topic asynchronously. + * @param topic + * @return + */ + CompletableFuture disableDeduplicationAsync(String topic); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index b0dd98e63e5da5..62a39b96bda26f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1584,6 +1584,84 @@ public void setDelayedDeliveryPolicy(String topic } } + @Override + public Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException { + try { + return getDeduplicationEnabledAsync(topic). + get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getDeduplicationEnabledAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "deduplicationEnabled"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, new InvocationCallback() { + @Override + public void completed(Boolean enabled) { + future.complete(enabled); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public void enableDeduplication(String topic, boolean enabled) throws PulsarAdminException { + try { + enableDeduplicationAsync(topic, enabled). + get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture enableDeduplicationAsync(String topic, boolean enabled) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "deduplicationEnabled"); + return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON)); + } + + @Override + public void disableDeduplication(String topic) throws PulsarAdminException { + try { + disableDeduplicationAsync(topic). + get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture disableDeduplicationAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "deduplicationEnabled"); + return asyncDeleteRequest(path); + } + @Override public Integer getMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException { try {