Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-5865. Make read retry interval and attempts in BlockInputStream configurable #6408

Merged
merged 25 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0b1ea27
HDDS-7035. Rebased.
Jul 26, 2023
be5016b
HDDS-7035. Removed custom variables. Moved String to Sign generation …
Aug 29, 2023
1a05023
HDDS-7035. Fixed checkstyle errors.
Aug 29, 2023
8a11cbc
HDDS-7035. Added licenses to new classes.
Aug 29, 2023
5cae64b
HDDS-7035. Fixed acceptance test errors.
Sep 6, 2023
96cf320
HDDS-7035. Added more Unit tests.
Sep 6, 2023
1fdc955
Merge branch 'apache:master' into master
SaketaChalamchala Oct 24, 2023
9a3c72a
Merge branch 'apache:master' into master
SaketaChalamchala Nov 1, 2023
a474f3d
Merge branch 'apache:master' into master
SaketaChalamchala Nov 14, 2023
5387064
Merge branch 'apache:master' into master
SaketaChalamchala Nov 16, 2023
4c8b5a3
Merge branch 'apache:master' into master
SaketaChalamchala Nov 27, 2023
01dc90d
Merge branch 'apache:master' into master
SaketaChalamchala Jan 20, 2024
75668c9
Merge branch 'apache:master' into master
SaketaChalamchala Feb 21, 2024
b830609
Merge branch 'apache:master' into master
SaketaChalamchala Feb 27, 2024
349e374
HDDS-5865. Made read retry max attempts and interval conigurable.
Mar 1, 2024
d836409
HDDS-5865. Fixed checkstyle errors.
Mar 1, 2024
59b2fc8
HDDS-5865. Fixed integratoin test errors.
Mar 1, 2024
27752ae
HDDS-5865. Removed unnecessary comment.
Mar 4, 2024
037689f
HDDS-5865. Removed unnecessary comments.
Mar 5, 2024
50eb1a5
HDDS-5865. Made read retry max attempts and interval configurable and…
Mar 19, 2024
5e0cab3
HDDS-5865. Fixed checkstyle errors.
Mar 19, 2024
dcffbb2
Merge branch 'apache:master' into HDDS-5865-2
SaketaChalamchala Mar 20, 2024
f5c2b1c
Merge branch 'apache:master' into HDDS-5865-2
SaketaChalamchala Mar 21, 2024
c2fa09c
HDDS-5865. Getting verifyChecksum from client configuration in BlockI…
Mar 21, 2024
1932c79
Fix typo in description
adoroszlai Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private int retryInterval = 0;

@Config(key = "read.max.retries",
defaultValue = "3",
description = "Maximum number of retries by Ozone Client on "
+ "encountering connectivity exception when reading a key.",
tags = ConfigTag.CLIENT)
private int maxReadRetryCount = 3;

@Config(key = "read.retry.interval",
defaultValue = "1",
description =
"Indicates the time duration in seconds a client will wait "
+ "before retrying a read key request on encountering "
+ "a connectivity excepetion from Datanodes . "
+ "By default the interval is 1 second",
tags = ConfigTag.CLIENT)
private int readRetryInterval = 1;

@Config(key = "checksum.type",
defaultValue = "CRC32",
description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] "
Expand Down Expand Up @@ -326,6 +343,22 @@ public void setRetryInterval(int retryInterval) {
this.retryInterval = retryInterval;
}

public int getMaxReadRetryCount() {
return maxReadRetryCount;
}

public void setMaxReadRetryCount(int maxReadRetryCount) {
this.maxReadRetryCount = maxReadRetryCount;
}

public int getReadRetryInterval() {
return readRetryInterval;
}

public void setReadRetryInterval(int readRetryInterval) {
this.readRetryInterval = readRetryInterval;
}

