Skip to content

Commit

Permalink
HDDS-10983. EC Key read corruption when the replica index of containe…
Browse files Browse the repository at this point in the history
…r in DN mismatches (apache#6779)

(cherry picked from commit 769d09e)
  • Loading branch information
swamirishi authored and xichen01 committed Jul 17, 2024
1 parent e9591bf commit ffc5677
Show file tree
Hide file tree
Showing 28 changed files with 468 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
this.blockID = blockId;
this.length = blockLen;
setPipeline(pipeline);
Expand All @@ -133,7 +133,7 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
OzoneClientConfig config
) {
) throws IOException {
this(blockId, blockLen, pipeline, token,
xceiverClientFactory, null, config);
}
Expand Down Expand Up @@ -244,33 +244,28 @@ protected List<ChunkInfo> getChunkInfoList() throws IOException {

@VisibleForTesting
protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
final Pipeline pipeline = xceiverClient.getPipeline();

Pipeline pipeline = pipelineRef.get();
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing BlockInputStream for get key to access {}",
blockID.getContainerID());
}

DatanodeBlockID.Builder blkIDBuilder =
DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID())
.setLocalID(blockID.getLocalID())
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());

int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
if (replicaIndex > 0) {
blkIDBuilder.setReplicaIndex(replicaIndex);
LOG.debug("Initializing BlockInputStream for get key to access {} with pipeline {}.",
blockID.getContainerID(), pipeline);
}

GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get());
xceiverClient, VALIDATORS, blockID, tokenRef.get(), pipeline.getReplicaIndexes());

return response.getBlockData().getChunksList();
}

private void setPipeline(Pipeline pipeline) {
private void setPipeline(Pipeline pipeline) throws IOException {
if (pipeline == null) {
return;
}
long replicaIndexes = pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();

if (replicaIndexes > 1) {
throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.",
pipeline));
}

// irrespective of the container state, we will always read via Standalone
// protocol.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
Expand Down Expand Up @@ -60,6 +62,7 @@ public class ChunkInputStream extends InputStream
private final ChunkInfo chunkInfo;
private final long length;
private final BlockID blockID;
private ContainerProtos.DatanodeBlockID datanodeBlockID;
private final XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private final Supplier<Pipeline> pipelineSupplier;
Expand Down Expand Up @@ -290,13 +293,27 @@ protected synchronized void releaseClient() {
}
}

/**
* Updates DatanodeBlockId which based on blockId.
*/
private void updateDatanodeBlockId(Pipeline pipeline) throws IOException {
DatanodeDetails closestNode = pipeline.getClosestNode();
int replicaIdx = pipeline.getReplicaIndex(closestNode);
ContainerProtos.DatanodeBlockID.Builder builder = blockID.getDatanodeBlockIDProtobufBuilder();
if (replicaIdx > 0) {
builder.setReplicaIndex(replicaIdx);
}
datanodeBlockID = builder.build();
}

/**
* Acquire new client if previous one was released.
*/
protected synchronized void acquireClient() throws IOException {
if (xceiverClientFactory != null && xceiverClient == null) {
xceiverClient = xceiverClientFactory.acquireClientForReadData(
pipelineSupplier.get());
Pipeline pipeline = pipelineSupplier.get();
xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
updateDatanodeBlockId(pipeline);
}
}

Expand Down Expand Up @@ -422,8 +439,8 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
throws IOException {

ReadChunkResponseProto readChunkResponse =
ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators, tokenSupplier.get());
ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo, datanodeBlockID, validators,
tokenSupplier.get());

if (readChunkResponse.hasData()) {
return readChunkResponse.getData().asReadOnlyByteBufferList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;

import java.io.IOException;
import java.util.function.Function;

/**
Expand All @@ -52,6 +53,6 @@ BlockExtendedInputStream create(ReplicationConfig repConfig,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config);
OzoneClientConfig config) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.security.token.Token;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
Expand Down Expand Up @@ -80,7 +81,7 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
blockInfo, xceiverFactory, refreshFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected int currentStreamIndex() {
* stream if it has not been opened already.
* @return BlockInput stream to read from.
*/
protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
protected BlockExtendedInputStream getOrOpenStream(int locationIndex) throws IOException {
BlockExtendedInputStream stream = blockStreams[locationIndex];
if (stream == null) {
// To read an EC block, we create a STANDALONE pipeline that contains the
Expand All @@ -187,8 +187,8 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Arrays.asList(dataLocation))
.setId(PipelineID.valueOf(dataLocation.getUuid())).setReplicaIndexes(
ImmutableMap.of(dataLocation, locationIndex + 1))
.setId(PipelineID.valueOf(dataLocation.getUuid()))
.setReplicaIndexes(ImmutableMap.of(dataLocation, locationIndex + 1))
.setState(Pipeline.PipelineState.CLOSED)
.build();

Expand Down Expand Up @@ -239,6 +239,7 @@ protected Function<BlockID, BlockLocationInfo> ecPipelineRefreshFunction(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Collections.singletonList(curIndexNode))
.setId(PipelineID.randomId())
.setReplicaIndexes(Collections.singletonMap(curIndexNode, replicaIndex))
.setState(Pipeline.PipelineState.CLOSED)
.build();
blockLocationInfo.setPipeline(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class DummyBlockInputStream extends BlockInputStream {
Function<BlockID, BlockLocationInfo> refreshFunction,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunks,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
super(blockId, blockLen, pipeline, token,
xceiverClientManager, refreshFunction, config);
this.chunkDataMap = chunks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ final class DummyBlockInputStreamWithRetry
List<ChunkInfo> chunkList,
Map<String, byte[]> chunkMap,
AtomicBoolean isRerfreshed, IOException ioException,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
super(blockId, blockLen, pipeline, token,
xceiverClientManager, blockID -> {
isRerfreshed.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private static ChunkInputStream throwingChunkInputStream(IOException ex,
}

private BlockInputStream createSubject(BlockID blockID, Pipeline pipeline,
ChunkInputStream stream) {
ChunkInputStream stream) throws IOException {
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setChecksumVerify(false);
return new DummyBlockInputStream(blockID, blockSize, pipeline, null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import static org.mockito.ArgumentMatchers.any;

/**
* Tests for BlockInputStreamFactoryImpl.
*/
Expand All @@ -46,14 +50,16 @@ public class TestBlockInputStreamFactoryImpl {
private OzoneConfiguration conf = new OzoneConfiguration();

@Test
public void testNonECGivesBlockInputStream() {
public void testNonECGivesBlockInputStream() throws IOException {
BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
ReplicationConfig repConfig =
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);

BlockLocationInfo blockInfo = createKeyLocationInfo(repConfig, 3,
1024 * 1024 * 10);

Pipeline pipeline = Mockito.spy(blockInfo.getPipeline());
blockInfo.setPipeline(pipeline);
Mockito.when(pipeline.getReplicaIndex(any(DatanodeDetails.class))).thenReturn(1);
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setChecksumVerify(true);
BlockExtendedInputStream stream =
Expand All @@ -66,7 +72,7 @@ public void testNonECGivesBlockInputStream() {
}

@Test
public void testECGivesECBlockInputStream() {
public void testECGivesECBlockInputStream() throws IOException {
BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
ReplicationConfig repConfig =
new ECReplicationConfig(3, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,41 @@
import java.util.Objects;

/**
* BlockID of Ozone (containerID + localID + blockCommitSequenceId).
* BlockID of Ozone (containerID + localID + blockCommitSequenceId + replicaIndex).
*/

public class BlockID {

private final ContainerBlockID containerBlockID;
private long blockCommitSequenceId;
// null value when not set with private constructor.(This is to avoid confusion of replica index 0 & null value).
// This value would be only set when deserializing from ContainerProtos.DatanodeBlockID or copying from another
// BlockID object.
private final Integer replicaIndex;

public BlockID(long containerID, long localID) {
this(containerID, localID, 0);
this(containerID, localID, 0, null);
}

private BlockID(long containerID, long localID, long bcsID) {
private BlockID(long containerID, long localID, long bcsID, Integer repIndex) {
containerBlockID = new ContainerBlockID(containerID, localID);
blockCommitSequenceId = bcsID;
this.replicaIndex = repIndex;
}

public BlockID(BlockID blockID) {
this(blockID.getContainerID(), blockID.getLocalID(), blockID.getBlockCommitSequenceId(),
blockID.getReplicaIndex());
}

public BlockID(ContainerBlockID containerBlockID) {
this(containerBlockID, 0);
this(containerBlockID, 0, null);
}

private BlockID(ContainerBlockID containerBlockID, long bcsId) {
private BlockID(ContainerBlockID containerBlockID, long bcsId, Integer repIndex) {
this.containerBlockID = containerBlockID;
blockCommitSequenceId = bcsId;
this.replicaIndex = repIndex;
}

public long getContainerID() {
Expand All @@ -65,6 +76,11 @@ public void setBlockCommitSequenceId(long blockCommitSequenceId) {
this.blockCommitSequenceId = blockCommitSequenceId;
}

// Can return a null value in case it is not set.
public Integer getReplicaIndex() {
return replicaIndex;
}

public ContainerBlockID getContainerBlockID() {
return containerBlockID;
}
Expand All @@ -79,21 +95,32 @@ public String toString() {
public void appendTo(StringBuilder sb) {
containerBlockID.appendTo(sb);
sb.append(" bcsId: ").append(blockCommitSequenceId);
sb.append(" replicaIndex: ").append(replicaIndex);
}

@JsonIgnore
public ContainerProtos.DatanodeBlockID getDatanodeBlockIDProtobuf() {
ContainerProtos.DatanodeBlockID.Builder blockID = getDatanodeBlockIDProtobufBuilder();
if (replicaIndex != null) {
blockID.setReplicaIndex(replicaIndex);
}
return blockID.build();
}

@JsonIgnore
public ContainerProtos.DatanodeBlockID.Builder getDatanodeBlockIDProtobufBuilder() {
return ContainerProtos.DatanodeBlockID.newBuilder().
setContainerID(containerBlockID.getContainerID())
.setLocalID(containerBlockID.getLocalID())
.setBlockCommitSequenceId(blockCommitSequenceId).build();
.setBlockCommitSequenceId(blockCommitSequenceId);
}

@JsonIgnore
public static BlockID getFromProtobuf(
ContainerProtos.DatanodeBlockID blockID) {
public static BlockID getFromProtobuf(ContainerProtos.DatanodeBlockID blockID) {
return new BlockID(blockID.getContainerID(),
blockID.getLocalID(), blockID.getBlockCommitSequenceId());
blockID.getLocalID(),
blockID.getBlockCommitSequenceId(),
blockID.hasReplicaIndex() ? blockID.getReplicaIndex() : null);
}

@JsonIgnore
Expand All @@ -107,7 +134,7 @@ public HddsProtos.BlockID getProtobuf() {
public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) {
return new BlockID(
ContainerBlockID.getFromProtobuf(blockID.getContainerBlockID()),
blockID.getBlockCommitSequenceId());
blockID.getBlockCommitSequenceId(), null);
}

@Override
Expand All @@ -119,14 +146,14 @@ public boolean equals(Object o) {
return false;
}
BlockID blockID = (BlockID) o;
return containerBlockID.equals(blockID.getContainerBlockID())
&& blockCommitSequenceId == blockID.getBlockCommitSequenceId();
return this.getContainerBlockID().equals(blockID.getContainerBlockID())
&& this.getBlockCommitSequenceId() == blockID.getBlockCommitSequenceId()
&& Objects.equals(this.getReplicaIndex(), blockID.getReplicaIndex());
}

@Override
public int hashCode() {
return Objects
.hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(),
blockCommitSequenceId);
return Objects.hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(),
blockCommitSequenceId, replicaIndex);
}
}
Loading

0 comments on commit ffc5677

Please sign in to comment.