Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10983. EC Key read corruption when the replica index of container in DN mismatches #6779

Merged
merged 23 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a1d5c4a
HDDS-10983. EC Key read corruption when the replica index of containe…
swamirishi Jun 6, 2024
8815940
HDDS-10983. Add Testcase
swamirishi Jun 6, 2024
4287270
HDDS-10983. Make the read api compatible with older clients
swamirishi Jun 7, 2024
2bb1b9d
HDDS-10983. Remove stacktrace
swamirishi Jun 7, 2024
5886cf4
HDDS-10983. Fix checkstyle
swamirishi Jun 7, 2024
cfc7415
HDDS-10983. Address review comments
swamirishi Jun 7, 2024
8edc907
HDDS-10983. Fix testcase
swamirishi Jun 7, 2024
1476d59
HDDS-10983. Add client version test
swamirishi Jun 8, 2024
437594d
HDDS-10983. Fix test cases
swamirishi Jun 9, 2024
65994f3
HDDS-10983. Fix issues
swamirishi Jun 10, 2024
731107c
HDDS-10983. Fix testcases
swamirishi Jun 10, 2024
5857567
HDDS-10983. Fix checkstyle
swamirishi Jun 10, 2024
dd11beb
HDDS-10983. Fix Acceptance test
swamirishi Jun 11, 2024
4c4e2c7
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 11, 2024
0d769da
HDDS-10983. Fix checkstyle
swamirishi Jun 11, 2024
45279a3
HDDS-10983. change pipeline supplier usage
swamirishi Jun 13, 2024
58b089d
HDDS-10983. Address review comments and simplify testcase
swamirishi Jun 14, 2024
e4c0d67
HDDS-10983. Convert to mapToInt
swamirishi Jun 14, 2024
da98011
HDDS-11013. Merge upstream master
swamirishi Jun 17, 2024
5db065f
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 17, 2024
b0764e4
HDDS-10983. Merge master
swamirishi Jun 17, 2024
3ae6768
HDDS-10983. Address reveiw comments
swamirishi Jun 19, 2024
040d6d8
HDDS-10983. Move replica index validation outside block manager
swamirishi Jun 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
this.blockID = blockId;
this.length = blockLen;
setPipeline(pipeline);
Expand All @@ -133,7 +133,7 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
OzoneClientConfig config
) {
) throws IOException {
this(blockId, blockLen, pipeline, token,
xceiverClientFactory, null, config);
}
Expand Down Expand Up @@ -244,33 +244,28 @@ protected List<ChunkInfo> getChunkInfoList() throws IOException {

@VisibleForTesting
protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
final Pipeline pipeline = xceiverClient.getPipeline();

Pipeline pipeline = pipelineRef.get();
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing BlockInputStream for get key to access {}",
blockID.getContainerID());
}

DatanodeBlockID.Builder blkIDBuilder =
DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID())
.setLocalID(blockID.getLocalID())
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());

int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
if (replicaIndex > 0) {
blkIDBuilder.setReplicaIndex(replicaIndex);
LOG.debug("Initializing BlockInputStream for get key to access {} with pipeline {}.",
blockID.getContainerID(), pipeline);
}

GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get());
xceiverClient, VALIDATORS, blockID, tokenRef.get(), pipeline.getReplicaIndexes());

return response.getBlockData().getChunksList();
}

private void setPipeline(Pipeline pipeline) {
private void setPipeline(Pipeline pipeline) throws IOException {
if (pipeline == null) {
return;
}
long replicaIndexes = pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();

if (replicaIndexes > 1) {
throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.",
pipeline));
}

// irrespective of the container state, we will always read via Standalone
// protocol.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
Expand Down Expand Up @@ -60,6 +62,7 @@ public class ChunkInputStream extends InputStream
private final ChunkInfo chunkInfo;
private final long length;
private final BlockID blockID;
private ContainerProtos.DatanodeBlockID datanodeBlockID;
private final XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private final Supplier<Pipeline> pipelineSupplier;
Expand Down Expand Up @@ -290,13 +293,27 @@ protected synchronized void releaseClient() {
}
}

/**
* Updates DatanodeBlockId which based on blockId.
*/
private void updateDatanodeBlockId(Pipeline pipeline) throws IOException {
DatanodeDetails closestNode = pipeline.getClosestNode();
int replicaIdx = pipeline.getReplicaIndex(closestNode);
ContainerProtos.DatanodeBlockID.Builder builder = blockID.getDatanodeBlockIDProtobufBuilder();
if (replicaIdx > 0) {
builder.setReplicaIndex(replicaIdx);
}
datanodeBlockID = builder.build();
}

