Skip to content

Commit

Permalink
Remove stateful fs member from HoodieTestUtils & FSUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar authored and Vinoth Chandar committed Jan 18, 2018
1 parent c7c73b0 commit 2d3d34e
Show file tree
Hide file tree
Showing 19 changed files with 74 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void init() throws IOException {
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath.toString(), jsc.hadoopConfiguration());
HoodieTestUtils.init(basePath);
HoodieTestUtils.init(fs, basePath);
dataGen = new HoodieTestDataGenerator();
}

Expand Down Expand Up @@ -1247,27 +1247,27 @@ public void testKeepLatestFileVersionsMOR() throws IOException {
.retainFileVersions(1).build()).build();

HoodieTableMetaClient metaClient = HoodieTestUtils
.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
.initTableType(fs, basePath, HoodieTableType.MERGE_ON_READ);

// Make 3 files, one base file and 2 log files associated with base file
String file1P0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000");
String file2P0L0 = HoodieTestUtils
.createNewLogFile(basePath, partitionPaths[0], "000", file1P0, Optional.empty());
.createNewLogFile(fs, basePath, partitionPaths[0], "000", file1P0, Optional.empty());
String file2P0L1 = HoodieTestUtils
.createNewLogFile(basePath, partitionPaths[0], "000", file1P0, Optional.of(2));
.createNewLogFile(fs, basePath, partitionPaths[0], "000", file1P0, Optional.of(2));
// make 1 compaction commit
HoodieTestUtils.createCompactionCommitFiles(basePath, "000");
HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "000");

// Make 4 files, one base file and 3 log files associated with base file
HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "001", file1P0);
file2P0L0 = HoodieTestUtils
.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.empty());
.createNewLogFile(fs, basePath, partitionPaths[0], "001", file1P0, Optional.empty());
file2P0L0 = HoodieTestUtils
.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.of(2));
.createNewLogFile(fs, basePath, partitionPaths[0], "001", file1P0, Optional.of(2));
file2P0L0 = HoodieTestUtils
.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.of(3));
.createNewLogFile(fs, basePath, partitionPaths[0], "001", file1P0, Optional.of(3));
// make 1 compaction commit
HoodieTestUtils.createCompactionCommitFiles(basePath, "001");
HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001");

HoodieTable table = HoodieTable.getHoodieTable(metaClient, config);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
Expand Down
23 changes: 13 additions & 10 deletions hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.junit.After;
import org.junit.Before;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestMultiFS implements Serializable {
Expand All @@ -64,8 +64,8 @@ public class TestMultiFS implements Serializable {
private static JavaSparkContext jsc;
private static SQLContext sqlContext;

@Before
public void initClass() throws Exception {
@BeforeClass
public static void initClass() throws Exception {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);

Expand All @@ -82,15 +82,18 @@ public void initClass() throws Exception {
sqlContext = new SQLContext(jsc);
}

@After
public void cleanupClass() throws Exception {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
@AfterClass
public static void cleanupClass() throws Exception {
if (jsc != null) {
jsc.stop();
}
FSUtils.setFs(null);

if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
}
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the same JVM
FileSystem.closeAll();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void init() throws Exception {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
this.basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(basePath);
HoodieTestUtils.init(FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()), basePath);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void init() throws IOException {
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
HoodieTestUtils.init(basePath);
HoodieTestUtils.init(fs, basePath);
// We have some records to be tagged (two different partitions)
schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public void init() throws Exception {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(basePath);
fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf());
HoodieTestUtils.init(fs, basePath);
}

@Test
Expand All @@ -75,7 +75,7 @@ public void testArchiveDatasetWithArchival() throws IOException {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build())
.forTable("test-trip-table").build();
HoodieTestUtils.init(basePath);
HoodieTestUtils.init(fs, basePath);
HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
Expand All @@ -44,6 +45,7 @@
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
Expand All @@ -57,6 +59,7 @@ public class TestHoodieCompactor {
private String basePath = null;
private HoodieCompactor compactor;
private transient HoodieTestDataGenerator dataGen = null;
private transient FileSystem fs;

@Before
public void init() throws IOException {
Expand All @@ -67,7 +70,8 @@ public void init() throws IOException {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf());
HoodieTestUtils.initTableType(fs, basePath, HoodieTableType.MERGE_ON_READ);

dataGen = new HoodieTestDataGenerator();
compactor = new HoodieRealtimeTableCompactor();
Expand Down Expand Up @@ -100,7 +104,7 @@ private HoodieWriteConfig.Builder getConfigBuilder() {

@Test(expected = IllegalArgumentException.class)
public void testCompactionOnCopyOnWriteFail() throws Exception {
HoodieTestUtils.initTableType(basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTestUtils.initTableType(fs, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
Expand Down Expand Up @@ -155,7 +159,7 @@ public void testLogFileCountsAfterCompaction() throws Exception {

// Write them to corresponding avro logfiles
HoodieTestUtils
.writeRecordsToLogFiles(metaClient.getBasePath(), HoodieTestDataGenerator.avroSchema,
.writeRecordsToLogFiles(fs, metaClient.getBasePath(), HoodieTestDataGenerator.avroSchema,
updatedRecords);

// Verify that all data file has one log file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void init() throws Exception {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
this.basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(basePath);
HoodieTestUtils.init(FSUtils.getFs(basePath, jsc.hadoopConfiguration()), basePath);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
Expand Down Expand Up @@ -80,7 +79,6 @@ public class TestMergeOnReadTable {
private transient SQLContext sqlContext;
private static String basePath = null;
private HoodieCompactor compactor;
private FileSystem fs;

//NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class)
//The implementation and gurantees of many API's differ, for example check rename(src,dst)
Expand All @@ -94,10 +92,8 @@ public static void cleanUp() throws Exception {
hdfsTestService.stop();
dfsCluster.shutdown();
}
FSUtils.setFs(null);
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the same JVM
FileSystem.closeAll();
HoodieTestUtils.resetFS(basePath);
}

@BeforeClass
Expand All @@ -110,8 +106,6 @@ public static void setUpDFS() throws IOException {
// Create a temp folder as the base path
dfs = dfsCluster.getFileSystem();
}
FSUtils.setFs(dfs);
HoodieTestUtils.resetFS(basePath);
}

@Before
Expand All @@ -124,12 +118,10 @@ public void init() throws IOException {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
jsc.hadoopConfiguration().addResource(fs.getConf());
jsc.hadoopConfiguration().addResource(dfs.getConf());

dfs.mkdirs(new Path(basePath));
FSUtils.setFs(dfs);
HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.MERGE_ON_READ);

sqlContext = new SQLContext(jsc); // SQLContext stuff
compactor = new HoodieRealtimeTableCompactor();
Expand Down Expand Up @@ -219,7 +211,7 @@ public void testSimpleInsertAndUpdate() throws Exception {

compactor.compact(jsc, getConfig(true), table, HoodieActiveTimeline.createNewCommitTime());

allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath());
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(),
allFiles);
dataFilesToRead = roView.getLatestDataFiles();
Expand Down Expand Up @@ -339,7 +331,7 @@ public void testSimpleInsertAndDelete() throws Exception {
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());

allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath());
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(),
allFiles);
dataFilesToRead = roView.getLatestDataFiles();
Expand All @@ -357,7 +349,7 @@ public void testSimpleInsertAndDelete() throws Exception {
public void testCOWToMORConvertedDatasetRollback() throws Exception {

//Set TableType to COW
HoodieTestUtils.initTableType(basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.COPY_ON_WRITE);

HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
Expand Down Expand Up @@ -396,7 +388,7 @@ public void testCOWToMORConvertedDatasetRollback() throws Exception {
assertNoWriteErrors(statuses);

//Set TableType to MOR
HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.MERGE_ON_READ);

//rollback a COW commit when TableType is MOR
client.rollback(newCommitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.uber.hoodie.common.util;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
Expand Down Expand Up @@ -57,15 +56,6 @@ public class FSUtils {
private static final long MIN_CLEAN_TO_KEEP = 10;
private static final long MIN_ROLLBACK_TO_KEEP = 10;
private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";
private static FileSystem fs;

/**
* Only to be used for testing.
*/
@VisibleForTesting
public static void setFs(FileSystem fs) {
FSUtils.fs = fs;
}

public static Configuration prepareHadoopConf(Configuration conf) {
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
Expand All @@ -86,9 +76,6 @@ public static Configuration prepareHadoopConf(Configuration conf) {


public static FileSystem getFs(String path, Configuration conf) {
if (fs != null) {
return fs;
}
FileSystem fs;
conf = prepareHadoopConf(conf);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class HdfsTestService {
private MiniDFSCluster miniDfsCluster;

public HdfsTestService() {
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
workDir = Files.createTempDir().getAbsolutePath();
}

Expand All @@ -66,10 +65,7 @@ public Configuration getHadoopConf() {
public MiniDFSCluster start(boolean format) throws IOException {
Preconditions
.checkState(workDir != null, "The work dir must be set before starting cluster.");

if (hadoopConf == null) {
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
}
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();

// If clean, then remove the work dir so we can start fresh.
String localDFSLocation = getDFSLocation(workDir);
Expand All @@ -91,8 +87,8 @@ public MiniDFSCluster start(boolean format) throws IOException {
}

public void stop() throws IOException {
logger.info("HDFS Minicluster service being shut down.");
miniDfsCluster.shutdown();
logger.info("HDFS Minicluster service shut down.");
miniDfsCluster = null;
hadoopConf = null;
}
Expand Down
Loading

0 comments on commit 2d3d34e

Please sign in to comment.