Skip to content

Commit

Permalink
Merge branch '2.1' into 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ddanielr committed Jan 31, 2025
2 parents 9fee599 + 51e152a commit 5e0455a
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

class FSConfArgs {

FileSystem fs;
Configuration conf;

FileSystem getFileSystem(Path path) throws IOException {
if (fs == null) {
return path.getFileSystem(getConf());
}
return fs;
}

FileSystem getFileSystem() throws IOException {
if (fs == null) {
fs = FileSystem.get(getConf());
return FileSystem.get(getConf());
}
return fs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ RFileSource[] getSources() throws IOException {
sources = new RFileSource[rFiles.length];
for (int i = 0; i < rFiles.length; i++) {
final Path path = rFiles[i].getPath();
sources[i] = new RFileSource(getFileSystem().open(path),
getFileSystem().getFileStatus(path).getLen(), rFiles[i].getFence());
sources[i] = new RFileSource(getFileSystem(path).open(path),
getFileSystem(path).getFileStatus(path).getLen(), rFiles[i].getFence());
}
} else {
for (int i = 0; i < sources.length; i++) {
Expand Down Expand Up @@ -162,5 +162,4 @@ public ScannerOptions withBounds(Range range) {
this.opts.bounds = range;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ public RFileWriter build() throws IOException {
visCacheSize);
} else {
return new RFileWriter(fileops.newWriterBuilder()
.forFile(UnreferencedTabletFile.of(out.getFileSystem(), out.path), out.getFileSystem(),
out.getConf(), cs)
.forFile(UnreferencedTabletFile.of(out.getFileSystem(out.path), out.path),
out.getFileSystem(out.path), out.getConf(), cs)
.withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -72,6 +73,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -938,4 +940,42 @@ public void testMultipleFilesAndCache() throws Exception {
assertEquals(testData, toMap(scanner));
scanner.close();
}

@Test
public void testFileSystemFromUri() throws Exception {
String localFsClass = "LocalFileSystem";

String remoteFsHost = "127.0.0.5:8080";
String fileUri = "hdfs://" + remoteFsHost + "/bulk-xyx/file1.rf";
// There was a bug in the code where the default hadoop file system was always used. This test
// checks that the hadoop filesystem used it based on the URI and not the default filesystem. In
// this env the default file system is the local hadoop file system.
var exception =
assertThrows(ConnectException.class, () -> RFile.newWriter().to(fileUri).build());
assertTrue(exception.getMessage().contains("to " + remoteFsHost
+ " failed on connection exception: java.net.ConnectException: Connection refused"));
// Ensure the DistributedFileSystem was used.
assertTrue(Arrays.stream(exception.getStackTrace())
.anyMatch(ste -> ste.getClassName().contains(DistributedFileSystem.class.getName())));
assertTrue(Arrays.stream(exception.getStackTrace())
.noneMatch(ste -> ste.getClassName().contains(localFsClass)));

var exception2 = assertThrows(RuntimeException.class, () -> {
var scanner = RFile.newScanner().from(fileUri).build();
scanner.iterator();
});
assertTrue(exception2.getMessage().contains("to " + remoteFsHost
+ " failed on connection exception: java.net.ConnectException: Connection refused"));
assertTrue(Arrays.stream(exception2.getCause().getStackTrace())
.anyMatch(ste -> ste.getClassName().contains(DistributedFileSystem.class.getName())));
assertTrue(Arrays.stream(exception2.getCause().getStackTrace())
.noneMatch(ste -> ste.getClassName().contains(localFsClass)));

// verify the assumptions this test is making about the local filesystem being the default.
var exception3 = assertThrows(IllegalArgumentException.class,
() -> FileSystem.get(new Configuration()).open(new Path(fileUri)));
assertTrue(exception3.getMessage().contains("Wrong FS: " + fileUri + ", expected: file:///"));
assertTrue(Arrays.stream(exception3.getStackTrace())
.anyMatch(ste -> ste.getClassName().contains(localFsClass)));
}
}

0 comments on commit 5e0455a

Please sign in to comment.