Skip to content

Commit

Permalink
support topic level delayed delivery policy (apache#7784)
Browse files Browse the repository at this point in the history
Master Issue: apache#2688 

### Motivation
support topic level delayed delivery policy

### Modifications
Support set/get/remove delayed delivery policy on topic level.

### Verifying this change

Added Unit test to verify set/get/remove delayed delivery policy at Topic level work as expected when Topic level policy is enabled/disabled

- org.apache.pulsar.broker.admin.AdminApiDelayedDelivery#testEnableAndDisableTopicDelayedDelivery
- org.apache.pulsar.broker.admin.AdminApiDelayedDelivery#testEnableTopicDelayedDelivery
  • Loading branch information
315157973 authored and Livio committed Sep 5, 2020
1 parent 371f564 commit d7ec806
Show file tree
Hide file tree
Showing 9 changed files with 439 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
Expand Down Expand Up @@ -579,6 +580,31 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
});
}

protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse, DelayedDeliveryPolicies deliveryPolicies) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies cache have not init.", topicName);
asyncResponse.resume(new RestException(e));
return;
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
topicPolicies.setDelayedDeliveryTickTimeMillis(deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed set delayed delivery policy for topic", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}

private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
List<CompletableFuture<Void>> results = new ArrayList<>(clusters.size() -1);
clusters.forEach(cluster -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -248,6 +250,61 @@ public void createNonPartitionedTopic(
internalCreateNonPartitionedTopic(authoritative);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
@ApiOperation(value = "Get delayed delivery messages config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
asyncResponse.resume(new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
, topicPolicies.getDelayedDeliveryEnabled()));
} else {
asyncResponse.resume(Response.noContent().build());
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
@ApiOperation(value = "Set delayed delivery messages config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Delayed delivery policies for the specified topic") DelayedDeliveryPolicies deliveryPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
validateAdminAccessForTenant(tenant);
validatePoliciesReadOnlyAccess();
checkTopicLevelPolicyEnable();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
internalSetDelayedDeliveryPolicies(asyncResponse, deliveryPolicies);
}



@DELETE
@Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
@ApiOperation(value = "Set delayed delivery messages config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void deleteDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setDelayedDeliveryPolicies(asyncResponse, tenant, namespace, encodedTopic, null);
}

/**
* It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be
* already exist and number of new partitions must be greater than existing number of partitions. Decrementing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void readMoreEntries() {
}

havePendingReplayRead = true;
Set<? extends Position> deletedMessages = topic.delayedDeliveryEnabled ?
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled() ?
asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket

Expand Down Expand Up @@ -771,7 +771,7 @@ public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {

@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
if (!topic.delayedDeliveryEnabled) {
if (!topic.isDelayedDeliveryEnabled()) {
// If broker has the feature disabled, always deliver messages immediately
return false;
}
Expand All @@ -783,7 +783,7 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
}

delayedDeliveryTracker.get().resetTickTime(topic.delayedDeliveryTickTimeMillis);
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
}
}
Expand All @@ -793,13 +793,14 @@ protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesT
return messagesToRedeliver.items(maxMessagesToRead,
(ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
} else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
delayedDeliveryTracker.get().resetTickTime(topic.delayedDeliveryTickTimeMillis);
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
} else {
return Collections.emptySet();
}
}

@Override
public synchronized long getNumberOfDelayedMessages() {
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2129,8 +2129,12 @@ public synchronized OffloadProcessStatus offloadStatus() {
* @return TopicPolicies is exist else return null.
*/
private TopicPolicies getTopicPolicies(TopicName topicName) {
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
try {
return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName());
return null;
Expand Down Expand Up @@ -2303,4 +2307,22 @@ public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction) {
});
return completableFuture;
}

public long getDelayedDeliveryTickTimeMillis() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
//Topic level setting has higher priority than namespace level
if (topicPolicies != null && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
return topicPolicies.getDelayedDeliveryTickTimeMillis();
}
return delayedDeliveryTickTimeMillis;
}

public boolean isDelayedDeliveryEnabled() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
//Topic level setting has higher priority than namespace level
if (topicPolicies != null && topicPolicies.isDelayedDeliveryEnabledSet()) {
return topicPolicies.getDelayedDeliveryEnabled();
}
return delayedDeliveryEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@
package org.apache.pulsar.broker.service.persistent;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.Cleanup;

