Skip to content

Commit

Permalink
Support deduplication on topic level (#7821)
Browse files Browse the repository at this point in the history
### Motivation
Support set `DeduplicationEnabled` on topic level

### Modifications
Support set/get/remove `DeduplicationEnabled` policy on topic level.

### Verifying this change
Added Unit test to verify set/get/remove `DeduplicationEnabled` policy at Topic level work as expected when Topic level policy is enabled/disabled

`org.apache.pulsar.broker.service.persistent.TopicDuplicationTest`
  • Loading branch information
315157973 authored Aug 20, 2020
1 parent 6cae4af commit b410274
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2189,6 +2189,21 @@ protected void internalSetBacklogQuota(AsyncResponse asyncResponse, BacklogQuota
});
}

protected CompletableFuture<Void> internalSetDeduplicationEnabled(Boolean enabled) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies cache have not init.", topicName);
throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setDeduplicationEnabled(enabled);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInSecond) {
//Validate message ttl value.
if (ttlInSecond != null && ttlInSecond.intValue() < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,65 @@ public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse,
internalSetMessageTTL(asyncResponse, null);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
@ApiOperation(value = "Get deduplication configuration of a topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
public void getDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isDeduplicationSet()) {
asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
} else {
asyncResponse.resume(Response.noContent().build());
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
@ApiOperation(value = "Set deduplication enabled on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
public void setDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "DeduplicationEnabled policies for the specified topic") Boolean enabled) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetDeduplicationEnabled(enabled).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated deduplication", ex);
asyncResponse.resume(ex);
}else if (ex != null) {
log.error("Failed updated deduplication", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
@ApiOperation(value = "Remove deduplication configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void removeDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setDeduplicationEnabled(asyncResponse, tenant, namespace, encodedTopic, null);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/retention")
@ApiOperation(value = "Get retention configuration for specified topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -421,7 +422,11 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {

private CompletableFuture<Boolean> isDeduplicationEnabled() {
TopicName name = TopicName.get(topic.getName());

//Topic level setting has higher priority than namespace level
TopicPolicies topicPolicies = topic.getTopicPolicies(name);
if (topicPolicies != null && topicPolicies.isDeduplicationSet()) {
return CompletableFuture.completedFuture(topicPolicies.getDeduplicationEnabled());
}
return pulsar.getConfigurationCache().policiesCache()
.getAsync(AdminResource.path(POLICIES, name.getNamespace())).thenApply(policies -> {
// If namespace policies have the field set, it will override the broker-level setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2128,7 +2128,7 @@ public synchronized OffloadProcessStatus offloadStatus() {
* @param topicName
* @return TopicPolicies is exist else return null.
*/
private TopicPolicies getTopicPolicies(TopicName topicName) {
public TopicPolicies getTopicPolicies(TopicName topicName) {
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
Expand Down Expand Up @@ -2342,4 +2342,9 @@ public int getMaxUnackedMessagesOnSubscription() {
}
return maxUnackedMessagesOnSubscription;
}

@VisibleForTesting
public MessageDeduplication getMessageDeduplication() {
return messageDeduplication;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,13 @@ private void startConsumer(Consumer<String> consumer, AtomicInteger consumerCoun

private void waitCacheInit(String topicName) throws Exception {
for (int i = 0; i < 50; i++) {
//wait for server init
Thread.sleep(1000);
try {
admin.topics().getMaxUnackedMessagesOnSubscription(topicName);
break;
} catch (Exception e) {
//ignore
Thread.sleep(100);
}
if (i == 49) {
throw new RuntimeException("Waiting for cache initialization has timed out");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

public class TopicDuplicationTest extends ProducerConsumerBase {
private final String testTenant = "my-property";
private final String testNamespace = "my-ns";
private final String myNamespace = testTenant + "/" + testNamespace;
private final String testTopic = "persistent://" + myNamespace + "/max-unacked-";

@BeforeMethod
@Override
protected void setup() throws Exception {
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setBrokerDeduplicationEnabled(true);
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 10000)
public void testDuplicationApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
waitCacheInit(topicName);
admin.topics().createPartitionedTopic(topicName, 3);
Boolean enabled = admin.topics().getDeduplicationEnabled(topicName);
assertNull(enabled);

admin.topics().enableDeduplication(topicName, true);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) {
break;
}
Thread.sleep(100);
}
Assert.assertEquals(admin.topics().getDeduplicationEnabled(topicName), true);
admin.topics().disableDeduplication(topicName);
for (int i = 0; i < 50; i++) {
if (admin.topics().getDeduplicationEnabled(topicName) == null) {
break;
}
Thread.sleep(100);
}
assertNull(admin.topics().getDeduplicationEnabled(topicName));
}

@Test(timeOut = 20000)
public void testDuplicationMethod() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
final String producerName = "my-producer";
final int maxMsgNum = 100;
waitCacheInit(topicName);
admin.topics().createPartitionedTopic(testTopic, 3);
//1) Start up producer and send msg.We specified the max sequenceId
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
.producerName(producerName).create();
long seq = System.currentTimeMillis();
for (int i = 0; i <= maxMsgNum; i++) {
producer.newMessage().value("msg-" + i).sequenceId(seq + i).send();
}
long maxSeq = seq + maxMsgNum;
//2) Max sequenceId should be recorded correctly
CompletableFuture<Optional<Topic>> completableFuture = pulsar.getBrokerService().getTopics().get(topicName);
Topic topic = completableFuture.get(1, TimeUnit.SECONDS).get();
PersistentTopic persistentTopic = (PersistentTopic) topic;
MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication();
messageDeduplication.checkStatus().whenComplete((res, ex) -> {
if (ex != null) {
fail("should not fail");
}
assertNotNull(messageDeduplication.highestSequencedPersisted);
assertNotNull(messageDeduplication.highestSequencedPushed);
long seqId = messageDeduplication.getLastPublishedSequenceId(producerName);
assertEquals(seqId, maxSeq);
assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(), maxSeq);
assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(), maxSeq);
}).get();
//3) disable the deduplication check
admin.topics().enableDeduplication(topicName, false);
for (int i = 0; i < 50; i++) {
if (admin.topics().getDeduplicationEnabled(topicName) != null) {
break;
}
Thread.sleep(100);
}
for (int i = 0; i < 100; i++) {
producer.newMessage().value("msg-" + i).sequenceId(maxSeq + i).send();
}
//4) Max sequenceId record should be clear
messageDeduplication.checkStatus().whenComplete((res, ex) -> {
if (ex != null) {
fail("should not fail");
}
assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1);
assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0);
assertEquals(messageDeduplication.highestSequencedPushed.size(), 0);
}).get();

}

private void waitCacheInit(String topicName) throws Exception {
for (int i = 0; i < 50; i++) {
//wait for server init
Thread.sleep(3000);
try {
admin.topics().getDeduplicationEnabled(topicName);
break;
} catch (Exception e) {
//ignore
}
if (i == 49) {
throw new RuntimeException("Waiting for cache initialization has timed out");
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1800,4 +1800,49 @@ CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
* @param topic Topic name
*/
CompletableFuture<Void> removePersistenceAsync(String topic);

/**
* get deduplication enabled of a topic.
* @param topic
* @return
* @throws PulsarAdminException
*/
Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException;

/**
* get deduplication enabled of a topic asynchronously.
* @param topic
* @return
*/
CompletableFuture<Boolean> getDeduplicationEnabledAsync(String topic);

/**
* set deduplication enabled of a topic.
* @param topic
* @param enabled
* @throws PulsarAdminException
*/
void enableDeduplication(String topic, boolean enabled) throws PulsarAdminException;

/**
* set deduplication enabled of a topic asynchronously.
* @param topic
* @param enabled
* @return
*/
CompletableFuture<Void> enableDeduplicationAsync(String topic, boolean enabled);

/**
* remove deduplication enabled of a topic.
* @param topic
* @throws PulsarAdminException
*/
void disableDeduplication(String topic) throws PulsarAdminException;

/**
* remove deduplication enabled of a topic asynchronously.
* @param topic
* @return
*/
CompletableFuture<Void> disableDeduplicationAsync(String topic);
}
Loading

0 comments on commit b410274

Please sign in to comment.