/**
* Acquire new client if previous one was released.
*/
protected synchronized void acquireClient() throws IOException {
if (xceiverClientFactory != null && xceiverClient == null) {
xceiverClient = xceiverClientFactory.acquireClientForReadData(
pipelineSupplier.get());
Pipeline pipeline = pipelineSupplier.get();
xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
updateDatanodeBlockId(pipeline);
}
}

Expand Down Expand Up @@ -422,8 +439,8 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
throws IOException {

ReadChunkResponseProto readChunkResponse =
ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators, tokenSupplier.get());
ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo, datanodeBlockID, validators,
tokenSupplier.get());

if (readChunkResponse.hasData()) {
return readChunkResponse.getData().asReadOnlyByteBufferList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;

import java.io.IOException;
import java.util.function.Function;

/**
Expand All @@ -52,6 +53,6 @@ BlockExtendedInputStream create(ReplicationConfig repConfig,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config);
OzoneClientConfig config) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.security.token.Token;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
Expand Down Expand Up @@ -80,7 +81,7 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
blockInfo, xceiverFactory, refreshFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected int currentStreamIndex() {
* stream if it has not been opened already.
* @return BlockInput stream to read from.
*/
protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
protected BlockExtendedInputStream getOrOpenStream(int locationIndex) throws IOException {
BlockExtendedInputStream stream = blockStreams[locationIndex];
if (stream == null) {
// To read an EC block, we create a STANDALONE pipeline that contains the
Expand All @@ -176,8 +176,8 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Arrays.asList(dataLocation))
.setId(PipelineID.valueOf(dataLocation.getUuid())).setReplicaIndexes(
ImmutableMap.of(dataLocation, locationIndex + 1))
.setId(PipelineID.valueOf(dataLocation.getUuid()))
.setReplicaIndexes(ImmutableMap.of(dataLocation, locationIndex + 1))
.setState(Pipeline.PipelineState.CLOSED)
.build();

Expand Down Expand Up @@ -228,6 +228,7 @@ protected Function<BlockID, BlockLocationInfo> ecPipelineRefreshFunction(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Collections.singletonList(curIndexNode))
.setId(PipelineID.randomId())
.setReplicaIndexes(Collections.singletonMap(curIndexNode, replicaIndex))
.setState(Pipeline.PipelineState.CLOSED)
.build();
blockLocationInfo.setPipeline(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class DummyBlockInputStream extends BlockInputStream {
Function<BlockID, BlockLocationInfo> refreshFunction,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunks,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
super(blockId, blockLen, pipeline, token,
xceiverClientManager, refreshFunction, config);
this.chunkDataMap = chunks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ final class DummyBlockInputStreamWithRetry
List<ChunkInfo> chunkList,
Map<String, byte[]> chunkMap,
AtomicBoolean isRerfreshed, IOException ioException,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
super(blockId, blockLen, pipeline, token,
xceiverClientManager, blockID -> {
isRerfreshed.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ private static ChunkInputStream throwingChunkInputStream(IOException ex,
}

private BlockInputStream createSubject(BlockID blockID, Pipeline pipeline,
ChunkInputStream stream) {
ChunkInputStream stream) throws IOException {
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setChecksumVerify(false);
return new DummyBlockInputStream(blockID, blockSize, pipeline, null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.mockito.ArgumentMatchers.any;

/**
* Tests for BlockInputStreamFactoryImpl.
Expand All @@ -48,14 +51,16 @@ public class TestBlockInputStreamFactoryImpl {
private OzoneConfiguration conf = new OzoneConfiguration();

@Test
public void testNonECGivesBlockInputStream() {
public void testNonECGivesBlockInputStream() throws IOException {
BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
ReplicationConfig repConfig =
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);

BlockLocationInfo blockInfo = createKeyLocationInfo(repConfig, 3,
1024 * 1024 * 10);

Pipeline pipeline = Mockito.spy(blockInfo.getPipeline());
blockInfo.setPipeline(pipeline);
Mockito.when(pipeline.getReplicaIndex(any(DatanodeDetails.class))).thenReturn(1);
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setChecksumVerify(true);
BlockExtendedInputStream stream =
Expand All @@ -68,7 +73,7 @@ public void testNonECGivesBlockInputStream() {
}

@Test
public void testECGivesECBlockInputStream() {
public void testECGivesECBlockInputStream() throws IOException {
BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
ReplicationConfig repConfig =
new ECReplicationConfig(3, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,41 @@
import java.util.Objects;

/**
* BlockID of Ozone (containerID + localID + blockCommitSequenceId).
* BlockID of Ozone (containerID + localID + blockCommitSequenceId + replicaIndex).
*/

public class BlockID {

private final ContainerBlockID containerBlockID;
private long blockCommitSequenceId;
// null value when not set with private constructor.(This is to avoid confusion of replica index 0 & null value).
// This value would be only set when deserializing from ContainerProtos.DatanodeBlockID or copying from another
// BlockID object.
private final Integer replicaIndex;

public BlockID(long containerID, long localID) {
this(containerID, localID, 0);
this(containerID, localID, 0, null);
}

private BlockID(long containerID, long localID, long bcsID) {
private BlockID(long containerID, long localID, long bcsID, Integer repIndex) {
containerBlockID = new ContainerBlockID(containerID, localID);
blockCommitSequenceId = bcsID;
this.replicaIndex = repIndex;
}

public BlockID(BlockID blockID) {
this(blockID.getContainerID(), blockID.getLocalID(), blockID.getBlockCommitSequenceId(),
blockID.getReplicaIndex());
}

public BlockID(ContainerBlockID containerBlockID) {
this(containerBlockID, 0);
this(containerBlockID, 0, null);
}

private BlockID(ContainerBlockID containerBlockID, long bcsId) {
private BlockID(ContainerBlockID containerBlockID, long bcsId, Integer repIndex) {
this.containerBlockID = containerBlockID;
blockCommitSequenceId = bcsId;
this.replicaIndex = repIndex;
}

public long getContainerID() {
Expand All @@ -65,6 +76,11 @@ public void setBlockCommitSequenceId(long blockCommitSequenceId) {
this.blockCommitSequenceId = blockCommitSequenceId;
}

// Can return a null value in case it is not set.
public Integer getReplicaIndex() {
return replicaIndex;
}

public ContainerBlockID getContainerBlockID() {
return containerBlockID;
}
Expand All @@ -79,21 +95,32 @@ public String toString() {
public void appendTo(StringBuilder sb) {
containerBlockID.appendTo(sb);
sb.append(" bcsId: ").append(blockCommitSequenceId);
sb.append(" replicaIndex: ").append(replicaIndex);
}

@JsonIgnore
public ContainerProtos.DatanodeBlockID getDatanodeBlockIDProtobuf() {
ContainerProtos.DatanodeBlockID.Builder blockID = getDatanodeBlockIDProtobufBuilder();
if (replicaIndex != null) {
blockID.setReplicaIndex(replicaIndex);
}
return blockID.build();
}

@JsonIgnore
public ContainerProtos.DatanodeBlockID.Builder getDatanodeBlockIDProtobufBuilder() {
return ContainerProtos.DatanodeBlockID.newBuilder().
setContainerID(containerBlockID.getContainerID())
.setLocalID(containerBlockID.getLocalID())
.setBlockCommitSequenceId(blockCommitSequenceId).build();
.setBlockCommitSequenceId(blockCommitSequenceId);
}

@JsonIgnore
public static BlockID getFromProtobuf(
ContainerProtos.DatanodeBlockID blockID) {
public static BlockID getFromProtobuf(ContainerProtos.DatanodeBlockID blockID) {
return new BlockID(blockID.getContainerID(),
blockID.getLocalID(), blockID.getBlockCommitSequenceId());
blockID.getLocalID(),
blockID.getBlockCommitSequenceId(),
blockID.hasReplicaIndex() ? blockID.getReplicaIndex() : null);
}

@JsonIgnore
Expand All @@ -107,7 +134,7 @@ public HddsProtos.BlockID getProtobuf() {
public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) {
return new BlockID(
ContainerBlockID.getFromProtobuf(blockID.getContainerBlockID()),
blockID.getBlockCommitSequenceId());
blockID.getBlockCommitSequenceId(), null);
}

@Override
Expand All @@ -119,14 +146,14 @@ public boolean equals(Object o) {
return false;
}
BlockID blockID = (BlockID) o;
return containerBlockID.equals(blockID.getContainerBlockID())
&& blockCommitSequenceId == blockID.getBlockCommitSequenceId();
return this.getContainerBlockID().equals(blockID.getContainerBlockID())
&& this.getBlockCommitSequenceId() == blockID.getBlockCommitSequenceId()
&& Objects.equals(this.getReplicaIndex(), blockID.getReplicaIndex());
}

@Override
public int hashCode() {
return Objects
.hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(),
blockCommitSequenceId);
return Objects.hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(),
blockCommitSequenceId, replicaIndex);
}
}
Loading