diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index ae01c6da756..569442268af 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -181,7 +181,7 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
     constructionLock.writeLock().lock();
     try {
       container = new OzoneContainer(hddsDatanodeService, this.datanodeDetails,
-          conf, context, certClient, secretKeyClient);
+          conf, context, certClient, secretKeyClient, connectionManager);
     } finally {
       constructionLock.writeLock().unlock();
     }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
index 86fa84c34e4..968c9b9a6e6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
@@ -16,16 +16,11 @@
  */
 package org.apache.hadoop.ozone.container.common.states.endpoint;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
-import java.util.Arrays;
-import java.util.Objects;
 import java.util.concurrent.Callable;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
@@ -99,20 +94,6 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
           layoutStorage.setClusterId(clusterId);
           layoutStorage.persistCurrentState();
 
-          HddsProtos.NodeState nodePreviousState = rpcEndPoint.getEndPoint()
-              .getNodePreviousState(ozoneContainer.getDatanodeDetails().getUuid());
-
-          if (nodePreviousState != null && nodePreviousState.equals(HddsProtos.NodeState.DEAD)) {
-            ozoneContainer.getMetaVolumeSet().getVolumeMap().forEach((key, value) ->
-                Arrays.asList(Objects.requireNonNull(value.getStorageDir().listFiles())).stream().filter(File::isDirectory).forEach(f -> {
-                  try {
-                    FileUtils.deleteDirectory(f);
-                  } catch (IOException e) {
-                    LOG.warn("Failed to delete directory {}", f.getAbsolutePath(), e);
-                  }
-                }));
-          }
-
           // Start the container services after getting the version information
           ozoneContainer.start(clusterId);
         }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 369b4a90030..240e09ab42d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -21,10 +21,13 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
