Skip to content

Commit

Permalink
HDDS-12080: delete irrelevant RATIS-pipeline's raft logs in case of D…
Browse files Browse the repository at this point in the history
…EAD datanode state
  • Loading branch information
Slava Tutrinov committed Jan 14, 2025
1 parent f125363 commit 1507d3f
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
*/
package org.apache.hadoop.ozone.container.common.states.endpoint;

import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.Callable;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
Expand Down Expand Up @@ -94,6 +99,20 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
layoutStorage.setClusterId(clusterId);
layoutStorage.persistCurrentState();

HddsProtos.NodeState nodePreviousState = rpcEndPoint.getEndPoint()
.getNodePreviousState(ozoneContainer.getDatanodeDetails().getUuid());

if (nodePreviousState != null && nodePreviousState.equals(HddsProtos.NodeState.DEAD)) {
ozoneContainer.getMetaVolumeSet().getVolumeMap().forEach((key, value) ->
Arrays.asList(Objects.requireNonNull(value.getStorageDir().listFiles())).stream().filter(File::isDirectory).forEach(f -> {
try {
FileUtils.deleteDirectory(f);
} catch (IOException e) {
LOG.warn("Failed to delete directory {}", f.getAbsolutePath(), e);
}
}));
}

// Start the container services after getting the version information
ozoneContainer.start(clusterId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,4 +656,7 @@ public void compactDb() {
}
}

public DatanodeDetails getDatanodeDetails() {
return datanodeDetails;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ExtendedDatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodePreviousStateResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto
Expand Down Expand Up @@ -89,4 +91,9 @@ SCMRegisteredResponseProto register(
PipelineReportsProto pipelineReports,
LayoutVersionProto layoutInfo) throws IOException;

NodePreviousStateResponseProto
getNodePreviousState(
StorageContainerDatanodeProtocolProtos.NodePreviousStateRequestProto
request) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos
.ExtendedDatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodePreviousStateRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodePreviousStateResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
Expand Down Expand Up @@ -50,6 +54,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.UUID;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -178,4 +183,18 @@ public SCMRegisteredResponseProto register(
(builder) -> builder.setRegisterRequest(req))
.getRegisterResponse();
}

public HddsProtos.NodeState getNodePreviousState(UUID datanodeUuid) throws IOException {
NodePreviousStateRequestProto request = NodePreviousStateRequestProto.newBuilder()
.setDatanodeUUID(datanodeUuid.toString())
.build();
return getNodePreviousState(request).getPreviousState();

}

@Override
public NodePreviousStateResponseProto getNodePreviousState(NodePreviousStateRequestProto request) throws IOException {
return submitRequest(Type.NodePreviousState, builder -> builder.setNodePreviousStateRequest(request))
.getNodePreviousStateResponse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ public SCMDatanodeResponse processMessage(SCMDatanodeRequest request)
.setStatus(Status.OK)
.setRegisterResponse(register(request.getRegisterRequest()))
.build();
case NodePreviousState:
return SCMDatanodeResponse.newBuilder()
.setCmdType(cmdType)
.setStatus(Status.OK)
.setNodePreviousStateResponse(
impl.getNodePreviousState(request.getNodePreviousStateRequest()))
.build();
default:
throw new ServiceException("Unknown command type: " + cmdType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ private void sleepIfNeeded() {
.SCMRegisteredResponseProto.ErrorCode.success).build();
}

@Override
public StorageContainerDatanodeProtocolProtos.NodePreviousStateResponseProto getNodePreviousState(
StorageContainerDatanodeProtocolProtos.NodePreviousStateRequestProto request) throws IOException {
return null;
}

/**
* Update nodeReport.
* @param datanodeDetailsProto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ message SCMDatanodeRequest {
optional SCMVersionRequestProto getVersionRequest = 3;
optional SCMRegisterRequestProto registerRequest = 4;
optional SCMHeartbeatRequestProto sendHeartbeatRequest = 5;
optional NodePreviousStateRequestProto nodePreviousStateRequest = 6;
}

message SCMDatanodeResponse {
Expand All @@ -60,13 +61,15 @@ message SCMDatanodeResponse {
optional SCMVersionResponseProto getVersionResponse = 6;
optional SCMRegisteredResponseProto registerResponse = 7;
optional SCMHeartbeatResponseProto sendHeartbeatResponse = 8;
optional NodePreviousStateResponseProto nodePreviousStateResponse = 9;

}

enum Type {
GetVersion = 1;
Register = 2;
SendHeartbeat = 3;
NodePreviousState = 4;
}

enum Status {
Expand Down Expand Up @@ -124,6 +127,10 @@ message SCMRegisteredResponseProto {
optional string networkLocation = 8;
}

message NodePreviousStateRequestProto {
required string datanodeUUID = 1;
}

/**
* This message is send by data node to indicate that it is alive or it is
* registering with the node manager.
Expand Down Expand Up @@ -157,6 +164,11 @@ message SCMHeartbeatResponseProto {
optional int64 term = 3;
}

message NodePreviousStateResponseProto {
optional NodeState previousState = 1;
}


message SCMNodeAddressList {
repeated string addressList = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodePreviousStateRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodePreviousStateResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconstructECContainersCommandProto;
Expand All @@ -50,6 +52,8 @@
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
Expand Down Expand Up @@ -274,6 +278,24 @@ public SCMRegisteredResponseProto register(
}
}

@Override
public NodePreviousStateResponseProto getNodePreviousState(NodePreviousStateRequestProto request) throws IOException {
NodeStatus nodeStatus = null;
try {
DatanodeDetails nodeByUuid = scm.getScmNodeManager().getNodeByUuid(request.getDatanodeUUID());
if (nodeByUuid != null) {
nodeStatus = scm.getScmNodeManager().getNodeStatus(nodeByUuid);
}
} catch (NodeNotFoundException e) {
LOG.warn("Node not found for UUID: {}", request.getDatanodeUUID());
}
NodePreviousStateResponseProto.Builder builder = NodePreviousStateResponseProto.newBuilder();
if (nodeStatus != null) {
builder.setPreviousState(nodeStatus.getHealth());
}
return builder.build();
}

@VisibleForTesting
public static SCMRegisteredResponseProto getRegisteredResponse(
RegisteredCommand cmd) {
Expand Down

0 comments on commit 1507d3f

Please sign in to comment.