public ChecksumType getChecksumType() {
return ChecksumType.valueOf(checksumType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
Expand Down Expand Up @@ -76,8 +77,8 @@ public class BlockInputStream extends BlockExtendedInputStream {
private XceiverClientSpi xceiverClient;
private boolean initialized = false;
// TODO: do we need to change retrypolicy based on exception.
private final RetryPolicy retryPolicy =
HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1));
private final RetryPolicy retryPolicy;

private int retries;

// List of ChunkInputStreams, one for each chunk in the block
Expand Down Expand Up @@ -112,25 +113,29 @@ public class BlockInputStream extends BlockExtendedInputStream {
private final Function<BlockID, BlockLocationInfo> refreshFunction;

public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction) {
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
this.blockID = blockId;
this.length = blockLen;
setPipeline(pipeline);
tokenRef.set(token);
this.verifyChecksum = verifyChecksum;
this.verifyChecksum = config.isChecksumVerify();
this.xceiverClientFactory = xceiverClientFactory;
this.refreshFunction = refreshFunction;
this.retryPolicy =
HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
}

public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory
XceiverClientFactory xceiverClientFactory,
OzoneClientConfig config
) {
this(blockId, blockLen, pipeline, token, verifyChecksum,
xceiverClientFactory, null);
this(blockId, blockLen, pipeline, token,
xceiverClientFactory, null, config);
}
/**
* Initialize the BlockInputStream. Get the BlockData (list of chunks) from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
Expand Down Expand Up @@ -48,8 +49,9 @@ public interface BlockInputStreamFactory {
*/
BlockExtendedInputStream create(ReplicationConfig repConfig,
BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction);
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
Expand Down Expand Up @@ -76,16 +77,18 @@ public BlockInputStreamFactoryImpl(ByteBufferPool byteBufferPool,
*/
public BlockExtendedInputStream create(ReplicationConfig repConfig,
BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction) {
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
blockInfo, verifyChecksum, xceiverFactory, refreshFunction,
ecBlockStreamFactory);
blockInfo, xceiverFactory, refreshFunction,
ecBlockStreamFactory, config);
} else {
return new BlockInputStream(blockInfo.getBlockID(), blockInfo.getLength(),
pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
pipeline, token, xceiverFactory, refreshFunction,
config);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
Expand Down Expand Up @@ -60,7 +61,6 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
private final int ecChunkSize;
private final long stripeSize;
private final BlockInputStreamFactory streamFactory;
private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final BlockLocationInfo blockInfo;
Expand All @@ -75,7 +75,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
private long position = 0;
private boolean closed = false;
private boolean seeked = false;

private OzoneClientConfig config;
protected ECReplicationConfig getRepConfig() {
return repConfig;
}
Expand Down Expand Up @@ -108,13 +108,13 @@ protected int availableDataLocations(int expectedLocations) {
}

public ECBlockInputStream(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
BlockLocationInfo blockInfo,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
BlockInputStreamFactory streamFactory) {
BlockInputStreamFactory streamFactory,
OzoneClientConfig config) {
this.repConfig = repConfig;
this.ecChunkSize = repConfig.getEcChunkSize();
this.verifyChecksum = verifyChecksum;
this.blockInfo = blockInfo;
this.streamFactory = streamFactory;
this.xceiverClientFactory = xceiverClientFactory;
Expand All @@ -123,6 +123,7 @@ public ECBlockInputStream(ECReplicationConfig repConfig,
this.dataLocations = new DatanodeDetails[repConfig.getRequiredNodes()];
this.blockStreams =
new BlockExtendedInputStream[repConfig.getRequiredNodes()];
this.config = config;

this.stripeSize = (long)ecChunkSize * repConfig.getData();
setBlockLocations(this.blockInfo.getPipeline());
Expand Down Expand Up @@ -191,8 +192,9 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE),
blkInfo, pipeline,
blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
ecPipelineRefreshFunction(locationIndex + 1, refreshFunction));
blockInfo.getToken(), xceiverClientFactory,
ecPipelineRefreshFunction(locationIndex + 1, refreshFunction),
config);
blockStreams[locationIndex] = stream;
LOG.debug("{}: created stream [{}]: {}", this, locationIndex, stream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
Expand Down Expand Up @@ -51,7 +52,8 @@ public interface ECBlockInputStreamFactory {
*/
BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
BlockLocationInfo blockInfo,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction);
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
Expand Down Expand Up @@ -74,16 +75,17 @@ private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory,
*/
public BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
BlockLocationInfo blockInfo,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction) {
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
if (missingLocations) {
// We create the reconstruction reader
ECBlockReconstructedStripeInputStream sis =
new ECBlockReconstructedStripeInputStream(
(ECReplicationConfig)repConfig, blockInfo, verifyChecksum,
(ECReplicationConfig)repConfig, blockInfo,
xceiverFactory, refreshFunction, inputStreamFactory,
byteBufferPool, ecReconstructExecutorSupplier.get());
byteBufferPool, ecReconstructExecutorSupplier.get(), config);
if (failedLocations != null) {
sis.addFailedDatanodes(failedLocations);
}
Expand All @@ -92,7 +94,8 @@ public BlockExtendedInputStream create(boolean missingLocations,
} else {
// Otherwise create the more efficient non-reconstruction reader
return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
verifyChecksum, xceiverFactory, refreshFunction, inputStreamFactory);
xceiverFactory, refreshFunction, inputStreamFactory,
config);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -49,7 +50,6 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
LoggerFactory.getLogger(ECBlockInputStreamProxy.class);

private final ECReplicationConfig repConfig;
private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final BlockLocationInfo blockInfo;
Expand All @@ -59,6 +59,7 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
private boolean reconstructionReader = false;
private List<DatanodeDetails> failedLocations = new ArrayList<>();
private boolean closed = false;
private OzoneClientConfig config;

/**
* Given the ECReplicationConfig and the block length, calculate how many
Expand Down Expand Up @@ -97,16 +98,17 @@ public static int availableDataLocations(Pipeline pipeline,
}

public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
BlockLocationInfo blockInfo,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
BlockLocationInfo> refreshFunction,
ECBlockInputStreamFactory streamFactory) {
ECBlockInputStreamFactory streamFactory,
OzoneClientConfig config) {
this.repConfig = repConfig;
this.verifyChecksum = verifyChecksum;
this.blockInfo = blockInfo;
this.ecBlockInputStreamFactory = streamFactory;
this.xceiverClientFactory = xceiverClientFactory;
this.refreshFunction = refreshFunction;
this.config = config;

setReaderType();
createBlockReader();
Expand All @@ -124,8 +126,8 @@ private void createBlockReader() {
.incECReconstructionTotal();
}
blockReader = ecBlockInputStreamFactory.create(reconstructionReader,
failedLocations, repConfig, blockInfo, verifyChecksum,
xceiverClientFactory, refreshFunction);
failedLocations, repConfig, blockInfo,
xceiverClientFactory, refreshFunction, config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
Expand Down Expand Up @@ -152,14 +153,15 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {

@SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
BlockLocationInfo blockInfo,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
BlockInputStreamFactory streamFactory,
ByteBufferPool byteBufferPool,
ExecutorService ecReconstructExecutor) {
super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
refreshFunction, streamFactory);
ExecutorService ecReconstructExecutor,
OzoneClientConfig config) {
super(repConfig, blockInfo, xceiverClientFactory,
refreshFunction, streamFactory, config);
this.byteBufferPool = byteBufferPool;
this.executor = ecReconstructExecutor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
Expand All @@ -44,13 +45,13 @@ class DummyBlockInputStream extends BlockInputStream {
long blockLen,
Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
XceiverClientFactory xceiverClientManager,
Function<BlockID, BlockLocationInfo> refreshFunction,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunks) {
super(blockId, blockLen, pipeline, token, verifyChecksum,
xceiverClientManager, refreshFunction);
Map<String, byte[]> chunks,
OzoneClientConfig config) {
super(blockId, blockLen, pipeline, token,
xceiverClientManager, refreshFunction, config);
this.chunkDataMap = chunks;
this.chunks = chunkList;

Expand Down
Loading