Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3900] Fix tempDir usage in TestHoodieLogFormat #6981

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@

package org.apache.hudi.common.functional;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.compress.Compression;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DeleteRecord;
Expand Down Expand Up @@ -68,11 +56,19 @@
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -114,39 +110,33 @@
public class TestHoodieLogFormat extends HoodieCommonTestHarness {

private static final HoodieLogBlockType DEFAULT_DATA_BLOCK_TYPE = HoodieLogBlockType.AVRO_DATA_BLOCK;
private static final int BUFFER_SIZE = 4096;

private static String BASE_OUTPUT_PATH = "/tmp/";
private FileSystem fs;
private static FileSystem fs;
private Path partitionPath;
private int bufferSize = 4096;
private String spillableBasePath;

@BeforeAll
public static void setUpClass() throws IOException, InterruptedException {
// Append is not supported in LocalFileSystem. HDFS needs to be setup.
MiniClusterUtil.setUp();
fs = MiniClusterUtil.fileSystem;
}

@AfterAll
public static void tearDownClass() {
MiniClusterUtil.shutdown();
fs = null;
}

@BeforeEach
public void setUp() throws IOException, InterruptedException {
this.fs = MiniClusterUtil.fileSystem;

assertTrue(fs.mkdirs(new Path(tempDir.toAbsolutePath().toString())));
this.partitionPath = new Path(tempDir.toAbsolutePath().toString());
this.basePath = tempDir.getParent().toString();
this.basePath = tempDir.toUri().getPath();
this.partitionPath = new Path(basePath, "partition_path");
this.spillableBasePath = new Path(basePath, ".spillable_path").toUri().getPath();
HoodieTestUtils.init(MiniClusterUtil.configuration, basePath, HoodieTableType.MERGE_ON_READ);
}

@AfterEach
public void tearDown() throws IOException {
fs.delete(partitionPath, true);
fs.delete(new Path(basePath), true);
}

@Test
public void testEmptyLog() throws IOException {
Writer writer =
Expand Down Expand Up @@ -684,8 +674,8 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -919,8 +909,8 @@ public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMa
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1004,8 +994,8 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1094,8 +1084,8 @@ public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.D
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(true)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1175,8 +1165,8 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1223,8 +1213,8 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1338,8 +1328,8 @@ public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskM
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1444,8 +1434,8 @@ public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.Disk
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1516,8 +1506,8 @@ public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillable
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1571,8 +1561,8 @@ public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.Disk
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1645,8 +1635,8 @@ public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillabl
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1755,8 +1745,8 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalS
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1935,8 +1925,8 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogB
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(true)
Expand Down Expand Up @@ -2023,8 +2013,8 @@ private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -2122,7 +2112,7 @@ public void testBasicAppendAndReadInReverse(boolean readBlocksLazily)
FileCreateUtils.createDeltaCommit(basePath, "100", fs);

HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) {
try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) {

assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock prevBlock = reader.prev();
Expand Down Expand Up @@ -2204,7 +2194,7 @@ public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily)
HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());

try (HoodieLogFileReader reader =
new HoodieLogFileReader(fs, logFile, schema, bufferSize, readBlocksLazily, true)) {
new HoodieLogFileReader(fs, logFile, schema, BUFFER_SIZE, readBlocksLazily, true)) {

assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock block = reader.prev();
Expand Down Expand Up @@ -2256,7 +2246,7 @@ public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily)

HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
try (HoodieLogFileReader reader =
new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) {
new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) {

assertTrue(reader.hasPrev(), "Third block should be available");
reader.moveToPrev();
Expand Down