diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 90756bbc8898..3d9b5b77ee35 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -247,24 +248,15 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, int dataLocs = ECBlockInputStreamProxy .expectedDataLocations(repConfig, safeBlockGroupLength); List toReconstructIndexes = new ArrayList<>(); + List notReconstructIndexes = new ArrayList<>(); for (Integer index : missingContainerIndexes) { if (index <= dataLocs || index > repConfig.getData()) { toReconstructIndexes.add(index); + } else { + // Don't need to be reconstructed, but we do need a stream to write + // the block data to. + notReconstructIndexes.add(index); } - // else padded indexes. - } - - // Looks like we don't need to reconstruct any missing blocks in this block - // group. The reason for this should be block group had only padding blocks - // in the missing locations. - if (toReconstructIndexes.size() == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping the reconstruction for the block: " - + blockLocationInfo.getBlockID() + ". In the missing locations: " - + missingContainerIndexes - + ", this block group has only padded blocks."); - } - return; } try (ECBlockReconstructedStripeInputStream sis @@ -276,71 +268,78 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, ECBlockOutputStream[] targetBlockStreams = new ECBlockOutputStream[toReconstructIndexes.size()]; + ECBlockOutputStream[] emptyBlockStreams = + new ECBlockOutputStream[notReconstructIndexes.size()]; ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()]; try { + // Create streams and buffers for all indexes that need reconstructed for (int i = 0; i < toReconstructIndexes.size(); i++) { int replicaIndex = toReconstructIndexes.get(i); - DatanodeDetails datanodeDetails = - targetMap.get(replicaIndex); - targetBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, - datanodeDetails, repConfig, replicaIndex - ); + DatanodeDetails datanodeDetails = targetMap.get(replicaIndex); + targetBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, datanodeDetails, repConfig, replicaIndex); bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize()); - // Make sure it's clean. Don't want to reuse the erroneously returned - // buffers from the pool. bufs[i].clear(); } + // Then create a stream for all indexes that don't need reconstructed, but still need a stream to + // write the empty block data to. + for (int i = 0; i < notReconstructIndexes.size(); i++) { + int replicaIndex = notReconstructIndexes.get(i); + DatanodeDetails datanodeDetails = targetMap.get(replicaIndex); + emptyBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, datanodeDetails, repConfig, replicaIndex); + } - sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1)) - .collect(Collectors.toSet())); - long length = safeBlockGroupLength; - while (length > 0) { - int readLen; - try { - readLen = sis.recoverChunks(bufs); - Set failedIndexes = sis.getFailedIndexes(); - if (!failedIndexes.isEmpty()) { - // There was a problem reading some of the block indexes, but we - // did not get an exception as there must have been spare indexes - // to try and recover from. Therefore we should log out the block - // group details in the same way as for the exception case below. + if (toReconstructIndexes.size() > 0) { + sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1)) + .collect(Collectors.toSet())); + long length = safeBlockGroupLength; + while (length > 0) { + int readLen; + try { + readLen = sis.recoverChunks(bufs); + Set failedIndexes = sis.getFailedIndexes(); + if (!failedIndexes.isEmpty()) { + // There was a problem reading some of the block indexes, but we + // did not get an exception as there must have been spare indexes + // to try and recover from. Therefore we should log out the block + // group details in the same way as for the exception case below. + logBlockGroupDetails(blockLocationInfo, repConfig, + blockDataGroup); + } + } catch (IOException e) { + // When we see exceptions here, it could be due to some transient + // issue that causes the block read to fail when reconstructing it, + // but we have seen issues where the containers don't have the + // blocks they appear they should have, or the block chunks are the + // wrong length etc. In order to debug these sort of cases, if we + // get an error, we will log out the details about the block group + // length on each source, along with their chunk list and chunk + // lengths etc. logBlockGroupDetails(blockLocationInfo, repConfig, blockDataGroup); + throw e; } - } catch (IOException e) { - // When we see exceptions here, it could be due to some transient - // issue that causes the block read to fail when reconstructing it, - // but we have seen issues where the containers don't have the - // blocks they appear they should have, or the block chunks are the - // wrong length etc. In order to debug these sort of cases, if we - // get an error, we will log out the details about the block group - // length on each source, along with their chunk list and chunk - // lengths etc. - logBlockGroupDetails(blockLocationInfo, repConfig, - blockDataGroup); - throw e; - } - // TODO: can be submitted in parallel - for (int i = 0; i < bufs.length; i++) { - CompletableFuture - future = targetBlockStreams[i].write(bufs[i]); - checkFailures(targetBlockStreams[i], future); - bufs[i].clear(); + // TODO: can be submitted in parallel + for (int i = 0; i < bufs.length; i++) { + CompletableFuture + future = targetBlockStreams[i].write(bufs[i]); + checkFailures(targetBlockStreams[i], future); + bufs[i].clear(); + } + length -= readLen; } - length -= readLen; } - - for (ECBlockOutputStream targetStream : targetBlockStreams) { - targetStream.executePutBlock(true, true, - blockLocationInfo.getLength(), blockDataGroup); - checkFailures(targetStream, - targetStream.getCurrentPutBlkResponseFuture()); + List allStreams = new ArrayList<>(Arrays.asList(targetBlockStreams)); + allStreams.addAll(Arrays.asList(emptyBlockStreams)); + for (ECBlockOutputStream targetStream : allStreams) { + targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup); + checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture()); } } finally { for (ByteBuffer buf : bufs) { byteBufferPool.putBuffer(buf); } IOUtils.cleanupWithLogger(LOG, targetBlockStreams); + IOUtils.cleanupWithLogger(LOG, emptyBlockStreams); } } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index f2fe3fa31a1c..be0f05844182 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -114,6 +114,7 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE; import static org.apache.hadoop.ozone.container.ContainerTestHelper.newWriteChunkRequestBuilder; +import static org.junit.jupiter.api.Assertions.assertEquals; /** * This class tests container commands on EC containers. @@ -614,8 +615,15 @@ void testECReconstructionCoordinatorWith(List missingIndexes) testECReconstructionCoordinator(missingIndexes, 3); } + @ParameterizedTest + @MethodSource("recoverableMissingIndexes") + void testECReconstructionCoordinatorWithPartialStripe(List missingIndexes) + throws Exception { + testECReconstructionCoordinator(missingIndexes, 1); + } + @Test - void testECReconstructionWithPartialStripe() + void testECReconstructParityWithPartialStripe() throws Exception { testECReconstructionCoordinator(ImmutableList.of(4, 5), 1); } @@ -895,18 +903,19 @@ private void checkBlockData( reconstructedBlockData) { for (int i = 0; i < blockData.length; i++) { + assertEquals(blockData[i].getBlockID(), reconstructedBlockData[i].getBlockID()); + assertEquals(blockData[i].getSize(), reconstructedBlockData[i].getSize()); + assertEquals(blockData[i].getMetadata(), reconstructedBlockData[i].getMetadata()); List oldBlockDataChunks = blockData[i].getChunks(); List newBlockDataChunks = reconstructedBlockData[i].getChunks(); for (int j = 0; j < oldBlockDataChunks.size(); j++) { ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j); - if (chunkInfo.getLen() == 0) { - // let's ignore the empty chunks - continue; - } - Assert.assertEquals(chunkInfo, newBlockDataChunks.get(j)); + assertEquals(chunkInfo, newBlockDataChunks.get(j)); } + // Ensure there are no extra chunks in the reconstructed block + assertEquals(oldBlockDataChunks.size(), newBlockDataChunks.size()); } }