From b410274554508684f4a15cc1f5a98a71d85fbe2e Mon Sep 17 00:00:00 2001
From: feynmanlin <feynmanlin@tencent.com>
Date: Thu, 20 Aug 2020 15:14:09 +0800
Subject: [PATCH] Support deduplication on topic level (#7821)

### Motivation
Support set `DeduplicationEnabled` on topic level

### Modifications
Support set/get/remove `DeduplicationEnabled` policy on topic level.

### Verifying this change
Added Unit test to verify set/get/remove `DeduplicationEnabled` policy at Topic level work as expected when Topic level policy is enabled/disabled

`org.apache.pulsar.broker.service.persistent.TopicDuplicationTest`
---
 .../admin/impl/PersistentTopicsBase.java      |  15 ++
 .../broker/admin/v2/PersistentTopics.java     |  59 +++++++
 .../persistent/MessageDeduplication.java      |   7 +-
 .../service/persistent/PersistentTopic.java   |   7 +-
 .../broker/admin/MaxUnackedMessagesTest.java  |   3 +-
 .../persistent/TopicDuplicationTest.java      | 160 ++++++++++++++++++
 .../apache/pulsar/client/admin/Topics.java    |  45 +++++
 .../client/admin/internal/TopicsImpl.java     |  78 +++++++++
 8 files changed, 371 insertions(+), 3 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f6010f139f2b9..51c895253bbb8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2189,6 +2189,21 @@ protected void internalSetBacklogQuota(AsyncResponse asyncResponse, BacklogQuota
                 });
     }
 
