Skip to content

Commit

Permalink
HDDS-11667. Validating DatanodeID on any request to the datanode (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
swamirishi authored Nov 20, 2024
1 parent fc6a2ea commit 9945de6
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -647,6 +648,20 @@ public boolean equals(Object obj) {
uuid.equals(((DatanodeDetails) obj).uuid);
}


/**
* Checks hostname, ipAddress and port of the 2 nodes are the same.
* @param datanodeDetails dnDetails object to compare with.
* @return true if the values match otherwise false.
*/
public boolean compareNodeValues(DatanodeDetails datanodeDetails) {
if (this == datanodeDetails || super.equals(datanodeDetails)) {
return true;
}
return Objects.equals(ipAddress, datanodeDetails.ipAddress)
&& Objects.equals(hostName, datanodeDetails.hostName) && Objects.equals(ports, datanodeDetails.ports);
}

@Override
public int hashCode() {
return uuid.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,11 @@ public List<DatanodeDetails> getNodesInOrder() {
}

void reportDatanode(DatanodeDetails dn) throws IOException {
if (nodeStatus.get(dn) == null) {
//This is a workaround for the case a datanode restarted with reinitializing it's dnId but it still reports the
// same set of pipelines it was part of. The pipeline report should be accepted for this anomalous condition.
// We rely on StaleNodeHandler in closing this pipeline eventually.
if (dn == null || (nodeStatus.get(dn) == null
&& nodeStatus.keySet().stream().noneMatch(node -> node.compareNodeValues(dn)))) {
throw new IOException(
String.format("Datanode=%s not part of pipeline=%s", dn, id));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.InputStream;
import java.io.OutputStream;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
Expand Down Expand Up @@ -93,7 +94,8 @@ public abstract StateMachine.DataChannel getStreamDataChannel(
*
* @return datanode Id
*/
protected String getDatanodeId() {
@VisibleForTesting
public String getDatanodeId() {
return datanodeId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
Expand All @@ -78,6 +79,7 @@
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
Expand Down Expand Up @@ -202,6 +204,7 @@ long getStartTime() {
private final boolean waitOnBothFollowers;
private final HddsDatanodeService datanodeService;
private static Semaphore semaphore = new Semaphore(1);
private final AtomicBoolean peersValidated;

/**
* CSM metrics.
Expand Down Expand Up @@ -252,6 +255,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
HDDS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
stateMachineHealthy = new AtomicBoolean(true);
this.peersValidated = new AtomicBoolean(false);

ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(
Expand All @@ -265,6 +269,19 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI

}

private void validatePeers() throws IOException {
if (this.peersValidated.get()) {
return;
}
final RaftGroup group = ratisServer.getServerDivision(getGroupId()).getGroup();
final RaftPeerId selfId = ratisServer.getServer().getId();
if (group.getPeer(selfId) == null) {
throw new StorageContainerException("Current datanode " + selfId + " is not a member of " + group,
ContainerProtos.Result.INVALID_CONFIG);
}
peersValidated.set(true);
}

@Override
public StateMachineStorage getStateMachineStorage() {
return storage;
Expand Down Expand Up @@ -962,6 +979,11 @@ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
final CheckedSupplier<ContainerCommandResponseProto, Exception> task
= () -> {
try {
try {
this.validatePeers();
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, request);
}
long timeNow = Time.monotonicNowNanos();
long queueingDelay = timeNow - context.getStartTime();
metrics.recordQueueingDelay(request.getCmdType(), queueingDelay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_NON_EMPTY_CONTAINER;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_ARGUMENT;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
Expand Down Expand Up @@ -242,6 +243,15 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
Type cmdType = request.getCmdType();
// Validate the request has been made to the correct datanode with the node id matching.
if (kvContainer != null) {
try {
handler.validateRequestDatanodeId(kvContainer.getContainerData().getReplicaIndex(),
request.getDatanodeUuid());
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, request);
}
}

switch (cmdType) {
case CreateContainer:
Expand Down Expand Up @@ -353,6 +363,13 @@ ContainerCommandResponseProto handleCreateContainer(
" already exists", null, CONTAINER_ALREADY_EXISTS), request);
}

try {
this.validateRequestDatanodeId(request.getCreateContainer().hasReplicaIndex() ?
request.getCreateContainer().getReplicaIndex() : null, request.getDatanodeUuid());
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, request);
}

long containerID = request.getContainerID();
State containerState = request.getCreateContainer().getState();

Expand Down Expand Up @@ -1532,4 +1549,22 @@ public static FaultInjector getInjector() {
public static void setInjector(FaultInjector instance) {
injector = instance;
}

/**
* Verify if request's replicaIndex matches with containerData. This validates only for EC containers i.e.
* containerReplicaIdx should be > 0.
*
* @param containerReplicaIdx replicaIndex for the container command.
* @param requestDatanodeUUID requested block info
* @throws StorageContainerException if replicaIndex mismatches.
*/
private boolean validateRequestDatanodeId(Integer containerReplicaIdx, String requestDatanodeUUID)
throws StorageContainerException {
if (containerReplicaIdx != null && containerReplicaIdx > 0 && !requestDatanodeUUID.equals(this.getDatanodeId())) {
throw new StorageContainerException(
String.format("Request is trying to write to node with uuid : %s but the current nodeId is: %s .",
requestDatanodeUUID, this.getDatanodeId()), INVALID_ARGUMENT);
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;

import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -131,7 +132,13 @@ public void testHandlerCommandHandling() throws Exception {
.build();

KeyValueContainer container = mock(KeyValueContainer.class);

KeyValueContainerData containerData = mock(KeyValueContainerData.class);
Mockito.when(container.getContainerData()).thenReturn(containerData);
Mockito.when(containerData.getReplicaIndex()).thenReturn(1);
ContainerProtos.ContainerCommandResponseProto responseProto = KeyValueHandler.dispatchRequest(handler,
createContainerRequest, container, null);
assertEquals(ContainerProtos.Result.INVALID_ARGUMENT, responseProto.getResult());
Mockito.when(handler.getDatanodeId()).thenReturn(DATANODE_UUID);
KeyValueHandler
.dispatchRequest(handler, createContainerRequest, container, null);
verify(handler, times(0)).handleListBlock(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
Expand All @@ -40,6 +42,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
Expand All @@ -50,6 +53,7 @@
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
Expand Down Expand Up @@ -264,6 +268,57 @@ public void testContainerStateMachineCloseOnMissingPipeline()
key.close();
}


@Test
public void testContainerStateMachineRestartWithDNChangePipeline()
throws Exception {
try (OzoneOutputStream key = objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("testDNRestart", 1024, ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
ReplicationFactor.THREE), new HashMap<>())) {
key.write("ratis".getBytes(UTF_8));
key.flush();

KeyOutputStream groupOutputStream = (KeyOutputStream) key.
getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
assertEquals(1, locationInfoList.size());

OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
Pipeline pipeline = omKeyLocationInfo.getPipeline();
List<HddsDatanodeService> datanodes =
new ArrayList<>(TestHelper.getDatanodeServices(cluster,
pipeline));

DatanodeDetails dn = datanodes.get(0).getDatanodeDetails();

// Delete all data volumes.
cluster.getHddsDatanode(dn).getDatanodeStateMachine().getContainer().getVolumeSet().getVolumesList()
.stream().forEach(v -> {
try {
FileUtils.deleteDirectory(v.getStorageDir());
} catch (IOException e) {
throw new RuntimeException(e);
}
});

// Delete datanode.id datanodeIdFile.
File datanodeIdFile = new File(HddsServerUtil.getDatanodeIdFilePath(cluster.getHddsDatanode(dn).getConf()));
boolean deleted = datanodeIdFile.delete();
assertTrue(deleted);
cluster.restartHddsDatanode(dn, false);
GenericTestUtils.waitFor(() -> {
try {
key.write("ratis".getBytes(UTF_8));
key.flush();
return groupOutputStream.getLocationInfoList().size() > 1;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, 1000, 30000);
}
}

@Test
public void testContainerStateMachineFailures() throws Exception {
OzoneOutputStream key =
Expand Down
Loading

0 comments on commit 9945de6

Please sign in to comment.