-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delta lake connector - Reader #16843
Conversation
Initial connector. More features and tests are coming in next commits. |
8c9ea2f
to
8fce28b
Compare
f517365
to
3ac0a9b
Compare
4eb91d1
to
53f0edc
Compare
@vkorukanti Thanks a lot for this effort. Looking forward to use it! |
e9437b3
to
3ef63fc
Compare
@vkorukanti Thank you for your contribution! This will help in simplifying reading Delta Tables from Presto. I am planning to review this PR over the next few days. I see that there are test failures currently due to |
Thanks @imjalpreet for reviewing. Seems like there is a dependency issue on the CI. I am able to run the tests locally. Will check out what is causing the issue. |
@vkorukanti If possible, can you also share the design document? |
https://docs.google.com/document/d/16S7xoAmXpSax7W1OWYYHo5nZ71t5NvrQ-F79pZF6yb8/edit?usp=sharing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkorukanti Thanks for the PR. I reviewed a few commits. Have added some comments. Will continue to review the rest of them.
presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java
Outdated
Show resolved
Hide resolved
schemaTableName.getSchemaName(), | ||
schemaTableName.getTableName(), | ||
tableLocation, | ||
Optional.of(snapshot.getVersion()), // lock the snapshot version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will storing the snapshot itself instead of version be better so the listFiles need not call getSnapshotForVersionAsOf again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will be addressing this in a next path. I am also planning to add a Guava cache so that we don't load the DeltaLog
again when listing the files in the Snapshot
.
presto-delta/src/main/java/com/facebook/presto/delta/DeltaColumnHandle.java
Outdated
Show resolved
Hide resolved
presto-delta/src/test/java/com/facebook/presto/delta/TestDeltaTypeUtils.java
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaExpressionUtils.java
Outdated
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaMetadata.java
Outdated
Show resolved
Hide resolved
@Test | ||
public void filterOnRegularColumn() | ||
{ | ||
String tableName = "data-reader-primitives"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the table definition as a comment for all these tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, not clear. Are you asking for the table schema or data in the table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The table schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the contribution Venki! The code looks good overall, I have added some minor comments. I am still going over the design doc and since the tests have still not run in CI, I will run them locally for now. As I move forward with the design doc and tests, will add review comments if any.
presto-parquet/src/main/java/com/facebook/presto/parquet/rule/ParquetDereferencePushDown.java
Outdated
Show resolved
Hide resolved
presto-parquet/src/main/java/com/facebook/presto/parquet/rule/ParquetDereferencePushDown.java
Outdated
Show resolved
Hide resolved
boolean changed = false; | ||
for (PlanNode child : node.getSources()) { | ||
PlanNode newChild = child.accept(this, null); | ||
if (newChild != child) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is preferable to use equals() to compare object references.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. This code existed for sometime. I am just refactoring here. Not sure if it is worth modifying to use .equals()
.
presto-delta/src/main/java/com/facebook/presto/delta/DeltaPageSourceProvider.java
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaPageSourceProvider.java
Outdated
Show resolved
Hide resolved
{ | ||
public static final String PARTITION_PRUNING_ENABLED = "partition_pruning_enabled"; | ||
public static final String FILTER_PUSHDOWN_ENABLED = "filter_pushdown_enabled"; | ||
private static final String CACHE_ENABLED = "cache_enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need cache_enabled
session property since it's value is not getting used as of now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it is not enabled yet. It requires some work. Removing this property for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to put it back because of the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkorukanti what tests use it? It's better to add properties back when caching is actually supported. Can you modify the tests not to use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this property the integration tests fail with the following error callstack. The problem is here, we always look for the property CACHE_ENABLED
. If not present it throws an unknown session property error. This is not related to Delta. In any case, caching will be added once this PR lands in master.
at com.facebook.presto.metadata.SessionPropertyManager.lambda$decodeCatalogPropertyValue$1(SessionPropertyManager.java:181)
at java.util.Optional.orElseThrow(Optional.java:290)
at com.facebook.presto.metadata.SessionPropertyManager.decodeCatalogPropertyValue(SessionPropertyManager.java:181)
at com.facebook.presto.FullConnectorSession.getProperty(FullConnectorSession.java:150)
at com.facebook.presto.hive.HiveSessionProperties.isCacheEnabled(HiveSessionProperties.java:1087)
at java.util.Optional.map(Optional.java:215)
at com.facebook.presto.hive.cache.HiveCachingHdfsConfiguration.lambda$getConfiguration$0(HiveCachingHdfsConfiguration.java:76)
at com.facebook.presto.hive.cache.HiveCachingHdfsConfiguration$CachingJobConf.createFileSystem(HiveCachingHdfsConfiguration.java:105)
at org.apache.hadoop.fs.PrestoFileSystemCache.get(PrestoFileSystemCache.java:59)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at com.facebook.presto.hive.HdfsEnvironment.lambda$getFileSystem$0(HdfsEnvironment.java:71)
at com.facebook.presto.hive.authentication.NoHdfsAuthentication.doAs(NoHdfsAuthentication.java:23)
at com.facebook.presto.hive.HdfsEnvironment.getFileSystem(HdfsEnvironment.java:70)
at com.facebook.presto.hive.HdfsEnvironment.getFileSystem(HdfsEnvironment.java:64)
at com.facebook.presto.delta.DeltaClient.loadDeltaTableLog(DeltaClient.java:148)
at com.facebook.presto.delta.DeltaClient.getTable(DeltaClient.java:79)
at com.facebook.presto.delta.DeltaMetadata.getTableHandle(DeltaMetadata.java:136)
at com.facebook.presto.delta.DeltaMetadata.getTableHandle(DeltaMetadata.java:57)
at com.facebook.presto.metadata.MetadataManager.getTableHandle(MetadataManager.java:330)
at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.lambda$visitTable$17(StatementAnalyzer.java:1244)
`
presto-delta/src/main/java/com/facebook/presto/delta/DeltaSplitManager.java
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaTableName.java
Outdated
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/rule/DeltaParquetDereferencePushDown.java
Outdated
Show resolved
Hide resolved
The TestDeltaIntegration is failing in PR because of dependency issue. Also locally, it fails with two errors. |
533599e
to
abfb1d0
Compare
After your latest commits too, I see some errors when I rant he TestDelta* tests. Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) on project presto-delta: Compilation failure: Compilation failure: |
It looks like the |
Thank you. Even though it was built clean from newly cloned directory, I had to build presto-parquet and presto-hive again, only then the tests worked. |
@@ -0,0 +1,2 @@ | |||
{"commitInfo":{"timestamp":1636316599141,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":10,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"687","numOutputRows":"1"}}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reformat the json files? You can right click "_delta_log" package in Intellij Project view, then click "Reformat code" to do a batch code clean up. I understand these files were program generated, but formatting them for the tests would make it easier for the developers to understand the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. Reformetted these files. Changes should be available in next push.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to revert the formatting. It looks like the formatting is introducing special characters that are causing the JSON parser in Delta client library to fail.
presto-delta/src/main/java/com/facebook/presto/delta/DeltaSplitManager.java
Outdated
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaSplitManager.java
Outdated
Show resolved
Hide resolved
presto-delta/src/test/java/com/facebook/presto/delta/TestDeltaIntegration.java
Outdated
Show resolved
Hide resolved
presto-delta/src/test/java/com/facebook/presto/delta/TestDeltaScanOptimizations.java
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaSplitManager.java
Show resolved
Hide resolved
return sessionProperties; | ||
} | ||
|
||
public static boolean isCacheEnabled(ConnectorSession session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used. Shall we remove it? It's better to introduce the properties when the real feature is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this property the integration tests fail with the following error callstack. The problem is here, we always look for the property CACHE_ENABLED
. If not present it throws an unknown session property error. This is not related to Delta. In any case, caching will be added once this PR lands in master.
at com.facebook.presto.metadata.SessionPropertyManager.lambda$decodeCatalogPropertyValue$1(SessionPropertyManager.java:181)
at java.util.Optional.orElseThrow(Optional.java:290)
at com.facebook.presto.metadata.SessionPropertyManager.decodeCatalogPropertyValue(SessionPropertyManager.java:181)
at com.facebook.presto.FullConnectorSession.getProperty(FullConnectorSession.java:150)
at com.facebook.presto.hive.HiveSessionProperties.isCacheEnabled(HiveSessionProperties.java:1087)
at java.util.Optional.map(Optional.java:215)
at com.facebook.presto.hive.cache.HiveCachingHdfsConfiguration.lambda$getConfiguration$0(HiveCachingHdfsConfiguration.java:76)
at com.facebook.presto.hive.cache.HiveCachingHdfsConfiguration$CachingJobConf.createFileSystem(HiveCachingHdfsConfiguration.java:105)
at org.apache.hadoop.fs.PrestoFileSystemCache.get(PrestoFileSystemCache.java:59)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at com.facebook.presto.hive.HdfsEnvironment.lambda$getFileSystem$0(HdfsEnvironment.java:71)
at com.facebook.presto.hive.authentication.NoHdfsAuthentication.doAs(NoHdfsAuthentication.java:23)
at com.facebook.presto.hive.HdfsEnvironment.getFileSystem(HdfsEnvironment.java:70)
at com.facebook.presto.hive.HdfsEnvironment.getFileSystem(HdfsEnvironment.java:64)
at com.facebook.presto.delta.DeltaClient.loadDeltaTableLog(DeltaClient.java:148)
at com.facebook.presto.delta.DeltaClient.getTable(DeltaClient.java:79)
at com.facebook.presto.delta.DeltaMetadata.getTableHandle(DeltaMetadata.java:136)
at com.facebook.presto.delta.DeltaMetadata.getTableHandle(DeltaMetadata.java:57)
at com.facebook.presto.metadata.MetadataManager.getTableHandle(MetadataManager.java:330)
at com.facebook.presto.sql.analyzer.StatementAnalyzer$Visitor.lambda$visitTable$17(StatementAnalyzer.java:1244)
`
{ | ||
public static final String PARTITION_PRUNING_ENABLED = "partition_pruning_enabled"; | ||
public static final String FILTER_PUSHDOWN_ENABLED = "filter_pushdown_enabled"; | ||
private static final String CACHE_ENABLED = "cache_enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkorukanti what tests use it? It's better to add properties back when caching is actually supported. Can you modify the tests not to use it?
presto-delta/src/main/java/com/facebook/presto/delta/DeltaSessionProperties.java
Outdated
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaExpressionUtils.java
Outdated
Show resolved
Hide resolved
03083db
to
5e7319d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work, @vkorukanti
looks good to me. Just a few minor things
I would suggest we try merge this as soon. Then we could add more optimizations in following PRs
presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java
Outdated
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java
Outdated
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java
Outdated
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java
Outdated
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaPageSource.java
Outdated
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaPageSource.java
Outdated
Show resolved
Hide resolved
presto-delta/src/main/java/com/facebook/presto/delta/DeltaPageSource.java
Outdated
Show resolved
Hide resolved
String typeBase = columnHandle.getDataType().getBase(); | ||
try { | ||
switch (typeBase) { | ||
case StandardTypes.TINYINT: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static import TINYINT, SMALLINT, INTEGER, etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are used only here. I could add, but the imports become too verbose as there are too many types.
presto-delta/src/main/java/com/facebook/presto/delta/DeltaExpressionUtils.java
Outdated
Show resolved
Hide resolved
Some of them are copied from github.com/delta-io/connectors/golden-tables Currently these golden tables are not part of any test jars that Delta provides. Make a PR in Delta to publish these golden tables as a maven artifact and in Presto add a maven dependency in tests.
Currently the rule is focussed on Hive Parquet tables. It doesn't work in any other connector other than Hive. Parquet is a common module between Hive and Delta connectors. Most of the code in the Hive Parquet dereference pushdown rule is common between Delta and Hive connectors. Refactor the rule and create a pluggable points so that Hive and other connectors that work with Parquet data (such as Delta) can extend it and avoid rewriting the same code.
Currently if two tests use the same Delta table, sometimes the deregistration of Delta tables in HMS is not immediately applied, resulting in table already exists exceptions.
Thanks @vkorukanti and everyone who helped to get this PR reviewed and merged. Really appreciated 🙏 |
New connector for reading Delta Lake tables natively in Presto. Connector is based on Delta Standalone Reader API. Design doc is here.