diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 7eccca1c17a9b6..37858afe3d8a71 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2847,6 +2847,10 @@ public void offloadFailed(ManagedLedgerException e, Object ctx) { @Override public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) { + if (config.getLedgerOffloader() != null && config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE) { + callback.offloadFailed(new ManagedLedgerException("NullLedgerOffloader"), ctx); + return; + } PositionImpl requestOffloadTo = (PositionImpl) pos; if (!isValidPosition(requestOffloadTo) // Also consider the case where the last ledger is currently diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index d0da8e65b514e8..a59f53d45756b5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -85,7 +85,7 @@ public void testNullOffloader() throws Exception { ledger.offloadPrefix(p); fail("Should have thrown an exception"); } catch (ManagedLedgerException e) { - assertEquals(e.getCause().getClass(), CompletionException.class); + assertEquals(e.getMessage(), "NullLedgerOffloader"); } assertEquals(ledger.getLedgersInfoAsList().size(), 5); assertEquals(ledger.getLedgersInfoAsList().stream() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 530bfd067eda47..8829841db6c49e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -87,6 +87,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; @@ -102,6 +103,7 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl; import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.LocalPoliciesResources; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources; @@ -1604,18 +1606,22 @@ public CompletableFuture getManagedLedgerConfig(TopicName t topicLevelOffloadPolicies, OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), getPulsar().getConfig().getProperties()); - if (topicLevelOffloadPolicies != null) { - try { - LedgerOffloader topicLevelLedgerOffLoader = - pulsar().createManagedLedgerOffloader(offloadPolicies); - managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader); - } catch (PulsarServerException e) { - throw new RuntimeException(e); + if (NamespaceService.isSystemServiceNamespace(namespace.toString())) { + managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE); + } else { + if (topicLevelOffloadPolicies != null) { + try { + LedgerOffloader topicLevelLedgerOffLoader = + pulsar().createManagedLedgerOffloader(offloadPolicies); + managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader); + } catch (PulsarServerException e) { + throw new RuntimeException(e); + } + } else { + //If the topic level policy is null, use the namespace level + managedLedgerConfig + .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); } - } else { - //If the topic level policy is null, use the namespace level - managedLedgerConfig - .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); } managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index ff45c140f56f5b..d506f7127678f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -20,8 +20,14 @@ import com.google.common.collect.Sets; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.commons.lang.RandomStringUtils; +import org.apache.pulsar.broker.admin.impl.BrokersBase; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -30,8 +36,13 @@ import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -135,4 +146,28 @@ public void testProduceAndConsumeUnderSystemNamespace() throws Exception { Assert.assertNotNull(receive); } + @Test + public void testHealthCheckTopicNotOffload() throws Exception { + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), + pulsar.getConfig()); + TopicName topicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopic(topicName.toString(), true).get().get(); + ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig(); + config.setLedgerOffloader(NullLedgerOffloader.INSTANCE); + admin.brokers().healthcheck(TopicVersion.V2); + admin.topics().triggerOffload(topicName.toString(), MessageId.earliest); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(persistentTopic.getManagedLedger().getOffloadedSize(), 0); + }); + LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class); + config.setLedgerOffloader(ledgerOffloader); + Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader); + admin.topicPolicies().setMaxConsumers(topicName.toString(), 2); + Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(), + NullLedgerOffloader.INSTANCE); + }); + } + }