Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable schema evolution for merge write disposition with delta table format #1742

Merged
merged 12 commits into from
Aug 27, 2024

Conversation

jorritsandbrink
Copy link
Collaborator

@jorritsandbrink jorritsandbrink commented Aug 25, 2024

Description

  • enables schema evolution (adding new columns) for the merge write disposition with the delta table format
  • increases minimum deltalake version to access add_columns method
  • allows to pass a schema name to get_delta_tables for pipelines with multiple schemas (that may explain problem with "missing" delta tables)

Related Issues

Fixes #1739

Copy link

netlify bot commented Aug 25, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit e169455
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/66ccf9932c7d810009062456

@@ -6660,63 +6659,52 @@ files = [

[[package]]
name = "pyarrow"
version = "14.0.2"
version = "16.1.0"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that pyarrow gets upgraded.

@jorritsandbrink jorritsandbrink marked this pull request as ready for review August 25, 2024 16:46
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! It looks good! Two things:

  • if arrow_ds is empty you do not evolve the schema. IMO that should happen. please add a test for it (if arrow_ds.head(1).num_rows == 0:)
  • should we update all table schemas like in other destinations where it happens in update_stored_schema? if you agree let's create a ticket for that
  • same thing for truncating tables before the load. this is actually used by refresh option

@rudolfix rudolfix added the support This issue is monitored by Solution Engineer label Aug 25, 2024
@jorritsandbrink
Copy link
Collaborator Author

@rudolfix

if arrow_ds is empty you do not evolve the schema. IMO that should happen. please add a test for it (if arrow_ds.head(1).num_rows == 0:)

Done.

should we update all table schemas like in other destinations where it happens in update_stored_schema? if you agree let's create a ticket for that

Three options:

  1. We let delta-rs do automatic schema evolution.
    • This already works for write_deltalake (which we use for the append and replace dispositions).
    • This does not yet work for DeltaTable.merge (which we use for the merge write disposition).
    • This does not yet work for the "empty batch case".
  2. We manually manage schema evolution.
    • In this case I think using update_stored_schema is a good idea.
  3. Mix of 1 and 2.

We currently do 3. 1 is not possible yet, but might become possible when the linked tickets are done (they are already assigned, so could be soon). 2 is possible, but is a bigger burden on our side. Which has your preference?

same thing for truncating tables before the load. this is actually used by refresh option

Okay, then we should probably use it.

rudolfix
rudolfix previously approved these changes Aug 26, 2024
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this is top. let me add windows fix

@rudolfix
Copy link
Collaborator

@jorritsandbrink

So what I'd do:
in update_stored_schema

  1. make sure that table prefix == table dir for delta (then we know that each table has a separate "folder"). then the folder structure is good and weird layouts are eliminated

in truncate / drop tables

  1. disable for delta (ie. with an error message) OR
  2. implement dropping and truncating tables properly (all refresh options should work after that)

migrating schema
You already have all the building blocks for (2) and it IMO makes sense to migrate tables before we start loading but the priority is low.

partition_by=self._partition_columns,
storage_options=self._storage_options,
)
with self.arrow_ds.scanner().to_reader() as arrow_rbr: # RecordBatchReader
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why you inserted a with context here. Is it because arrow_rbr gets exhausted and is effectively useless after the context?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it has a close method... so it has internal unmanaged resources that we should free ASAP. otherwise garbage collector does it way later

@rudolfix rudolfix merged commit 08e5e7a into devel Aug 27, 2024
55 of 56 checks passed
@rudolfix rudolfix deleted the fix/1739-schema-evolution-delta-filesystem-merge branch August 27, 2024 07:59
@rudolfix rudolfix restored the fix/1739-schema-evolution-delta-filesystem-merge branch August 27, 2024 07:59
@rudolfix rudolfix deleted the fix/1739-schema-evolution-delta-filesystem-merge branch August 27, 2024 07:59
willi-mueller pushed a commit that referenced this pull request Sep 2, 2024
…ble format (#1742)

* black format

* increase minimum deltalake version dependency

* enable schema evolution for delta table merge

* extract delta table merge logic into separate function

* remove big decimal exclusion due to upstream bugfix

* evolve delta table schema in empty source case

* refactor DeltaLoadFilesystemJob

* uses right table path format in delta lake load job

* allows to pass schema name when getting delta tables and computing table counts

* cleansup usage of remote paths and uris in filesystem load jobs

* removes tempfile from file_storage

---------

Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
willi-mueller pushed a commit that referenced this pull request Sep 2, 2024
…ble format (#1742)

* black format

* increase minimum deltalake version dependency

* enable schema evolution for delta table merge

* extract delta table merge logic into separate function

* remove big decimal exclusion due to upstream bugfix

* evolve delta table schema in empty source case

* refactor DeltaLoadFilesystemJob

* uses right table path format in delta lake load job

* allows to pass schema name when getting delta tables and computing table counts

* cleansup usage of remote paths and uris in filesystem load jobs

* removes tempfile from file_storage

---------

Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
willi-mueller pushed a commit that referenced this pull request Sep 2, 2024
…ble format (#1742)

* black format

* increase minimum deltalake version dependency

* enable schema evolution for delta table merge

* extract delta table merge logic into separate function

* remove big decimal exclusion due to upstream bugfix

* evolve delta table schema in empty source case

* refactor DeltaLoadFilesystemJob

* uses right table path format in delta lake load job

* allows to pass schema name when getting delta tables and computing table counts

* cleansup usage of remote paths and uris in filesystem load jobs

* removes tempfile from file_storage

---------

Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
willi-mueller pushed a commit that referenced this pull request Sep 2, 2024
…ble format (#1742)

* black format

* increase minimum deltalake version dependency

* enable schema evolution for delta table merge

* extract delta table merge logic into separate function

* remove big decimal exclusion due to upstream bugfix

* evolve delta table schema in empty source case

* refactor DeltaLoadFilesystemJob

* uses right table path format in delta lake load job

* allows to pass schema name when getting delta tables and computing table counts

* cleansup usage of remote paths and uris in filesystem load jobs

* removes tempfile from file_storage

---------

Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
willi-mueller pushed a commit that referenced this pull request Sep 2, 2024
…ble format (#1742)

* black format

* increase minimum deltalake version dependency

* enable schema evolution for delta table merge

* extract delta table merge logic into separate function

* remove big decimal exclusion due to upstream bugfix

* evolve delta table schema in empty source case

* refactor DeltaLoadFilesystemJob

* uses right table path format in delta lake load job

* allows to pass schema name when getting delta tables and computing table counts

* cleansup usage of remote paths and uris in filesystem load jobs

* removes tempfile from file_storage

---------

Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
willi-mueller pushed a commit that referenced this pull request Sep 2, 2024
…ble format (#1742)

* black format

* increase minimum deltalake version dependency

* enable schema evolution for delta table merge

* extract delta table merge logic into separate function

* remove big decimal exclusion due to upstream bugfix

* evolve delta table schema in empty source case

* refactor DeltaLoadFilesystemJob

* uses right table path format in delta lake load job

* allows to pass schema name when getting delta tables and computing table counts

* cleansup usage of remote paths and uris in filesystem load jobs

* removes tempfile from file_storage

---------

Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
support This issue is monitored by Solution Engineer
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

No schema evolution with delta table format on filesystem destination with merge write disposition
2 participants