Skip to content

Commit

Permalink
[fix][test] Fix resource leaks in LoadBalancerTest (#21443)
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Oct 26, 2023
1 parent 3a9f99f commit 5e16ecf
Showing 1 changed file with 7 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -88,8 +85,6 @@
public class LoadBalancerTest {
LocalBookkeeperEnsemble bkEnsemble;

ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

private static final Logger log = LoggerFactory.getLogger(LoadBalancerTest.class);

private static final int MAX_RETRIES = 15;
Expand Down Expand Up @@ -150,13 +145,10 @@ void setup() throws Exception {
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdownNow();

for (int i = 0; i < BROKER_COUNT; i++) {
pulsarAdmins[i].close();
if (pulsarServices[i] != null) {
pulsarServices[i].close();
}
pulsarServices[i].close();
}

bkEnsemble.stop();
Expand Down Expand Up @@ -670,27 +662,21 @@ public void testNamespaceBundleAutoSplit() throws Exception {
*/
@Test
public void testLeaderElection() throws Exception {
// this.pulsarServices is the reference to all of the PulsarServices
// it is used in order to clean up the resources
PulsarService[] allServices = new PulsarService[pulsarServices.length];
System.arraycopy(pulsarServices, 0, allServices, 0, pulsarServices.length);
for (int i = 0; i < BROKER_COUNT - 1; i++) {
List<PulsarService> activePulsar = new ArrayList<>();
List<PulsarService> followerPulsar = new ArrayList<>();
LeaderBroker oldLeader = null;
PulsarService leaderPulsar = null;
for (int j = 0; j < BROKER_COUNT; j++) {
if (allServices[j].getState() != PulsarService.State.Closed) {
activePulsar.add(allServices[j]);
LeaderElectionService les = allServices[j].getLeaderElectionService();
PulsarService pulsarService = pulsarServices[j];
if (pulsarService.getState() != PulsarService.State.Closed) {
activePulsar.add(pulsarService);
LeaderElectionService les = pulsarService.getLeaderElectionService();
if (les.isLeader()) {
oldLeader = les.getCurrentLeader().get();
leaderPulsar = allServices[j];
// set the refence to null in the main array,
// in order to prevent closing this PulsarService twice
pulsarServices[i] = null;
leaderPulsar = pulsarService;
} else {
followerPulsar.add(allServices[j]);
followerPulsar.add(pulsarService);
}
}
}
Expand Down

0 comments on commit 5e16ecf

Please sign in to comment.