Skip to content

Commit

Permalink
[fix][broker] Avoid heartbeat topic to offload. (apache#15008)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored and nicklixinyang committed Apr 20, 2022
1 parent ace77f5 commit f0e1821
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2847,6 +2847,10 @@ public void offloadFailed(ManagedLedgerException e, Object ctx) {

@Override
public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) {
if (config.getLedgerOffloader() != null && config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE) {
callback.offloadFailed(new ManagedLedgerException("NullLedgerOffloader"), ctx);
return;
}
PositionImpl requestOffloadTo = (PositionImpl) pos;
if (!isValidPosition(requestOffloadTo)
// Also consider the case where the last ledger is currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testNullOffloader() throws Exception {
ledger.offloadPrefix(p);
fail("Should have thrown an exception");
} catch (ManagedLedgerException e) {
assertEquals(e.getCause().getClass(), CompletionException.class);
assertEquals(e.getMessage(), "NullLedgerOffloader");
}
assertEquals(ledger.getLedgersInfoAsList().size(), 5);
assertEquals(ledger.getLedgersInfoAsList().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
Expand All @@ -102,6 +103,7 @@
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
Expand Down Expand Up @@ -1604,18 +1606,22 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
topicLevelOffloadPolicies,
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
getPulsar().getConfig().getProperties());
if (topicLevelOffloadPolicies != null) {
try {
LedgerOffloader topicLevelLedgerOffLoader =
pulsar().createManagedLedgerOffloader(offloadPolicies);
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
} catch (PulsarServerException e) {
throw new RuntimeException(e);
if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
} else {
if (topicLevelOffloadPolicies != null) {
try {
LedgerOffloader topicLevelLedgerOffLoader =
pulsar().createManagedLedgerOffloader(offloadPolicies);
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
} catch (PulsarServerException e) {
throw new RuntimeException(e);
}
} else {
//If the topic level policy is null, use the namespace level
managedLedgerConfig
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}
} else {
//If the topic level policy is null, use the namespace level
managedLedgerConfig
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}

managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@

import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.pulsar.broker.admin.impl.BrokersBase;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -30,8 +36,13 @@
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -135,4 +146,28 @@ public void testProduceAndConsumeUnderSystemNamespace() throws Exception {
Assert.assertNotNull(receive);
}

@Test
public void testHealthCheckTopicNotOffload() throws Exception {
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopic(topicName.toString(), true).get().get();
ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
config.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
admin.brokers().healthcheck(TopicVersion.V2);
admin.topics().triggerOffload(topicName.toString(), MessageId.earliest);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(persistentTopic.getManagedLedger().getOffloadedSize(), 0);
});
LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
config.setLedgerOffloader(ledgerOffloader);
Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> {
Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(),
NullLedgerOffloader.INSTANCE);
});
}

}

0 comments on commit f0e1821

Please sign in to comment.