@@ -49,6 +52,7 @@
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -73,14 +77,19 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -135,6 +144,7 @@ public class OzoneContainer {
   private ScheduledExecutorService dbCompactionExecutorService;
 
   private final ContainerMetrics metrics;
+  private final SCMConnectionManager scmConnectionManager;
   private WitnessedContainerMetadataStore witnessedContainerMetadataStore;
 
   enum InitializingStatus {
@@ -151,12 +161,14 @@ enum InitializingStatus {
    * @throws IOException
    */
   public OzoneContainer(HddsDatanodeService hddsDatanodeService,
-      DatanodeDetails datanodeDetails, ConfigurationSource conf,
-      StateContext context, CertificateClient certClient,
-      SecretKeyVerifierClient secretKeyClient) throws IOException {
+                        DatanodeDetails datanodeDetails, ConfigurationSource conf,
+                        StateContext context, CertificateClient certClient,
+                        SecretKeyVerifierClient secretKeyClient,
+                        SCMConnectionManager scmConnectionManager) throws IOException {
     config = conf;
     this.datanodeDetails = datanodeDetails;
     this.context = context;
+    this.scmConnectionManager = scmConnectionManager;
     this.volumeChecker = new StorageVolumeChecker(conf, new Timer(),
         datanodeDetails.threadNamePrefix());
 
@@ -229,6 +241,8 @@ public OzoneContainer(HddsDatanodeService hddsDatanodeService,
      */
     controller = new ContainerController(containerSet, handlers);
 
+    cleanUpRatisMetadataDirectory();
+
     writeChannel = XceiverServerRatis.newXceiverServerRatis(hddsDatanodeService,
         datanodeDetails, config, hddsDispatcher, controller, certClient,
         context);
@@ -300,7 +314,7 @@ public OzoneContainer(HddsDatanodeService hddsDatanodeService,
   public OzoneContainer(
       DatanodeDetails datanodeDetails, ConfigurationSource conf,
       StateContext context) throws IOException {
-    this(null, datanodeDetails, conf, context, null, null);
+    this(null, datanodeDetails, conf, context, null, null, null);
   }
 
   public GrpcTlsConfig getTlsClientConfig() {
@@ -659,4 +673,49 @@ public void compactDb() {
   public DatanodeDetails getDatanodeDetails() {
     return datanodeDetails;
   }
+
+  private void cleanUpRatisMetadataDirectory()
+      throws IOException {
+    if (scmConnectionManager != null) {
+      Collection<InetSocketAddress> scmAddressesForDatanodes = HddsUtils.getSCMAddressForDatanodes(config);
+      for (InetSocketAddress scmAddress : scmAddressesForDatanodes) {
+        scmConnectionManager.addSCMServer(scmAddress, context.getThreadNamePrefix());
+        context.addEndpoint(scmAddress);
+      }
+
+      scmConnectionManager.getValues().stream()
+          .filter(endPoint -> !endPoint.isPassive())
+          .findFirst()
+          .ifPresent(rpcEndPoint -> {
+            try {
+              // Check the previous state of the datanode stored on the SCM side
+              HddsProtos.NodeState nodePreviousState = rpcEndPoint.getEndPoint()
+                  .getNodeState(datanodeDetails.getUuid());
+
+              if (HddsProtos.NodeState.DEAD.equals(nodePreviousState)) {
+                LOG.info("The node previous state is DEAD, let's clean up the RATIS/THREE pipelines");
+                // OK, the node was previously marked as DEAD, let's clean up the
+                // RATIS/THREE pipelines (aka raft-groups)
+                this.getMetaVolumeSet().getVolumeMap().forEach((key, value) ->
+                    Arrays.stream(Objects.requireNonNull(value.getStorageDir()
+                            // don't touch the directory with volume check info
+                            .listFiles((dir, name) -> !name.equals("tmp"))))
+                        .filter(File::isDirectory) // only directories
+                        .forEach(directory -> {
+                          try {
+                            FileUtils.deleteDirectory(directory);
+                            LOG.info("Delete directory: {}", directory.getAbsolutePath());
+                          } catch (IOException e) {
+                            LOG.warn("Failed to delete directory: {}", directory);
+                          }
+                        }));
+              }
+
+            } catch (IOException e) {
+              LOG.error(String.format("Failed to get datanode previous state with SCM: %s", e.getMessage()), e);
+            }
+          });
+    }
+  }
+
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
index 9825dc004d7..e8587000b9f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
@@ -18,8 +18,8 @@
 
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ExtendedDatanodeDetailsProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodePreviousStateResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeStateRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeStateResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -91,9 +91,6 @@ SCMRegisteredResponseProto register(
       PipelineReportsProto pipelineReports,
       LayoutVersionProto layoutInfo) throws IOException;
 
-  NodePreviousStateResponseProto
-      getNodePreviousState(
-      StorageContainerDatanodeProtocolProtos.NodePreviousStateRequestProto
-          request) throws IOException;
+  NodeStateResponseProto getNodeState(NodeStateRequestProto request) throws IOException;
 
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index b81b47429dd..c6416917f8c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -21,10 +21,9 @@
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos
     .ExtendedDatanodeDetailsProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodePreviousStateRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodePreviousStateResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeStateRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeStateResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
@@ -184,17 +183,17 @@ public SCMRegisteredResponseProto register(
         .getRegisterResponse();
   }
 
-  public HddsProtos.NodeState getNodePreviousState(UUID datanodeUuid) throws IOException {
-    NodePreviousStateRequestProto request = NodePreviousStateRequestProto.newBuilder()
+  public HddsProtos.NodeState getNodeState(UUID datanodeUuid) throws IOException {
+    NodeStateRequestProto request = NodeStateRequestProto.newBuilder()
         .setDatanodeUUID(datanodeUuid.toString())
         .build();
-    return getNodePreviousState(request).getPreviousState();
+    return getNodeState(request).getNodeState();
 
   }
 
   @Override
-  public NodePreviousStateResponseProto getNodePreviousState(NodePreviousStateRequestProto request) throws IOException {
-    return submitRequest(Type.NodePreviousState, builder -> builder.setNodePreviousStateRequest(request))
-        .getNodePreviousStateResponse();
+  public NodeStateResponseProto getNodeState(NodeStateRequestProto request) throws IOException {
+    return submitRequest(Type.NodePreviousState, builder -> builder.setNodeStateRequest(request))
+        .getNodeStateResponse();
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index 7d724261e0b..1d352515317 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -122,8 +122,8 @@ public SCMDatanodeResponse processMessage(SCMDatanodeRequest request)
         return SCMDatanodeResponse.newBuilder()
             .setCmdType(cmdType)
             .setStatus(Status.OK)
-            .setNodePreviousStateResponse(
-                impl.getNodePreviousState(request.getNodePreviousStateRequest()))
+            .setNodeStateResponse(
+                impl.getNodeState(request.getNodeStateRequest()))
             .build();
       default:
         throw new ServiceException("Unknown command type: " + cmdType);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 8e148d9e537..bdc534f1cb7 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -26,6 +26,8 @@
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeStateRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeStateResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -256,8 +258,8 @@ private void sleepIfNeeded() {
   }
 
   @Override
-  public StorageContainerDatanodeProtocolProtos.NodePreviousStateResponseProto getNodePreviousState(
-      StorageContainerDatanodeProtocolProtos.NodePreviousStateRequestProto request) throws IOException {
+  public NodeStateResponseProto getNodeState(
+      NodeStateRequestProto request) throws IOException {
     return null;
   }
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 2f2cbc81e90..c25816178e7 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -37,6 +38,10 @@
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
@@ -46,13 +51,22 @@
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
@@ -63,7 +77,17 @@
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DISK_OUT_OF_SPACE;
 import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
 
 /**
  * This class is used to test OzoneContainer.
@@ -251,6 +275,67 @@ public void testContainerCreateDiskFull(ContainerTestVersionInfo versionInfo)
     assertEquals(DISK_OUT_OF_SPACE, e.getResult());
   }
 
+  @Test
+  public void testCleanUpMetadataDirInCaseOfDeadNodeState() throws Exception {
+    // given
+    doReturn("7292dd16-71bf-4e43-bd96-c68c3cef9bb7").when(datanodeDetails).getUuidString();
+    initTest(ContainerTestVersionInfo.getLayoutList().get(0));
+
+    // metadata directory bellow contains the following 4 pipelines
+    List<String> pipelinesShouldBeDeleted = new ArrayList<String>(4) {{
+        add("1d076d52-0072-4adc-938c-8de43cb2f606");
+        add("1ebaa8d2-4a76-4662-8fbf-08d851b2e261");
+        add("2b762e18-1ca5-4da8-bb78-d45121efbebe");
+        add("2d79a291-3b23-4af4-a632-8e265eba5735");
+      }};
+
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, folder.resolve("MetadataDir").toString());
+    conf.set(ScmConfigKeys.OZONE_SCM_NAMES, "localhost");
+
+    // copy the metadata directory mentioned above to the tmp directory
+    FileUtils.copyDirectory(new File(Objects.requireNonNull(getClass().getClassLoader().getResource("metadata"))
+            .toURI()), new File(folder.toFile().getAbsolutePath() + "/MetadataDir"));
+
+    DatanodeStateMachine datanodeStateMachine = mock(DatanodeStateMachine.class);
+    when(datanodeStateMachine.getDatanodeDetails()).thenReturn(datanodeDetails);
+    StateContext context = mock(StateContext.class);
+    when(context.getParent()).thenReturn(datanodeStateMachine);
+    SCMConnectionManager scmConnectionManager = spy(new SCMConnectionManager(conf));
+
+    doAnswer(invocation -> {
+      Collection<EndpointStateMachine> addresses = (Collection<EndpointStateMachine>) invocation.callRealMethod();
+      EndpointStateMachine rpcEndpoint = spy((EndpointStateMachine) addresses.toArray()[0]);
+      StorageContainerDatanodeProtocolClientSideTranslatorPB scmDatanodeProtocol =
+          mock(StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+      doReturn(scmDatanodeProtocol).when(rpcEndpoint).getEndPoint();
+      doReturn(HddsProtos.NodeState.DEAD).when(scmDatanodeProtocol)
+          .getNodeState(any(UUID.class));
+      return Collections.singletonList(rpcEndpoint);
+    }).when(scmConnectionManager).getValues();
+
+    try (MockedStatic<FileUtils> fileUtilsMock = Mockito.mockStatic(FileUtils.class, CALLS_REAL_METHODS)) {
+      // when
+      new OzoneContainer(null, datanodeDetails, conf, context, null, null, scmConnectionManager);
+      ArgumentCaptor<File> fileCaptor = ArgumentCaptor.forClass(File.class);
+
+      // then
+      List<File> files = Arrays.asList(Objects.requireNonNull(new File(folder.toFile().getAbsolutePath()
+          + "/MetadataDir/ratis").listFiles()));
+      assertEquals(2, files.size());
+      files.forEach(file -> assertTrue(file.getName().equals("tmp") || file.getName().equals("scmUsed")));
+
+
+      // and then
+      fileUtilsMock.verify(() -> FileUtils.deleteDirectory(fileCaptor.capture()), times(4));
+      fileCaptor.getAllValues().forEach(pipelineDirectory -> {
+        String remove = pipelinesShouldBeDeleted.remove(pipelinesShouldBeDeleted.indexOf(pipelineDirectory.getName()));
+        assertNotNull(remove);
+      });
+      assertEquals(0, pipelinesShouldBeDeleted.size());
+    }
+
+  }
+
   //verify committed space on each volume
   private void verifyCommittedSpace(OzoneContainer oc) {
     List<HddsVolume> volumes = StorageVolumeUtil.getHddsVolumesList(
@@ -328,6 +413,6 @@ private DatanodeDetails createDatanodeDetails() {
         .addPort(containerPort)
         .addPort(ratisPort)
         .addPort(restPort);
-    return builder.build();
+    return spy(builder.build());
   }
 }
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/datanode.id b/hadoop-hdds/container-service/src/test/resources/metadata/datanode.id
new file mode 100644
index 00000000000..8e969f3e271
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/datanode.id
@@ -0,0 +1,14 @@
+!!org.apache.hadoop.ozone.container.common.helpers.DatanodeIdYaml$DatanodeDetailsYaml {
+  certSerialId: null,
+  currentVersion: 2,
+  hostName: 748343a3b913,
+  initialVersion: 2,
+  ipAddress: 172.25.0.110,
+  persistedOpState: IN_SERVICE,
+  persistedOpStateExpiryEpochSec: 0,
+  portDetails: {
+    HTTP: 9882,
+    CLIENT_RPC: 19864
+  },
+  uuid: 7292dd16-71bf-4e43-bd96-c68c3cef9bb7
+}
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/current/log_inprogress_0 b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/current/log_inprogress_0
new file mode 100644
index 00000000000..b1fb346708f
Binary files /dev/null and b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/current/log_inprogress_0 differ
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/current/raft-meta b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/current/raft-meta
new file mode 100644
index 00000000000..6412bfea46f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/current/raft-meta
@@ -0,0 +1,4 @@
+#
+#Fri Jan 17 11:20:04 UTC 2025
+term=1
+votedFor=7292dd16-71bf-4e43-bd96-c68c3cef9bb7
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/current/raft-meta.conf b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/current/raft-meta.conf
new file mode 100644
index 00000000000..dfe7a24615b
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/current/raft-meta.conf
@@ -0,0 +1,7 @@
+"ä
+v
+$7292dd16-71bf-4e43-bd96-c68c3cef9bb7172.25.0.110:9856"172.25.0.110:9855*172.25.0.110:98582172.25.0.110:98578
+t
+$bbba9aa0-f0d2-48fb-ba06-00a7e80ff3ab172.25.0.111:9856"172.25.0.111:9855*172.25.0.111:98582172.25.0.111:98578
+t
+$d1053a31-ba99-48a9-aeca-319a62209c34172.25.0.112:9856"172.25.0.112:9855*172.25.0.112:98582172.25.0.112:98578
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/in_use.lock b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/in_use.lock
new file mode 100644
index 00000000000..f4c68218790
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1d076d52-0072-4adc-938c-8de43cb2f606/in_use.lock
@@ -0,0 +1 @@
+14723@748343a3b913
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/current/log_inprogress_0 b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/current/log_inprogress_0
new file mode 100644
index 00000000000..b1fb346708f
Binary files /dev/null and b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/current/log_inprogress_0 differ
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/current/raft-meta b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/current/raft-meta
new file mode 100644
index 00000000000..7937ef0d7fd
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/current/raft-meta
@@ -0,0 +1,4 @@
+#
+#Fri Jan 17 11:20:03 UTC 2025
+term=1
+votedFor=7292dd16-71bf-4e43-bd96-c68c3cef9bb7
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/current/raft-meta.conf b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/current/raft-meta.conf
new file mode 100644
index 00000000000..dfe7a24615b
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/current/raft-meta.conf
@@ -0,0 +1,7 @@
+"ä
+v
+$7292dd16-71bf-4e43-bd96-c68c3cef9bb7172.25.0.110:9856"172.25.0.110:9855*172.25.0.110:98582172.25.0.110:98578
+t
+$bbba9aa0-f0d2-48fb-ba06-00a7e80ff3ab172.25.0.111:9856"172.25.0.111:9855*172.25.0.111:98582172.25.0.111:98578
+t
+$d1053a31-ba99-48a9-aeca-319a62209c34172.25.0.112:9856"172.25.0.112:9855*172.25.0.112:98582172.25.0.112:98578
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/in_use.lock b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/in_use.lock
new file mode 100644
index 00000000000..f4c68218790
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/1ebaa8d2-4a76-4662-8fbf-08d851b2e261/in_use.lock
@@ -0,0 +1 @@
+14723@748343a3b913
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/current/log_inprogress_0 b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/current/log_inprogress_0
new file mode 100644
index 00000000000..e1ccf8c96c7
Binary files /dev/null and b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/current/log_inprogress_0 differ
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/current/raft-meta b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/current/raft-meta
new file mode 100644
index 00000000000..b05ae94dc5f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/current/raft-meta
@@ -0,0 +1,4 @@
+#
+#Fri Jan 17 11:20:06 UTC 2025
+term=1
+votedFor=d1053a31-ba99-48a9-aeca-319a62209c34
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/current/raft-meta.conf b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/current/raft-meta.conf
new file mode 100644
index 00000000000..313dd89d868
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/current/raft-meta.conf
@@ -0,0 +1,7 @@
+"ä
+t
+$7292dd16-71bf-4e43-bd96-c68c3cef9bb7172.25.0.110:9856"172.25.0.110:9855*172.25.0.110:98582172.25.0.110:98578
+t
+$bbba9aa0-f0d2-48fb-ba06-00a7e80ff3ab172.25.0.111:9856"172.25.0.111:9855*172.25.0.111:98582172.25.0.111:98578
+v
+$d1053a31-ba99-48a9-aeca-319a62209c34172.25.0.112:9856"172.25.0.112:9855*172.25.0.112:98582172.25.0.112:98578
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/in_use.lock b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/in_use.lock
new file mode 100644
index 00000000000..f4c68218790
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2b762e18-1ca5-4da8-bb78-d45121efbebe/in_use.lock
@@ -0,0 +1 @@
+14723@748343a3b913
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/current/log_inprogress_0 b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/current/log_inprogress_0
new file mode 100644
index 00000000000..b1fb346708f
Binary files /dev/null and b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/current/log_inprogress_0 differ
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/current/raft-meta b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/current/raft-meta
new file mode 100644
index 00000000000..7eff992cc54
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/current/raft-meta
@@ -0,0 +1,4 @@
+#
+#Fri Jan 17 11:20:05 UTC 2025
+term=1
+votedFor=7292dd16-71bf-4e43-bd96-c68c3cef9bb7
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/current/raft-meta.conf b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/current/raft-meta.conf
new file mode 100644
index 00000000000..dfe7a24615b
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/current/raft-meta.conf
@@ -0,0 +1,7 @@
+"ä
+v
+$7292dd16-71bf-4e43-bd96-c68c3cef9bb7172.25.0.110:9856"172.25.0.110:9855*172.25.0.110:98582172.25.0.110:98578
+t
+$bbba9aa0-f0d2-48fb-ba06-00a7e80ff3ab172.25.0.111:9856"172.25.0.111:9855*172.25.0.111:98582172.25.0.111:98578
+t
+$d1053a31-ba99-48a9-aeca-319a62209c34172.25.0.112:9856"172.25.0.112:9855*172.25.0.112:98582172.25.0.112:98578
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/in_use.lock b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/in_use.lock
new file mode 100644
index 00000000000..f4c68218790
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/2d79a291-3b23-4af4-a632-8e265eba5735/in_use.lock
@@ -0,0 +1 @@
+14723@748343a3b913
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/scmUsed b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/scmUsed
new file mode 100644
index 00000000000..1dcb13253b5
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/scmUsed
@@ -0,0 +1 @@
+1228800 1737112073880
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/resources/metadata/ratis/tmp/disk-check/disk-check-9cdafe5f-536e-4668-969a-c06ed9bf4919 b/hadoop-hdds/container-service/src/test/resources/metadata/ratis/tmp/disk-check/disk-check-9cdafe5f-536e-4668-969a-c06ed9bf4919
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 5d3fd9e5eec..78a3a1a5fb6 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -44,7 +44,7 @@ message SCMDatanodeRequest {
   optional SCMVersionRequestProto getVersionRequest = 3;
   optional SCMRegisterRequestProto registerRequest = 4;
   optional SCMHeartbeatRequestProto sendHeartbeatRequest = 5;
-  optional NodePreviousStateRequestProto nodePreviousStateRequest = 6;
+  optional NodeStateRequestProto nodeStateRequest = 6;
 }
 
 message SCMDatanodeResponse {
@@ -61,7 +61,7 @@ message SCMDatanodeResponse {
   optional SCMVersionResponseProto getVersionResponse = 6;
   optional SCMRegisteredResponseProto registerResponse = 7;
   optional SCMHeartbeatResponseProto sendHeartbeatResponse = 8;
-  optional NodePreviousStateResponseProto nodePreviousStateResponse = 9;
+  optional NodeStateResponseProto nodeStateResponse = 9;
 
 }
 
@@ -127,7 +127,7 @@ message SCMRegisteredResponseProto {
   optional string networkLocation = 8;
 }
 
-message NodePreviousStateRequestProto {
+message NodeStateRequestProto {
   required string datanodeUUID = 1;
 }
 
@@ -164,8 +164,8 @@ message SCMHeartbeatResponseProto {
   optional int64 term = 3;
 }
 
-message NodePreviousStateResponseProto {
-  optional NodeState previousState = 1;
+message NodeStateResponseProto {
+  optional NodeState nodeState = 1;
 }
 
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index e2e59344423..3c6acbc5d78 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -37,9 +37,9 @@
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodePreviousStateRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodePreviousStateResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeStateRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeStateResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconstructECContainersCommandProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
@@ -279,7 +279,7 @@ public SCMRegisteredResponseProto register(
   }
 
   @Override
-  public NodePreviousStateResponseProto getNodePreviousState(NodePreviousStateRequestProto request) throws IOException {
+  public NodeStateResponseProto getNodeState(NodeStateRequestProto request) throws IOException {
     NodeStatus nodeStatus = null;
     try {
       DatanodeDetails nodeByUuid = scm.getScmNodeManager().getNodeByUuid(request.getDatanodeUUID());
@@ -289,9 +289,9 @@ public NodePreviousStateResponseProto getNodePreviousState(NodePreviousStateRequ
     } catch (NodeNotFoundException e) {
       LOG.warn("Node not found for UUID: {}", request.getDatanodeUUID());
     }
-    NodePreviousStateResponseProto.Builder builder = NodePreviousStateResponseProto.newBuilder();
+    NodeStateResponseProto.Builder builder = NodeStateResponseProto.newBuilder();
     if (nodeStatus != null) {
-      builder.setPreviousState(nodeStatus.getHealth());
+      builder.setNodeState(nodeStatus.getHealth());
     }
     return builder.build();
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index 50cb8f6b9be..963f958e2b8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -311,7 +311,7 @@ private OzoneContainer createAndStartOzoneContainerInstance() {
     try {
       StateContext stateContext = ContainerTestUtils.getMockContext(dn, conf);
       container = new OzoneContainer(
-          null, dn, conf, stateContext, caClient, keyClient);
+          null, dn, conf, stateContext, caClient, keyClient, null);
       MutableVolumeSet volumeSet = container.getVolumeSet();
       StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
           .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempFolder.toFile()));
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java
index 262d3026e78..8a4c3a15bcf 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java
@@ -136,7 +136,7 @@ void testCreateOzoneContainer(boolean requireToken, boolean hasToken,
 
       DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
       container = new OzoneContainer(null, dn, conf, ContainerTestUtils
-          .getMockContext(dn, conf), caClient, secretKeyClient);
+          .getMockContext(dn, conf), caClient, secretKeyClient, null);
       MutableVolumeSet volumeSet = container.getVolumeSet();
       StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
           .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempFolder.toFile()));