import org.apache.pulsar.client.api.Consumer;
Expand All @@ -37,6 +43,8 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -46,6 +54,9 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
@Override
@BeforeClass
public void setup() throws Exception {
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setDelayedDeliveryTickTimeMillis(1024);
super.internalSetup();
super.producerBaseSetup();
}
Expand Down Expand Up @@ -315,4 +326,128 @@ public void testOrderingDispatch() throws PulsarClientException {
}
}
}

@Test(timeOut = 20000)
public void testEnableAndDisableTopicDelayedDelivery() throws Exception {
String topicName = "persistent://public/default/topic-" + UUID.randomUUID().toString();

admin.topics().createPartitionedTopic(topicName, 3);
assertNull(admin.topics().getDelayedDeliveryPolicy(topicName));
DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(2000, false);
admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
//wait for update
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
if (admin.topics().getDelayedDeliveryPolicy(topicName) != null) {
break;
}
}

assertFalse(admin.topics().getDelayedDeliveryPolicy(topicName).isActive());
assertEquals(2000, admin.topics().getDelayedDeliveryPolicy(topicName).getTickTime());

admin.topics().removeDelayedDeliveryPolicy(topicName);
//wait for update
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
if (admin.topics().getDelayedDeliveryPolicy(topicName) == null) {
break;
}
}
assertNull(admin.topics().getDelayedDeliveryPolicy(topicName));
}

@Test(timeOut = 20000)
public void testEnableTopicDelayedDelivery() throws Exception {
final String topicName = "persistent://public/default/test" + UUID.randomUUID().toString();

admin.topics().createPartitionedTopic(topicName, 3);
assertNull(admin.topics().getDelayedDeliveryPolicy(topicName));
//1 Set topic policy
DelayedDeliveryPolicies delayedDeliveryPolicies = new DelayedDeliveryPolicies(2000, true);
admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
//wait for update
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
if (admin.topics().getDelayedDeliveryPolicy(topicName) != null) {
break;
}
}
//2 Setup consumer and producer
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub" + System.currentTimeMillis())
.subscriptionType(SubscriptionType.Shared)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName).create();
//3 Send delay message
for (int i = 0; i < 10; i++) {
producer.newMessage()
.value("delayed-msg-" + i)
.deliverAfter(5, TimeUnit.SECONDS)
.sendAsync();
}
producer.flush();

//4 There will be no message in the first 3 seconds
assertNull(consumer.receive(3, TimeUnit.SECONDS));

Set<String> delayedMessages = new HashSet<>();
for (int i = 0; i < 10; i++) {
Message<String> msg = consumer.receive(4, TimeUnit.SECONDS);
delayedMessages.add(msg.getValue());
consumer.acknowledge(msg);
}
for (int i = 0; i < 10; i++) {
assertTrue(delayedMessages.contains("delayed-msg-" + i));
}
//5 Disable delayed delivery
delayedDeliveryPolicies.setActive(false);
admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
//wait for update
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
if (!admin.topics().getDelayedDeliveryPolicy(topicName).isActive()) {
break;
}
}
producer.newMessage().value("disabled-msg").deliverAfter(5, TimeUnit.SECONDS).send();
//6 Delay deliver is disabled, so we can receive message immediately
Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
assertNotNull(msg);
consumer.acknowledge(msg);
//7 Set a very long tick time, so that trackDelayedDelivery will fail. we can receive msg immediately.
delayedDeliveryPolicies.setActive(true);
delayedDeliveryPolicies.setTickTime(Integer.MAX_VALUE);
admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
//wait for update
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
if (admin.topics().getDelayedDeliveryPolicy(topicName).isActive()) {
break;
}
}
producer.newMessage().value("long-tick-msg").deliverAfter(5, TimeUnit.SECONDS).send();
msg = consumer.receive(1, TimeUnit.SECONDS);
assertNotNull(msg);
consumer.acknowledge(msg);
//8 remove topic policy, it will use namespace level policy
admin.topics().removeDelayedDeliveryPolicy(topicName);
//wait for update
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
if (admin.topics().getDelayedDeliveryPolicy(topicName) == null) {
break;
}
}
producer.newMessage().value("long-tick-msg").deliverAfter(2, TimeUnit.SECONDS).send();
msg = consumer.receive(1, TimeUnit.SECONDS);
assertNull(msg);
msg = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(msg);
}
}
Loading

0 comments on commit d7ec806

Please sign in to comment.