Skip to content

Commit

Permalink
HDDS-10983. EC Key read corruption when the replica index of containe…
Browse files Browse the repository at this point in the history
…r in DN mismatches (apache#6779)

(cherry picked from commit 769d09e)
  • Loading branch information
swamirishi authored and xichen01 committed Jul 18, 2024
1 parent 5fde0fb commit 91567d9
Show file tree
Hide file tree
Showing 29 changed files with 469 additions and 127 deletions.
2 changes: 1 addition & 1 deletion hadoop-hdds/client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
this.blockID = blockId;
this.length = blockLen;
setPipeline(pipeline);
Expand All @@ -133,7 +133,7 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
OzoneClientConfig config
) {
) throws IOException {
this(blockId, blockLen, pipeline, token,
xceiverClientFactory, null, config);
}
Expand Down Expand Up @@ -244,33 +244,28 @@ protected List<ChunkInfo> getChunkInfoList() throws IOException {

@VisibleForTesting
protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
final Pipeline pipeline = xceiverClient.getPipeline();

Pipeline pipeline = pipelineRef.get();
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing BlockInputStream for get key to access {}",
blockID.getContainerID());
}

DatanodeBlockID.Builder blkIDBuilder =
DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID())
.setLocalID(blockID.getLocalID())
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());

int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
if (replicaIndex > 0) {
blkIDBuilder.setReplicaIndex(replicaIndex);
LOG.debug("Initializing BlockInputStream for get key to access {} with pipeline {}.",
blockID.getContainerID(), pipeline);
}

GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get());
xceiverClient, VALIDATORS, blockID, tokenRef.get(), pipeline.getReplicaIndexes());

return response.getBlockData().getChunksList();
}

private void setPipeline(Pipeline pipeline) {
private void setPipeline(Pipeline pipeline) throws IOException {
if (pipeline == null) {
return;
}
long replicaIndexes = pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();

if (replicaIndexes > 1) {
throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.",
pipeline));
}

// irrespective of the container state, we will always read via Standalone
// protocol.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
Expand Down Expand Up @@ -60,6 +62,7 @@ public class ChunkInputStream extends InputStream
private final ChunkInfo chunkInfo;
private final long length;
private final BlockID blockID;
private ContainerProtos.DatanodeBlockID datanodeBlockID;
private final XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private final Supplier<Pipeline> pipelineSupplier;
Expand Down Expand Up @@ -290,13 +293,27 @@ protected synchronized void releaseClient() {
}
}

/**
* Updates DatanodeBlockId which based on blockId.
*/
private void updateDatanodeBlockId(Pipeline pipeline) throws IOException {
DatanodeDetails closestNode = pipeline.getClosestNode();
int replicaIdx = pipeline.getReplicaIndex(closestNode);
ContainerProtos.DatanodeBlockID.Builder builder = blockID.getDatanodeBlockIDProtobufBuilder();
if (replicaIdx > 0) {
builder.setReplicaIndex(replicaIdx);
}
datanodeBlockID = builder.build();
}

/**
* Acquire new client if previous one was released.
*/
protected synchronized void acquireClient() throws IOException {
if (xceiverClientFactory != null && xceiverClient == null) {
xceiverClient = xceiverClientFactory.acquireClientForReadData(
pipelineSupplier.get());
Pipeline pipeline = pipelineSupplier.get();
xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
updateDatanodeBlockId(pipeline);
}
}

Expand Down Expand Up @@ -422,8 +439,8 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
throws IOException {

ReadChunkResponseProto readChunkResponse =
ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators, tokenSupplier.get());
ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo, datanodeBlockID, validators,
tokenSupplier.get());

