Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Jan 10, 2023
1 parent 7ecbe6e commit 88ec79d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.cluster.decommission.DecommissionService;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -77,9 +76,6 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS
protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener<DecommissionResponse> listener)
throws Exception {
logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString());
if (request.id() == null) {
request.setID(UUIDs.base64UUID());
}
decommissionService.startDecommissionAction(request, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.UUIDs;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -144,6 +145,9 @@ public void startDecommissionAction(
public ClusterState execute(ClusterState currentState) {
// validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action
validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes);
if (decommissionRequest.id() == null) {
decommissionRequest.setID(UUIDs.base64UUID());
}
DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata();
// check that request is eligible to proceed and attribute is weighed away
ensureEligibleRequest(decommissionAttributeMetadata, decommissionRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,40 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testDecommissioningFailedWhenAnotherRequestForSameAttributeIsExecuted() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionStatus oldStatus = DecommissionStatus.INIT;
DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata(
new DecommissionAttribute("zone", "zone_1"),
oldStatus,
randomAlphaOfLength(10)
);
final ClusterState.Builder builder = builder(clusterService.state());
setState(
clusterService,
builder.metadata(Metadata.builder(clusterService.state().metadata()).decommissionAttributeMetadata(oldMetadata).build())
);
AtomicReference<Exception> exceptionReference = new AtomicReference<>();
ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
@Override
public void onResponse(DecommissionResponse decommissionResponse) {
fail("on response shouldn't have been called");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
exceptionReference.set(e);
countDownLatch.countDown();
}
};
DecommissionRequest request = new DecommissionRequest(new DecommissionAttribute("zone", "zone_1"));
decommissionService.startDecommissionAction(request, listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
assertTrue(exceptionReference.get() instanceof DecommissioningFailedException);
assertThat(exceptionReference.get().getMessage(), Matchers.endsWith("same request is already in status [INIT]"));
}

public void testScheduleNodesDecommissionOnTimeout() {
TransportService mockTransportService = Mockito.mock(TransportService.class);
ThreadPool mockThreadPool = Mockito.mock(ThreadPool.class);
Expand Down

0 comments on commit 88ec79d

Please sign in to comment.