-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Port OrcFileWriterFactory.createOrcDataSink to TrinoFileSystem #14627
Port OrcFileWriterFactory.createOrcDataSink to TrinoFileSystem #14627
Conversation
CI: #14631 |
adb3467
to
dd5fe8f
Compare
This is prerequisite to be able to track |
Sorry I didn't think anyone is going to review it at this stage. |
I am not going to review it then, but you can @ mention me when you want me to take a look |
Yup. Please mark it ready-for review and re-request review when this is ready |
dd18dca
to
2c9cae3
Compare
@alexjo2144 @sopel39 there is no button for re-requesting the review but please take a look ;) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly lgtm
@@ -58,6 +59,7 @@ | |||
implements ConnectorPageSinkProvider | |||
{ | |||
private final Set<HiveFileWriterFactory> fileWriterFactories; | |||
private final TrinoFileSystemFactory trinoFileSystemFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just fileSystemFactory
@@ -79,6 +81,7 @@ | |||
@Inject | |||
public HivePageSinkProvider( | |||
Set<HiveFileWriterFactory> fileWriterFactories, | |||
TrinoFileSystemFactory trinoFileSystemFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just fileSystemFactory
@@ -93,6 +96,7 @@ public HivePageSinkProvider( | |||
HiveWriterStats hiveWriterStats) | |||
{ | |||
this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); | |||
this.trinoFileSystemFactory = requireNonNull(trinoFileSystemFactory, "trinoFileSystemFactory is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just fileSystemFactory
here and in other places
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java
Show resolved
Hide resolved
@@ -212,12 +213,13 @@ private void mergeFiles(Iterable<TempFile> files, Consumer<Page> consumer) | |||
Collection<Iterator<Page>> iterators = new ArrayList<>(); | |||
|
|||
for (TempFile tempFile : files) { | |||
Path file = tempFile.getPath(); | |||
String file = tempFile.getPath(); | |||
TrinoInputFile trinoInputFile = trinoFileSystem.newInputFile(file); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trinoInputFile
-> inputFile
FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, path, configuration); | ||
FSDataInputStream inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.open(path)); | ||
TrinoFileSystem fileSystem = trinoFileSystemFactory.create(identity); | ||
TrinoInput inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.newInputFile(path).newInput()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't need to wrap in doAs
anymore. See io.trino.filesystem.hdfs.HdfsInputFile#newInput
|
||
Optional<Supplier<OrcDataSource>> validationInputFactory = Optional.empty(); | ||
String stringPath = path.toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you move it higher, you can also use it in createOrcDataSink(trinoFileSystem, path.toString());
FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, path, configuration); | ||
FSDataInputStream inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.open(path)); | ||
TrinoFileSystem trinoFileSystem = trinoFileSystemFactory.create(identity); | ||
TrinoInput inputStream = hdfsEnvironment.doAs(identity, () -> trinoFileSystem.newInputFile(path.toString()).newInput()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doAs
is no longer needed here
FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, splitPath, configuration); | ||
FSDataInputStream inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.open(splitPath)); | ||
TrinoFileSystem trinoFileSystem = trinoFileSystemFactory.create(identity); | ||
TrinoInput trinoInput = hdfsEnvironment.doAs(identity, () -> trinoFileSystem.newInputFile(splitPath).newInput()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trinoInput
- > inputFile
would be better (here and in other places)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but that is not a file it is more like inputStream, see io.trino.filesystem.TrinoInput
@@ -143,7 +143,7 @@ public HiveStorageFormat getFormat() | |||
@Override | |||
public Optional<HivePageSourceFactory> getHivePageSourceFactory(HdfsEnvironment hdfsEnvironment) | |||
{ | |||
return Optional.of(new OrcPageSourceFactory(new OrcReaderOptions(), hdfsEnvironment, new FileFormatDataSourceStats(), UTC)); | |||
return Optional.of(new OrcPageSourceFactory(new OrcReaderOptions(), hdfsEnvironment, new FileFormatDataSourceStats(), UTC, new HdfsFileSystemFactory(hdfsEnvironment))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use HDFS_FILE_SYSTEM_FACTORY
?
24a085f
to
8f35ca7
Compare
CI: #14686 |
bafffe5
to
8f35ca7
Compare
@@ -636,22 +638,34 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt | |||
hiveWriterStats); | |||
} | |||
|
|||
private String setSchemeToFileIfOneDoesNotExist(String path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move below makeRowIdSortingWriter
@@ -636,22 +638,34 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt | |||
hiveWriterStats); | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could add unit test for it in io.trino.plugin.hive.TestHiveWriterFactory
if you make this method staic and package private (then move this method to bottom of class)
Path parent = path.getParent(); | ||
String scheme = parent.toUri().getScheme(); | ||
String parentPath = parent.toString(); | ||
if (scheme == null || scheme.equals("")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use setSchemeToFileIfOneDoesNotExist
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments
Path tempFilePath; | ||
if (sortedWritingTempStagingPathEnabled) { | ||
String tempPrefix = sortedWritingTempStagingPath.replace( | ||
"${USER}", | ||
new HdfsContext(session).getIdentity().getUser()); | ||
tempPrefix = setSchemeToFileIfOneDoesNotExist(tempPrefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe setSchemeToFileIfAbsent
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeletedRows.java
Show resolved
Hide resolved
private final FileFormatDataSourceStats stats; | ||
|
||
public HdfsOrcDataSource( | ||
OrcDataSourceId id, | ||
long size, | ||
OrcReaderOptions options, | ||
FSDataInputStream inputStream, | ||
TrinoInput input, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be cleaner if this was TrinoInputFile
. Then HdfsOrcDataSource
would really own lifecycle of TrinoInput
(both create it and close it).
new OrcReaderOptions(), | ||
fileSystem.open(path), | ||
inputFile.newInput(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can HdfsOrcDataSource
just accept inputFile
?
} | ||
catch (IOException e) { | ||
catch (RuntimeException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it doesn't throw anything just remove the try/catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to change when specific exception is thrown, if you check deeper then you will see that I cach IOException
there and throw UncheckedIO
. I will change this to catch UncheckedIO only. Does it make sense ?
} | ||
catch (IOException e) { | ||
catch (RuntimeException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
String file = tempFile.getPath(); | ||
fileSystem.deleteFile(file); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nit: I would just inline this
String file = tempFile.getPath(); | |
fileSystem.deleteFile(file); | |
fileSystem.deleteFile(tempFile.getPath()); |
@@ -139,7 +137,7 @@ private OrcDeleteDeltaPageSource( | |||
this.stats = requireNonNull(stats, "stats is null"); | |||
this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null"); | |||
|
|||
verifyAcidSchema(reader, path); | |||
verifyAcidSchema(reader, new Path(path)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could migrate verifyAcidSchema
pretty easily as well. Looks like it just uses the path in error messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually tried that and it turned out it is not that simple - I would also have to make changes in OrcPageSourceFactory
to verifyFileHasColumnNames
and some more places - there would be a lot of changes and this is not really related.
private final TypeManager typeManager; | ||
private final NodeVersion nodeVersion; | ||
private final FileFormatDataSourceStats readStats; | ||
private final OrcWriterStats stats = new OrcWriterStats(); | ||
private final OrcWriterOptions orcWriterOptions; | ||
private final TrinoFileSystemFactory fileSystemFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'd put this at the top where HdfsEnvironment was
HdfsEnvironment hdfsEnvironment, | ||
FileFormatDataSourceStats stats, | ||
HiveConfig hiveConfig, | ||
TrinoFileSystemFactory fileSystemFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kinda a half-way migration right? We still need the hdfsEnvironment for some places. Let's put the two fields together in the field list
c15232f
to
60d4f1b
Compare
60d4f1b
to
6e815fa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments
@@ -289,15 +289,6 @@ public HiveWriterFactory( | |||
|
|||
Configuration conf = hdfsEnvironment.getConfiguration(new HdfsContext(session), writePath); | |||
this.conf = toJobConf(conf); | |||
|
|||
// make sure the FileSystem is created with the correct Configuration object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undo. Let's keep this
} | ||
} | ||
catch (IOException e) { | ||
throw new TrinoException(HIVE_FILESYSTEM_ERROR, format("Error while accessing: %s", writeInfoTargetPath)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
propagate e
catch (IOException e) { | ||
throw new TrinoException(HIVE_WRITER_OPEN_ERROR, e); | ||
} | ||
TrinoFileSystem fileSystem; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this line
String parentPath = parent.toString(); | ||
parentPath = setSchemeToFileIfAbsent(parentPath); | ||
Path tempFilePath = new Path(parentPath, ".tmp-sort." + path.getName()); | ||
fileSystem = fileSystemFactory.create(session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TrinoFileSystem fileSystem = ...
} | ||
TrinoFileSystem fileSystem; | ||
Path parent = path.getParent(); | ||
String parentPath = parent.toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can inline it: String parentPath = setSchemeToFileIfAbsent(parent.toString())
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveWriterFactory.java
Show resolved
Hide resolved
6e815fa
to
1782f52
Compare
CI: #12818 |
CI #14814 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comment
@@ -123,16 +124,23 @@ | |||
private static final Pattern DEFAULT_HIVE_COLUMN_NAME_PATTERN = Pattern.compile("_col\\d+"); | |||
private final OrcReaderOptions orcReaderOptions; | |||
private final HdfsEnvironment hdfsEnvironment; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can remove hdfsEnvironment
now. It seems unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is still used, it is passed to io.trino.plugin.hive.orc.OrcPageSourceFactory#createOrcPageSource
and then used to create OrcDeletedRows
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could try to simplify it further
@@ -252,13 +263,13 @@ private ConnectorPageSource createOrcPageSource( | |||
|
|||
boolean originalFilesPresent = acidInfo.isPresent() && !acidInfo.get().getOriginalFiles().isEmpty(); | |||
try { | |||
FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, path, configuration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HdfsEnvironment hdfsEnvironment
is unused now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used on line 414
Description
This is prerequisite to be able to track FileSystem memory utilization while writing ORC data - #14023
Non-technical explanation
Allows better tracking of memory consumed internally.
Release notes
(x) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: