Skip to content

Commit

Permalink
HDDS-5865. Make read retry interval and attempts in BlockInputStream …
Browse files Browse the repository at this point in the history
…configurable (apache#6408)

(cherry picked from commit c4dc6a0)
  • Loading branch information
SaketaChalamchala authored and xichen01 committed Jul 18, 2024
1 parent 305c8d0 commit 9ec078f
Show file tree
Hide file tree
Showing 24 changed files with 304 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private int retryInterval = 0;

@Config(key = "read.max.retries",
defaultValue = "3",
description = "Maximum number of retries by Ozone Client on "
+ "encountering connectivity exception when reading a key.",
tags = ConfigTag.CLIENT)
private int maxReadRetryCount = 3;

@Config(key = "read.retry.interval",
defaultValue = "1",
description =
"Indicates the time duration in seconds a client will wait "
+ "before retrying a read key request on encountering "
+ "a connectivity excepetion from Datanodes . "
+ "By default the interval is 1 second",
tags = ConfigTag.CLIENT)
private int readRetryInterval = 1;

@Config(key = "checksum.type",
defaultValue = "CRC32",
description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] "
Expand Down Expand Up @@ -326,6 +343,22 @@ public void setRetryInterval(int retryInterval) {
this.retryInterval = retryInterval;
}

public int getMaxReadRetryCount() {
return maxReadRetryCount;
}

public void setMaxReadRetryCount(int maxReadRetryCount) {
this.maxReadRetryCount = maxReadRetryCount;
}

public int getReadRetryInterval() {
return readRetryInterval;
}

public void setReadRetryInterval(int readRetryInterval) {
this.readRetryInterval = readRetryInterval;
}

public ChecksumType getChecksumType() {
return ChecksumType.valueOf(checksumType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
Expand Down Expand Up @@ -76,8 +77,8 @@ public class BlockInputStream extends BlockExtendedInputStream {
private XceiverClientSpi xceiverClient;
private boolean initialized = false;
// TODO: do we need to change retrypolicy based on exception.
private final RetryPolicy retryPolicy =
HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1));
private final RetryPolicy retryPolicy;

private int retries;

// List of ChunkInputStreams, one for each chunk in the block
Expand Down Expand Up @@ -112,25 +113,29 @@ public class BlockInputStream extends BlockExtendedInputStream {
private final Function<BlockID, BlockLocationInfo> refreshFunction;

public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction) {
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
this.blockID = blockId;
this.length = blockLen;
setPipeline(pipeline);
tokenRef.set(token);
this.verifyChecksum = verifyChecksum;
this.verifyChecksum = config.isChecksumVerify();
this.xceiverClientFactory = xceiverClientFactory;
this.refreshFunction = refreshFunction;
this.retryPolicy =
HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
}

public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory
XceiverClientFactory xceiverClientFactory,
OzoneClientConfig config
) {
this(blockId, blockLen, pipeline, token, verifyChecksum,
xceiverClientFactory, null);
this(blockId, blockLen, pipeline, token,
xceiverClientFactory, null, config);
}
/**
* Initialize the BlockInputStream. Get the BlockData (list of chunks) from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
Expand Down Expand Up @@ -48,8 +49,9 @@ public interface BlockInputStreamFactory {
*/
BlockExtendedInputStream create(ReplicationConfig repConfig,
BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction);
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
Expand Down Expand Up @@ -76,16 +77,18 @@ public BlockInputStreamFactoryImpl(ByteBufferPool byteBufferPool,
*/
public BlockExtendedInputStream create(ReplicationConfig repConfig,
BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction) {
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
blockInfo, verifyChecksum, xceiverFactory, refreshFunction,
ecBlockStreamFactory);
blockInfo, xceiverFactory, refreshFunction,
ecBlockStreamFactory, config);
} else {
return new BlockInputStream(blockInfo.getBlockID(), blockInfo.getLength(),
pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
pipeline, token, xceiverFactory, refreshFunction,
config);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
Expand Down Expand Up @@ -60,7 +61,6 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
private final int ecChunkSize;
private final long stripeSize;
private final BlockInputStreamFactory streamFactory;
private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final BlockLocationInfo blockInfo;
Expand All @@ -75,7 +75,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
private long position = 0;
private boolean closed = false;
private boolean seeked = false;

private OzoneClientConfig config;
protected ECReplicationConfig getRepConfig() {
return repConfig;
}
Expand Down Expand Up @@ -119,13 +119,13 @@ protected int availableParityLocations() {
}

public ECBlockInputStream(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
BlockLocationInfo blockInfo,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
BlockInputStreamFactory streamFactory) {
BlockInputStreamFactory streamFactory,
OzoneClientConfig config) {
this.repConfig = repConfig;
this.ecChunkSize = repConfig.getEcChunkSize();
this.verifyChecksum = verifyChecksum;
this.blockInfo = blockInfo;
this.streamFactory = streamFactory;
this.xceiverClientFactory = xceiverClientFactory;
Expand All @@ -134,6 +134,7 @@ public ECBlockInputStream(ECReplicationConfig repConfig,
this.dataLocations = new DatanodeDetails[repConfig.getRequiredNodes()];
this.blockStreams =
new BlockExtendedInputStream[repConfig.getRequiredNodes()];
this.config = config;

this.stripeSize = (long)ecChunkSize * repConfig.getData();
setBlockLocations(this.blockInfo.getPipeline());
Expand Down Expand Up @@ -202,8 +203,9 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE),
blkInfo, pipeline,
blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
ecPipelineRefreshFunction(locationIndex + 1, refreshFunction));
blockInfo.getToken(), xceiverClientFactory,
ecPipelineRefreshFunction(locationIndex + 1, refreshFunction),
config);
blockStreams[locationIndex] = stream;
LOG.debug("{}: created stream [{}]: {}", this, locationIndex, stream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
Expand Down Expand Up @@ -51,7 +52,8 @@ public interface ECBlockInputStreamFactory {
*/
BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
BlockLocationInfo blockInfo,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction);
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
Expand Down Expand Up @@ -74,16 +75,17 @@ private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory,
*/
public BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
BlockLocationInfo blockInfo,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction) {
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
if (missingLocations) {
// We create the reconstruction reader
ECBlockReconstructedStripeInputStream sis =
new ECBlockReconstructedStripeInputStream(
(ECReplicationConfig)repConfig, blockInfo, verifyChecksum,
(ECReplicationConfig)repConfig, blockInfo,
xceiverFactory, refreshFunction, inputStreamFactory,
byteBufferPool, ecReconstructExecutorSupplier.get());
byteBufferPool, ecReconstructExecutorSupplier.get(), config);
if (failedLocations != null) {
sis.addFailedDatanodes(failedLocations);
}
Expand All @@ -92,7 +94,8 @@ public BlockExtendedInputStream create(boolean missingLocations,
} else {
// Otherwise create the more efficient non-reconstruction reader
return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
verifyChecksum, xceiverFactory, refreshFunction, inputStreamFactory);
xceiverFactory, refreshFunction, inputStreamFactory,
config);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -49,7 +50,6 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
LoggerFactory.getLogger(ECBlockInputStreamProxy.class);

private final ECReplicationConfig repConfig;
private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final BlockLocationInfo blockInfo;
Expand All @@ -59,6 +59,7 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
private boolean reconstructionReader = false;
private List<DatanodeDetails> failedLocations = new ArrayList<>();
private boolean closed = false;
private OzoneClientConfig config;

/**
* Given the ECReplicationConfig and the block length, calculate how many
Expand Down Expand Up @@ -97,16 +98,17 @@ public static int availableDataLocations(Pipeline pipeline,
}

public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
BlockLocationInfo blockInfo,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
BlockLocationInfo> refreshFunction,
ECBlockInputStreamFactory streamFactory) {
ECBlockInputStreamFactory streamFactory,
OzoneClientConfig config) {
this.repConfig = repConfig;
this.verifyChecksum = verifyChecksum;
this.blockInfo = blockInfo;
this.ecBlockInputStreamFactory = streamFactory;
this.xceiverClientFactory = xceiverClientFactory;
this.refreshFunction = refreshFunction;
this.config = config;

setReaderType();
createBlockReader();
Expand All @@ -124,8 +126,8 @@ private void createBlockReader() {
.incECReconstructionTotal();
}
blockReader = ecBlockInputStreamFactory.create(reconstructionReader,
failedLocations, repConfig, blockInfo, verifyChecksum,
xceiverClientFactory, refreshFunction);
failedLocations, repConfig, blockInfo,
xceiverClientFactory, refreshFunction, config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
Expand Down Expand Up @@ -152,14 +153,15 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {

@SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
BlockLocationInfo blockInfo,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
BlockInputStreamFactory streamFactory,
ByteBufferPool byteBufferPool,
ExecutorService ecReconstructExecutor) {
super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
refreshFunction, streamFactory);
ExecutorService ecReconstructExecutor,
OzoneClientConfig config) {
super(repConfig, blockInfo, xceiverClientFactory,
refreshFunction, streamFactory, config);
this.byteBufferPool = byteBufferPool;
this.executor = ecReconstructExecutor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
Expand All @@ -44,13 +45,13 @@ class DummyBlockInputStream extends BlockInputStream {
long blockLen,
Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
XceiverClientFactory xceiverClientManager,
Function<BlockID, BlockLocationInfo> refreshFunction,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunks) {
super(blockId, blockLen, pipeline, token, verifyChecksum,
xceiverClientManager, refreshFunction);
Map<String, byte[]> chunks,
OzoneClientConfig config) {
super(blockId, blockLen, pipeline, token,
xceiverClientManager, refreshFunction, config);
this.chunkDataMap = chunks;
this.chunks = chunkList;

Expand Down
Loading

0 comments on commit 9ec078f

Please sign in to comment.