Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table merges fail when CDF is enabled due to column mismatches #2908

Closed
ldacey opened this issue Sep 25, 2024 · 2 comments · Fixed by #2919
Closed

Table merges fail when CDF is enabled due to column mismatches #2908

ldacey opened this issue Sep 25, 2024 · 2 comments · Fixed by #2919
Assignees
Labels
bug Something isn't working

Comments

@ldacey
Copy link
Contributor

ldacey commented Sep 25, 2024

Environment

Delta-rs version: 0.20.0

Binding: Python


Bug

What happened:

Enabling CDF results in _internal.DeltaError: Generic DeltaTable error: Error during planning: UNION

What you expected to happen:

How to reproduce it:

from datetime import date, datetime

import polars as pl
from deltalake import DeltaTable
from deltalake.table import TableMerger
from pyarrow import Table


def generate_sample_data():
    data = {
        "year": [2023] * 5,
        "month_id": [202309] * 5,
        "date_id": [20230901, 20230902, 20230903, 20230904, 20230905],
        "date": [
            date(2023, 9, 1),
            date(2023, 9, 2),
            date(2023, 9, 3),
            date(2023, 9, 4),
            date(2023, 9, 5),
        ],
        "row_hash": ["example1", "example2", "example3", "example4", "example5"],
        "interaction_datetime": [datetime(2023, 9, i + 1, 10, 0) for i in range(5)],
        "primary_user_id": [f"USER00{i+1}" for i in range(5)],
    }

    df = pl.DataFrame(data)
    return df


df = generate_sample_data()
table = df.to_arrow()

print(table.schema)

DeltaTable.create(
    table_uri="union_error",
    schema=table.schema,
    mode="overwrite",
    partition_by=["month_id"],
    storage_options=None,
    name="union_error",
    description="Union Error",
    custom_metadata={},
    configuration={
        "delta.dataSkippingStatsColumns": "year,month_id,date_id,row_hash",
        "delta.checkpoint.writeStatsAsStruct": "true",
        "delta.checkpointInterval": "5",
        "delta.deletedFileRetentionDuration": "interval 30 days",
        "delta.enableChangeDataFeed": "true",
        "delta.appendOnly": "false",
    },
)
dt = DeltaTable("union_error")

print(dt.metadata().configuration)


def insert_unique_rows(
    source_pyarrow_table: Table,
    target_delta_table: DeltaTable,
) -> TableMerger:
    """Merges the source PyArrow Table with the target DeltaTable."""
    predicates = {"merge": "s.row_hash = t.row_hash"}

    merger: TableMerger = target_delta_table.merge(
        source=source_pyarrow_table,
        predicate=predicates["merge"],
        source_alias="s",
        target_alias="t",
    ).when_not_matched_insert_all()

    return merger


insert_unique_rows(source_pyarrow_table=table, target_delta_table=dt).execute()

dt = DeltaTable("union_error")
print(dt.to_pandas())

More details:

Turn off the delta.enableChangeDataFeed configuration and then the merge is successful for some reason.

@ldacey ldacey added the bug Something isn't working label Sep 25, 2024
@VillePuuska
Copy link
Contributor

A more minimal example of the issue:

import polars as pl
from deltalake import DeltaTable

df = pl.DataFrame(
    {
        "id": [1, 2],
        "date": [1, 2],
    },
    schema={
        # setting data types to be equal fixes the error, i.e. int & int or date & date
        "id": pl.Int64,
        "date": pl.Date,
    },
)
table = df.to_arrow()

dt = DeltaTable.create(
    table_uri="union_error",
    schema=table.schema,
    mode="overwrite",
    partition_by=["id"],  # taking out partitioning fixes the error
    configuration={
        "delta.enableChangeDataFeed": "true",  # false fixes the error
    },
)

dt.merge(
    source=table,
    predicate="s.id = t.id",
    source_alias="s",
    target_alias="t",
).when_not_matched_insert_all().execute()

Running this gives the error

Traceback (most recent call last):
  File "/workspaces/codespaces-blank/setup.py", line 31, in <module>
    ).when_not_matched_insert_all().execute()
                                    ^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/deltalake/table.py", line 1793, in execute
    metrics = self._table.merge_execute(self._builder)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_internal.DeltaError: Generic DeltaTable error: Error during planning: UNION Column id (type: Int64) is not compatible with column date (type: Date32)

Modifications to the script that allow it to run without error:

  • If you change the datatype of both id and date to be the same; pl.Int64 OR pl.Date.
  • If you comment out partition_by=["id"].
  • If you comment out "delta.enableChangeDataFeed": "true".

The script also works fine if you have two pl.String columns, or one pl.String and one pl.Int64. But if you have pl.String and pl.Date columns, you then get the following kind of error:

Traceback (most recent call last):
  File "/workspaces/codespaces-blank/setup.py", line 31, in <module>
    ).when_not_matched_insert_all().execute()
                                    ^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/deltalake/table.py", line 1793, in execute
    metrics = self._table.merge_execute(self._builder)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Exception: Cast error: Cannot cast string 'a' to value of Date32 type

So the issue seems to be some weird mix of Date, partitioning, and CDF. 🤔

@VillePuuska
Copy link
Contributor

Looks like this is the same issue as #2832

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants