diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 3042b4d847a0..549735438a02 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -144,6 +144,23 @@ public enum ChecksumCombineMode { tags = ConfigTag.CLIENT) private int retryInterval = 0; + @Config(key = "read.max.retries", + defaultValue = "3", + description = "Maximum number of retries by Ozone Client on " + + "encountering connectivity exception when reading a key.", + tags = ConfigTag.CLIENT) + private int maxReadRetryCount = 3; + + @Config(key = "read.retry.interval", + defaultValue = "1", + description = + "Indicates the time duration in seconds a client will wait " + + "before retrying a read key request on encountering " + + "a connectivity excepetion from Datanodes . " + + "By default the interval is 1 second", + tags = ConfigTag.CLIENT) + private int readRetryInterval = 1; + @Config(key = "checksum.type", defaultValue = "CRC32", description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] " @@ -326,6 +343,22 @@ public void setRetryInterval(int retryInterval) { this.retryInterval = retryInterval; } + public int getMaxReadRetryCount() { + return maxReadRetryCount; + } + + public void setMaxReadRetryCount(int maxReadRetryCount) { + this.maxReadRetryCount = maxReadRetryCount; + } + + public int getReadRetryInterval() { + return readRetryInterval; + } + + public void setReadRetryInterval(int readRetryInterval) { + this.readRetryInterval = readRetryInterval; + } + public ChecksumType getChecksumType() { return ChecksumType.valueOf(checksumType); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 3d789912a66b..65e8b5b1d7f8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator; @@ -76,8 +77,8 @@ public class BlockInputStream extends BlockExtendedInputStream { private XceiverClientSpi xceiverClient; private boolean initialized = false; // TODO: do we need to change retrypolicy based on exception. - private final RetryPolicy retryPolicy = - HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1)); + private final RetryPolicy retryPolicy; + private int retries; // List of ChunkInputStreams, one for each chunk in the block @@ -112,25 +113,29 @@ public class BlockInputStream extends BlockExtendedInputStream { private final Function refreshFunction; public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, - Token token, boolean verifyChecksum, + Token token, XceiverClientFactory xceiverClientFactory, - Function refreshFunction) { + Function refreshFunction, + OzoneClientConfig config) { this.blockID = blockId; this.length = blockLen; setPipeline(pipeline); tokenRef.set(token); - this.verifyChecksum = verifyChecksum; + this.verifyChecksum = config.isChecksumVerify(); this.xceiverClientFactory = xceiverClientFactory; this.refreshFunction = refreshFunction; + this.retryPolicy = + HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(), + TimeUnit.SECONDS.toMillis(config.getReadRetryInterval())); } public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, - boolean verifyChecksum, - XceiverClientFactory xceiverClientFactory + XceiverClientFactory xceiverClientFactory, + OzoneClientConfig config ) { - this(blockId, blockLen, pipeline, token, verifyChecksum, - xceiverClientFactory, null); + this(blockId, blockLen, pipeline, token, + xceiverClientFactory, null, config); } /** * Initialize the BlockInputStream. Get the BlockData (list of chunks) from diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java index bd100214ae48..6f8a744f762d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; @@ -48,8 +49,9 @@ public interface BlockInputStreamFactory { */ BlockExtendedInputStream create(ReplicationConfig repConfig, BlockLocationInfo blockInfo, Pipeline pipeline, - Token token, boolean verifyChecksum, + Token token, XceiverClientFactory xceiverFactory, - Function refreshFunction); + Function refreshFunction, + OzoneClientConfig config); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java index 40063f9ce492..6bcdc3c48114 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; @@ -76,16 +77,18 @@ public BlockInputStreamFactoryImpl(ByteBufferPool byteBufferPool, */ public BlockExtendedInputStream create(ReplicationConfig repConfig, BlockLocationInfo blockInfo, Pipeline pipeline, - Token token, boolean verifyChecksum, + Token token, XceiverClientFactory xceiverFactory, - Function refreshFunction) { + Function refreshFunction, + OzoneClientConfig config) { if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) { return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig, - blockInfo, verifyChecksum, xceiverFactory, refreshFunction, - ecBlockStreamFactory); + blockInfo, xceiverFactory, refreshFunction, + ecBlockStreamFactory, config); } else { return new BlockInputStream(blockInfo.getBlockID(), blockInfo.getLength(), - pipeline, token, verifyChecksum, xceiverFactory, refreshFunction); + pipeline, token, xceiverFactory, refreshFunction, + config); } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java index ea4f3d743f92..f658ea583f7c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -60,7 +61,6 @@ public class ECBlockInputStream extends BlockExtendedInputStream { private final int ecChunkSize; private final long stripeSize; private final BlockInputStreamFactory streamFactory; - private final boolean verifyChecksum; private final XceiverClientFactory xceiverClientFactory; private final Function refreshFunction; private final BlockLocationInfo blockInfo; @@ -75,7 +75,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream { private long position = 0; private boolean closed = false; private boolean seeked = false; - + private OzoneClientConfig config; protected ECReplicationConfig getRepConfig() { return repConfig; } @@ -119,13 +119,13 @@ protected int availableParityLocations() { } public ECBlockInputStream(ECReplicationConfig repConfig, - BlockLocationInfo blockInfo, boolean verifyChecksum, + BlockLocationInfo blockInfo, XceiverClientFactory xceiverClientFactory, Function refreshFunction, - BlockInputStreamFactory streamFactory) { + BlockInputStreamFactory streamFactory, + OzoneClientConfig config) { this.repConfig = repConfig; this.ecChunkSize = repConfig.getEcChunkSize(); - this.verifyChecksum = verifyChecksum; this.blockInfo = blockInfo; this.streamFactory = streamFactory; this.xceiverClientFactory = xceiverClientFactory; @@ -134,6 +134,7 @@ public ECBlockInputStream(ECReplicationConfig repConfig, this.dataLocations = new DatanodeDetails[repConfig.getRequiredNodes()]; this.blockStreams = new BlockExtendedInputStream[repConfig.getRequiredNodes()]; + this.config = config; this.stripeSize = (long)ecChunkSize * repConfig.getData(); setBlockLocations(this.blockInfo.getPipeline()); @@ -202,8 +203,9 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) { StandaloneReplicationConfig.getInstance( HddsProtos.ReplicationFactor.ONE), blkInfo, pipeline, - blockInfo.getToken(), verifyChecksum, xceiverClientFactory, - ecPipelineRefreshFunction(locationIndex + 1, refreshFunction)); + blockInfo.getToken(), xceiverClientFactory, + ecPipelineRefreshFunction(locationIndex + 1, refreshFunction), + config); blockStreams[locationIndex] = stream; LOG.debug("{}: created stream [{}]: {}", this, locationIndex, stream); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java index 0e2ef22c1e94..66e7a31337a6 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; @@ -51,7 +52,8 @@ public interface ECBlockInputStreamFactory { */ BlockExtendedInputStream create(boolean missingLocations, List failedLocations, ReplicationConfig repConfig, - BlockLocationInfo blockInfo, boolean verifyChecksum, + BlockLocationInfo blockInfo, XceiverClientFactory xceiverFactory, - Function refreshFunction); + Function refreshFunction, + OzoneClientConfig config); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java index 36b6539ea817..01d0b0a7b7e8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; @@ -74,16 +75,17 @@ private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory, */ public BlockExtendedInputStream create(boolean missingLocations, List failedLocations, ReplicationConfig repConfig, - BlockLocationInfo blockInfo, boolean verifyChecksum, + BlockLocationInfo blockInfo, XceiverClientFactory xceiverFactory, - Function refreshFunction) { + Function refreshFunction, + OzoneClientConfig config) { if (missingLocations) { // We create the reconstruction reader ECBlockReconstructedStripeInputStream sis = new ECBlockReconstructedStripeInputStream( - (ECReplicationConfig)repConfig, blockInfo, verifyChecksum, + (ECReplicationConfig)repConfig, blockInfo, xceiverFactory, refreshFunction, inputStreamFactory, - byteBufferPool, ecReconstructExecutorSupplier.get()); + byteBufferPool, ecReconstructExecutorSupplier.get(), config); if (failedLocations != null) { sis.addFailedDatanodes(failedLocations); } @@ -92,7 +94,8 @@ public BlockExtendedInputStream create(boolean missingLocations, } else { // Otherwise create the more efficient non-reconstruction reader return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo, - verifyChecksum, xceiverFactory, refreshFunction, inputStreamFactory); + xceiverFactory, refreshFunction, inputStreamFactory, + config); } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java index 973561616f7b..68a0337cef1d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -49,7 +50,6 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream { LoggerFactory.getLogger(ECBlockInputStreamProxy.class); private final ECReplicationConfig repConfig; - private final boolean verifyChecksum; private final XceiverClientFactory xceiverClientFactory; private final Function refreshFunction; private final BlockLocationInfo blockInfo; @@ -59,6 +59,7 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream { private boolean reconstructionReader = false; private List failedLocations = new ArrayList<>(); private boolean closed = false; + private OzoneClientConfig config; /** * Given the ECReplicationConfig and the block length, calculate how many @@ -97,16 +98,17 @@ public static int availableDataLocations(Pipeline pipeline, } public ECBlockInputStreamProxy(ECReplicationConfig repConfig, - BlockLocationInfo blockInfo, boolean verifyChecksum, + BlockLocationInfo blockInfo, XceiverClientFactory xceiverClientFactory, Function refreshFunction, - ECBlockInputStreamFactory streamFactory) { + ECBlockInputStreamFactory streamFactory, + OzoneClientConfig config) { this.repConfig = repConfig; - this.verifyChecksum = verifyChecksum; this.blockInfo = blockInfo; this.ecBlockInputStreamFactory = streamFactory; this.xceiverClientFactory = xceiverClientFactory; this.refreshFunction = refreshFunction; + this.config = config; setReaderType(); createBlockReader(); @@ -124,8 +126,8 @@ private void createBlockReader() { .incECReconstructionTotal(); } blockReader = ecBlockInputStreamFactory.create(reconstructionReader, - failedLocations, repConfig, blockInfo, verifyChecksum, - xceiverClientFactory, refreshFunction); + failedLocations, repConfig, blockInfo, + xceiverClientFactory, refreshFunction, config); } @Override diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java index 142825cb1206..31f94e0acad6 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; @@ -152,14 +153,15 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream { @SuppressWarnings("checkstyle:ParameterNumber") public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig, - BlockLocationInfo blockInfo, boolean verifyChecksum, + BlockLocationInfo blockInfo, XceiverClientFactory xceiverClientFactory, Function refreshFunction, BlockInputStreamFactory streamFactory, ByteBufferPool byteBufferPool, - ExecutorService ecReconstructExecutor) { - super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory, - refreshFunction, streamFactory); + ExecutorService ecReconstructExecutor, + OzoneClientConfig config) { + super(repConfig, blockInfo, xceiverClientFactory, + refreshFunction, streamFactory, config); this.byteBufferPool = byteBufferPool; this.executor = ecReconstructExecutor; diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java index 3e7779f0d10a..a89097533d2a 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; @@ -44,13 +45,13 @@ class DummyBlockInputStream extends BlockInputStream { long blockLen, Pipeline pipeline, Token token, - boolean verifyChecksum, XceiverClientFactory xceiverClientManager, Function refreshFunction, List chunkList, - Map chunks) { - super(blockId, blockLen, pipeline, token, verifyChecksum, - xceiverClientManager, refreshFunction); + Map chunks, + OzoneClientConfig config) { + super(blockId, blockLen, pipeline, token, + xceiverClientManager, refreshFunction, config); this.chunkDataMap = chunks; this.chunks = chunkList; diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java index 24a35745144d..6d12614228f9 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; @@ -51,12 +52,12 @@ final class DummyBlockInputStreamWithRetry long blockLen, Pipeline pipeline, Token token, - boolean verifyChecksum, XceiverClientFactory xceiverClientManager, List chunkList, Map chunkMap, - AtomicBoolean isRerfreshed, IOException ioException) { - super(blockId, blockLen, pipeline, token, verifyChecksum, + AtomicBoolean isRerfreshed, IOException ioException, + OzoneClientConfig config) { + super(blockId, blockLen, pipeline, token, xceiverClientManager, blockID -> { isRerfreshed.set(true); try { @@ -68,7 +69,7 @@ final class DummyBlockInputStreamWithRetry throw new RuntimeException(e); } - }, chunkList, chunkMap); + }, chunkList, chunkMap, config); this.ioException = ioException; } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 2e95de1ecad6..6bea76902876 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -22,8 +22,10 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -89,6 +91,8 @@ public class TestBlockInputStream { private Function refreshFunction; + private OzoneConfiguration conf = new OzoneConfiguration(); + @BeforeEach @SuppressWarnings("unchecked") public void setup() throws Exception { @@ -96,10 +100,12 @@ public void setup() throws Exception { BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE); createChunkList(5); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(false); Pipeline pipeline = MockPipeline.createSingleNodePipeline(); blockStream = new DummyBlockInputStream(blockID, blockSize, pipeline, null, - false, null, refreshFunction, chunks, chunkDataMap); + null, refreshFunction, chunks, chunkDataMap, clientConfig); } /** @@ -264,11 +270,14 @@ public void testRefreshPipelineFunction() throws Exception { BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); AtomicBoolean isRefreshed = new AtomicBoolean(); createChunkList(5); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(false); try (BlockInputStream blockInputStreamWithRetry = new DummyBlockInputStreamWithRetry(blockID, blockSize, MockPipeline.createSingleNodePipeline(), null, - false, null, chunks, chunkDataMap, isRefreshed, null)) { + null, chunks, chunkDataMap, isRefreshed, null, + clientConfig)) { assertFalse(isRefreshed.get()); seekAndVerify(50); byte[] b = new byte[200]; @@ -352,8 +361,10 @@ private static ChunkInputStream throwingChunkInputStream(IOException ex, private BlockInputStream createSubject(BlockID blockID, Pipeline pipeline, ChunkInputStream stream) { - return new DummyBlockInputStream(blockID, blockSize, pipeline, null, false, - null, refreshFunction, chunks, null) { + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(false); + return new DummyBlockInputStream(blockID, blockSize, pipeline, null, + null, refreshFunction, chunks, null, clientConfig) { @Override protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { return stream; @@ -411,8 +422,11 @@ public void testRefreshOnReadFailureAfterUnbuffer(IOException ex) .thenReturn(blockLocationInfo); when(blockLocationInfo.getPipeline()).thenReturn(newPipeline); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(false); BlockInputStream subject = new BlockInputStream(blockID, blockSize, - pipeline, null, false, clientFactory, refreshFunction) { + pipeline, null, clientFactory, refreshFunction, + clientConfig) { @Override protected List getChunkInfoListUsingClient() { return chunks; diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java index 8db662cee070..dac71ba3db19 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -258,9 +259,10 @@ public synchronized void setFailIndexes(Integer... fail) { public synchronized BlockExtendedInputStream create( ReplicationConfig repConfig, BlockLocationInfo blockInfo, Pipeline pipeline, - Token token, boolean verifyChecksum, + Token token, XceiverClientFactory xceiverFactory, - Function refreshFunction) { + Function refreshFunction, + OzoneClientConfig config) { int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0)); TestBlockInputStream stream = new TestBlockInputStream( diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java index abd69e5118c2..e7185a9351f4 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java @@ -21,9 +21,11 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; @@ -41,6 +43,8 @@ */ public class TestBlockInputStreamFactoryImpl { + private OzoneConfiguration conf = new OzoneConfiguration(); + @Test public void testNonECGivesBlockInputStream() { BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl(); @@ -50,9 +54,12 @@ public void testNonECGivesBlockInputStream() { BlockLocationInfo blockInfo = createKeyLocationInfo(repConfig, 3, 1024 * 1024 * 10); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); BlockExtendedInputStream stream = factory.create(repConfig, blockInfo, blockInfo.getPipeline(), - blockInfo.getToken(), true, null, null); + blockInfo.getToken(), null, null, + clientConfig); Assertions.assertTrue(stream instanceof BlockInputStream); Assertions.assertEquals(stream.getBlockID(), blockInfo.getBlockID()); Assertions.assertEquals(stream.getLength(), blockInfo.getLength()); @@ -67,9 +74,12 @@ public void testECGivesECBlockInputStream() { BlockLocationInfo blockInfo = createKeyLocationInfo(repConfig, 5, 1024 * 1024 * 10); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); BlockExtendedInputStream stream = factory.create(repConfig, blockInfo, blockInfo.getPipeline(), - blockInfo.getToken(), true, null, null); + blockInfo.getToken(), null, null, + clientConfig); Assertions.assertTrue(stream instanceof ECBlockInputStreamProxy); Assertions.assertEquals(stream.getBlockID(), blockInfo.getBlockID()); Assertions.assertEquals(stream.getLength(), blockInfo.getLength()); diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java index caa071b1b9ca..2ac575f6d45c 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java @@ -20,9 +20,11 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -56,6 +58,7 @@ public class TestECBlockInputStream { private ECReplicationConfig repConfig; private TestBlockInputStreamFactory streamFactory; + private OzoneConfiguration conf = new OzoneConfiguration(); @BeforeEach public void setup() { @@ -69,15 +72,19 @@ public void testSufficientLocations() { // EC-3-2, 5MB block, so all 3 data locations are needed BlockLocationInfo keyInfo = ECStreamTestUtil .createKeyInfo(repConfig, 5, 5 * ONEMB); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, new TestBlockInputStreamFactory())) { + keyInfo, null, null, new TestBlockInputStreamFactory(), + clientConfig)) { Assertions.assertTrue(ecb.hasSufficientLocations()); } // EC-3-2, very large block, so all 3 data locations are needed keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 5000 * ONEMB); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, new TestBlockInputStreamFactory())) { + keyInfo, null, null, new TestBlockInputStreamFactory(), + clientConfig)) { Assertions.assertTrue(ecb.hasSufficientLocations()); } @@ -87,7 +94,8 @@ keyInfo, true, null, null, new TestBlockInputStreamFactory())) { dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1); keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB - 1, dnMap); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, new TestBlockInputStreamFactory())) { + keyInfo, null, null, new TestBlockInputStreamFactory(), + clientConfig)) { Assertions.assertTrue(ecb.hasSufficientLocations()); } @@ -97,7 +105,8 @@ keyInfo, true, null, null, new TestBlockInputStreamFactory())) { dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1); keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5 * ONEMB, dnMap); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, new TestBlockInputStreamFactory())) { + keyInfo, null, null, new TestBlockInputStreamFactory(), + clientConfig)) { Assertions.assertFalse(ecb.hasSufficientLocations()); } @@ -109,7 +118,8 @@ keyInfo, true, null, null, new TestBlockInputStreamFactory())) { dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5); keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5 * ONEMB, dnMap); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, new TestBlockInputStreamFactory())) { + keyInfo, null, null, new TestBlockInputStreamFactory(), + clientConfig)) { Assertions.assertFalse(ecb.hasSufficientLocations()); } } @@ -121,8 +131,11 @@ public void testCorrectBlockSizePassedToBlockStreamLessThanCell() BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB - 100); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { ecb.read(buf); // We expect only 1 block stream and it should have a length passed of // ONEMB - 100. @@ -138,8 +151,11 @@ public void testCorrectBlockSizePassedToBlockStreamTwoCells() BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB + 100); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { ecb.read(buf); List streams = streamFactory.getBlockStreams(); Assertions.assertEquals(ONEMB, streams.get(0).getLength()); @@ -154,8 +170,11 @@ public void testCorrectBlockSizePassedToBlockStreamThreeCells() BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 2 * ONEMB + 100); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { ecb.read(buf); List streams = streamFactory.getBlockStreams(); Assertions.assertEquals(ONEMB, streams.get(0).getLength()); @@ -171,8 +190,11 @@ public void testCorrectBlockSizePassedToBlockStreamThreeFullAndPartialStripe() BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 10 * ONEMB + 100); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { ecb.read(buf); List streams = streamFactory.getBlockStreams(); Assertions.assertEquals(4 * ONEMB, streams.get(0).getLength()); @@ -188,8 +210,11 @@ public void testCorrectBlockSizePassedToBlockStreamSingleFullCell() BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { ecb.read(buf); List streams = streamFactory.getBlockStreams(); Assertions.assertEquals(ONEMB, streams.get(0).getLength()); @@ -203,8 +228,11 @@ public void testCorrectBlockSizePassedToBlockStreamSeveralFullCells() BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 9 * ONEMB); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { ecb.read(buf); List streams = streamFactory.getBlockStreams(); Assertions.assertEquals(3 * ONEMB, streams.get(0).getLength()); @@ -217,8 +245,11 @@ public void testCorrectBlockSizePassedToBlockStreamSeveralFullCells() public void testSimpleRead() throws IOException { BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { ByteBuffer buf = ByteBuffer.allocate(100); @@ -240,8 +271,11 @@ public void testSimpleRead() throws IOException { public void testSimpleReadUnderOneChunk() throws IOException { BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 1, ONEMB); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { ByteBuffer buf = ByteBuffer.allocate(100); @@ -259,8 +293,11 @@ public void testSimpleReadUnderOneChunk() throws IOException { public void testReadPastEOF() throws IOException { BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 50); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { ByteBuffer buf = ByteBuffer.allocate(100); @@ -278,8 +315,11 @@ public void testReadCrossingMultipleECChunkBounds() throws IOException { 100); BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { // EC Chunk size is 100 and 3-2. Create a byte buffer to read 3.5 chunks, // so 350 @@ -313,8 +353,11 @@ public void testSeekPastBlockLength() throws IOException { ONEMB); BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 100); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { assertThrows(EOFException.class, () -> ecb.seek(1000)); } } @@ -325,8 +368,11 @@ public void testSeekToLength() throws IOException { ONEMB); BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 100); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { // When seek more than the length, should throw EOFException. assertThrows(EOFException.class, () -> ecb.seek(101)); } @@ -338,8 +384,11 @@ public void testSeekToLengthZeroLengthBlock() throws IOException { ONEMB); BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 0); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { ecb.seek(0); Assertions.assertEquals(0, ecb.getPos()); Assertions.assertEquals(0, ecb.getRemaining()); @@ -352,8 +401,11 @@ public void testSeekToValidPosition() throws IOException { ONEMB); BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { ecb.seek(ONEMB - 1); Assertions.assertEquals(ONEMB - 1, ecb.getPos()); Assertions.assertEquals(ONEMB * 4 + 1, ecb.getRemaining()); @@ -381,8 +433,11 @@ public void testErrorReadingBlockReportsBadLocation() throws IOException { ONEMB); BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { // Read a full stripe to ensure all streams are created in the stream // factory ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB); @@ -412,8 +467,11 @@ public void testNoErrorIfSpareLocationToRead() throws IOException { BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 8 * ONEMB, datanodes); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { // Read a full stripe to ensure all streams are created in the stream // factory ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB); @@ -476,8 +534,11 @@ public void testEcPipelineRefreshFunction() { return blockLocation; }; + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, - keyInfo, true, null, null, streamFactory)) { + keyInfo, null, null, streamFactory, + clientConfig)) { Pipeline pipeline = ecb.ecPipelineRefreshFunction(3, refreshFunction) .apply(blockID) @@ -510,8 +571,9 @@ public synchronized List getBlockStreams() { public synchronized BlockExtendedInputStream create( ReplicationConfig repConfig, BlockLocationInfo blockInfo, Pipeline pipeline, Token token, - boolean verifyChecksum, XceiverClientFactory xceiverFactory, - Function refreshFunction) { + XceiverClientFactory xceiverFactory, + Function refreshFunction, + OzoneClientConfig config) { TestBlockInputStream stream = new TestBlockInputStream( blockInfo.getBlockID(), blockInfo.getLength(), (byte)blockStreams.size()); diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java index 929fa13042e4..0afb05ff17a5 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java @@ -20,7 +20,9 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; @@ -56,6 +58,7 @@ public class TestECBlockInputStreamProxy { private long randomSeed; private ThreadLocalRandom random = ThreadLocalRandom.current(); private SplittableRandom dataGenerator; + private OzoneConfiguration conf = new OzoneConfiguration(); @BeforeEach public void setup() { @@ -346,8 +349,11 @@ private void resetAndAdvanceDataGenerator(long position) { private ECBlockInputStreamProxy createBISProxy(ECReplicationConfig rConfig, BlockLocationInfo blockInfo) { + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); return new ECBlockInputStreamProxy( - rConfig, blockInfo, true, null, null, streamFactory); + rConfig, blockInfo, null, null, streamFactory, + clientConfig); } private static class TestECBlockInputStreamFactory @@ -376,8 +382,9 @@ public List getFailedLocations() { public BlockExtendedInputStream create(boolean missingLocations, List failedDatanodes, ReplicationConfig repConfig, BlockLocationInfo blockInfo, - boolean verifyChecksum, XceiverClientFactory xceiverFactory, - Function refreshFunction) { + XceiverClientFactory xceiverFactory, + Function refreshFunction, + OzoneClientConfig config) { this.failedLocations = failedDatanodes; ByteBuffer wrappedBuffer = ByteBuffer.wrap(data.array(), 0, data.capacity()); diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedInputStream.java index e39acaf9d234..78a3476d0964 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedInputStream.java @@ -19,7 +19,9 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ElasticByteBufferPool; @@ -54,6 +56,7 @@ public class TestECBlockReconstructedInputStream { private ByteBufferPool bufferPool = new ElasticByteBufferPool(); private ExecutorService ecReconstructExecutor = Executors.newFixedThreadPool(3); + private OzoneConfiguration conf = new OzoneConfiguration(); @BeforeEach public void setup() throws IOException { @@ -74,8 +77,11 @@ private ECBlockReconstructedStripeInputStream createStripeInputStream( BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap); streamFactory.setCurrentPipeline(keyInfo.getPipeline()); - return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true, - null, null, streamFactory, bufferPool, ecReconstructExecutor); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); + return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, + null, null, streamFactory, bufferPool, ecReconstructExecutor, + clientConfig); } @Test diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java index 62d8c2d76023..83c864b6c660 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java @@ -19,8 +19,10 @@ import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ElasticByteBufferPool; @@ -70,7 +72,8 @@ public class TestECBlockReconstructedStripeInputStream { private ByteBufferPool bufferPool = new ElasticByteBufferPool(); private ExecutorService ecReconstructExecutor = Executors.newFixedThreadPool(3); - + private OzoneConfiguration conf = new OzoneConfiguration(); + static List> recoveryCases() { // TODO better name List> params = new ArrayList<>(); params.add(emptySet()); // non-recovery @@ -821,8 +824,11 @@ public void testFailedLocationsAreNotRead() throws IOException { private ECBlockReconstructedStripeInputStream createInputStream( BlockLocationInfo keyInfo) { - return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true, - null, null, streamFactory, bufferPool, ecReconstructExecutor); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); + return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, + null, null, streamFactory, bufferPool, ecReconstructExecutor, + clientConfig); } private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 62f96d8adf00..8fadd19b67d3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -259,12 +259,15 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, } } + OzoneClientConfig clientConfig = this.ozoneClientConfig; + clientConfig.setChecksumVerify(true); try (ECBlockReconstructedStripeInputStream sis = new ECBlockReconstructedStripeInputStream( - repConfig, blockLocationInfo, true, + repConfig, blockLocationInfo, this.containerOperationClient.getXceiverClientManager(), null, this.blockInputStreamFactory, byteBufferPool, - this.ecReconstructReadExecutor)) { + this.ecReconstructReadExecutor, + clientConfig)) { ECBlockOutputStream[] targetBlockStreams = new ECBlockOutputStream[toReconstructIndexes.size()]; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 4843c1c45e6c..2d40841ee499 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -25,6 +25,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; @@ -58,9 +59,9 @@ private static List createStreams( OmKeyInfo keyInfo, List blockInfos, XceiverClientFactory xceiverClientFactory, - boolean verifyChecksum, Function retryFunction, - BlockInputStreamFactory blockStreamFactory) { + BlockInputStreamFactory blockStreamFactory, + OzoneClientConfig config) { List partStreams = new ArrayList<>(); for (OmKeyLocationInfo omKeyLocationInfo : blockInfos) { if (LOG.isDebugEnabled()) { @@ -91,9 +92,9 @@ private static List createStreams( omKeyLocationInfo, omKeyLocationInfo.getPipeline(), omKeyLocationInfo.getToken(), - verifyChecksum, xceiverClientFactory, - retry); + retry, + config); partStreams.add(stream); } return partStreams; @@ -117,13 +118,13 @@ private static BlockLocationInfo getBlockLocationInfo(OmKeyInfo newKeyInfo, private static LengthInputStream getFromOmKeyInfo( OmKeyInfo keyInfo, XceiverClientFactory xceiverClientFactory, - boolean verifyChecksum, Function retryFunction, BlockInputStreamFactory blockStreamFactory, - List locationInfos) { + List locationInfos, + OzoneClientConfig config) { List streams = createStreams(keyInfo, - locationInfos, xceiverClientFactory, verifyChecksum, retryFunction, - blockStreamFactory); + locationInfos, xceiverClientFactory, retryFunction, + blockStreamFactory, config); KeyInputStream keyInputStream = new KeyInputStream(keyInfo.getKeyName(), streams); return new LengthInputStream(keyInputStream, keyInputStream.getLength()); @@ -134,20 +135,22 @@ private static LengthInputStream getFromOmKeyInfo( */ public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo, XceiverClientFactory xceiverClientFactory, - boolean verifyChecksum, Function retryFunction, - BlockInputStreamFactory blockStreamFactory) { + Function retryFunction, + BlockInputStreamFactory blockStreamFactory, + OzoneClientConfig config) { List keyLocationInfos = keyInfo .getLatestVersionLocations().getBlocksLatestVersionOnly(); - return getFromOmKeyInfo(keyInfo, xceiverClientFactory, verifyChecksum, - retryFunction, blockStreamFactory, keyLocationInfos); + return getFromOmKeyInfo(keyInfo, xceiverClientFactory, + retryFunction, blockStreamFactory, keyLocationInfos, config); } public static List getStreamsFromKeyInfo(OmKeyInfo keyInfo, - XceiverClientFactory xceiverClientFactory, boolean verifyChecksum, + XceiverClientFactory xceiverClientFactory, Function retryFunction, - BlockInputStreamFactory blockStreamFactory) { + BlockInputStreamFactory blockStreamFactory, + OzoneClientConfig config) { List keyLocationInfos = keyInfo .getLatestVersionLocations().getBlocksLatestVersionOnly(); @@ -162,7 +165,8 @@ public static List getStreamsFromKeyInfo(OmKeyInfo keyInfo, // Create a KeyInputStream for each part. for (List locationInfo : partsToBlocksMap.values()) { lengthInputStreams.add(getFromOmKeyInfo(keyInfo, xceiverClientFactory, - verifyChecksum, retryFunction, blockStreamFactory, locationInfo)); + retryFunction, blockStreamFactory, locationInfo, + config)); } return lengthInputStreams; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 8d61f8ef8609..dff656383991 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -2221,9 +2221,8 @@ private OzoneInputStream createInputStream( if (feInfo == null) { LengthInputStream lengthInputStream = KeyInputStream - .getFromOmKeyInfo(keyInfo, xceiverClientManager, - clientConfig.isChecksumVerify(), retryFunction, - blockInputStreamFactory); + .getFromOmKeyInfo(keyInfo, xceiverClientManager, retryFunction, + blockInputStreamFactory, clientConfig); try { final GDPRSymmetricKey gk = getGDPRSymmetricKey( keyInfo.getMetadata(), Cipher.DECRYPT_MODE); @@ -2238,9 +2237,8 @@ private OzoneInputStream createInputStream( } else if (!keyInfo.getLatestVersionLocations().isMultipartKey()) { // Regular Key with FileEncryptionInfo LengthInputStream lengthInputStream = KeyInputStream - .getFromOmKeyInfo(keyInfo, xceiverClientManager, - clientConfig.isChecksumVerify(), retryFunction, - blockInputStreamFactory); + .getFromOmKeyInfo(keyInfo, xceiverClientManager, retryFunction, + blockInputStreamFactory, clientConfig); final KeyProvider.KeyVersion decrypted = getDEK(feInfo); final CryptoInputStream cryptoIn = new CryptoInputStream(lengthInputStream.getWrappedStream(), @@ -2250,9 +2248,8 @@ private OzoneInputStream createInputStream( } else { // Multipart Key with FileEncryptionInfo List lengthInputStreams = KeyInputStream - .getStreamsFromKeyInfo(keyInfo, xceiverClientManager, - clientConfig.isChecksumVerify(), retryFunction, - blockInputStreamFactory); + .getStreamsFromKeyInfo(keyInfo, xceiverClientManager, retryFunction, + blockInputStreamFactory, clientConfig); final KeyProvider.KeyVersion decrypted = getDEK(feInfo); List cryptoInputStreams = new ArrayList<>(); diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyInputStreamEC.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyInputStreamEC.java index 28e9b8ac3c61..df6ef9bc0506 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyInputStreamEC.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyInputStreamEC.java @@ -20,8 +20,10 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; @@ -40,7 +42,6 @@ import static org.apache.hadoop.ozone.OzoneConsts.MB; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -49,6 +50,8 @@ */ public class TestKeyInputStreamEC { + private OzoneConfiguration conf = new OzoneConfiguration(); + @Test public void testReadAgainstLargeBlockGroup() throws IOException { int dataBlocks = 10; @@ -68,10 +71,13 @@ public void testReadAgainstLargeBlockGroup() throws IOException { BlockInputStreamFactory mockStreamFactory = mock(BlockInputStreamFactory.class); when(mockStreamFactory.create(any(), any(), any(), any(), - anyBoolean(), any(), any())).thenReturn(blockInputStream); + any(), any(), any())).thenReturn(blockInputStream); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); try (LengthInputStream kis = KeyInputStream.getFromOmKeyInfo(keyInfo, - null, true, null, mockStreamFactory)) { + null, null, mockStreamFactory, + clientConfig)) { byte[] buf = new byte[100]; int readBytes = kis.read(buf, 0, 100); Assertions.assertEquals(100, readBytes); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java index f410c50c4662..752c011f4688 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.conf.DefaultConfigManager; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -305,9 +306,11 @@ private void readData(OmKeyInfo keyInfo, Function retryFunc) throws IOException { XceiverClientFactory xceiverClientManager = ((RpcClient) client.getProxy()).getXceiverClientManager(); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(false); try (InputStream is = KeyInputStream.getFromOmKeyInfo(keyInfo, - xceiverClientManager, - false, retryFunc, blockInputStreamFactory)) { + xceiverClientManager, retryFunc, blockInputStreamFactory, + clientConfig)) { byte[] buf = new byte[100]; int readBytes = is.read(buf, 0, 100); assertEquals(100, readBytes); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java index dbcf9f6ea4e4..c3941e84cef8 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java @@ -17,6 +17,8 @@ package org.apache.hadoop.ozone.om; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.storage.BlockInputStream; import org.apache.hadoop.ozone.client.io.KeyInputStream; import org.jetbrains.annotations.NotNull; @@ -34,6 +36,8 @@ */ public class TestChunkStreams { + private OzoneConfiguration conf = new OzoneConfiguration(); + @Test public void testReadGroupInputStream() throws Exception { String dataString = RandomStringUtils.randomAscii(500); @@ -90,7 +94,10 @@ private List createInputStreams(String dataString) { } private BlockInputStream createStream(byte[] buf, int offset) { - return new BlockInputStream(null, 100, null, null, true, null) { + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(true); + return new BlockInputStream(null, 100, null, null, null, + clientConfig) { private long pos; private final ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100);