diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 39a69b35d5d..38172e99a2a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -450,7 +450,7 @@ private XceiverClientReply sendCommandWithRetry( processForDebug(request), pipeline); } else { LOG.error(message + " on the pipeline {}.", - request.getCmdType(), pipeline); + request.getCmdType(), pipeline, new RuntimeException()); } throw ioException; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index f8ea7b81f82..8b5f7934e66 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -439,8 +439,8 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) throws IOException { ReadChunkResponseProto readChunkResponse = - ContainerProtocolCalls.readChunk(xceiverClient, - readChunkInfo, datanodeBlockID, validators, tokenSupplier.get()); + ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo, datanodeBlockID, validators, + tokenSupplier.get()); if (readChunkResponse.hasData()) { return readChunkResponse.getData().asReadOnlyByteBufferList() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 2026e78e137..092c050740a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -266,11 +266,8 @@ private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, * @return container protocol getLastCommittedBlockLength response * @throws IOException if there is an I/O error while performing the call */ - public static ContainerProtos.GetCommittedBlockLengthResponseProto - getCommittedBlockLength( - XceiverClientSpi xceiverClient, BlockID blockID, - Token token) - throws IOException { + public static ContainerProtos.GetCommittedBlockLengthResponseProto getCommittedBlockLength( + XceiverClientSpi xceiverClient, BlockID blockID, Token token) throws IOException { ContainerProtos.GetCommittedBlockLengthRequestProto.Builder getBlockLengthRequestBuilder = ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder(). @@ -345,10 +342,8 @@ public static ContainerCommandRequestProto getPutBlockRequest( * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ - public static ContainerProtos.ReadChunkResponseProto readChunk( - XceiverClientSpi xceiverClient, ChunkInfo chunk, - DatanodeBlockID blockID, List validators, - Token token) throws IOException { + public static ContainerProtos.ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, + DatanodeBlockID blockID, List validators, Token token) throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto.newBuilder() .setBlockID(blockID) diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 0ab75818dab..ee084a15e94 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -46,6 +46,7 @@ import org.apache.hadoop.security.token.Token; import com.google.common.base.Preconditions; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -511,7 +512,13 @@ public static ContainerCommandRequestProto getDeleteContainer( } public static BlockID getTestBlockID(long containerID) { - return new BlockID(containerID, UniqueId.next()); + return getTestBlockID(containerID, null); + } + + public static BlockID getTestBlockID(long containerID, Integer replicaIndex) { + BlockID blockID = Mockito.spy(new BlockID(containerID, UniqueId.next())); + Mockito.when(blockID.getReplicaIndex()).thenReturn(replicaIndex); + return blockID; } public static long getTestContainerID() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java index acfb3489a72..7773b54f794 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java @@ -247,8 +247,8 @@ public static void verifyBCSId(Container container, BlockID blockID) */ public static void verifyReplicaIdx(Container container, BlockID blockID) throws IOException { - int containerReplicaIndex = container.getContainerData().getReplicaIndex(); - if (containerReplicaIndex > 0 && containerReplicaIndex != blockID.getReplicaIndex()) { + Integer containerReplicaIndex = container.getContainerData().getReplicaIndex(); + if (containerReplicaIndex > 0 && !containerReplicaIndex.equals(blockID.getReplicaIndex())) { throw new StorageContainerException( "Unable to find the Container with replicaIdx " + blockID.getReplicaIndex() + ". Container " + container.getContainerData().getContainerID() + " replicaIdx is " diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java index 3da23688418..e178e8061bc 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java @@ -189,10 +189,13 @@ private List getChunkInfos(OmKeyLocationInfo xceiverClientSpi = getXceiverClientFactory().acquireClientForReadData(pipeline); - ContainerProtos.DatanodeBlockID datanodeBlockID = blockID - .getDatanodeBlockIDProtobuf(); + ContainerProtos.DatanodeBlockID.Builder datanodeBlockID = blockID.getDatanodeBlockIDProtobufBuilder(); + int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); + if (replicaIndex > 0) { + datanodeBlockID.setReplicaIndex(replicaIndex); + } ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClientSpi, datanodeBlockID, token); + .getBlock(xceiverClientSpi, datanodeBlockID.build(), token); chunks = response.getBlockData().getChunksList(); } finally { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java index 09f9c7d037e..177ed669971 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java @@ -149,13 +149,15 @@ protected List getChunkInfos( LOG.debug("Initializing BlockInputStream for get key to access {}", blockID.getContainerID()); } - xceiverClientSpi = - getXceiverClientFactory().acquireClientForReadData(pipeline); + xceiverClientSpi = getXceiverClientFactory().acquireClientForReadData(pipeline); - ContainerProtos.DatanodeBlockID datanodeBlockID = blockID - .getDatanodeBlockIDProtobuf(); + ContainerProtos.DatanodeBlockID.Builder datanodeBlockID = blockID.getDatanodeBlockIDProtobufBuilder(); + int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); + if (replicaIndex > 0) { + datanodeBlockID.setReplicaIndex(replicaIndex); + } ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClientSpi, datanodeBlockID, token); + .getBlock(xceiverClientSpi, datanodeBlockID.build(), token); chunks = response.getBlockData().getChunksList(); } finally { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index c8ffe5d011a..8842cf34aa1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -452,10 +452,10 @@ public void testCreateRecoveryContainer() throws Exception { .generateToken(ANY_USER, container.containerID()); scm.getContainerManager().getContainerStateManager() .addContainer(container.getProtobuf()); - + int replicaIndex = 4; XceiverClientSpi dnClient = xceiverClientManager.acquireClient( createSingleNodePipeline(newPipeline, newPipeline.getNodes().get(0), - 2)); + replicaIndex)); try { // To create the actual situation, container would have been in closed // state at SCM. @@ -470,7 +470,7 @@ public void testCreateRecoveryContainer() throws Exception { String encodedToken = cToken.encodeToUrlString(); ContainerProtocolCalls.createRecoveringContainer(dnClient, container.containerID().getProtobuf().getId(), - encodedToken, 4); + encodedToken, replicaIndex); BlockID blockID = ContainerTestHelper .getTestBlockID(container.containerID().getProtobuf().getId()); @@ -511,7 +511,8 @@ public void testCreateRecoveryContainer() throws Exception { readContainerResponseProto.getContainerData().getState()); ContainerProtos.ReadChunkResponseProto readChunkResponseProto = ContainerProtocolCalls.readChunk(dnClient, - writeChunkRequest.getWriteChunk().getChunkData(), blockID.getDatanodeBlockIDProtobuf(), null, + writeChunkRequest.getWriteChunk().getChunkData(), + blockID.getDatanodeBlockIDProtobufBuilder().setReplicaIndex(replicaIndex).build(), null, blockToken); ByteBuffer[] readOnlyByteBuffersArray = BufferUtils .getReadOnlyByteBuffersArray(