-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support adding columns in Delta Lake (ALTER TABLE ADD COLUMN) #12371
Conversation
@@ -265,6 +265,35 @@ public void testCharVarcharComparison() | |||
.hasStackTraceContaining("Unsupported type: char(3)"); | |||
} | |||
|
|||
@Test | |||
public void testAddColumnToPartitionedTable() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add tests that will check:
- if spark can read table with added columns
- if optimize/vacuum works after adding columns ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test about optimize & vacuum.
if spark can read table with added columns
Should I wait #11565?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verifying, even locally if that works would be great imho.
Regarding test - depends how long you would have to wait.
@findinpath what is ETA for #11565 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#11565 is in a good shape now. I am assuming that the PR will be ready to be merged in the following days.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ebyhr so my advice would be to wait for it and write a proper test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested on this product test PR #11565
@findinpath may want to disable the cache in https://github.com/trinodb/trino/pull/11565/files#diff-8ecb8972788b7b6221c837540b00b020c2404ef64548ae563e02a9b1a66c8d14
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ebyhr could you check that disabling the cache will actually help with this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would running REFRESH TABLE
in Spark help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@homar @findepi Disabling the cache didn't help. REFRESH TABLE
logs the same error "ERROR DeltaLog: Change in the table id detected..." and subsequent SELECT
don't show the error.
JFYI: The error comes from
https://github.com/delta-io/delta/blob/728bf902542077ce1c2e97ca67a53c53bb460c64/core/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala#L574-L575
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems Spark reuse the same metaData.id
when adding a new column.
trino> ALTER TABLE delta.default.test ADD COLUMN c4 int;
s3://presto-ci-test/test/_delta_log/00000000000000000004.json
{"commitInfo":{"version":4,"timestamp":1653357796162,"userId":"yuya.ebihara","userName":"yuya.ebihara","operation":"ALTER TABLE","operationParameters":{"queryId":"20220524_020316_00004_imnpb"},"clusterId":"trino-dev-ffffffff-ffff-ffff-ffff-ffffffffffff","readVersion":0,"isolationLevel":"WriteSerializable","blindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"24b68017-cb79-4f0b-8ee7-e72a496bbaf4","format":{"provider":"parquet","options":{}},"schemaString":"{\"fields\":[{\"metadata\":{},\"name\":\"c1\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"c2\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"c3\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"c4\",\"nullable\":true,\"type\":\"integer\"}],\"type\":\"struct\"}","partitionColumns":[],"configuration":{},"createdTime":1653357796162}}
spark-sql> ALTER TABLE default.test ADD COLUMN (x5 int);
s3://presto-ci-test/test/_delta_log/00000000000000000005.json
{"commitInfo":{"timestamp":1653357938234,"operation":"ADD COLUMNS","operationParameters":{"columns":"[{\"column\":{\"name\":\"x5\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}}]"},"readVersion":4,"isBlindAppend":true,"operationMetrics":{}}}
{"metaData":{"id":"24b68017-cb79-4f0b-8ee7-e72a496bbaf4","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c4\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"x5\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1653357796162}}
Confirmed setting DeltaLakeTableHandle.getMetadataEntry().getId()
in MetadataEntry.id
suppresses the Spark error message.
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One important fix but the rest looks good to me
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
|
||
try { | ||
FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(handle.getLocation())); | ||
long commitVersion = getMandatoryCurrentVersion(fileSystem, new Path(handle.getLocation())) + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can two concurrent transactions both add a new column with same name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first entry will be silently overridden by another transaction as far as I confirmed. I mean, only one json log file will be generated after those two transactions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transaction log synchronizer should prevent a new json file from overwriting another
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I was testing with local disk based query runner. Verified another transaction fails on S3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transaction log synchronizer should prevent a new json file from overwriting another
Yes.
But we're using getMandatoryCurrentVersion
which checks, well, current transaction, without checking whether some other thread did add another column with same name.
Am i missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're totally right. Instead of calling getMandatoryCurrentVersion
we should be committing to tableHandle.getReadVersion() + 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexjo2144 yeah, thanks for confirming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your help. Changed to long commitVersion = handle.getReadVersion() + 1;
128fbfb
to
bc515f0
Compare
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
8e6a944
to
7cf4f6b
Compare
@findepi @homar @findinpath @alexjo2144 Addressed comments. |
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
7cf4f6b
to
a4fa86f
Compare
@mosabua I agree with updating the docs. |
From looking into the code, it looks like this isn't an issue? I didn't spend a ton of time looking and don't entirely know how everything is working, but from what I can tell, every statement listed in docs is at least being tested as part of the delta lake connector. |
Maybe that means "ADD COLUMN" was the last one missing.. |
@mosabua I think RENAME COLUMN is not supported yet...at least in 370, when I rename the column, the query from spark is successful, but Trino returns NULL on that new column. |
we need to know where we are at with the current 381 release and @ebyhr can confirm in the code for us |
@Dearkano It looks a bug in Delta Lake connector. Could you file an issue to https://github.com/trinodb/trino/issues/new? |
Description
Support adding columns in Delta Lake
Documentation
(x) No documentation is needed.
Release notes
(x) Release notes entries required with the following suggested text: