Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11391. Frequent Ozone DN Crashes During OM + DN Decommission with Freon #7154

Merged
merged 10 commits into from
Sep 5, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerDataProto.State.RECOVERING;
import static org.apache.hadoop.ozone.ClientVersion.EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;

import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -547,9 +548,13 @@ ContainerCommandResponseProto handlePutBlock(

boolean endOfBlock = false;
if (!request.getPutBlock().hasEof() || request.getPutBlock().getEof()) {
// in EC, we will be doing empty put block.
// So, let's flush only when there are any chunks
if (!request.getPutBlock().getBlockData().getChunksList().isEmpty()) {
// There are two cases where client sends empty put block with eof.
// (1) An EC empty file. In this case, the block/chunk file does not exist,
// so no need to flush/close the file.
// (2) Ratis output stream in incremental chunk list mode may send empty put block
// to close the block, in which case we need to flush/close the file.
if (!request.getPutBlock().getBlockData().getChunksList().isEmpty() ||
blockData.getMetadata().containsKey(INCREMENTAL_CHUNK_LIST)) {
chunkManager.finishWriteChunks(kvContainer, blockData);
}
endOfBlock = true;
Expand Down Expand Up @@ -903,6 +908,9 @@ ContainerCommandResponseProto handleWriteChunk(
// of order.
blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex());
boolean eob = writeChunk.getBlock().getEof();
if (eob) {
chunkManager.finishWriteChunks(kvContainer, blockData);
}
blockManager.putBlock(kvContainer, blockData, eob);
blockDataProto = blockData.getProtoBufMessage();
final long numBytes = blockDataProto.getSerializedSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.UUID;
Expand All @@ -53,6 +58,8 @@
* Helpers for ChunkManager implementation tests.
*/
public abstract class AbstractTestChunkManager {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractTestChunkManager.class);

private HddsVolume hddsVolume;
private KeyValueContainerData keyValueContainerData;
Expand Down Expand Up @@ -128,6 +135,55 @@ protected void checkChunkFileCount(int expected) {
assertEquals(expected, files.length);
}

/**
* Helper method to check if a file is in use.
*/
public static boolean isFileNotInUse(String filePath) {
try {
Process process = new ProcessBuilder("fuser", filePath).start();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), UTF_8))) {
String output = reader.readLine(); // If fuser returns no output, the file is not in use
if (output == null) {
return true;
}
LOG.debug("File is in use: {}", filePath);
return false;
} finally {
process.destroy();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a finally block to cleanup, but we would need to define process outside the try block then:
finally { if (process != null) { process.destroy(); } }

} catch (IOException e) {
LOG.warn("Failed to check if file is in use: {}", filePath, e);
return false; // On failure, assume the file is in use
}
}

protected boolean checkChunkFilesClosed() {
return checkChunkFilesClosed(keyValueContainerData.getChunksPath());
}

/**
* check that all files under chunk path are closed.
*/
public static boolean checkChunkFilesClosed(String path) {
//As in Setup, we try to create container, these paths should exist.
assertNotNull(path);

File dir = new File(path);
assertTrue(dir.exists());

File[] files = dir.listFiles();
assertNotNull(files);
for (File file : files) {
assertTrue(file.exists());
assertTrue(file.isFile());
// check that the file is closed.
if (!isFileNotInUse(file.getAbsolutePath())) {
return false;
}
}
return true;
}

protected void checkWriteIOStats(long length, long opCount) {
VolumeIOStats volumeIOStats = hddsVolume.getVolumeIOStats();
assertEquals(length, volumeIOStats.getWriteBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
Expand All @@ -39,7 +40,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.when;

/**
* Common test cases for ChunkManager implementation tests.
Expand Down Expand Up @@ -222,4 +225,26 @@ public void testWriteAndReadChunkMultipleTimes() throws Exception {
checkReadIOStats(len * count, count);
}

@Test
public void testFinishWrite() throws Exception {
// GIVEN
ChunkManager chunkManager = createTestSubject();
checkChunkFileCount(0);
checkWriteIOStats(0, 0);

chunkManager.writeChunk(getKeyValueContainer(), getBlockID(),
getChunkInfo(), getData(),
WRITE_STAGE);

BlockData blockData = Mockito.mock(BlockData.class);
when(blockData.getBlockID()).thenReturn(getBlockID());

chunkManager.finishWriteChunks(getKeyValueContainer(), blockData);
assertTrue(checkChunkFilesClosed());

// THEN
checkChunkFileCount(1);
checkWriteIOStats(getChunkInfo().getLen(), 1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.hadoop.fs.StreamCapabilities;

import org.apache.hadoop.ozone.ClientConfigForTesting;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
Expand All @@ -83,7 +84,9 @@
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.impl.AbstractTestChunkManager;
import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
Expand All @@ -93,6 +96,7 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;

import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.service.OpenKeyCleanupService;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -320,6 +324,8 @@ private void waitForEmptyDeletedTable()
}

@Test
// Making this the second test to be run to avoid lingering block files from previous tests
@Order(2)
public void testEmptyHsync() throws Exception {
// Check that deletedTable should not have keys with the same block as in
// keyTable's when a key is hsync()'ed then close()'d.
Expand Down Expand Up @@ -358,10 +364,16 @@ public void testKeyHSyncThenClose() throws Exception {
String data = "random data";
final Path file = new Path(dir, "file-hsync-then-close");
try (FileSystem fs = FileSystem.get(CONF)) {
String chunkPath;
try (FSDataOutputStream outputStream = fs.create(file, true)) {
outputStream.write(data.getBytes(UTF_8), 0, data.length());
outputStream.hsync();
// locate the container chunk path on the first DataNode.
chunkPath = getChunkPathOnDataNode(outputStream);
assertFalse(AbstractTestChunkManager.checkChunkFilesClosed(chunkPath));
}
// After close, the chunk file should be closed.
assertTrue(AbstractTestChunkManager.checkChunkFilesClosed(chunkPath));
}

OzoneManager ozoneManager = cluster.getOzoneManager();
Expand All @@ -387,6 +399,22 @@ public void testKeyHSyncThenClose() throws Exception {
}
}

private static String getChunkPathOnDataNode(FSDataOutputStream outputStream)
throws IOException {
String chunkPath;
KeyOutputStream groupOutputStream =
((OzoneFSOutputStream) outputStream.getWrappedStream()).getWrappedOutputStream().getKeyOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo, cluster);
chunkPath = dn.getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID()).
getContainerData().getChunksPath();
return chunkPath;
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testO3fsHSync(boolean incrementalChunkList) throws Exception {
Expand Down