Skip to content

Commit

Permalink
fixing base path
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan authored and xushiyan committed Oct 19, 2022
1 parent 048299e commit b209e5a
Showing 1 changed file with 49 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@

package org.apache.hudi.common.functional;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.compress.Compression;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DeleteRecord;
Expand Down Expand Up @@ -68,11 +56,19 @@
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -114,39 +110,33 @@
public class TestHoodieLogFormat extends HoodieCommonTestHarness {

private static final HoodieLogBlockType DEFAULT_DATA_BLOCK_TYPE = HoodieLogBlockType.AVRO_DATA_BLOCK;
private static final int BUFFER_SIZE = 4096;

private static String BASE_OUTPUT_PATH = "/tmp/";
private FileSystem fs;
private static FileSystem fs;
private Path partitionPath;
private int bufferSize = 4096;
private String spillableBasePath;

@BeforeAll
public static void setUpClass() throws IOException, InterruptedException {
// Append is not supported in LocalFileSystem. HDFS needs to be setup.
MiniClusterUtil.setUp();
fs = MiniClusterUtil.fileSystem;
}

@AfterAll
public static void tearDownClass() {
MiniClusterUtil.shutdown();
fs = null;
}

@BeforeEach
public void setUp() throws IOException, InterruptedException {
this.fs = MiniClusterUtil.fileSystem;

assertTrue(fs.mkdirs(new Path(tempDir.toAbsolutePath().toString())));
this.partitionPath = new Path(tempDir.toAbsolutePath().toString());
this.basePath = tempDir.getParent().toString();
this.basePath = tempDir.toUri().getPath();
this.partitionPath = new Path(basePath, "partition_path");
this.spillableBasePath = new Path(basePath, ".spillable_path").toUri().getPath();
HoodieTestUtils.init(MiniClusterUtil.configuration, basePath, HoodieTableType.MERGE_ON_READ);
}

@AfterEach
public void tearDown() throws IOException {
fs.delete(partitionPath, true);
fs.delete(new Path(basePath), true);
}

@Test
public void testEmptyLog() throws IOException {
Writer writer =
Expand Down Expand Up @@ -684,8 +674,8 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -919,8 +909,8 @@ public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMa
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1004,8 +994,8 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1094,8 +1084,8 @@ public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.D
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(true)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1175,8 +1165,8 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1223,8 +1213,8 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1338,8 +1328,8 @@ public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskM
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build();
Expand Down Expand Up @@ -1444,8 +1434,8 @@ public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.Disk
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1516,8 +1506,8 @@ public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillable
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1571,8 +1561,8 @@ public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.Disk
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1645,8 +1635,8 @@ public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillabl
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1755,8 +1745,8 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalS
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -1935,8 +1925,8 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogB
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(true)
Expand Down Expand Up @@ -2023,8 +2013,8 @@ private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1
.withMaxMemorySizeInBytes(10240L)
.withReadBlocksLazily(readBlocksLazily)
.withReverseReader(false)
.withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withUseScanV2(useScanv2)
Expand Down Expand Up @@ -2122,7 +2112,7 @@ public void testBasicAppendAndReadInReverse(boolean readBlocksLazily)
FileCreateUtils.createDeltaCommit(basePath, "100", fs);

HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) {
try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) {

assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock prevBlock = reader.prev();
Expand Down Expand Up @@ -2204,7 +2194,7 @@ public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily)
HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());

try (HoodieLogFileReader reader =
new HoodieLogFileReader(fs, logFile, schema, bufferSize, readBlocksLazily, true)) {
new HoodieLogFileReader(fs, logFile, schema, BUFFER_SIZE, readBlocksLazily, true)) {

assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock block = reader.prev();
Expand Down Expand Up @@ -2256,7 +2246,7 @@ public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily)

HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
try (HoodieLogFileReader reader =
new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) {
new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) {

assertTrue(reader.hasPrev(), "Third block should be available");
reader.moveToPrev();
Expand Down

0 comments on commit b209e5a

Please sign in to comment.