+    protected CompletableFuture<Void> internalSetDeduplicationEnabled(Boolean enabled) {
+        TopicPolicies topicPolicies = null;
+        try {
+            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.error("Topic {} policies cache have not init.", topicName);
+            throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
+        }
+        if (topicPolicies == null) {
+            topicPolicies = new TopicPolicies();
+        }
+        topicPolicies.setDeduplicationEnabled(enabled);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+    }
+
     protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInSecond) {
         //Validate message ttl value.
         if (ttlInSecond != null && ttlInSecond.intValue() < 0) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 42dbcb82fb1b3..39477c052a91c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1268,6 +1268,65 @@ public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse,
         internalSetMessageTTL(asyncResponse, null);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
+    @ApiOperation(value = "Get deduplication configuration of a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
+    public void getDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
+                             @PathParam("tenant") String tenant,
+                             @PathParam("namespace") String namespace,
+                             @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isDeduplicationSet()) {
+            asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
+    @ApiOperation(value = "Set deduplication enabled on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
+    public void setDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
+                             @PathParam("tenant") String tenant,
+                             @PathParam("namespace") String namespace,
+                             @PathParam("topic") @Encoded String encodedTopic,
+                             @ApiParam(value = "DeduplicationEnabled policies for the specified topic") Boolean enabled) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetDeduplicationEnabled(enabled).whenComplete((r, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed updated deduplication", ex);
+                asyncResponse.resume(ex);
+            }else if (ex != null) {
+                log.error("Failed updated deduplication", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
+    @ApiOperation(value = "Remove deduplication configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void removeDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
+                                           @PathParam("tenant") String tenant,
+                                           @PathParam("namespace") String namespace,
+                                           @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setDeduplicationEnabled(asyncResponse, tenant, namespace, encodedTopic, null);
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/{topic}/retention")
     @ApiOperation(value = "Get retention configuration for specified topic.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 2fb4944d0f95d..c9c215dece1be 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -34,6 +34,7 @@
 import org.apache.pulsar.broker.service.Topic.PublishContext;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
@@ -421,7 +422,11 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
 
     private CompletableFuture<Boolean> isDeduplicationEnabled() {
         TopicName name = TopicName.get(topic.getName());
-
+        //Topic level setting has higher priority than namespace level
+        TopicPolicies topicPolicies = topic.getTopicPolicies(name);
+        if (topicPolicies != null && topicPolicies.isDeduplicationSet()) {
+            return CompletableFuture.completedFuture(topicPolicies.getDeduplicationEnabled());
+        }
         return pulsar.getConfigurationCache().policiesCache()
                 .getAsync(AdminResource.path(POLICIES, name.getNamespace())).thenApply(policies -> {
                     // If namespace policies have the field set, it will override the broker-level setting
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index daf08e15aacc2..74ef28ed21c49 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2128,7 +2128,7 @@ public synchronized OffloadProcessStatus offloadStatus() {
      * @param topicName
      * @return TopicPolicies is exist else return null.
      */
-    private TopicPolicies getTopicPolicies(TopicName topicName) {
+    public TopicPolicies getTopicPolicies(TopicName topicName) {
         TopicName cloneTopicName = topicName;
         if (topicName.isPartitioned()) {
             cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
@@ -2342,4 +2342,9 @@ public int getMaxUnackedMessagesOnSubscription() {
         }
         return maxUnackedMessagesOnSubscription;
     }
+
+    @VisibleForTesting
+    public MessageDeduplication getMessageDeduplication() {
+        return messageDeduplication;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
index 93fd08c10e8a4..f679f30f6b7e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
@@ -305,12 +305,13 @@ private void startConsumer(Consumer<String> consumer, AtomicInteger consumerCoun
 
     private void waitCacheInit(String topicName) throws Exception {
         for (int i = 0; i < 50; i++) {
+            //wait for server init
+            Thread.sleep(1000);
             try {
                 admin.topics().getMaxUnackedMessagesOnSubscription(topicName);
                 break;
             } catch (Exception e) {
                 //ignore
-                Thread.sleep(100);
             }
             if (i == 49) {
                 throw new RuntimeException("Waiting for cache initialization has timed out");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
new file mode 100644
index 0000000000000..faa010af79197
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.junit.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class TopicDuplicationTest extends ProducerConsumerBase {
+    private final String testTenant = "my-property";
+    private final String testNamespace = "my-ns";
+    private final String myNamespace = testTenant + "/" + testNamespace;
+    private final String testTopic = "persistent://" + myNamespace + "/max-unacked-";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        this.conf.setSystemTopicEnabled(true);
+        this.conf.setTopicLevelPoliciesEnabled(true);
+        this.conf.setBrokerDeduplicationEnabled(true);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 10000)
+    public void testDuplicationApi() throws Exception {
+        final String topicName = testTopic + UUID.randomUUID().toString();
+        waitCacheInit(topicName);
+        admin.topics().createPartitionedTopic(topicName, 3);
+        Boolean enabled = admin.topics().getDeduplicationEnabled(topicName);
+        assertNull(enabled);
+
+        admin.topics().enableDeduplication(topicName, true);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        Assert.assertEquals(admin.topics().getDeduplicationEnabled(topicName), true);
+        admin.topics().disableDeduplication(topicName);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getDeduplicationEnabled(topicName) == null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        assertNull(admin.topics().getDeduplicationEnabled(topicName));
+    }
+
+    @Test(timeOut = 20000)
+    public void testDuplicationMethod() throws Exception {
+        final String topicName = testTopic + UUID.randomUUID().toString();
+        final String producerName = "my-producer";
+        final int maxMsgNum = 100;
+        waitCacheInit(topicName);
+        admin.topics().createPartitionedTopic(testTopic, 3);
+        //1) Start up producer and send msg.We specified the max sequenceId
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
+                .producerName(producerName).create();
+        long seq = System.currentTimeMillis();
+        for (int i = 0; i <= maxMsgNum; i++) {
+            producer.newMessage().value("msg-" + i).sequenceId(seq + i).send();
+        }
+        long maxSeq = seq + maxMsgNum;
+        //2) Max sequenceId should be recorded correctly
+        CompletableFuture<Optional<Topic>> completableFuture = pulsar.getBrokerService().getTopics().get(topicName);
+        Topic topic = completableFuture.get(1, TimeUnit.SECONDS).get();
+        PersistentTopic persistentTopic = (PersistentTopic) topic;
+        MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication();
+        messageDeduplication.checkStatus().whenComplete((res, ex) -> {
+            if (ex != null) {
+                fail("should not fail");
+            }
+            assertNotNull(messageDeduplication.highestSequencedPersisted);
+            assertNotNull(messageDeduplication.highestSequencedPushed);
+            long seqId = messageDeduplication.getLastPublishedSequenceId(producerName);
+            assertEquals(seqId, maxSeq);
+            assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(), maxSeq);
+            assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(), maxSeq);
+        }).get();
+        //3) disable the deduplication check
+        admin.topics().enableDeduplication(topicName, false);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getDeduplicationEnabled(topicName) != null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().value("msg-" + i).sequenceId(maxSeq + i).send();
+        }
+        //4) Max sequenceId record should be clear
+        messageDeduplication.checkStatus().whenComplete((res, ex) -> {
+            if (ex != null) {
+                fail("should not fail");
+            }
+            assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1);
+            assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0);
+            assertEquals(messageDeduplication.highestSequencedPushed.size(), 0);
+        }).get();
+
+    }
+
+    private void waitCacheInit(String topicName) throws Exception {
+        for (int i = 0; i < 50; i++) {
+            //wait for server init
+            Thread.sleep(3000);
+            try {
+                admin.topics().getDeduplicationEnabled(topicName);
+                break;
+            } catch (Exception e) {
+                //ignore
+            }
+            if (i == 49) {
+                throw new RuntimeException("Waiting for cache initialization has timed out");
+            }
+        }
+    }
+
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 485dcce7914b9..6476f27fe74da 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1800,4 +1800,49 @@ CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
      * @param topic Topic name
      */
     CompletableFuture<Void> removePersistenceAsync(String topic);
+
+    /**
+     * get deduplication enabled of a topic.
+     * @param topic
+     * @return
+     * @throws PulsarAdminException
+     */
+    Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException;
+
+    /**
+     * get deduplication enabled of a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<Boolean> getDeduplicationEnabledAsync(String topic);
+
+    /**
+     * set deduplication enabled of a topic.
+     * @param topic
+     * @param enabled
+     * @throws PulsarAdminException
+     */
+    void enableDeduplication(String topic, boolean enabled) throws PulsarAdminException;
+
+    /**
+     * set deduplication enabled of a topic asynchronously.
+     * @param topic
+     * @param enabled
+     * @return
+     */
+    CompletableFuture<Void> enableDeduplicationAsync(String topic, boolean enabled);
+
+    /**
+     * remove deduplication enabled of a topic.
+     * @param topic
+     * @throws PulsarAdminException
+     */
+    void disableDeduplication(String topic) throws PulsarAdminException;
+
+    /**
+     * remove deduplication enabled of a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<Void> disableDeduplicationAsync(String topic);
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index b0dd98e63e5da..62a39b96bda26 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1584,6 +1584,84 @@ public void setDelayedDeliveryPolicy(String topic
         }
     }
 
+    @Override
+    public Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException {
+        try {
+            return getDeduplicationEnabledAsync(topic).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean> getDeduplicationEnabledAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "deduplicationEnabled");
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        asyncGetRequest(path, new InvocationCallback<Boolean>() {
+            @Override
+            public void completed(Boolean enabled) {
+                future.complete(enabled);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public void enableDeduplication(String topic, boolean enabled) throws PulsarAdminException {
+        try {
+            enableDeduplicationAsync(topic, enabled).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> enableDeduplicationAsync(String topic, boolean enabled) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "deduplicationEnabled");
+        return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void disableDeduplication(String topic) throws PulsarAdminException {
+        try {
+            disableDeduplicationAsync(topic).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> disableDeduplicationAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "deduplicationEnabled");
+        return asyncDeleteRequest(path);
+    }
+
     @Override
     public Integer getMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException {
         try {