Skip to content

Commit

Permalink
Change runnable to action listener
Browse files Browse the repository at this point in the history
  • Loading branch information
imRishN committed Aug 31, 2022
1 parent 9233757 commit 8eb27f0
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ public Coordinator(
this.onJoinValidators,
rerouteService,
nodeHealthService,
this.peerFinder::onDecommission,
this.peerFinder::onRecommission
peerFinder.nodeCommissionedListener()
);
this.publicationHandler = new PublicationTransportHandler(
transportService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ public class JoinHelper {
private final NodeHealthService nodeHealthService;

public boolean isDecommissioned;
private Runnable onDecommission;
private Runnable onRecommission;
private final ActionListener<Void> nodeCommissionedListener;

private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());

Expand All @@ -137,13 +136,13 @@ public class JoinHelper {
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
RerouteService rerouteService,
NodeHealthService nodeHealthService,
Runnable onDecommission,
Runnable onRecommission
ActionListener<Void> nodeCommissionedListener
) {
this.clusterManagerService = clusterManagerService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.nodeCommissionedListener = nodeCommissionedListener;
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService, transportService) {

private final long term = currentTermSupplier.getAsLong();
Expand Down Expand Up @@ -353,7 +352,7 @@ public void handleResponse(Empty response) {
onCompletion.run();
if (isDecommissioned) {
isDecommissioned = false;
onRecommission.run();
nodeCommissionedListener.onResponse(null);
}
}

Expand All @@ -364,7 +363,7 @@ public void handleException(TransportException exp) {
logger.info("local node is decommissioned. Will not be able to join the cluster");
if (!isDecommissioned) {
isDecommissioned = true;
onDecommission.run();
nodeCommissionedListener.onFailure(exp);
}
}
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
Expand Down
26 changes: 16 additions & 10 deletions server/src/main/java/org/opensearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,22 @@ public PeerFinder(
);
}

public ActionListener<Void> nodeCommissionedListener() {
return new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
logger.info("setting findPeersInterval to [{}], due to recommissioning", findPeersInterval);
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
}

@Override
public void onFailure(Exception e) {
logger.info("setting findPeersInterval to [{}], due to decommissioning", findPeersInterval);
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING.get(settings);
}
};
}

public void activate(final DiscoveryNodes lastAcceptedNodes) {
logger.trace("activating with {}", lastAcceptedNodes);

Expand Down Expand Up @@ -166,16 +182,6 @@ public void deactivate(DiscoveryNode leader) {
}
}

public void onDecommission() {
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING.get(settings);
logger.info("setting findPeersInterval to [{}], due to decommissioning", findPeersInterval);
}

public void onRecommission() {
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
logger.info("setting findPeersInterval to [{}], due to recommissioning", findPeersInterval);
}

// exposed to subclasses for testing
protected final boolean holdsLock() {
return Thread.holdsLock(mutex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.logging.log4j.Level;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterName;
Expand All @@ -55,6 +56,7 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Mockito.mock;
import static org.opensearch.monitor.StatusInfo.Status.HEALTHY;
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.opensearch.node.Node.NODE_NAME_SETTING;
Expand Down Expand Up @@ -91,8 +93,7 @@ public void testJoinDeduplication() {
Collections.emptyList(),
(s, p, r) -> {},
() -> new StatusInfo(HEALTHY, "info"),
()-> {},
() -> {}
mock(ActionListener.class)
);
transportService.start();

Expand Down Expand Up @@ -233,8 +234,7 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName,
Collections.emptyList(),
(s, p, r) -> {},
null,
() -> {},
() -> {}
mock(ActionListener.class)
); // registers request handler
transportService.start();
transportService.acceptIncomingRequests();
Expand Down Expand Up @@ -289,8 +289,7 @@ public void testJoinFailureOnUnhealthyNodes() {
Collections.emptyList(),
(s, p, r) -> {},
() -> nodeHealthServiceStatus.get(),
() -> {},
() -> {}
mock(ActionListener.class)
);
transportService.start();

Expand Down

0 comments on commit 8eb27f0

Please sign in to comment.