Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abort the whole table transaction if any updates in the transaction has failed #1246

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

stevie9868
Copy link

@stevie9868 stevie9868 commented Oct 23, 2024

We have encountered a data loss issue when using pyIceberg to perform an overwrite operation. Typically, an overwrite operation involves creating both a delete snapshot and an append snapshot. However, if an exception occurs during the creation of the append snapshot, the current code still attempts to commit the delete snapshot, leading to potential data loss. One thing to note is this does not apply to only overwrite but potentially other operations as well.

To address this issue, we need to ensure that the entire transaction is aborted if any part of the update process fails.

Also provided a simple test case, where before this change, the transaction will only contains a delete snapshot update deleting the data. Whereas after this fix, we still keep the same data before the partially failed transaction since the whole transaction is now aborted.

@stevie9868 stevie9868 changed the title abort the whole transaction if any update on the chain has failed abort the whole table transaction if any updates in the transaction has failed Oct 23, 2024
@stevie9868
Copy link
Author

@HonahX

Thanks for unblocking the testing actions!
But looks like the curl command in Python CI/lint-and-test 3.10 times out.

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Oct 25, 2024

Thanks for the PR @stevie9868. This sounds like an important bug to address.

Do you know if this bug only applies to the overwrite function or all functions in Transactions?

PS I reran the CI

) -> None:
"""Close and commit the transaction, or handle exceptions."""
# Only commit the full transaction, if there is no exception in all updates on the chain
if exctb is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the difference between exctype, excinst, and exctb here? Why do we use exctb?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about this part. Typically __exit__ is called as the last step for the with statement.
Here, __exit__ calls the self.commit_transaction() which will process the transactions.

It seems like the issue here is to catch partial exceptions from the self.commit_transaction() which wont be caught here

@kevinjqliu
Copy link
Contributor

We have encountered a data loss issue when using pyIceberg to perform an overwrite operation. Typically, an overwrite operation involves creating both a delete snapshot and an append snapshot. However, if an exception occurs during the creation of the append snapshot, the current code still attempts to commit the delete snapshot, leading to potential data loss.

Im a bit confused on the chain of events. Here's what I found digging through the code:

table.overwrite creates a transaction and calls its overwrite function

with self.transaction() as tx:
tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties)

In the transaction's overwrite function, it calls both self.delete and self.update_snapshot(snapshot_properties=snapshot_properties).fast_append()

self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)
with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
table_metadata=self.table_metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io
)
for data_file in data_files:
update_snapshot.append_data_file(data_file)

self.delete ultimately creates a UpdateSnapshot (_OverwriteFiles)

with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite(
commit_uuid=commit_uuid
) as overwrite_snapshot:
for original_data_file, replaced_data_files in replaced_files:
overwrite_snapshot.delete_data_file(original_data_file)
for replaced_data_file in replaced_data_files:
overwrite_snapshot.append_data_file(replaced_data_file)

and self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() also creates a UpdateSnapshot (_FastAppendFiles).
with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite(
commit_uuid=commit_uuid
) as overwrite_snapshot:
for original_data_file, replaced_data_files in replaced_files:
overwrite_snapshot.delete_data_file(original_data_file)
for replaced_data_file in replaced_data_files:
overwrite_snapshot.append_data_file(replaced_data_file)

Both _OverwriteFiles and _FastAppendFiles subclass _SnapshotProducer which combines with UpdateTableMetadata updates the transaction

@abstractmethod
def _commit(self) -> UpdatesAndRequirements: ...
def commit(self) -> None:
self._transaction._apply(*self._commit())
def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
"""Close and commit the change."""
self.commit()

def _commit(self) -> UpdatesAndRequirements:
new_manifests = self._manifests()
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
summary = self._summary(self.snapshot_properties)
manifest_list_file_path = _generate_manifest_list_path(
location=self._transaction.table_metadata.location,
snapshot_id=self._snapshot_id,
attempt=0,
commit_uuid=self.commit_uuid,
)
with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
sequence_number=next_sequence_number,
) as writer:
writer.add_manifests(new_manifests)
snapshot = Snapshot(
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
manifest_list=manifest_list_file_path,
sequence_number=next_sequence_number,
summary=summary,
schema_id=self._transaction.table_metadata.current_schema_id,
)
return (
(
AddSnapshotUpdate(snapshot=snapshot),
SetSnapshotRefUpdate(
snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch"
),
),
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),),
)

At this point, nothing has been committed yet. All updates are queued up in the transaction.
commit_transaction is used to apply the changes in the transaction.
For the above scenario, all updates are applied as one transaction. This transaction is either accepted or rejected as a whole. So there cannot be a scenario where the deletes are applied while the append is not

@kevinjqliu
Copy link
Contributor

Please let me know if the above makes sense

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Oct 25, 2024

Ah, do you have _autocommit set to True?
Since both delete and fast_append ultimately call transaction's _apply to queue up the updates, having _autocommit set to True will trigger commit each time.

if self._autocommit:
self.commit_transaction()

This seems like a potential footgun. Perhaps we should get rid of _autocommit, its not used anywhere https://github.com/search?q=repo%3Aapache%2Ficeberg-python%20_autocommit&type=code

@stevie9868
Copy link
Author

stevie9868 commented Oct 26, 2024

@kevinjqliu
Thanks for the detail walk through!

I believe if self.update_snapshot(snapshot_properties=snapshot_properties).fast_append throws an exception, it will still trigger the transaction.exit, which will have the commit_transaction then only contain 1 update, which is the delete in this case as the append failed to be added into the updates list

For example, if the fast_append() failed any operation during the commit, (in our case, we see aws s3 exception), then the exception will propagate back to the transaction.exit.

Let me know if I miss anything, thanks, and I also prefer getting rid of _autocommit in the Transaction class

@stevie9868
Copy link
Author

Thanks for the PR @stevie9868. This sounds like an important bug to address.

Do you know if this bug only applies to the overwrite function or all functions in Transactions?

PS I reran the CI

Thank you, I think this would potentially apply to all functions that triy to combine more than one update into one transaction.

@stevie9868
Copy link
Author

Ah, do you have _autocommit set to True? Since both delete and fast_append ultimately call transaction's _apply to queue up the updates, having _autocommit set to True will trigger commit each time.

if self._autocommit:
self.commit_transaction()

This seems like a potential footgun. Perhaps we should get rid of _autocommit, its not used anywhere https://github.com/search?q=repo%3Aapache%2Ficeberg-python%20_autocommit&type=code

ah, I don't think we set _autocommit to true

@HonahX
Copy link
Contributor

HonahX commented Oct 27, 2024

@stevie9868 @kevinjqliu Thanks for the great PR and discussions! I agree that there is some issue with the current Transaction mechanism: the commit_transaction can be incorrectly called when we should just abandon everything

The following pattern has been the most common practice of updating tables in pyiceberg since the beginning

with tbl.transaction() as txn:
            txn.overwrite(...)
            ....

The "with" statement will ensure that the context manager---Transaction object's __exit__()(commit_transaction) will always be called (even there is an exception) as long as the transaction object is successfully initialized. However, we should only call commit_transaction when there is no exception along the way.

A simpler example would be:

pa_table_with_column = pa.Table.from_pydict(
        {
            "foo": ["a", None, "z"],
            "bar": [19, None, 25],
        },
        schema=pa.schema([
            pa.field("foo", pa.large_string(), nullable=True),
            pa.field("bar", pa.int32(), nullable=True),
        ]),
    )

tbl = catalog.create_table(identifier=identifier, schema=pa_table_with_column.schema)

    with pytest.raises(ValueError):
        with tbl.transaction() as txn:
            txn.append(pa_table_with_column)
            raise ValueError
            txn.append(pa_table_with_column)

    assert len(tbl.scan().to_pandas()) == 0

Since I explicitly raise an error during the transaction, the whole transaction should be abandoned. But this code block still insert 3 rows (first append) to the table.

Please let me know if these make sense. Would love to hear your thoughts on this!

@stevie9868
Copy link
Author

@HonahX

Thanks for providing a detailed example, and I agree that we should only call commit_transaction when there is no exception along the way.

@HonahX
Copy link
Contributor

HonahX commented Oct 27, 2024

This seems like a potential footgun. Perhaps we should get rid of _autocommit, its not used anywhere https://github.com/search?q=repo%3Aapache%2Ficeberg-python%20_autocommit&type=code

The _autocommit flag/autocommit parameter in Transaction is used in some Table's APIs:

def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
"""Create a new UpdateSchema to alter the columns of this table.
Args:
allow_incompatible_changes: If changes are allowed that might break downstream consumers.
case_sensitive: If field names are case-sensitive.
Returns:
A new UpdateSchema.
"""
return UpdateSchema(
transaction=Transaction(self, autocommit=True),
allow_incompatible_changes=allow_incompatible_changes,
case_sensitive=case_sensitive,
name_mapping=self.name_mapping(),
)

def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)

def manage_snapshots(self) -> ManageSnapshots:
"""
Shorthand to run snapshot management operations like create branch, create tag, etc.
Use table.manage_snapshots().<operation>().commit() to run a specific operation.
Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.
We can also use context managers to make more changes. For example,
with table.manage_snapshots() as ms:
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
"""
return ManageSnapshots(transaction=Transaction(self, autocommit=True))

The idea is to make the code simpler if we only want to evolve schema/spec/...
i.e.

with table.update_schema() as update:
    update.add_column("some_field", IntegerType(), "doc")

instead of another with..transaction wrapper

with table.transaction() as transaction:
    with transaction.update_schema() as update_schema:
        update.add_column("some_other_field", IntegerType(), "doc")

Since the recommended way to start a transaction is

txn = tbl.transaction()

, this option in general is not exposed to user directly: #471 (comment)

However, there may still be some concerns around this since Transaction is a public class. If this is the case, I think we can start from making the parameter "private" (autocommit -> _autocommit) and/or adding some doc to explain the usage.

Please let me know what you think!

pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
tests/catalog/test_base.py Outdated Show resolved Hide resolved
@stevie9868
Copy link
Author

stevie9868 commented Oct 27, 2024

However, there may still be some concerns around this since Transaction is a public class. If this is the case, I think we can start from making the parameter "private" (autocommit -> _autocommit) and/or adding some doc to explain the usage.

ah, I think the parameter autocommit is private in Transaction class.

Having a doc is a good first step, and I believe currently autocommit=true will be applied to ManagedSnapshot, UpdateSpec, and UpdateSchema.

Also correct me if I am wrong, I believe the current java iceberg library doesn't have the auto_commit option in the Transaction class?

@stevie9868 stevie9868 force-pushed the yingjianw/abortWholeTransactionWhenThereIsUpdateFailure branch from 34ca959 to f7a7a87 Compare October 27, 2024 17:49
@stevie9868
Copy link
Author

I have also updated the PR based on existing comments, and thanks everyone for the inputs!

Comment on lines +239 to +240
"""Close and commit the transaction, or handle exceptions."""
# Only commit the full transaction, if there is no exception in all updates on the chain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Close and commit the transaction, or handle exceptions."""
# Only commit the full transaction, if there is no exception in all updates on the chain
"""Close and commit the transaction if no exceptions have been raised."""

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exit the runtime context related to this object. The parameters describe the exception that caused the context to be exited. If the context was exited without an exception, all three arguments will be None.

From https://docs.python.org/3/reference/datamodel.html#object.__exit__

@@ -766,3 +766,26 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None
with pytest.raises(ValidationError) as exc_info:
_ = given_catalog_has_a_table(catalog, properties=property_with_none)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)


def test_abort_table_transaction_on_exception(catalog: InMemoryCatalog) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this test be moved to tests/table/test_init.py instead? it doesn't really belong in the "catalog"

Comment on lines +773 to +777
# Populate some initial data
data = pa.Table.from_pylist(
[{"x": 1, "y": 2, "z": 3}, {"x": 4, "y": 5, "z": 6}],
schema=TEST_TABLE_SCHEMA.as_arrow(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can just use the test fixture arrow_table_with_null: pa.Table like so

spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int

Comment on lines +786 to +790
with pytest.raises(ValueError):
with tbl.transaction() as txn:
txn.overwrite(data)
raise ValueError

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
with pytest.raises(ValueError):
with tbl.transaction() as txn:
txn.overwrite(data)
raise ValueError
with pytest.raises(ValueError):
with tbl.transaction() as txn:
txn.overwrite(data)
raise ValueError
txn.overwrite(data)

maybe another call after the exception

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops yea, honah already mentioned this

@kevinjqliu
Copy link
Contributor

Thanks @HonahX @stevie9868! Glad we were able to get to the bottom of this important correctness issue.

I started #1253 to continue the conversation on autocommit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants