From 2d3d34e36e5ae4ed9a1a3404e7dd9d1a141dc511 Mon Sep 17 00:00:00 2001 From: vinothchandar Date: Wed, 3 Jan 2018 16:05:30 -0800 Subject: [PATCH] Remove stateful fs member from HoodieTestUtils & FSUtils --- .../TestHoodieClientOnCopyOnWriteStorage.java | 18 +++++++-------- .../java/com/uber/hoodie/TestMultiFS.java | 23 +++++++++++-------- .../hoodie/func/TestUpdateMapFunction.java | 2 +- .../index/bloom/TestHoodieBloomIndex.java | 2 +- .../hoodie/io/TestHoodieCommitArchiveLog.java | 4 ++-- .../uber/hoodie/io/TestHoodieCompactor.java | 10 +++++--- .../hoodie/table/TestCopyOnWriteTable.java | 2 +- .../hoodie/table/TestMergeOnReadTable.java | 20 +++++----------- .../com/uber/hoodie/common/util/FSUtils.java | 13 ----------- .../common/minicluster/HdfsTestService.java | 8 ++----- .../hoodie/common/model/HoodieTestUtils.java | 23 ++++++++----------- .../table/HoodieTableMetaClientTest.java | 9 +++----- .../common/table/log/HoodieLogFormatTest.java | 4 +--- .../string/HoodieActiveTimelineTest.java | 2 +- .../view/HoodieTableFileSystemViewTest.java | 19 +++++++-------- .../hoodie/hadoop/InputFormatTestUtil.java | 8 +++++-- .../HoodieRealtimeRecordReaderTest.java | 5 ++-- .../utilities/TestHDFSParquetImporter.java | 3 --- .../utilities/TestHoodieSnapshotCopier.java | 2 +- 19 files changed, 74 insertions(+), 103 deletions(-) diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index e3adf8d9c839..184ea82e086e 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -102,7 +102,7 @@ public void init() throws IOException { folder.create(); basePath = folder.getRoot().getAbsolutePath(); fs = FSUtils.getFs(basePath.toString(), jsc.hadoopConfiguration()); - HoodieTestUtils.init(basePath); + HoodieTestUtils.init(fs, basePath); dataGen = new HoodieTestDataGenerator(); } @@ -1247,27 +1247,27 @@ public void testKeepLatestFileVersionsMOR() throws IOException { .retainFileVersions(1).build()).build(); HoodieTableMetaClient metaClient = HoodieTestUtils - .initTableType(basePath, HoodieTableType.MERGE_ON_READ); + .initTableType(fs, basePath, HoodieTableType.MERGE_ON_READ); // Make 3 files, one base file and 2 log files associated with base file String file1P0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000"); String file2P0L0 = HoodieTestUtils - .createNewLogFile(basePath, partitionPaths[0], "000", file1P0, Optional.empty()); + .createNewLogFile(fs, basePath, partitionPaths[0], "000", file1P0, Optional.empty()); String file2P0L1 = HoodieTestUtils - .createNewLogFile(basePath, partitionPaths[0], "000", file1P0, Optional.of(2)); + .createNewLogFile(fs, basePath, partitionPaths[0], "000", file1P0, Optional.of(2)); // make 1 compaction commit - HoodieTestUtils.createCompactionCommitFiles(basePath, "000"); + HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "000"); // Make 4 files, one base file and 3 log files associated with base file HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "001", file1P0); file2P0L0 = HoodieTestUtils - .createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.empty()); + .createNewLogFile(fs, basePath, partitionPaths[0], "001", file1P0, Optional.empty()); file2P0L0 = HoodieTestUtils - .createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.of(2)); + .createNewLogFile(fs, basePath, partitionPaths[0], "001", file1P0, Optional.of(2)); file2P0L0 = HoodieTestUtils - .createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.of(3)); + .createNewLogFile(fs, basePath, partitionPaths[0], "001", file1P0, Optional.of(3)); // make 1 compaction commit - HoodieTestUtils.createCompactionCommitFiles(basePath, "001"); + HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001"); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); List hoodieCleanStats = table.clean(jsc); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java b/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java index 1bdc15d25a62..676b970d9bf9 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java @@ -47,8 +47,8 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; public class TestMultiFS implements Serializable { @@ -64,8 +64,8 @@ public class TestMultiFS implements Serializable { private static JavaSparkContext jsc; private static SQLContext sqlContext; - @Before - public void initClass() throws Exception { + @BeforeClass + public static void initClass() throws Exception { hdfsTestService = new HdfsTestService(); dfsCluster = hdfsTestService.start(true); @@ -82,15 +82,18 @@ public void initClass() throws Exception { sqlContext = new SQLContext(jsc); } - @After - public void cleanupClass() throws Exception { - if (hdfsTestService != null) { - hdfsTestService.stop(); - } + @AfterClass + public static void cleanupClass() throws Exception { if (jsc != null) { jsc.stop(); } - FSUtils.setFs(null); + + if (hdfsTestService != null) { + hdfsTestService.stop(); + dfsCluster.shutdown(); + } + // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the same JVM + FileSystem.closeAll(); } @Test diff --git a/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java b/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java index ef4a86833f0e..352a0036b85d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java @@ -48,7 +48,7 @@ public void init() throws Exception { TemporaryFolder folder = new TemporaryFolder(); folder.create(); this.basePath = folder.getRoot().getAbsolutePath(); - HoodieTestUtils.init(basePath); + HoodieTestUtils.init(FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()), basePath); } @Test diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java index 92dcae96b1fe..c3ace401eff6 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java @@ -88,7 +88,7 @@ public void init() throws IOException { folder.create(); basePath = folder.getRoot().getAbsolutePath(); fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); - HoodieTestUtils.init(basePath); + HoodieTestUtils.init(fs, basePath); // We have some records to be tagged (two different partitions) schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index 5d8362688cd4..d0333d077470 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -54,8 +54,8 @@ public void init() throws Exception { TemporaryFolder folder = new TemporaryFolder(); folder.create(); basePath = folder.getRoot().getAbsolutePath(); - HoodieTestUtils.init(basePath); fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); + HoodieTestUtils.init(fs, basePath); } @Test @@ -75,7 +75,7 @@ public void testArchiveDatasetWithArchival() throws IOException { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build()) .forTable("test-trip-table").build(); - HoodieTestUtils.init(basePath); + HoodieTestUtils.init(fs, basePath); HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createCommitFile(basePath, "102"); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index a6e385f2a225..e05e73364d85 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -31,6 +31,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; @@ -44,6 +45,7 @@ import java.io.IOException; import java.util.List; import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.junit.After; @@ -57,6 +59,7 @@ public class TestHoodieCompactor { private String basePath = null; private HoodieCompactor compactor; private transient HoodieTestDataGenerator dataGen = null; + private transient FileSystem fs; @Before public void init() throws IOException { @@ -67,7 +70,8 @@ public void init() throws IOException { TemporaryFolder folder = new TemporaryFolder(); folder.create(); basePath = folder.getRoot().getAbsolutePath(); - HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ); + fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); + HoodieTestUtils.initTableType(fs, basePath, HoodieTableType.MERGE_ON_READ); dataGen = new HoodieTestDataGenerator(); compactor = new HoodieRealtimeTableCompactor(); @@ -100,7 +104,7 @@ private HoodieWriteConfig.Builder getConfigBuilder() { @Test(expected = IllegalArgumentException.class) public void testCompactionOnCopyOnWriteFail() throws Exception { - HoodieTestUtils.initTableType(basePath, HoodieTableType.COPY_ON_WRITE); + HoodieTestUtils.initTableType(fs, basePath, HoodieTableType.COPY_ON_WRITE); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); @@ -155,7 +159,7 @@ public void testLogFileCountsAfterCompaction() throws Exception { // Write them to corresponding avro logfiles HoodieTestUtils - .writeRecordsToLogFiles(metaClient.getBasePath(), HoodieTestDataGenerator.avroSchema, + .writeRecordsToLogFiles(fs, metaClient.getBasePath(), HoodieTestDataGenerator.avroSchema, updatedRecords); // Verify that all data file has one log file diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java index 61303c3fb7ce..9432be72d01c 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java @@ -76,7 +76,7 @@ public void init() throws Exception { TemporaryFolder folder = new TemporaryFolder(); folder.create(); this.basePath = folder.getRoot().getAbsolutePath(); - HoodieTestUtils.init(basePath); + HoodieTestUtils.init(FSUtils.getFs(basePath, jsc.hadoopConfiguration()), basePath); } @Test diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 5465879f030e..a9ed76379f0f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -42,7 +42,6 @@ import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; -import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; @@ -80,7 +79,6 @@ public class TestMergeOnReadTable { private transient SQLContext sqlContext; private static String basePath = null; private HoodieCompactor compactor; - private FileSystem fs; //NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class) //The implementation and gurantees of many API's differ, for example check rename(src,dst) @@ -94,10 +92,8 @@ public static void cleanUp() throws Exception { hdfsTestService.stop(); dfsCluster.shutdown(); } - FSUtils.setFs(null); // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the same JVM FileSystem.closeAll(); - HoodieTestUtils.resetFS(basePath); } @BeforeClass @@ -110,8 +106,6 @@ public static void setUpDFS() throws IOException { // Create a temp folder as the base path dfs = dfsCluster.getFileSystem(); } - FSUtils.setFs(dfs); - HoodieTestUtils.resetFS(basePath); } @Before @@ -124,12 +118,10 @@ public void init() throws IOException { TemporaryFolder folder = new TemporaryFolder(); folder.create(); basePath = folder.getRoot().getAbsolutePath(); - fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); - jsc.hadoopConfiguration().addResource(fs.getConf()); + jsc.hadoopConfiguration().addResource(dfs.getConf()); dfs.mkdirs(new Path(basePath)); - FSUtils.setFs(dfs); - HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.MERGE_ON_READ); sqlContext = new SQLContext(jsc); // SQLContext stuff compactor = new HoodieRealtimeTableCompactor(); @@ -219,7 +211,7 @@ public void testSimpleInsertAndUpdate() throws Exception { compactor.compact(jsc, getConfig(true), table, HoodieActiveTimeline.createNewCommitTime()); - allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath()); + allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); dataFilesToRead = roView.getLatestDataFiles(); @@ -339,7 +331,7 @@ public void testSimpleInsertAndDelete() throws Exception { commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); - allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath()); + allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); dataFilesToRead = roView.getLatestDataFiles(); @@ -357,7 +349,7 @@ public void testSimpleInsertAndDelete() throws Exception { public void testCOWToMORConvertedDatasetRollback() throws Exception { //Set TableType to COW - HoodieTestUtils.initTableType(basePath, HoodieTableType.COPY_ON_WRITE); + HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.COPY_ON_WRITE); HoodieWriteConfig cfg = getConfig(true); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); @@ -396,7 +388,7 @@ public void testCOWToMORConvertedDatasetRollback() throws Exception { assertNoWriteErrors(statuses); //Set TableType to MOR - HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.MERGE_ON_READ); //rollback a COW commit when TableType is MOR client.rollback(newCommitTime); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index d788cdd4435c..5417f01c9b32 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -16,7 +16,6 @@ package com.uber.hoodie.common.util; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodiePartitionMetadata; @@ -57,15 +56,6 @@ public class FSUtils { private static final long MIN_CLEAN_TO_KEEP = 10; private static final long MIN_ROLLBACK_TO_KEEP = 10; private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_"; - private static FileSystem fs; - - /** - * Only to be used for testing. - */ - @VisibleForTesting - public static void setFs(FileSystem fs) { - FSUtils.fs = fs; - } public static Configuration prepareHadoopConf(Configuration conf) { conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); @@ -86,9 +76,6 @@ public static Configuration prepareHadoopConf(Configuration conf) { public static FileSystem getFs(String path, Configuration conf) { - if (fs != null) { - return fs; - } FileSystem fs; conf = prepareHadoopConf(conf); try { diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/minicluster/HdfsTestService.java b/hoodie-common/src/test/java/com/uber/hoodie/common/minicluster/HdfsTestService.java index 74cc1104b079..a90739f75c4b 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/minicluster/HdfsTestService.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/minicluster/HdfsTestService.java @@ -55,7 +55,6 @@ public class HdfsTestService { private MiniDFSCluster miniDfsCluster; public HdfsTestService() { - hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); workDir = Files.createTempDir().getAbsolutePath(); } @@ -66,10 +65,7 @@ public Configuration getHadoopConf() { public MiniDFSCluster start(boolean format) throws IOException { Preconditions .checkState(workDir != null, "The work dir must be set before starting cluster."); - - if (hadoopConf == null) { - hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); - } + hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); // If clean, then remove the work dir so we can start fresh. String localDFSLocation = getDFSLocation(workDir); @@ -91,8 +87,8 @@ public MiniDFSCluster start(boolean format) throws IOException { } public void stop() throws IOException { + logger.info("HDFS Minicluster service being shut down."); miniDfsCluster.shutdown(); - logger.info("HDFS Minicluster service shut down."); miniDfsCluster = null; hadoopConf = null; } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 7017bbf91b2b..74103f8fa6ac 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -70,27 +70,22 @@ public class HoodieTestUtils { - public static FileSystem fs; public static final String TEST_EXTENSION = ".test"; public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; public static final int DEFAULT_TASK_PARTITIONID = 1; public static final String[] DEFAULT_PARTITION_PATHS = {"2016/03/15", "2015/03/16", "2015/03/17"}; private static Random rand = new Random(46474747); - public static void resetFS(String basePath) { - HoodieTestUtils.fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); - } - public static Configuration getDefaultHadoopConf() { return new Configuration(); } - public static HoodieTableMetaClient init(String basePath) throws IOException { - fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); - return initTableType(basePath, HoodieTableType.COPY_ON_WRITE); + public static HoodieTableMetaClient init(FileSystem fs, String basePath) throws IOException { + return initTableType(fs, basePath, HoodieTableType.COPY_ON_WRITE); } - public static HoodieTableMetaClient initTableType(String basePath, HoodieTableType tableType) + public static HoodieTableMetaClient initTableType(FileSystem fs, String basePath, + HoodieTableType tableType) throws IOException { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME); @@ -105,7 +100,8 @@ public static HoodieTableMetaClient initOnTemp() throws IOException { TemporaryFolder folder = new TemporaryFolder(); folder.create(); String basePath = folder.getRoot().getAbsolutePath(); - return HoodieTestUtils.init(basePath); + return HoodieTestUtils + .init(FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()), basePath); } public static String makeNewCommitTime() { @@ -143,7 +139,7 @@ public static final String createDataFile(String basePath, String partitionPath, return fileID; } - public static final String createNewLogFile(String basePath, String partitionPath, + public static final String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String commitTime, String fileID, Optional version) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; boolean makeDir = fs.mkdirs(new Path(folderPath)); @@ -159,7 +155,8 @@ public static final String createNewLogFile(String basePath, String partitionPat return fileID; } - public static final void createCompactionCommitFiles(String basePath, String... commitTimes) + public static final void createCompactionCommitFiles(FileSystem fs, String basePath, + String... commitTimes) throws IOException { for (String commitTime : commitTimes) { boolean createFile = fs.createNewFile(new Path( @@ -268,7 +265,7 @@ public static T serializeDeserialize(T object, Class return deseralizedObject; } - public static void writeRecordsToLogFiles(String basePath, Schema schema, + public static void writeRecordsToLogFiles(FileSystem fs, String basePath, Schema schema, List updatedRecords) { Map> groupedUpdated = updatedRecords.stream() .collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/HoodieTableMetaClientTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/HoodieTableMetaClientTest.java index fcfb9b7f658e..bd710d1a99e5 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/HoodieTableMetaClientTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/HoodieTableMetaClientTest.java @@ -36,7 +36,6 @@ import org.apache.hadoop.io.Text; import org.junit.Before; import org.junit.Test; -import org.junit.rules.TemporaryFolder; public class HoodieTableMetaClientTest { @@ -45,10 +44,8 @@ public class HoodieTableMetaClientTest { @Before public void init() throws IOException { - TemporaryFolder folder = new TemporaryFolder(); - folder.create(); - this.basePath = folder.getRoot().getAbsolutePath(); - metaClient = HoodieTestUtils.init(basePath); + metaClient = HoodieTestUtils.initOnTemp(); + basePath = metaClient.getBasePath(); } @Test @@ -109,7 +106,7 @@ public void checkCommitTimeline() throws IOException { public void checkArchiveCommitTimeline() throws IOException { Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath()); SequenceFile.Writer writer = SequenceFile - .createWriter(HoodieTestUtils.fs.getConf(), SequenceFile.Writer.file(archiveLogPath), + .createWriter(metaClient.getHadoopConf(), SequenceFile.Writer.file(archiveLogPath), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index 1e8b021914a3..e9f3a3a87748 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -80,7 +80,6 @@ public static void setUpClass() throws IOException, InterruptedException { @AfterClass public static void tearDownClass() { MiniClusterUtil.shutdown(); - HoodieTestUtils.resetFS(basePath); } @Before @@ -91,8 +90,7 @@ public void setUp() throws IOException, InterruptedException { assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath()))); this.partitionPath = new Path(folder.getRoot().getPath()); this.basePath = folder.getRoot().getParent(); - HoodieTestUtils.fs = fs; - HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.initTableType(fs, basePath, HoodieTableType.MERGE_ON_READ); } @After diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java index 189be698dcda..096c75d77932 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java @@ -49,7 +49,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { - HoodieTestUtils.fs.delete(new Path(this.metaClient.getBasePath()), true); + metaClient.getFs().delete(new Path(this.metaClient.getBasePath()), true); } @Test diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java index 68d32215a6eb..fab8e371459f 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java @@ -45,7 +45,6 @@ import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Test; -import org.junit.rules.TemporaryFolder; @SuppressWarnings("ResultOfMethodCallIgnored") public class HoodieTableFileSystemViewTest { @@ -58,10 +57,8 @@ public class HoodieTableFileSystemViewTest { @Before public void init() throws IOException { - TemporaryFolder folder = new TemporaryFolder(); - folder.create(); - this.basePath = folder.getRoot().getAbsolutePath(); - metaClient = HoodieTestUtils.init(basePath); + metaClient = HoodieTestUtils.initOnTemp(); + basePath = metaClient.getBasePath(); fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); roView = (TableFileSystemView.ReadOptimizedView) fsView; @@ -69,7 +66,7 @@ public void init() throws IOException { } private void refreshFsView(FileStatus[] statuses) { - metaClient = new HoodieTableMetaClient(HoodieTestUtils.fs.getConf(), basePath, true); + metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); if (statuses != null) { fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), @@ -184,7 +181,7 @@ public void testStreamLatestVersionInPartition() throws IOException { new File(basePath + "/.hoodie/" + commitTime4 + ".commit").createNewFile(); // Now we list the entire partition - FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); + FileStatus[] statuses = metaClient.getFs().listStatus(new Path(fullPartitionPath)); assertEquals(11, statuses.length); refreshFsView(null); @@ -285,7 +282,7 @@ public void testStreamEveryVersionInPartition() throws IOException { new File(basePath + "/.hoodie/" + commitTime4 + ".commit").createNewFile(); // Now we list the entire partition - FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); + FileStatus[] statuses = metaClient.getFs().listStatus(new Path(fullPartitionPath)); assertEquals(7, statuses.length); refreshFsView(null); @@ -359,7 +356,7 @@ public void streamLatestVersionInRange() throws IOException { new File(basePath + "/.hoodie/" + commitTime4 + ".commit").createNewFile(); // Now we list the entire partition - FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); + FileStatus[] statuses = metaClient.getFs().listStatus(new Path(fullPartitionPath)); assertEquals(9, statuses.length); refreshFsView(statuses); @@ -430,7 +427,7 @@ public void streamLatestVersionsBefore() throws IOException { new File(basePath + "/.hoodie/" + commitTime4 + ".commit").createNewFile(); // Now we list the entire partition - FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); + FileStatus[] statuses = metaClient.getFs().listStatus(new Path(fullPartitionPath)); assertEquals(7, statuses.length); refreshFsView(null); @@ -492,7 +489,7 @@ public void streamLatestVersions() throws IOException { new File(basePath + "/.hoodie/" + commitTime4 + ".commit").createNewFile(); // Now we list the entire partition - FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); + FileStatus[] statuses = metaClient.getFs().listStatus(new Path(fullPartitionPath)); assertEquals(10, statuses.length); refreshFsView(statuses); diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java index ac14e64842f9..4b060a132bd2 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java @@ -39,7 +39,9 @@ public class InputFormatTestUtil { public static File prepareDataset(TemporaryFolder basePath, int numberOfFiles, String commitNumber) throws IOException { basePath.create(); - HoodieTestUtils.init(basePath.getRoot().toString()); + HoodieTestUtils + .init(FSUtils.getFs(basePath.getRoot().toString(), HoodieTestUtils.getDefaultHadoopConf()), + basePath.getRoot().toString()); File partitionPath = basePath.newFolder("2016", "05", "01"); for (int i = 0; i < numberOfFiles; i++) { File dataFile = @@ -99,7 +101,9 @@ public static File prepareParquetDataset(TemporaryFolder basePath, Schema schema int numberOfFiles, int numberOfRecords, String commitNumber) throws IOException { basePath.create(); - HoodieTestUtils.init(basePath.getRoot().toString()); + HoodieTestUtils + .init(FSUtils.getFs(basePath.getRoot().toString(), HoodieTestUtils.getDefaultHadoopConf()), + basePath.getRoot().toString()); File partitionPath = basePath.newFolder("2016", "05", "01"); AvroParquetWriter parquetWriter; for (int i = 0; i < numberOfFiles; i++) { diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java index d9d24caa1066..2bef7780a1ed 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -72,7 +72,6 @@ public void setUp() { jobConf = new JobConf(); fs = FSUtils .getFs(basePath.getRoot().getAbsolutePath(), HoodieTestUtils.getDefaultHadoopConf()); - HoodieTestUtils.fs = fs; } @Rule @@ -105,7 +104,7 @@ public void testReader() throws Exception { // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); HoodieTestUtils - .initTableType(basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); + .initTableType(fs, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); String commitTime = "100"; File partitionDir = InputFormatTestUtil .prepareParquetDataset(basePath, schema, 1, 100, commitTime); @@ -163,7 +162,7 @@ public void testReaderWithNestedAndComplexSchema() throws Exception { // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema()); HoodieTestUtils - .initTableType(basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); + .initTableType(fs, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); String commitTime = "100"; int numberOfRecords = 100; int numberOfLogRecords = numberOfRecords / 2; diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHDFSParquetImporter.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHDFSParquetImporter.java index c1eaa86aae2f..c1e58153fb4c 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHDFSParquetImporter.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHDFSParquetImporter.java @@ -27,7 +27,6 @@ import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; -import com.uber.hoodie.common.util.FSUtils; import java.io.IOException; import java.io.Serializable; import java.text.ParseException; @@ -71,7 +70,6 @@ public static void initClass() throws Exception { dfs = dfsCluster.getFileSystem(); dfsBasePath = dfs.getWorkingDirectory().toString(); dfs.mkdirs(new Path(dfsBasePath)); - FSUtils.setFs(dfs); } @AfterClass @@ -79,7 +77,6 @@ public static void cleanupClass() throws Exception { if (hdfsTestService != null) { hdfsTestService.stop(); } - FSUtils.setFs(null); } /** diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java index db670673f604..326894dfeab0 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java @@ -53,7 +53,7 @@ public void init() throws IOException { outputPath = rootPath + "/output"; fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); - HoodieTestUtils.init(basePath); + HoodieTestUtils.init(fs, basePath); // Start a local Spark job SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]"); jsc = new JavaSparkContext(conf);