Skip to content

Commit

Permalink
Add request id to decommission request
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Jan 10, 2023
1 parent 319b4b1 commit c46992c
Show file tree
Hide file tree
Showing 16 changed files with 113 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionR
public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120);

private DecommissionAttribute decommissionAttribute;
private boolean originalRequest = false;
private String id;
private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT;

// holder for no_delay param. To avoid draining time timeout.
Expand All @@ -47,8 +47,7 @@ public DecommissionRequest(StreamInput in) throws IOException {
decommissionAttribute = new DecommissionAttribute(in);
this.delayTimeout = in.readTimeValue();
this.noDelay = in.readBoolean();
this.originalRequest = in.readBoolean();

this.id = in.readOptionalString();
}

@Override
Expand All @@ -57,7 +56,7 @@ public void writeTo(StreamOutput out) throws IOException {
decommissionAttribute.writeTo(out);
out.writeTimeValue(delayTimeout);
out.writeBoolean(noDelay);
out.writeBoolean(originalRequest);
out.writeOptionalString(id);
}

/**
Expand Down Expand Up @@ -100,21 +99,21 @@ public boolean isNoDelay() {
}

/**
* Sets originalRequest for decommission request
* Sets id for decommission request
*
* @param originalRequest boolean to identify if it is the first and original request
* @param id uuid for request
* @return this request
*/
public DecommissionRequest originalRequest(boolean originalRequest) {
this.originalRequest = originalRequest;
public DecommissionRequest setID(String id) {
this.id = id;
return this;
}

/**
* @return Returns whether decommission request is first and original request
* @return Returns id of decommission request
*/
public boolean originalRequest() {
return this.originalRequest;
public String id() {
return id;
}

@Override
Expand Down Expand Up @@ -146,8 +145,6 @@ public String toString() {
return "DecommissionRequest{"
+ "decommissionAttribute="
+ decommissionAttribute
+ ", originalRequest="
+ originalRequest
+ ", delayTimeout="
+ delayTimeout
+ ", noDelay="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public DecommissionRequestBuilder setNoDelay(boolean noDelay) {
}

/**
* Sets originalRequest for decommission request
* Sets id for decommission request
*
* @param originalRequest boolean to identify if it is the first and original request
* @param id for decommission request
* @return current object
*/
public DecommissionRequestBuilder originalRequest(boolean originalRequest) {
request.originalRequest(originalRequest);
public DecommissionRequestBuilder id(String id) {
request.setID(id);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.cluster.decommission.DecommissionService;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -76,6 +77,9 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS
protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener<DecommissionResponse> listener)
throws Exception {
logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString());
if (request.id() == null) {
request.setID(UUIDs.base64UUID());
}
decommissionService.startDecommissionAction(request, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DecommissionAttributeMetadata extends AbstractNamedDiffable<Custom>

private final DecommissionAttribute decommissionAttribute;
private DecommissionStatus status;
private String requestID;
public static final String attributeType = "awareness";

/**
Expand All @@ -45,18 +46,19 @@ public class DecommissionAttributeMetadata extends AbstractNamedDiffable<Custom>
* @param decommissionAttribute attribute details
* @param status current status of the attribute decommission
*/
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status) {
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status, String requestId) {
this.decommissionAttribute = decommissionAttribute;
this.status = status;
this.requestID = requestId;
}

/**
* Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT}
* Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT} and request id
*
* @param decommissionAttribute attribute details
*/
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute) {
this(decommissionAttribute, DecommissionStatus.INIT);
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, String requestID) {
this(decommissionAttribute, DecommissionStatus.INIT, requestID);
}

/**
Expand All @@ -77,6 +79,15 @@ public DecommissionStatus status() {
return this.status;
}

/**
* Returns the request id of the decommission
*
* @return request id
*/
public String requestID() {
return this.requestID;
}

/**
* Returns instance of the metadata with updated status
* @param newStatus status to be updated with
Expand Down Expand Up @@ -128,12 +139,13 @@ public boolean equals(Object o) {
DecommissionAttributeMetadata that = (DecommissionAttributeMetadata) o;

if (!status.equals(that.status)) return false;
if (!requestID.equals(that.requestID)) return false;
return decommissionAttribute.equals(that.decommissionAttribute);
}

@Override
public int hashCode() {
return Objects.hash(attributeType, decommissionAttribute, status);
return Objects.hash(attributeType, decommissionAttribute, status, requestID);
}

/**
Expand All @@ -152,6 +164,7 @@ public Version getMinimalSupportedVersion() {
public DecommissionAttributeMetadata(StreamInput in) throws IOException {
this.decommissionAttribute = new DecommissionAttribute(in);
this.status = DecommissionStatus.fromString(in.readString());
this.requestID = in.readString();
}

public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
Expand All @@ -165,12 +178,14 @@ public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException
public void writeTo(StreamOutput out) throws IOException {
decommissionAttribute.writeTo(out);
out.writeString(status.status());
out.writeString(requestID);
}

public static DecommissionAttributeMetadata fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token;
DecommissionAttribute decommissionAttribute = null;
DecommissionStatus status = null;
String requestID = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
Expand Down Expand Up @@ -210,6 +225,13 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser)
);
}
status = DecommissionStatus.fromString(parser.text());
} else if ("requestID".equals(currentFieldName)) {
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
throw new OpenSearchParseException(
"failed to parse status of decommissioning, expected string but found unknown type"
);
}
requestID = parser.text();
} else {
throw new OpenSearchParseException(
"unknown field found [{}], failed to parse the decommission attribute",
Expand All @@ -218,15 +240,15 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser)
}
}
}
return new DecommissionAttributeMetadata(decommissionAttribute, status);
return new DecommissionAttributeMetadata(decommissionAttribute, status, requestID);
}

/**
* {@inheritDoc}
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
toXContent(decommissionAttribute, status, attributeType, builder, params);
toXContent(decommissionAttribute, status, requestID, attributeType, builder, params);
return builder;
}

Expand All @@ -245,6 +267,7 @@ public EnumSet<Metadata.XContentContext> context() {
public static void toXContent(
DecommissionAttribute decommissionAttribute,
DecommissionStatus status,
String requestID,
String attributeType,
XContentBuilder builder,
ToXContent.Params params
Expand All @@ -253,6 +276,7 @@ public static void toXContent(
builder.field(decommissionAttribute.attributeName(), decommissionAttribute.attributeValue());
builder.endObject();
builder.field("status", status.status());
builder.field("requestID", requestID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public ClusterState execute(ClusterState currentState) {
decommissionAttributeMetadata.validateNewStatus(decommissionStatus);
decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttributeMetadata.decommissionAttribute(),
decommissionStatus
decommissionStatus,
decommissionAttributeMetadata.requestID()
);
ClusterState newState = ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ public class DecommissionHelper {

static ClusterState registerDecommissionAttributeInClusterState(
ClusterState currentState,
DecommissionAttribute decommissionAttribute
DecommissionAttribute decommissionAttribute,
String requestID
) {
DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute);
DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, requestID);
return ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,15 @@ public ClusterState execute(ClusterState currentState) {
// validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action
validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes);
DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata();
if (decommissionAttributeMetadata == null || decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED)) {
decommissionRequest.originalRequest(true);
}
// check that request is eligible to proceed and attribute is weighed away
ensureEligibleRequest(decommissionAttributeMetadata, decommissionRequest);
ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute);

ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute);
ClusterState newState = registerDecommissionAttributeInClusterState(
currentState,
decommissionAttribute,
decommissionRequest.id()
);
// add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion
nodeIdsToBeExcluded = filterNodesWithDecommissionAttribute(currentState, decommissionAttribute, true).stream()
.map(DiscoveryNode::getId)
Expand Down Expand Up @@ -191,6 +192,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata();
assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute());
assert decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT);
assert decommissionAttributeMetadata.requestID().equals(decommissionRequest.id());
assert newState.getVotingConfigExclusions()
.stream()
.map(CoordinationMetadata.VotingConfigExclusion::getNodeId)
Expand Down Expand Up @@ -461,17 +463,21 @@ private static void ensureEligibleRequest(
DecommissionAttributeMetadata decommissionAttributeMetadata,
DecommissionRequest decommissionRequest
) {
String msg = null;
String msg;
DecommissionAttribute requestedDecommissionAttribute = decommissionRequest.getDecommissionAttribute();
if (decommissionAttributeMetadata != null) {
// check if the same attribute is registered and handle it accordingly
if (decommissionAttributeMetadata.decommissionAttribute().equals(requestedDecommissionAttribute)) {
switch (decommissionAttributeMetadata.status()) {
// for INIT - check if it is eligible internal retry
case INIT:
msg = (decommissionRequest.originalRequest() == false) ? "same request is already in status [INIT]" : null;
throw new DecommissioningFailedException(requestedDecommissionAttribute, msg);
// for FAILED - we are good to process it again
if (decommissionRequest.id().equals(decommissionAttributeMetadata.requestID()) == false) {
throw new DecommissioningFailedException(
requestedDecommissionAttribute,
"same request is already in status [INIT]"
);
}
// for FAILED - we are good to process it again
case FAILED:
break;
case DRAINING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ public void testPreventJoinClusterWithDecommission() {
);
DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttribute,
decommissionStatus
decommissionStatus,
randomAlphaOfLength(10)
);
Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build();
DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-1"));
Expand All @@ -257,7 +258,8 @@ public void testJoinClusterWithDifferentDecommission() {
);
DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttribute,
decommissionStatus
decommissionStatus,
randomAlphaOfLength(10)
);
Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build();

Expand All @@ -277,7 +279,8 @@ public void testJoinFailedForDecommissionedNode() throws Exception {
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone1");
DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttribute,
DecommissionStatus.SUCCESSFUL
DecommissionStatus.SUCCESSFUL,
randomAlphaOfLength(10)
);
final ClusterState clusterManagerClusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
Expand Down Expand Up @@ -315,7 +318,8 @@ public void testJoinClusterWithDecommissionFailed() {
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1");
DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttribute,
DecommissionStatus.FAILED
DecommissionStatus.FAILED,
randomAlphaOfLength(10)
);
Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,8 @@ private static ClusterState initialStateWithDecommissionedAttribute(
) {
DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttribute,
DecommissionStatus.SUCCESSFUL
DecommissionStatus.SUCCESSFUL,
randomAlphaOfLength(10)
);
return ClusterState.builder(clusterState)
.metadata(Metadata.builder(clusterState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ private void verifyDecommissionStatusTransition(DecommissionStatus currentStatus
final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata(
new DecommissionAttribute("zone", "zone-1"),
currentStatus
currentStatus,
randomAlphaOfLength(10)
);
ClusterState state = clusterService.state();
Metadata metadata = state.metadata();
Expand Down
Loading

0 comments on commit c46992c

Please sign in to comment.