Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][test] Support decorating topic, subscription, dispatcher, ManagedLedger and ManagedCursors instances in tests #23892

Merged
merged 2 commits into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,8 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties()))
.thenAccept(bk -> {
final ManagedLedgerImpl newledger = config.getShadowSource() == null
? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name,
mlOwnershipChecker)
: new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name,
mlOwnershipChecker);
final ManagedLedgerImpl newledger =
createManagedLedger(bk, store, name, config, mlOwnershipChecker);
PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
pendingInitializeLedgers.put(name, pendingLedger);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
Expand Down Expand Up @@ -472,6 +469,14 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
});
}

protected ManagedLedgerImpl createManagedLedger(BookKeeper bk, MetaStore store, String name,
ManagedLedgerConfig config,
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
return config.getShadowSource() == null
? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker) :
new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker);
}

@Override
public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ public void operationComplete(List<String> consumers, Stat s) {
for (final String cursorName : consumers) {
log.info("[{}] Loading cursor {}", name, cursorName);
final ManagedCursorImpl cursor;
cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName);
cursor = createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName);
lhotari marked this conversation as resolved.
Show resolved Hide resolved

cursor.recover(new VoidCallback() {
@Override
Expand Down Expand Up @@ -663,7 +663,7 @@ public void operationFailed(ManagedLedgerException exception) {
log.debug("[{}] Recovering cursor {} lazily", name, cursorName);
}
final ManagedCursorImpl cursor;
cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName);
cursor = createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
CompletableFuture<ManagedCursor> cursorRecoveryFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorRecoveryFuture);

Expand Down Expand Up @@ -1007,7 +1007,7 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
if (log.isDebugEnabled()) {
log.debug("[{}] Creating new cursor: {}", name, cursorName);
}
final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, this, cursorName);
final ManagedCursorImpl cursor = createCursor(bookKeeper, cursorName);
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
Position position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition();
Expand Down Expand Up @@ -1039,6 +1039,10 @@ public void operationFailed(ManagedLedgerException exception) {
});
}

protected ManagedCursorImpl createCursor(BookKeeper bookKeeper, String cursorName) {
return new ManagedCursorImpl(bookKeeper, this, cursorName);
}

@Override
public synchronized void asyncDeleteCursor(final String consumerName, final DeleteCursorCallback callback,
final Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata

try {
this.managedLedgerFactory =
new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger,
openTelemetry);
createManagedLedgerFactory(metadataStore, openTelemetry, bkFactory, managedLedgerFactoryConfig,
statsLogger);
} catch (Exception e) {
statsProvider.stop();
defaultBkClient.close();
Expand Down Expand Up @@ -147,6 +147,16 @@ public BookKeeper getBookKeeperClient() {
};
}

protected ManagedLedgerFactoryImpl createManagedLedgerFactory(MetadataStoreExtended metadataStore,
OpenTelemetry openTelemetry,
BookkeeperFactoryForCustomEnsemblePlacementPolicy
bkFactory,
ManagedLedgerFactoryConfig managedLedgerFactoryConfig,
StatsLogger statsLogger) throws Exception {
return new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger,
openTelemetry);
}

@Override
public Collection<ManagedLedgerStorageClass> getStorageClasses() {
return List.of(getDefaultStorageClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import java.io.Closeable;
import java.io.IOException;
import org.apache.bookkeeper.mledger.ManagedLedger;

/**
Expand All @@ -28,4 +29,8 @@
public interface TopicFactory extends Closeable {

<T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz);

default void close() throws IOException {
// default implementation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,70 +250,10 @@ private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
}

if (dispatcher == null || !dispatcher.isConsumerConnected()) {
Dispatcher previousDispatcher = null;
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(
cursor, SubType.Exclusive, 0, topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
previousDispatcher = dispatcher;
if (config.isSubscriptionSharedUseClassicPersistentImplementation()) {
dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this);
}
}
break;
case Failover:
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
// For non partition topics, use a negative index so
// dispatcher won't sort consumers before picking
// an active consumer for the topic.
partitionIndex = -1;
}

if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
partitionIndex, topic, this);
}
break;
case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
|| !((StickyKeyDispatcher) dispatcher)
.hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
dispatcher =
new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
} else {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
}
}
break;
default:
return FutureUtil.failedFuture(
new ServerMetadataException("Unsupported subscription type"));
}

if (previousDispatcher != null) {
previousDispatcher.close().thenRun(() -> {
log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName);
}).exceptionally(ex -> {
log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex);
return null;
});
if (consumer.subType() == null) {
return FutureUtil.failedFuture(new ServerMetadataException("Unsupported subscription type"));
}
dispatcher = reuseOrCreateDispatcher(dispatcher, consumer);
} else {
Optional<CompletableFuture<Void>> compatibilityError =
checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer);
Expand All @@ -327,6 +267,79 @@ private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
});
}

/**
* Create a new dispatcher or reuse the existing one when it's compatible with the new consumer.
* This protected method can be overridded for testing purpose for injecting test dispatcher instances with
* special behaviors.
* @param dispatcher the existing dispatcher
* @param consumer the new consumer
* @return the dispatcher to use, either the existing one or a new one
*/
protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher, Consumer consumer) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
Dispatcher previousDispatcher = null;
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(
cursor, SubType.Exclusive, 0, topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
previousDispatcher = dispatcher;
if (config.isSubscriptionSharedUseClassicPersistentImplementation()) {
dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this);
}
}
break;
case Failover:
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
// For non partition topics, use a negative index so
// dispatcher won't sort consumers before picking
// an active consumer for the topic.
partitionIndex = -1;
}

if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
previousDispatcher = dispatcher;
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
partitionIndex, topic, this);
}
break;
case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
|| !((StickyKeyDispatcher) dispatcher)
.hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
dispatcher =
new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
this, config, ksm);
} else {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
config, ksm);
}
}
break;
}

if (previousDispatcher != null) {
previousDispatcher.close().thenRun(() -> {
log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName);
}).exceptionally(ex -> {
log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex);
return null;
});
}

return dispatcher;
}

@Override
public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
cursor.updateLastActive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,17 @@ public CompletableFuture<Void> unloadSubscription(@Nonnull String subName) {
});
}

private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,

/**
* Create a new subscription instance for the topic.
* This protected method can be overridden in tests to return a special test implementation instance.
* @param subscriptionName the name of the subscription
* @param cursor the cursor to use for the subscription
* @param replicated the subscription replication flag
* @param subscriptionProperties the subscription properties
* @return the subscription instance
*/
protected PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
Boolean replicated, Map<String, String> subscriptionProperties) {
requireNonNull(topicCompactionService);
if (isCompactionSubscription(subscriptionName)
Expand Down
Loading
Loading