Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Avoid subscription fenced error with consumer.seek whenever possible #23163

Merged
merged 5 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public class PersistentSubscription extends AbstractSubscription {
private final PendingAckHandle pendingAckHandle;
private volatile Map<String, String> subscriptionProperties;
private volatile CompletableFuture<Void> fenceFuture;
private volatile CompletableFuture<Void> inProgressResetCursorFuture;

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
Expand Down Expand Up @@ -220,6 +221,16 @@ public boolean setReplicated(boolean replicated) {

@Override
public CompletableFuture<Void> addConsumer(Consumer consumer) {
CompletableFuture<Void> inProgressResetCursorFuture = this.inProgressResetCursorFuture;
if (inProgressResetCursorFuture != null) {
return inProgressResetCursorFuture.handle((ignore, ignoreEx) -> null)
.thenCompose(ignore -> addConsumerInternal(consumer));
} else {
return addConsumerInternal(consumer);
}
}

private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> {
synchronized (PersistentSubscription.this) {
cursor.updateLastActive();
Expand Down Expand Up @@ -775,7 +786,8 @@ public void findEntryComplete(Position position, Object ctx) {
} else {
finalPosition = position.getNext();
}
resetCursor(finalPosition, future);
CompletableFuture<Void> resetCursorFuture = resetCursor(finalPosition);
FutureUtil.completeAfter(future, resetCursorFuture);
}

@Override
Expand All @@ -794,18 +806,13 @@ public void findEntryFailed(ManagedLedgerException exception,
}

@Override
public CompletableFuture<Void> resetCursor(Position position) {
CompletableFuture<Void> future = new CompletableFuture<>();
resetCursor(position, future);
return future;
}

private void resetCursor(Position finalPosition, CompletableFuture<Void> future) {
public CompletableFuture<Void> resetCursor(Position finalPosition) {
if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) {
future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription"));
return;
return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription"));
}

final CompletableFuture<Void> future = new CompletableFuture<>();
inProgressResetCursorFuture = future;
final CompletableFuture<Void> disconnectFuture;

// Lock the Subscription object before locking the Dispatcher object to avoid deadlocks
Expand All @@ -825,6 +832,7 @@ private void resetCursor(Position finalPosition, CompletableFuture<Void> future)
if (throwable != null) {
log.error("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
inProgressResetCursorFuture = null;
future.completeExceptionally(
new SubscriptionBusyException("Failed to disconnect consumers from subscription"));
return;
Expand Down Expand Up @@ -864,6 +872,7 @@ public void resetComplete(Object ctx) {
dispatcher.afterAckMessages(null, finalPosition);
}
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
inProgressResetCursorFuture = null;
future.complete(null);
}

Expand All @@ -872,6 +881,7 @@ public void resetFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to reset subscription to position {}", topicName, subName,
finalPosition, exception);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
inProgressResetCursorFuture = null;
// todo - retry on InvalidCursorPositionException
// or should we just ask user to retry one more time?
if (exception instanceof InvalidCursorPositionException) {
Expand All @@ -886,10 +896,12 @@ public void resetFailed(ManagedLedgerException exception, Object ctx) {
}).exceptionally((e) -> {
log.error("[{}][{}] Error while resetting cursor", topicName, subName, e);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
inProgressResetCursorFuture = null;
future.completeExceptionally(new BrokerServiceException(e));
return null;
});
});
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
Expand All @@ -50,8 +52,13 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -781,6 +788,64 @@ public void testSeekByFunctionAndMultiTopic() throws Exception {
assertEquals(count, (msgInTopic1Partition0 + msgInTopic1Partition1 + msgInTopic1Partition2) * 2);
}

@Test
public void testSeekWillNotEncounteredFencedError() throws Exception {
String topicName = "persistent://prop/ns-abc/my-topic2";
admin.topics().createNonPartitionedTopic(topicName);
admin.topicPolicies().setRetention(topicName, new RetentionPolicies(3600, 0));
// Create a pulsar client with a subscription fenced counter.
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
AtomicInteger receivedFencedErrorCounter = new AtomicInteger();
PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
protected void handleError(CommandError error) {
if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced")) {
receivedFencedErrorCounter.incrementAndGet();
}
super.handleError(error);
}
});

// publish some messages.
org.apache.pulsar.client.api.Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("s1")
.subscribe();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topicName).create();
MessageIdImpl msgId1 = (MessageIdImpl) producer.send("0");
for (int i = 1; i < 11; i++) {
admin.topics().unload(topicName);
producer.send(i + "");
}

// Inject a delay for reset-cursor.
mockZooKeeper.delay(3000, (op, path) -> {
if (path.equals("/managed-ledgers/prop/ns-abc/persistent/my-topic2/s1")) {
return op.toString().equalsIgnoreCase("SET");
}
return false;
});

// Verify: consumer will not receive "subscription fenced" error after a seek.
for (int i = 1; i < 11; i++) {
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
assertNotNull(msg);
consumer.acknowledge(msg);
}
consumer.seek(msgId1);
Awaitility.await().untilAsserted(() -> {
assertTrue(consumer.isConnected());
});
assertEquals(receivedFencedErrorCounter.get(), 0);

// cleanup.
producer.close();
consumer.close();
client.close();
admin.topics().delete(topicName);
}

@Test
public void testExceptionBySeekFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
Expand Down
Loading