Skip to content

Commit

Permalink
6/7 p2
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Jun 7, 2024
1 parent d0536f0 commit c32acbe
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1005,15 +1005,20 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean
return numUnloadedTopics;
})
.whenComplete((__, ex) -> {
// clean up topics that failed to unload from the broker ownership cache
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
if (disconnectClients) {
// clean up topics that failed to unload from the broker ownership cache
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
}
pulsar.getNamespaceService().onNamespaceBundleUnload(bundle);
double unloadBundleTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));
if (ex != null) {

log.error("Failed to close topics under bundle:{} in {} ms",
bundle.toString(), unloadBundleTime, ex);
if (!disconnectClients) {
// clean up topics that failed to unload from the broker ownership cache
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
}
} else {
log.info("Unloading bundle:{} with {} topics completed in {} ms",
bundle, unloadedTopics, unloadBundleTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,22 @@ protected void cleanup() throws Exception {
public Object[][] unloadCases (){
// [msgCount, enabledBatch, maxMsgPerBatch, subType, ackMsgCount]
return new Object[][]{
//{100, false, 1, Exclusive, 0},
//{100, false, 1, Failover, 0},
//{100, false, 1, Shared, 0},
//{100, false, 1, Key_Shared, 0},
//{100, true, 5, Exclusive, 0},
//{100, true, 5, Failover, 0},
//{100, true, 5, Shared, 0},
//{100, true, 5, Key_Shared, 0},
//{100, false, 1, Exclusive, 50},
//{100, false, 1, Failover, 50},
//{100, false, 1, Shared, 50},
{100, false, 1, Exclusive, 0},
{100, false, 1, Failover, 0},
{100, false, 1, Shared, 0},
{100, false, 1, Key_Shared, 0},
{100, true, 5, Exclusive, 0},
{100, true, 5, Failover, 0},
{100, true, 5, Shared, 0},
{100, true, 5, Key_Shared, 0},
{100, false, 1, Exclusive, 50},
{100, false, 1, Failover, 50},
{100, false, 1, Shared, 50},
{100, false, 1, Key_Shared, 50},
//{100, true, 5, Exclusive, 50},
//{100, true, 5, Failover, 50},
//{100, true, 5, Shared, 50},
//{100, true, 5, Key_Shared, 50},
{100, true, 5, Exclusive, 50},
{100, true, 5, Failover, 50},
{100, true, 5, Shared, 50},
{100, true, 5, Key_Shared, 50},
};
}

Expand Down

0 comments on commit c32acbe

Please sign in to comment.