Skip to content

Commit

Permalink
HDDS-10681. EC Reconstruction does not issue put block to data index …
Browse files Browse the repository at this point in the history
…if it is unused (apache#6514)

(cherry picked from commit cba8c85)
  • Loading branch information
sodonnel authored and xichen01 committed Apr 17, 2024
1 parent 35d19a7 commit a2ad621
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -247,24 +248,15 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
int dataLocs = ECBlockInputStreamProxy
.expectedDataLocations(repConfig, safeBlockGroupLength);
List<Integer> toReconstructIndexes = new ArrayList<>();
List<Integer> notReconstructIndexes = new ArrayList<>();
for (Integer index : missingContainerIndexes) {
if (index <= dataLocs || index > repConfig.getData()) {
toReconstructIndexes.add(index);
} else {
// Don't need to be reconstructed, but we do need a stream to write
// the block data to.
notReconstructIndexes.add(index);
}
// else padded indexes.
}

// Looks like we don't need to reconstruct any missing blocks in this block
// group. The reason for this should be block group had only padding blocks
// in the missing locations.
if (toReconstructIndexes.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping the reconstruction for the block: "
+ blockLocationInfo.getBlockID() + ". In the missing locations: "
+ missingContainerIndexes
+ ", this block group has only padded blocks.");
}
return;
}

try (ECBlockReconstructedStripeInputStream sis
Expand All @@ -276,71 +268,78 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,

ECBlockOutputStream[] targetBlockStreams =
new ECBlockOutputStream[toReconstructIndexes.size()];
ECBlockOutputStream[] emptyBlockStreams =
new ECBlockOutputStream[notReconstructIndexes.size()];
ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()];
try {
// Create streams and buffers for all indexes that need reconstructed
for (int i = 0; i < toReconstructIndexes.size(); i++) {
int replicaIndex = toReconstructIndexes.get(i);
DatanodeDetails datanodeDetails =
targetMap.get(replicaIndex);
targetBlockStreams[i] = getECBlockOutputStream(blockLocationInfo,
datanodeDetails, repConfig, replicaIndex
);
DatanodeDetails datanodeDetails = targetMap.get(replicaIndex);
targetBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, datanodeDetails, repConfig, replicaIndex);
bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
// Make sure it's clean. Don't want to reuse the erroneously returned
// buffers from the pool.
bufs[i].clear();
}
// Then create a stream for all indexes that don't need reconstructed, but still need a stream to
// write the empty block data to.
for (int i = 0; i < notReconstructIndexes.size(); i++) {
int replicaIndex = notReconstructIndexes.get(i);
DatanodeDetails datanodeDetails = targetMap.get(replicaIndex);
emptyBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, datanodeDetails, repConfig, replicaIndex);
}

sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
.collect(Collectors.toSet()));
long length = safeBlockGroupLength;
while (length > 0) {
int readLen;
try {
readLen = sis.recoverChunks(bufs);
Set<Integer> failedIndexes = sis.getFailedIndexes();
if (!failedIndexes.isEmpty()) {
// There was a problem reading some of the block indexes, but we
// did not get an exception as there must have been spare indexes
// to try and recover from. Therefore we should log out the block
// group details in the same way as for the exception case below.
if (toReconstructIndexes.size() > 0) {
sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
.collect(Collectors.toSet()));
long length = safeBlockGroupLength;
while (length > 0) {
int readLen;
try {
readLen = sis.recoverChunks(bufs);
Set<Integer> failedIndexes = sis.getFailedIndexes();
if (!failedIndexes.isEmpty()) {
// There was a problem reading some of the block indexes, but we
// did not get an exception as there must have been spare indexes
// to try and recover from. Therefore we should log out the block
// group details in the same way as for the exception case below.
logBlockGroupDetails(blockLocationInfo, repConfig,
blockDataGroup);
}
} catch (IOException e) {
// When we see exceptions here, it could be due to some transient
// issue that causes the block read to fail when reconstructing it,
// but we have seen issues where the containers don't have the
// blocks they appear they should have, or the block chunks are the
// wrong length etc. In order to debug these sort of cases, if we
// get an error, we will log out the details about the block group
// length on each source, along with their chunk list and chunk
// lengths etc.
logBlockGroupDetails(blockLocationInfo, repConfig,
blockDataGroup);
throw e;
}
} catch (IOException e) {
// When we see exceptions here, it could be due to some transient
// issue that causes the block read to fail when reconstructing it,
// but we have seen issues where the containers don't have the
// blocks they appear they should have, or the block chunks are the
// wrong length etc. In order to debug these sort of cases, if we
// get an error, we will log out the details about the block group
// length on each source, along with their chunk list and chunk
// lengths etc.
logBlockGroupDetails(blockLocationInfo, repConfig,
blockDataGroup);
throw e;
}
// TODO: can be submitted in parallel
for (int i = 0; i < bufs.length; i++) {
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
future = targetBlockStreams[i].write(bufs[i]);
checkFailures(targetBlockStreams[i], future);
bufs[i].clear();
// TODO: can be submitted in parallel
for (int i = 0; i < bufs.length; i++) {
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
future = targetBlockStreams[i].write(bufs[i]);
checkFailures(targetBlockStreams[i], future);
bufs[i].clear();
}
length -= readLen;
}
length -= readLen;
}

for (ECBlockOutputStream targetStream : targetBlockStreams) {
targetStream.executePutBlock(true, true,
blockLocationInfo.getLength(), blockDataGroup);
checkFailures(targetStream,
targetStream.getCurrentPutBlkResponseFuture());
List<ECBlockOutputStream> allStreams = new ArrayList<>(Arrays.asList(targetBlockStreams));
allStreams.addAll(Arrays.asList(emptyBlockStreams));
for (ECBlockOutputStream targetStream : allStreams) {
targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup);
checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture());
}
} finally {
for (ByteBuffer buf : bufs) {
byteBufferPool.putBuffer(buf);
}
IOUtils.cleanupWithLogger(LOG, targetBlockStreams);
IOUtils.cleanupWithLogger(LOG, emptyBlockStreams);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.newWriteChunkRequestBuilder;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* This class tests container commands on EC containers.
Expand Down Expand Up @@ -614,8 +615,15 @@ void testECReconstructionCoordinatorWith(List<Integer> missingIndexes)
testECReconstructionCoordinator(missingIndexes, 3);
}

@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
void testECReconstructionCoordinatorWithPartialStripe(List<Integer> missingIndexes)
throws Exception {
testECReconstructionCoordinator(missingIndexes, 1);
}

@Test
void testECReconstructionWithPartialStripe()
void testECReconstructParityWithPartialStripe()
throws Exception {
testECReconstructionCoordinator(ImmutableList.of(4, 5), 1);
}
Expand Down Expand Up @@ -895,18 +903,19 @@ private void checkBlockData(
reconstructedBlockData) {

for (int i = 0; i < blockData.length; i++) {
assertEquals(blockData[i].getBlockID(), reconstructedBlockData[i].getBlockID());
assertEquals(blockData[i].getSize(), reconstructedBlockData[i].getSize());
assertEquals(blockData[i].getMetadata(), reconstructedBlockData[i].getMetadata());
List<ContainerProtos.ChunkInfo> oldBlockDataChunks =
blockData[i].getChunks();
List<ContainerProtos.ChunkInfo> newBlockDataChunks =
reconstructedBlockData[i].getChunks();
for (int j = 0; j < oldBlockDataChunks.size(); j++) {
ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j);
if (chunkInfo.getLen() == 0) {
// let's ignore the empty chunks
continue;
}
Assert.assertEquals(chunkInfo, newBlockDataChunks.get(j));
assertEquals(chunkInfo, newBlockDataChunks.get(j));
}
// Ensure there are no extra chunks in the reconstructed block
assertEquals(oldBlockDataChunks.size(), newBlockDataChunks.size());
}
}

Expand Down

0 comments on commit a2ad621

Please sign in to comment.