diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index f546a487f84be..12c3ea12df581 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -410,11 +410,8 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(), config.getBookKeeperEnsemblePlacementPolicyProperties())) .thenAccept(bk -> { - final ManagedLedgerImpl newledger = config.getShadowSource() == null - ? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, - mlOwnershipChecker) - : new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, - mlOwnershipChecker); + final ManagedLedgerImpl newledger = + createManagedLedger(bk, store, name, config, mlOwnershipChecker); PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger); pendingInitializeLedgers.put(name, pendingLedger); newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { @@ -472,6 +469,14 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { }); } + protected ManagedLedgerImpl createManagedLedger(BookKeeper bk, MetaStore store, String name, + ManagedLedgerConfig config, + Supplier> mlOwnershipChecker) { + return config.getShadowSource() == null + ? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker) : + new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker); + } + @Override public void asyncOpenReadOnlyManagedLedger(String managedLedgerName, AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback, diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 966aa068f2ff5..4f45fc67b6377 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -632,7 +632,7 @@ public void operationComplete(List consumers, Stat s) { for (final String cursorName : consumers) { log.info("[{}] Loading cursor {}", name, cursorName); final ManagedCursorImpl cursor; - cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName); + cursor = createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName); cursor.recover(new VoidCallback() { @Override @@ -663,7 +663,7 @@ public void operationFailed(ManagedLedgerException exception) { log.debug("[{}] Recovering cursor {} lazily", name, cursorName); } final ManagedCursorImpl cursor; - cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName); + cursor = createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName); CompletableFuture cursorRecoveryFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorRecoveryFuture); @@ -1007,7 +1007,7 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP if (log.isDebugEnabled()) { log.debug("[{}] Creating new cursor: {}", name, cursorName); } - final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, this, cursorName); + final ManagedCursorImpl cursor = createCursor(bookKeeper, cursorName); CompletableFuture cursorFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorFuture); Position position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition(); @@ -1039,6 +1039,10 @@ public void operationFailed(ManagedLedgerException exception) { }); } + protected ManagedCursorImpl createCursor(BookKeeper bookKeeper, String cursorName) { + return new ManagedCursorImpl(bookKeeper, this, cursorName); + } + @Override public synchronized void asyncDeleteCursor(final String consumerName, final DeleteCursorCallback callback, final Object ctx) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 737bc69bf24df..3d945afe4c115 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -116,8 +116,8 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata try { this.managedLedgerFactory = - new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger, - openTelemetry); + createManagedLedgerFactory(metadataStore, openTelemetry, bkFactory, managedLedgerFactoryConfig, + statsLogger); } catch (Exception e) { statsProvider.stop(); defaultBkClient.close(); @@ -147,6 +147,16 @@ public BookKeeper getBookKeeperClient() { }; } + protected ManagedLedgerFactoryImpl createManagedLedgerFactory(MetadataStoreExtended metadataStore, + OpenTelemetry openTelemetry, + BookkeeperFactoryForCustomEnsemblePlacementPolicy + bkFactory, + ManagedLedgerFactoryConfig managedLedgerFactoryConfig, + StatsLogger statsLogger) throws Exception { + return new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger, + openTelemetry); + } + @Override public Collection getStorageClasses() { return List.of(getDefaultStorageClass()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java index f8bac9d8134b0..523f995cc5dc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import java.io.Closeable; +import java.io.IOException; import org.apache.bookkeeper.mledger.ManagedLedger; /** @@ -28,4 +29,8 @@ public interface TopicFactory extends Closeable { T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class topicClazz); + + default void close() throws IOException { + // default implementation + } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index a96a7e75506eb..275d1ae5818b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -250,70 +250,10 @@ private CompletableFuture addConsumerInternal(Consumer consumer) { } if (dispatcher == null || !dispatcher.isConsumerConnected()) { - Dispatcher previousDispatcher = null; - switch (consumer.subType()) { - case Exclusive: - if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) { - previousDispatcher = dispatcher; - dispatcher = new PersistentDispatcherSingleActiveConsumer( - cursor, SubType.Exclusive, 0, topic, this); - } - break; - case Shared: - if (dispatcher == null || dispatcher.getType() != SubType.Shared) { - previousDispatcher = dispatcher; - if (config.isSubscriptionSharedUseClassicPersistentImplementation()) { - dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this); - } else { - dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this); - } - } - break; - case Failover: - int partitionIndex = TopicName.getPartitionIndex(topicName); - if (partitionIndex < 0) { - // For non partition topics, use a negative index so - // dispatcher won't sort consumers before picking - // an active consumer for the topic. - partitionIndex = -1; - } - - if (dispatcher == null || dispatcher.getType() != SubType.Failover) { - previousDispatcher = dispatcher; - dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, - partitionIndex, topic, this); - } - break; - case Key_Shared: - KeySharedMeta ksm = consumer.getKeySharedMeta(); - if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared - || !((StickyKeyDispatcher) dispatcher) - .hasSameKeySharedPolicy(ksm)) { - previousDispatcher = dispatcher; - if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) { - dispatcher = - new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor, - this, - topic.getBrokerService().getPulsar().getConfiguration(), ksm); - } else { - dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this, - topic.getBrokerService().getPulsar().getConfiguration(), ksm); - } - } - break; - default: - return FutureUtil.failedFuture( - new ServerMetadataException("Unsupported subscription type")); - } - - if (previousDispatcher != null) { - previousDispatcher.close().thenRun(() -> { - log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName); - }).exceptionally(ex -> { - log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex); - return null; - }); + if (consumer.subType() == null) { + return FutureUtil.failedFuture(new ServerMetadataException("Unsupported subscription type")); } + dispatcher = reuseOrCreateDispatcher(dispatcher, consumer); } else { Optional> compatibilityError = checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer); @@ -327,6 +267,79 @@ private CompletableFuture addConsumerInternal(Consumer consumer) { }); } + /** + * Create a new dispatcher or reuse the existing one when it's compatible with the new consumer. + * This protected method can be overridded for testing purpose for injecting test dispatcher instances with + * special behaviors. + * @param dispatcher the existing dispatcher + * @param consumer the new consumer + * @return the dispatcher to use, either the existing one or a new one + */ + protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher, Consumer consumer) { + Dispatcher previousDispatcher = null; + switch (consumer.subType()) { + case Exclusive: + if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) { + previousDispatcher = dispatcher; + dispatcher = new PersistentDispatcherSingleActiveConsumer( + cursor, SubType.Exclusive, 0, topic, this); + } + break; + case Shared: + if (dispatcher == null || dispatcher.getType() != SubType.Shared) { + previousDispatcher = dispatcher; + if (config.isSubscriptionSharedUseClassicPersistentImplementation()) { + dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this); + } else { + dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this); + } + } + break; + case Failover: + int partitionIndex = TopicName.getPartitionIndex(topicName); + if (partitionIndex < 0) { + // For non partition topics, use a negative index so + // dispatcher won't sort consumers before picking + // an active consumer for the topic. + partitionIndex = -1; + } + + if (dispatcher == null || dispatcher.getType() != SubType.Failover) { + previousDispatcher = dispatcher; + dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, + partitionIndex, topic, this); + } + break; + case Key_Shared: + KeySharedMeta ksm = consumer.getKeySharedMeta(); + if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared + || !((StickyKeyDispatcher) dispatcher) + .hasSameKeySharedPolicy(ksm)) { + previousDispatcher = dispatcher; + if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) { + dispatcher = + new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor, + this, config, ksm); + } else { + dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this, + config, ksm); + } + } + break; + } + + if (previousDispatcher != null) { + previousDispatcher.close().thenRun(() -> { + log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName); + }).exceptionally(ex -> { + log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex); + return null; + }); + } + + return dispatcher; + } + @Override public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException { cursor.updateLastActive(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2325c8286a1be..e920c483bb3ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -586,7 +586,17 @@ public CompletableFuture unloadSubscription(@Nonnull String subName) { }); } - private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, + + /** + * Create a new subscription instance for the topic. + * This protected method can be overridden in tests to return a special test implementation instance. + * @param subscriptionName the name of the subscription + * @param cursor the cursor to use for the subscription + * @param replicated the subscription replication flag + * @param subscriptionProperties the subscription properties + * @return the subscription instance + */ + protected PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, Boolean replicated, Map subscriptionProperties) { requireNonNull(topicCompactionService); if (isCompactionSubscription(subscriptionName) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java new file mode 100644 index 0000000000000..a1549b2cb86b3 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.testinterceptor; + +import io.opentelemetry.api.OpenTelemetry; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.function.Supplier; +import lombok.Getter; +import lombok.Setter; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.MetaStore; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ManagedLedgerClientFactory; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TopicFactory; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; + +/** + * A test interceptor for broker tests that allows to decorate persistent topics, subscriptions, dispatchers + * managed ledger factory, managed ledger and managed cursor instances. + */ +public class BrokerTestInterceptor { + public static final BrokerTestInterceptor INSTANCE = new BrokerTestInterceptor(); + + // Suppress default constructor for noninstantiability + private BrokerTestInterceptor() { + + } + + public static class TestTopicFactory implements TopicFactory { + @Override + public T create(String topic, ManagedLedger ledger, BrokerService brokerService, + Class topicClazz) { + if (!topicClazz.isAssignableFrom(PersistentTopic.class)) { + throw new UnsupportedOperationException("Unsupported topic class"); + } + return topicClazz.cast( + INSTANCE.getPersistentTopicDecorator().apply(new TestTopic(topic, ledger, brokerService))); + } + } + + static class TestTopic extends PersistentTopic { + + public TestTopic(String topic, ManagedLedger ledger, BrokerService brokerService) { + super(topic, ledger, brokerService); + } + + @Override + protected PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, + Boolean replicated, + Map subscriptionProperties) { + return INSTANCE.getPersistentSubscriptionDecorator() + .apply(new TestSubscription(this, subscriptionName, cursor, replicated, subscriptionProperties)); + } + } + + static class TestSubscription extends PersistentSubscription { + public TestSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, + Boolean replicated, + Map subscriptionProperties) { + super(topic, subscriptionName, cursor, replicated, subscriptionProperties); + } + + @Override + protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher, + Consumer consumer) { + Dispatcher previousInstance = dispatcher; + dispatcher = super.reuseOrCreateDispatcher(dispatcher, consumer); + if (dispatcher != previousInstance) { + dispatcher = INSTANCE.getDispatcherDecorator().apply(dispatcher); + } + return dispatcher; + } + } + + public static class TestManagedLedgerStorage extends ManagedLedgerClientFactory { + @Override + protected ManagedLedgerFactoryImpl createManagedLedgerFactory(MetadataStoreExtended metadataStore, + OpenTelemetry openTelemetry, + ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory, + ManagedLedgerFactoryConfig managedLedgerFactoryConfig, + StatsLogger statsLogger) throws Exception { + return INSTANCE.managedLedgerFactoryDecorator.apply( + new TestManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger, + openTelemetry)); + } + } + + static class TestManagedLedgerFactoryImpl extends ManagedLedgerFactoryImpl { + public TestManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, + BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, + ManagedLedgerFactoryConfig config, StatsLogger statsLogger, + OpenTelemetry openTelemetry) throws Exception { + super(metadataStore, bookKeeperGroupFactory, config, statsLogger, openTelemetry); + } + + @Override + protected ManagedLedgerImpl createManagedLedger(BookKeeper bk, MetaStore store, String name, + ManagedLedgerConfig config, + Supplier> mlOwnershipChecker) { + return INSTANCE.managedLedgerDecorator.apply( + new TestManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker)); + } + } + + static class TestManagedLedgerImpl extends ManagedLedgerImpl { + public TestManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, + ManagedLedgerConfig config, + OrderedScheduler scheduledExecutor, String name, + Supplier> mlOwnershipChecker) { + super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker); + } + + @Override + protected ManagedCursorImpl createCursor(BookKeeper bookKeeper, String cursorName) { + return INSTANCE.managedCursorDecorator.apply(super.createCursor(bookKeeper, cursorName)); + } + } + + @Getter + @Setter + private Function persistentTopicDecorator = Function.identity(); + + @Getter + @Setter + private Function persistentSubscriptionDecorator = Function.identity(); + + @Getter + @Setter + private Function dispatcherDecorator = Function.identity(); + + @Getter + @Setter + private Function managedLedgerFactoryDecorator = Function.identity(); + + @Getter + @Setter + private Function managedLedgerDecorator = Function.identity(); + + @Getter + @Setter + private Function managedCursorDecorator = Function.identity(); + + public void reset() { + persistentTopicDecorator = Function.identity(); + persistentSubscriptionDecorator = Function.identity(); + dispatcherDecorator = Function.identity(); + managedLedgerFactoryDecorator = Function.identity(); + managedLedgerDecorator = Function.identity(); + managedCursorDecorator = Function.identity(); + } + + public void configure(ServiceConfiguration conf) { + conf.setTopicFactoryClassName(TestTopicFactory.class.getName()); + conf.setManagedLedgerStorageClassName(TestManagedLedgerStorage.class.getName()); + } + + public void applyDispatcherSpyDecorator(Class dispatcherClass, + java.util.function.Consumer spyCustomizer) { + setDispatcherDecorator(createDispatcherSpyDecorator(dispatcherClass, spyCustomizer)); + } + + public static Function createDispatcherSpyDecorator( + Class dispatcherClass, java.util.function.Consumer spyCustomizer) { + return dispatcher -> { + Dispatcher spy = BrokerTestUtil.spyWithoutRecordingInvocations(dispatcher); + spyCustomizer.accept(dispatcherClass.cast(spy)); + return spy; + }; + } + + public void applyCursorSpyDecorator(java.util.function.Consumer spyCustomizer) { + setManagedCursorDecorator(cursor -> { + ManagedCursorImpl spy = BrokerTestUtil.spyWithoutRecordingInvocations(cursor); + spyCustomizer.accept(spy); + return spy; + }); + } +}