Skip to content

Commit

Permalink
[HUDI-3900] Fixing tempDir usage in TestHoodieLogFormat (apache#6981)
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan authored and satishkotha committed Dec 11, 2022
1 parent f662d81 commit bc3ce82
Showing 1 changed file with 40 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -65,7 +66,6 @@
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -106,39 +106,33 @@
public class TestHoodieLogFormat extends HoodieCommonTestHarness {

private static final HoodieLogBlockType DEFAULT_DATA_BLOCK_TYPE = HoodieLogBlockType.AVRO_DATA_BLOCK;
private static final int BUFFER_SIZE = 4096;

private static String BASE_OUTPUT_PATH = "/tmp/";
private FileSystem fs;
private static FileSystem fs;
private Path partitionPath;
private int bufferSize = 4096;
private String spillableBasePath;

@BeforeAll
public static void setUpClass() throws IOException, InterruptedException {
// Append is not supported in LocalFileSystem. HDFS needs to be setup.
MiniClusterUtil.setUp();
fs = MiniClusterUtil.fileSystem;
}

@AfterAll
public static void tearDownClass() {
MiniClusterUtil.shutdown();
fs = null;
}

@BeforeEach
public void setUp() throws IOException, InterruptedException {
this.fs = MiniClusterUtil.fileSystem;

assertTrue(fs.mkdirs(new Path(tempDir.toAbsolutePath().toString())));
this.partitionPath = new Path(tempDir.toAbsolutePath().toString());
this.basePath = tempDir.getParent().toString();
this.basePath = tempDir.toUri().getPath();
this.partitionPath = new Path(basePath, "partition_path");
this.spillableBasePath = new Path(basePath, ".spillable_path").toUri().getPath();
HoodieTestUtils.init(MiniClusterUtil.configuration, basePath, HoodieTableType.MERGE_ON_READ);
}

@AfterEach
public void tearDown() throws IOException {
fs.delete(partitionPath, true);
fs.delete(new Path(basePath), true);
}

@Test
public void testEmptyLog() throws IOException {
Writer writer =
Expand Down Expand Up @@ -588,8 +582,8 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -821,8 +815,8 @@ public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMa
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -900,8 +894,8 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -988,8 +982,8 @@ public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.D
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(true)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1067,8 +1061,8 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1114,8 +1108,8 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1209,8 +1203,8 @@ public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskM
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1314,8 +1308,8 @@ public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.Disk
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1384,8 +1378,8 @@ public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillable
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1437,8 +1431,8 @@ public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.Disk
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1509,8 +1503,8 @@ public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillabl
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1617,8 +1611,8 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalS
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1796,8 +1790,8 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogB
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(true)
Expand Down Expand Up @@ -1883,8 +1877,8 @@ private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1978,7 +1972,7 @@ public void testBasicAppendAndReadInReverse(boolean readBlocksLazily)
FileCreateUtils.createDeltaCommit(basePath, "100", fs);

HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) {
try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) {

assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock prevBlock = reader.prev();
Expand Down Expand Up @@ -2060,7 +2054,7 @@ public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily)
HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());

try (HoodieLogFileReader reader =
new HoodieLogFileReader(fs, logFile, schema, bufferSize, readBlocksLazily, true)) {
new HoodieLogFileReader(fs, logFile, schema, BUFFER_SIZE, readBlocksLazily, true)) {

assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock block = reader.prev();
Expand Down Expand Up @@ -2112,7 +2106,7 @@ public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily)

HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
try (HoodieLogFileReader reader =
new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) {
new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) {

assertTrue(reader.hasPrev(), "Third block should be available");
reader.moveToPrev();
Expand Down

0 comments on commit bc3ce82

Please sign in to comment.