Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is pyiceberg.Table thread-safe? #1305

Closed
chengchengpei opened this issue Nov 8, 2024 · 3 comments
Closed

Is pyiceberg.Table thread-safe? #1305

chengchengpei opened this issue Nov 8, 2024 · 3 comments

Comments

@chengchengpei
Copy link

Question

Is pyiceberg.Table thread-safe? can we call table.append(table_data) in multiple threads or processes to write to the same table parallelly?

@kevinjqliu
Copy link
Contributor

the write process rely on the catalog's atomic swap to guarantee serializable isolation. Multiple threads can call append but only one will succeed at a time, the others have to retry the commit
https://iceberg.apache.org/spec/?h=concurrency#optimistic-concurrency

@chengchengpei
Copy link
Author

chengchengpei commented Nov 8, 2024

@kevinjqliu

i tried to run the following codes: (multiple processes to append to the same iceberg table)

import os
import time
from multiprocessing import Pool

import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, BinaryType
import base64
import boto3

from utils import list_files


def process_batch(batch):
    print('start processing batch')
    ids = []
    image_data = []
    for image_path, image_name in batch:
        with open(image_path, "rb") as f:
            image_data.append(base64.b64encode(f.read()))
            ids.append(image_name)
    table_data = pa.Table.from_pydict({"id": ids, "image_data": image_data})
    start = time.time()
    catalog = load_catalog("glue", **{
        "type": "glue",
        "region": "us-east-1",
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
        "s3.access-key-id": os.getenv("AWS_KEY_ID"),
        "s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY"),
        "max-workers": 8
    })
    catalog.load_table("test.imagenet-object-localization-challenge-10000").append(table_data)
    end = time.time()
    print('uploaded {} in {} seconds'.format(len(ids), end - start))
    return len(ids), end - start


if __name__ == "__main__":
    # Create a schema for the Iceberg table
    schema = Schema(
        NestedField(1, "id", StringType()),
        NestedField(2, "image_data", BinaryType())
    )

    # Load the Iceberg catalog
    catalog = load_catalog("glue", **{
        "type": "glue",
        "region": "us-east-1",
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
        "s3.access-key-id": os.getenv("AWS_KEY_ID"),
        "s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY"),
        "max-workers": 8
        # "write.parquet.compression-codec": "snappy"
    })

    catalog.create_namespace_if_not_exists("test")

    # Create an Iceberg table
    table = catalog.create_table_if_not_exists(
        identifier="test.imagenet-object-localization-challenge-10000",
        schema=schema,
        location="s3://test/iceberg-data/")

    # Load images and convert to base64
    images_list = list_files("/Users/ILSVRC/data/CLS-LOC/test/", extension=".JPEG")
    batch_size = 10000
    total_batches = 10
    processes = []
    batches = [images_list[i:i + batch_size] for i in
               range(0, min(len(images_list), total_batches * batch_size), batch_size)]

    with Pool(4) as pool:
        results = pool.map(process_batch, batches)

    for result in results:
        print('uploaded {} in {} seconds'.format(result[0], result[1]))

but got

Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar
    return list(map(*args))
  File "/Users/test/write_images_to_iceberg.py", line 51, in process_batch
    catalog.load_table("test.imagenet-object-localization-challenge-10000").append(table_data)
  File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 1578, in append
    tx.append(df=df, snapshot_properties=snapshot_properties)
  File "/Users/tst/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 289, in __exit__
    self.commit_transaction()
  File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 712, in commit_transaction
    self._table._do_commit(  # pylint: disable=W0212
  File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 1638, in _do_commit
    response = self.catalog._commit_table(  # pylint: disable=W0212
  File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/catalog/glue.py", line 484, in _commit_table
    updated_staged_table = self._update_and_stage_table(current_table, table_request)
  File "/Users/test/venv/lib/python3.9/site-packages/pyiceberg/catalog/__init__.py", line 835, in _update_and_stage_table
    requirement.validate(current_table.metadata if current_table else None)
  File "/Users/tests/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line 1262, in validate
    raise CommitFailedException(
pyiceberg.exceptions.CommitFailedException: Requirement failed: branch main has changed: expected id 2633742078255924117, found 3998254648540280684
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/test/write_images_to_iceberg.py", line 92, in <module>
    results = pool.map(process_batch, batches)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 771, in get
    raise self._value
pyiceberg.exceptions.CommitFailedException: Requirement failed: branch main has changed: expected id 2633742078255924117, found 3998254648540280684

i have too many rows to write to the same iceberg table.... how to speed it up?

Thanks

@kevinjqliu
Copy link
Contributor

pyiceberg.exceptions.CommitFailedException

This is not a thread issue. This is expected when you have concurrent writers. The writer should retry the commit. See #1084 for a similar issue. And #269 as the issue to track this feature.
See #269 (comment) for a workaround.

i have too many rows to write to the same iceberg table.... how to speed it up?

You can try writing batches as parquet files, and then collect them all in the main method. Use the add_files method to commit all parquet files to the iceberg table, this will commit only once to the table.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants