diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index f3137749b9f..1c324ac8ff5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -647,6 +648,20 @@ public boolean equals(Object obj) { uuid.equals(((DatanodeDetails) obj).uuid); } + + /** + * Checks hostname, ipAddress and port of the 2 nodes are the same. + * @param datanodeDetails dnDetails object to compare with. + * @return true if the values match otherwise false. + */ + public boolean compareNodeValues(DatanodeDetails datanodeDetails) { + if (this == datanodeDetails || super.equals(datanodeDetails)) { + return true; + } + return Objects.equals(ipAddress, datanodeDetails.ipAddress) + && Objects.equals(hostName, datanodeDetails.hostName) && Objects.equals(ports, datanodeDetails.ports); + } + @Override public int hashCode() { return uuid.hashCode(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 6e72537367c..7390de95fe9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -330,7 +330,11 @@ public List getNodesInOrder() { } void reportDatanode(DatanodeDetails dn) throws IOException { - if (nodeStatus.get(dn) == null) { + //This is a workaround for the case a datanode restarted with reinitializing it's dnId but it still reports the + // same set of pipelines it was part of. The pipeline report should be accepted for this anomalous condition. + // We rely on StaleNodeHandler in closing this pipeline eventually. + if (dn == null || (nodeStatus.get(dn) == null + && nodeStatus.keySet().stream().noneMatch(node -> node.compareNodeValues(dn)))) { throw new IOException( String.format("Datanode=%s not part of pipeline=%s", dn, id)); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index bfdff69be46..f1e637085e9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.OutputStream; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; @@ -93,7 +94,8 @@ public abstract StateMachine.DataChannel getStreamDataChannel( * * @return datanode Id */ - protected String getDatanodeId() { + @VisibleForTesting + public String getDatanodeId() { return datanodeId; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 1048ec5092c..0be2b6de6ef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -65,6 +65,7 @@ import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.utils.BufferUtils; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel; @@ -78,6 +79,7 @@ import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeer; @@ -202,6 +204,7 @@ long getStartTime() { private final boolean waitOnBothFollowers; private final HddsDatanodeService datanodeService; private static Semaphore semaphore = new Semaphore(1); + private final AtomicBoolean peersValidated; /** * CSM metrics. @@ -252,6 +255,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI HDDS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT); applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions); stateMachineHealthy = new AtomicBoolean(true); + this.peersValidated = new AtomicBoolean(false); ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat( @@ -265,6 +269,19 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI } + private void validatePeers() throws IOException { + if (this.peersValidated.get()) { + return; + } + final RaftGroup group = ratisServer.getServerDivision(getGroupId()).getGroup(); + final RaftPeerId selfId = ratisServer.getServer().getId(); + if (group.getPeer(selfId) == null) { + throw new StorageContainerException("Current datanode " + selfId + " is not a member of " + group, + ContainerProtos.Result.INVALID_CONFIG); + } + peersValidated.set(true); + } + @Override public StateMachineStorage getStateMachineStorage() { return storage; @@ -962,6 +979,11 @@ private CompletableFuture applyTransaction( final CheckedSupplier task = () -> { try { + try { + this.validatePeers(); + } catch (StorageContainerException e) { + return ContainerUtils.logAndReturnError(LOG, e, request); + } long timeNow = Time.monotonicNowNanos(); long queueingDelay = timeNow - context.getStartTime(); metrics.recordQueueingDelay(request.getCmdType(), queueingDelay); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 860615e0a4b..4cc4f63382e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -101,6 +101,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_NON_EMPTY_CONTAINER; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_ARGUMENT; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR; @@ -242,6 +243,15 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler, ContainerCommandRequestProto request, KeyValueContainer kvContainer, DispatcherContext dispatcherContext) { Type cmdType = request.getCmdType(); + // Validate the request has been made to the correct datanode with the node id matching. + if (kvContainer != null) { + try { + handler.validateRequestDatanodeId(kvContainer.getContainerData().getReplicaIndex(), + request.getDatanodeUuid()); + } catch (StorageContainerException e) { + return ContainerUtils.logAndReturnError(LOG, e, request); + } + } switch (cmdType) { case CreateContainer: @@ -353,6 +363,13 @@ ContainerCommandResponseProto handleCreateContainer( " already exists", null, CONTAINER_ALREADY_EXISTS), request); } + try { + this.validateRequestDatanodeId(request.getCreateContainer().hasReplicaIndex() ? + request.getCreateContainer().getReplicaIndex() : null, request.getDatanodeUuid()); + } catch (StorageContainerException e) { + return ContainerUtils.logAndReturnError(LOG, e, request); + } + long containerID = request.getContainerID(); State containerState = request.getCreateContainer().getState(); @@ -1532,4 +1549,22 @@ public static FaultInjector getInjector() { public static void setInjector(FaultInjector instance) { injector = instance; } + + /** + * Verify if request's replicaIndex matches with containerData. This validates only for EC containers i.e. + * containerReplicaIdx should be > 0. + * + * @param containerReplicaIdx replicaIndex for the container command. + * @param requestDatanodeUUID requested block info + * @throws StorageContainerException if replicaIndex mismatches. + */ + private boolean validateRequestDatanodeId(Integer containerReplicaIdx, String requestDatanodeUUID) + throws StorageContainerException { + if (containerReplicaIdx != null && containerReplicaIdx > 0 && !requestDatanodeUUID.equals(this.getDatanodeId())) { + throw new StorageContainerException( + String.format("Request is trying to write to node with uuid : %s but the current nodeId is: %s .", + requestDatanodeUUID, this.getDatanodeId()), INVALID_ARGUMENT); + } + return true; + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 2637f1922c6..655ecbb48b4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -68,6 +68,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; @@ -131,7 +132,13 @@ public void testHandlerCommandHandling() throws Exception { .build(); KeyValueContainer container = mock(KeyValueContainer.class); - + KeyValueContainerData containerData = mock(KeyValueContainerData.class); + Mockito.when(container.getContainerData()).thenReturn(containerData); + Mockito.when(containerData.getReplicaIndex()).thenReturn(1); + ContainerProtos.ContainerCommandResponseProto responseProto = KeyValueHandler.dispatchRequest(handler, + createContainerRequest, container, null); + assertEquals(ContainerProtos.Result.INVALID_ARGUMENT, responseProto.getResult()); + Mockito.when(handler.getDatanodeId()).thenReturn(DATANODE_UUID); KeyValueHandler .dispatchRequest(handler, createContainerRequest, container, null); verify(handler, times(0)).handleListBlock( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index b6eaca8e80d..e3759521c82 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.Duration; @@ -32,6 +33,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsUtils; @@ -40,6 +42,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; @@ -50,6 +53,7 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -264,6 +268,57 @@ public void testContainerStateMachineCloseOnMissingPipeline() key.close(); } + + @Test + public void testContainerStateMachineRestartWithDNChangePipeline() + throws Exception { + try (OzoneOutputStream key = objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey("testDNRestart", 1024, ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, + ReplicationFactor.THREE), new HashMap<>())) { + key.write("ratis".getBytes(UTF_8)); + key.flush(); + + KeyOutputStream groupOutputStream = (KeyOutputStream) key. + getOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + assertEquals(1, locationInfoList.size()); + + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + Pipeline pipeline = omKeyLocationInfo.getPipeline(); + List datanodes = + new ArrayList<>(TestHelper.getDatanodeServices(cluster, + pipeline)); + + DatanodeDetails dn = datanodes.get(0).getDatanodeDetails(); + + // Delete all data volumes. + cluster.getHddsDatanode(dn).getDatanodeStateMachine().getContainer().getVolumeSet().getVolumesList() + .stream().forEach(v -> { + try { + FileUtils.deleteDirectory(v.getStorageDir()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + // Delete datanode.id datanodeIdFile. + File datanodeIdFile = new File(HddsServerUtil.getDatanodeIdFilePath(cluster.getHddsDatanode(dn).getConf())); + boolean deleted = datanodeIdFile.delete(); + assertTrue(deleted); + cluster.restartHddsDatanode(dn, false); + GenericTestUtils.waitFor(() -> { + try { + key.write("ratis".getBytes(UTF_8)); + key.flush(); + return groupOutputStream.getLocationInfoList().size() > 1; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, 1000, 30000); + } + } + @Test public void testContainerStateMachineFailures() throws Exception { OzoneOutputStream key = diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java index 5743866f2d2..20e65291faf 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.utils.IOUtils; @@ -50,17 +52,26 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.TestHelper; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; @@ -69,8 +80,10 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; /** * Tests key output stream. @@ -91,52 +104,56 @@ public class TestECKeyOutputStream { private static int inputSize = dataBlocks * chunkSize; private static byte[][] inputChunks = new byte[dataBlocks][chunkSize]; - /** - * Create a MiniDFSCluster for testing. - */ - @BeforeAll - protected static void init() throws Exception { - chunkSize = 1024 * 1024; - flushSize = 2 * chunkSize; - maxFlushSize = 2 * flushSize; - blockSize = 2 * maxFlushSize; - - OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + private static void initConf(OzoneConfiguration configuration) { + OzoneClientConfig clientConfig = configuration.getObject(OzoneClientConfig.class); clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE); clientConfig.setStreamBufferFlushDelay(false); - conf.setFromObject(clientConfig); + configuration.setFromObject(clientConfig); // If SCM detects dead node too quickly, then container would be moved to // closed state and all in progress writes will get exception. To avoid // that, we are just keeping higher timeout and none of the tests depending // on deadnode detection timeout currently. - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); - conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS); - conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300, + configuration.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); + configuration.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS); + configuration.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300, TimeUnit.SECONDS); - conf.setTimeDuration( + configuration.set("ozone.replication.allowed-configs", "(^((STANDALONE|RATIS)/(ONE|THREE))|(EC/(3-2|6-3|10-4)-" + + "(512|1024|2048|4096|1)k)$)"); + configuration.setTimeDuration( "hdds.ratis.raft.server.notification.no-leader.timeout", 300, TimeUnit.SECONDS); - conf.setQuietMode(false); - conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, + configuration.setQuietMode(false); + configuration.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); - conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500, + configuration.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500, TimeUnit.MILLISECONDS); - conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1, + configuration.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1, TimeUnit.SECONDS); - conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10); + configuration.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10); // "Enable" hsync to verify that hsync would be blocked by ECKeyOutputStream - conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true); - conf.setBoolean("ozone.client.hbase.enhancements.allowed", true); - conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); + configuration.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true); + configuration.setBoolean("ozone.client.hbase.enhancements.allowed", true); + configuration.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); ClientConfigForTesting.newBuilder(StorageUnit.BYTES) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) .setStreamBufferMaxSize(maxFlushSize) - .applyTo(conf); + .applyTo(configuration); + } + /** + * Create a MiniDFSCluster for testing. + */ + @BeforeAll + protected static void init() throws Exception { + chunkSize = 1024 * 1024; + flushSize = 2 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + initConf(conf); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(10) .build(); @@ -172,6 +189,90 @@ public void testCreateKeyWithECReplicationConfig() throws Exception { } } + @Test + public void testECKeyCreatetWithDatanodeIdChange() + throws Exception { + AtomicReference failed = new AtomicReference<>(false); + AtomicReference miniOzoneCluster = new AtomicReference<>(); + OzoneClient client1 = null; + try (MockedStatic mockedHandler = Mockito.mockStatic(Handler.class, Mockito.CALLS_REAL_METHODS)) { + Map handlers = new HashMap<>(); + mockedHandler.when(() -> Handler.getHandlerForContainerType(any(), any(), any(), any(), any(), any(), any())) + .thenAnswer(i -> { + Handler handler = Mockito.spy((Handler) i.callRealMethod()); + handlers.put(handler.getDatanodeId(), handler); + return handler; + }); + OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); + initConf(ozoneConfiguration); + miniOzoneCluster.set(MiniOzoneCluster.newBuilder(ozoneConfiguration).setNumDatanodes(10).build()); + miniOzoneCluster.get().waitForClusterToBeReady(); + client1 = miniOzoneCluster.get().newClient(); + ObjectStore store = client1.getObjectStore(); + store.createVolume(volumeName); + store.getVolume(volumeName).createBucket(bucketName); + OzoneOutputStream key = TestHelper.createKey(keyString, new ECReplicationConfig(3, 2, + ECReplicationConfig.EcCodec.RS, 1024), inputSize, store, volumeName, bucketName); + byte[] b = new byte[6 * 1024]; + ECKeyOutputStream groupOutputStream = (ECKeyOutputStream) key.getOutputStream(); + List locationInfoList = groupOutputStream.getLocationInfoList(); + while (locationInfoList.isEmpty()) { + locationInfoList = groupOutputStream.getLocationInfoList(); + Random random = new Random(); + random.nextBytes(b); + assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream()); + key.write(b); + key.flush(); + } + + assertEquals(1, locationInfoList.size()); + + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + long containerId = omKeyLocationInfo.getContainerID(); + Pipeline pipeline = omKeyLocationInfo.getPipeline(); + DatanodeDetails dnWithReplicaIndex1 = + pipeline.getReplicaIndexes().entrySet().stream().filter(e -> e.getValue() == 1).map(Map.Entry::getKey) + .findFirst().get(); + Mockito.when(handlers.get(dnWithReplicaIndex1.getUuidString()).getDatanodeId()) + .thenAnswer(i -> { + if (!failed.get()) { + // Change dnId for one write chunk request. + failed.set(true); + return dnWithReplicaIndex1.getUuidString() + "_failed"; + } else { + return dnWithReplicaIndex1.getUuidString(); + } + }); + locationInfoList = groupOutputStream.getLocationInfoList(); + while (locationInfoList.size() == 1) { + locationInfoList = groupOutputStream.getLocationInfoList(); + Random random = new Random(); + random.nextBytes(b); + assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream()); + key.write(b); + key.flush(); + } + assertEquals(2, locationInfoList.size()); + assertNotEquals(locationInfoList.get(1).getPipeline().getId(), pipeline.getId()); + GenericTestUtils.waitFor(() -> { + try { + return miniOzoneCluster.get().getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueOf(containerId)).getState().equals( + HddsProtos.LifeCycleState.CLOSED); + } catch (ContainerNotFoundException e) { + throw new RuntimeException(e); + } + }, 1000, 30000); + key.close(); + Assertions.assertTrue(failed.get()); + } finally { + IOUtils.closeQuietly(client1); + if (miniOzoneCluster.get() != null) { + miniOzoneCluster.get().shutdown(); + } + } + } + @Test public void testCreateKeyWithOutBucketDefaults() throws Exception { OzoneVolume volume = objectStore.getVolume(volumeName);