if (readChunkResponse.hasData()) {
return readChunkResponse.getData().asReadOnlyByteBufferList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;

import java.io.IOException;
import java.util.function.Function;

/**
Expand All @@ -52,6 +53,6 @@ BlockExtendedInputStream create(ReplicationConfig repConfig,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config);
OzoneClientConfig config) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.security.token.Token;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
Expand Down Expand Up @@ -80,7 +81,7 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
blockInfo, xceiverFactory, refreshFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected int currentStreamIndex() {
* stream if it has not been opened already.
* @return BlockInput stream to read from.
*/
protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
protected BlockExtendedInputStream getOrOpenStream(int locationIndex) throws IOException {
BlockExtendedInputStream stream = blockStreams[locationIndex];
if (stream == null) {
// To read an EC block, we create a STANDALONE pipeline that contains the
Expand All @@ -187,8 +187,8 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Arrays.asList(dataLocation))
.setId(PipelineID.valueOf(dataLocation.getUuid())).setReplicaIndexes(
ImmutableMap.of(dataLocation, locationIndex + 1))
.setId(PipelineID.valueOf(dataLocation.getUuid()))
.setReplicaIndexes(ImmutableMap.of(dataLocation, locationIndex + 1))
.setState(Pipeline.PipelineState.CLOSED)
.build();

Expand Down Expand Up @@ -239,6 +239,7 @@ protected Function<BlockID, BlockLocationInfo> ecPipelineRefreshFunction(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Collections.singletonList(curIndexNode))
.setId(PipelineID.randomId())
.setReplicaIndexes(Collections.singletonMap(curIndexNode, replicaIndex))
.setState(Pipeline.PipelineState.CLOSED)
.build();
blockLocationInfo.setPipeline(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class DummyBlockInputStream extends BlockInputStream {
Function<BlockID, BlockLocationInfo> refreshFunction,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunks,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
super(blockId, blockLen, pipeline, token,
xceiverClientManager, refreshFunction, config);
this.chunkDataMap = chunks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ final class DummyBlockInputStreamWithRetry
List<ChunkInfo> chunkList,
Map<String, byte[]> chunkMap,
AtomicBoolean isRerfreshed, IOException ioException,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
super(blockId, blockLen, pipeline, token,
xceiverClientManager, blockID -> {
isRerfreshed.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private static ChunkInputStream throwingChunkInputStream(IOException ex,
}

private BlockInputStream createSubject(BlockID blockID, Pipeline pipeline,
ChunkInputStream stream) {
ChunkInputStream stream) throws IOException {
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setChecksumVerify(false);
return new DummyBlockInputStream(blockID, blockSize, pipeline, null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import static org.mockito.ArgumentMatchers.any;

/**
* Tests for BlockInputStreamFactoryImpl.
*/
Expand All @@ -46,14 +50,16 @@ public class TestBlockInputStreamFactoryImpl {
private OzoneConfiguration conf = new OzoneConfiguration();

@Test
public void testNonECGivesBlockInputStream() {
public void testNonECGivesBlockInputStream() throws IOException {
BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
ReplicationConfig repConfig =
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);

BlockLocationInfo blockInfo = createKeyLocationInfo(repConfig, 3,
1024 * 1024 * 10);

Pipeline pipeline = Mockito.spy(blockInfo.getPipeline());
blockInfo.setPipeline(pipeline);
Mockito.when(pipeline.getReplicaIndex(any(DatanodeDetails.class))).thenReturn(1);
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setChecksumVerify(true);
BlockExtendedInputStream stream =
Expand All @@ -66,7 +72,7 @@ public void testNonECGivesBlockInputStream() {
}

@Test
public void testECGivesECBlockInputStream() {
public void testECGivesECBlockInputStream() throws IOException {
BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
ReplicationConfig repConfig =
new ECReplicationConfig(3, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,41 @@
import java.util.Objects;

/**
* BlockID of Ozone (containerID + localID + blockCommitSequenceId).
* BlockID of Ozone (containerID + localID + blockCommitSequenceId + replicaIndex).
*/

public class BlockID {

private final ContainerBlockID containerBlockID;
private long blockCommitSequenceId;
// null value when not set with private constructor.(This is to avoid confusion of replica index 0 & null value).
// This value would be only set when deserializing from ContainerProtos.DatanodeBlockID or copying from another
// BlockID object.
private final Integer replicaIndex;

public BlockID(long containerID, long localID) {
this(containerID, localID, 0);
this(containerID, localID, 0, null);
}

private BlockID(long containerID, long localID, long bcsID) {
private BlockID(long containerID, long localID, long bcsID, Integer repIndex) {
containerBlockID = new ContainerBlockID(containerID, localID);
blockCommitSequenceId = bcsID;
this.replicaIndex = repIndex;
}

public BlockID(BlockID blockID) {
this(blockID.getContainerID(), blockID.getLocalID(), blockID.getBlockCommitSequenceId(),
blockID.getReplicaIndex());
}

public BlockID(ContainerBlockID containerBlockID) {
this(containerBlockID, 0);
this(containerBlockID, 0, null);
}

private BlockID(ContainerBlockID containerBlockID, long bcsId) {
private BlockID(ContainerBlockID containerBlockID, long bcsId, Integer repIndex) {
this.containerBlockID = containerBlockID;
blockCommitSequenceId = bcsId;
this.replicaIndex = repIndex;
}

public long getContainerID() {
Expand All @@ -65,6 +76,11 @@ public void setBlockCommitSequenceId(long blockCommitSequenceId) {
this.blockCommitSequenceId = blockCommitSequenceId;
}

// Can return a null value in case it is not set.
public Integer getReplicaIndex() {
return replicaIndex;
}

public ContainerBlockID getContainerBlockID() {
return containerBlockID;
}
Expand All @@ -79,21 +95,32 @@ public String toString() {
public void appendTo(StringBuilder sb) {
containerBlockID.appendTo(sb);
sb.append(" bcsId: ").append(blockCommitSequenceId);
sb.append(" replicaIndex: ").append(replicaIndex);
}

@JsonIgnore
public ContainerProtos.DatanodeBlockID getDatanodeBlockIDProtobuf() {
ContainerProtos.DatanodeBlockID.Builder blockID = getDatanodeBlockIDProtobufBuilder();
if (replicaIndex != null) {
blockID.setReplicaIndex(replicaIndex);
}
return blockID.build();
}

@JsonIgnore
public ContainerProtos.DatanodeBlockID.Builder getDatanodeBlockIDProtobufBuilder() {
return ContainerProtos.DatanodeBlockID.newBuilder().
setContainerID(containerBlockID.getContainerID())
.setLocalID(containerBlockID.getLocalID())
.setBlockCommitSequenceId(blockCommitSequenceId).build();
.setBlockCommitSequenceId(blockCommitSequenceId);
}

@JsonIgnore
public static BlockID getFromProtobuf(
ContainerProtos.DatanodeBlockID blockID) {
public static BlockID getFromProtobuf(ContainerProtos.DatanodeBlockID blockID) {
return new BlockID(blockID.getContainerID(),
blockID.getLocalID(), blockID.getBlockCommitSequenceId());
blockID.getLocalID(),
blockID.getBlockCommitSequenceId(),
blockID.hasReplicaIndex() ? blockID.getReplicaIndex() : null);
}

@JsonIgnore
Expand All @@ -107,7 +134,7 @@ public HddsProtos.BlockID getProtobuf() {
public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) {
return new BlockID(
ContainerBlockID.getFromProtobuf(blockID.getContainerBlockID()),
blockID.getBlockCommitSequenceId());
blockID.getBlockCommitSequenceId(), null);
}

@Override
Expand All @@ -119,14 +146,14 @@ public boolean equals(Object o) {
return false;
}
BlockID blockID = (BlockID) o;
return containerBlockID.equals(blockID.getContainerBlockID())
&& blockCommitSequenceId == blockID.getBlockCommitSequenceId();
return this.getContainerBlockID().equals(blockID.getContainerBlockID())
&& this.getBlockCommitSequenceId() == blockID.getBlockCommitSequenceId()
&& Objects.equals(this.getReplicaIndex(), blockID.getReplicaIndex());
}

@Override
public int hashCode() {
return Objects
.hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(),
blockCommitSequenceId);
return Objects.hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(),
blockCommitSequenceId, replicaIndex);
}
}
Loading

0 comments on commit 91567d9

Please sign in to comment.