Skip to content

Commit

Permalink
HDDS-11439. De-duplicate code for ReplicatedFileChecksumHelper and EC…
Browse files Browse the repository at this point in the history
…FileChecksumHelper (apache#7264)
  • Loading branch information
harryteng9527 authored Oct 11, 2024
1 parent 05a409e commit 523c860
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
Expand All @@ -38,6 +40,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

/**
Expand Down Expand Up @@ -150,6 +153,90 @@ protected void setChecksumType(ContainerProtos.ChecksumType type) {
checksumType = type;
}

protected abstract AbstractBlockChecksumComputer getBlockChecksumComputer(List<ContainerProtos.ChunkInfo> chunkInfos,
long blockLength);

protected abstract String populateBlockChecksumBuf(ByteBuffer blockChecksumByteBuffer) throws IOException;

protected abstract List<ContainerProtos.ChunkInfo> getChunkInfos(
OmKeyLocationInfo keyLocationInfo) throws IOException;

protected ByteBuffer getBlockChecksumFromChunkChecksums(AbstractBlockChecksumComputer blockChecksumComputer)
throws IOException {
blockChecksumComputer.compute(getCombineMode());
return blockChecksumComputer.getOutByteBuffer();
}

/**
* Compute block checksums block by block and append the raw bytes of the
* block checksums into getBlockChecksumBuf().
*
* @throws IOException
*/
protected void checksumBlocks() throws IOException {
long currentLength = 0;
for (int blockIdx = 0;
blockIdx < getKeyLocationInfoList().size() && getRemaining() >= 0;
blockIdx++) {
OmKeyLocationInfo keyLocationInfo =
getKeyLocationInfoList().get(blockIdx);
if (currentLength > getLength()) {
return;
}

if (!checksumBlock(keyLocationInfo)) {
throw new PathIOException(getSrc(),
"Fail to get block checksum for " + keyLocationInfo
+ ", checksum combine mode: " + getCombineMode());
}

currentLength += keyLocationInfo.getLength();
}
}

/**
* Return true when sounds good to continue or retry, false when severe
* condition or totally failed.
*/
protected boolean checksumBlock(OmKeyLocationInfo keyLocationInfo)
throws IOException {
// for each block, send request
List<ContainerProtos.ChunkInfo> chunkInfos =
getChunkInfos(keyLocationInfo);
if (chunkInfos.isEmpty()) {
return false;
}

long blockNumBytes = keyLocationInfo.getLength();

if (getRemaining() < blockNumBytes) {
blockNumBytes = getRemaining();
}
setRemaining(getRemaining() - blockNumBytes);

ContainerProtos.ChecksumData checksumData =
chunkInfos.get(0).getChecksumData();
setChecksumType(checksumData.getType());
int bytesPerChecksum = checksumData.getBytesPerChecksum();
setBytesPerCRC(bytesPerChecksum);

AbstractBlockChecksumComputer blockChecksumComputer = getBlockChecksumComputer(chunkInfos,
keyLocationInfo.getLength());
ByteBuffer blockChecksumByteBuffer =
getBlockChecksumFromChunkChecksums(blockChecksumComputer);
String blockChecksumForDebug =
populateBlockChecksumBuf(blockChecksumByteBuffer);

LOG.debug("Got reply from {} {} for block {}: blockChecksum={}, " +
"blockChecksumType={}",
keyInfo.getReplicationConfig().getReplicationType() == HddsProtos.ReplicationType.EC
? "EC pipeline" : "pipeline",
keyLocationInfo.getPipeline(), keyLocationInfo.getBlockID(),
blockChecksumForDebug, checksumData.getType());

return true;
}

/**
* Request the blocks created in the most recent version from Ozone Manager.
*
Expand Down Expand Up @@ -219,14 +306,6 @@ public void compute() throws IOException {
}
}

/**
* Compute block checksums block by block and append the raw bytes of the
* block checksums into getBlockChecksumBuf().
*
* @throws IOException
*/
protected abstract void checksumBlocks() throws IOException;

/**
* Make final file checksum result given the per-block or per-block-group
* checksums collected into getBlockChecksumBuf().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone.client.checksum;

import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
Expand Down Expand Up @@ -46,7 +45,6 @@
* The helper class to compute file checksum for EC files.
*/
public class ECFileChecksumHelper extends BaseFileChecksumHelper {
private int blockIdx;

public ECFileChecksumHelper(OzoneVolume volume, OzoneBucket bucket,
String keyName, long length, OzoneClientConfig.ChecksumCombineMode
Expand All @@ -57,63 +55,13 @@ public ECFileChecksumHelper(OzoneVolume volume, OzoneBucket bucket,
}

@Override
protected void checksumBlocks() throws IOException {
long currentLength = 0;
for (blockIdx = 0;
blockIdx < getKeyLocationInfoList().size() && getRemaining() >= 0;
blockIdx++) {
OmKeyLocationInfo keyLocationInfo =
getKeyLocationInfoList().get(blockIdx);

if (currentLength > getLength()) {
return;
}

if (!checksumBlock(keyLocationInfo)) {
throw new PathIOException(getSrc(),
"Fail to get block checksum for " + keyLocationInfo
+ ", checksum combine mode: " + getCombineMode());
}

currentLength += keyLocationInfo.getLength();
}
}

private boolean checksumBlock(OmKeyLocationInfo keyLocationInfo)
throws IOException {
// for each block, send request
List<ContainerProtos.ChunkInfo> chunkInfos =
getChunkInfos(keyLocationInfo);
if (chunkInfos.size() == 0) {
return false;
}

long blockNumBytes = keyLocationInfo.getLength();

if (getRemaining() < blockNumBytes) {
blockNumBytes = getRemaining();
}
setRemaining(getRemaining() - blockNumBytes);

ContainerProtos.ChecksumData checksumData =
chunkInfos.get(0).getChecksumData();
setChecksumType(checksumData.getType());
int bytesPerChecksum = checksumData.getBytesPerChecksum();
setBytesPerCRC(bytesPerChecksum);

ByteBuffer blockChecksumByteBuffer =
getBlockChecksumFromChunkChecksums(chunkInfos, keyLocationInfo.getLength());
String blockChecksumForDebug =
populateBlockChecksumBuf(blockChecksumByteBuffer);

LOG.debug("Got reply from EC pipeline {} for block {}: blockChecksum={}, " +
"blockChecksumType={}",
keyLocationInfo.getPipeline(), keyLocationInfo.getBlockID(),
blockChecksumForDebug, checksumData.getType());
return true;
protected AbstractBlockChecksumComputer getBlockChecksumComputer(List<ContainerProtos.ChunkInfo> chunkInfos,
long blockLength) {
return new ECBlockChecksumComputer(chunkInfos, getKeyInfo(), blockLength);
}

private String populateBlockChecksumBuf(
@Override
protected String populateBlockChecksumBuf(
ByteBuffer blockChecksumByteBuffer) throws IOException {
String blockChecksumForDebug = null;
switch (getCombineMode()) {
Expand All @@ -139,19 +87,9 @@ private String populateBlockChecksumBuf(
return blockChecksumForDebug;
}

private ByteBuffer getBlockChecksumFromChunkChecksums(
List<ContainerProtos.ChunkInfo> chunkInfos,
long blockLength) throws IOException {

AbstractBlockChecksumComputer blockChecksumComputer =
new ECBlockChecksumComputer(chunkInfos, getKeyInfo(), blockLength);
blockChecksumComputer.compute(getCombineMode());

return blockChecksumComputer.getOutByteBuffer();
}

private List<ContainerProtos.ChunkInfo> getChunkInfos(OmKeyLocationInfo
keyLocationInfo) throws IOException {
@Override
protected List<ContainerProtos.ChunkInfo> getChunkInfos(OmKeyLocationInfo
keyLocationInfo) throws IOException {
// To read an EC block, we create a STANDALONE pipeline that contains the
// single location for the block index we want to read. The EC blocks are
// indexed from 1 to N, however the data locations are stored in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone.client.checksum;

import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
Expand All @@ -44,7 +43,6 @@
* The helper class to compute file checksum for replicated files.
*/
public class ReplicatedFileChecksumHelper extends BaseFileChecksumHelper {
private int blockIdx;

public ReplicatedFileChecksumHelper(
OzoneVolume volume, OzoneBucket bucket, String keyName, long length,
Expand All @@ -61,72 +59,18 @@ public ReplicatedFileChecksumHelper(OzoneVolume volume, OzoneBucket bucket,
keyInfo);
}


@Override
protected void checksumBlocks() throws IOException {
long currentLength = 0;
for (blockIdx = 0;
blockIdx < getKeyLocationInfoList().size() && getRemaining() >= 0;
blockIdx++) {
OmKeyLocationInfo keyLocationInfo =
getKeyLocationInfoList().get(blockIdx);
if (currentLength > getLength()) {
return;
}

if (!checksumBlock(keyLocationInfo)) {
throw new PathIOException(getSrc(),
"Fail to get block checksum for " + keyLocationInfo
+ ", checksum combine mode: " + getCombineMode());
}

currentLength += keyLocationInfo.getLength();
}
}

/**
* Return true when sounds good to continue or retry, false when severe
* condition or totally failed.
*/
private boolean checksumBlock(OmKeyLocationInfo keyLocationInfo)
throws IOException {
// for each block, send request
List<ContainerProtos.ChunkInfo> chunkInfos =
getChunkInfos(keyLocationInfo);
if (chunkInfos.size() == 0) {
return false;
}

long blockNumBytes = keyLocationInfo.getLength();

if (getRemaining() < blockNumBytes) {
blockNumBytes = getRemaining();
}
setRemaining(getRemaining() - blockNumBytes);

ContainerProtos.ChecksumData checksumData =
chunkInfos.get(0).getChecksumData();
setChecksumType(checksumData.getType());
int bytesPerChecksum = checksumData.getBytesPerChecksum();
setBytesPerCRC(bytesPerChecksum);

ByteBuffer blockChecksumByteBuffer = getBlockChecksumFromChunkChecksums(
keyLocationInfo, chunkInfos);
String blockChecksumForDebug =
populateBlockChecksumBuf(blockChecksumByteBuffer);

LOG.debug("got reply from pipeline {} for block {}: blockChecksum={}, " +
"blockChecksumType={}",
keyLocationInfo.getPipeline(), keyLocationInfo.getBlockID(),
blockChecksumForDebug, checksumData.getType());
return true;
protected AbstractBlockChecksumComputer getBlockChecksumComputer(List<ContainerProtos.ChunkInfo> chunkInfos,
long blockLength) {
return new ReplicatedBlockChecksumComputer(chunkInfos);
}

// copied from BlockInputStream
/**
* Send RPC call to get the block info from the container.
* @return List of chunks in this block.
*/
@Override
protected List<ContainerProtos.ChunkInfo> getChunkInfos(
OmKeyLocationInfo keyLocationInfo) throws IOException {
// irrespective of the container state, we will always read via Standalone
Expand Down Expand Up @@ -164,18 +108,6 @@ protected List<ContainerProtos.ChunkInfo> getChunkInfos(
return chunks;
}

// TODO: copy BlockChecksumHelper here
ByteBuffer getBlockChecksumFromChunkChecksums(
OmKeyLocationInfo keyLocationInfo,
List<ContainerProtos.ChunkInfo> chunkInfoList)
throws IOException {
AbstractBlockChecksumComputer blockChecksumComputer =
new ReplicatedBlockChecksumComputer(chunkInfoList);
blockChecksumComputer.compute(getCombineMode());

return blockChecksumComputer.getOutByteBuffer();
}

/**
* Parses out the raw blockChecksum bytes from {@code checksumData} byte
* buffer according to the blockChecksumType and populates the cumulative
Expand All @@ -184,7 +116,8 @@ ByteBuffer getBlockChecksumFromChunkChecksums(
* @return a debug-string representation of the parsed checksum if
* debug is enabled, otherwise null.
*/
String populateBlockChecksumBuf(ByteBuffer checksumData)
@Override
protected String populateBlockChecksumBuf(ByteBuffer checksumData)
throws IOException {
String blockChecksumForDebug = null;
switch (getCombineMode()) {
Expand Down
Loading

0 comments on commit 523c860

Please sign in to comment.