From f632c22a50d1f34d108df8ab31072907eb807a22 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 22 Feb 2024 18:55:16 -0800 Subject: [PATCH] add write target file size config --- pyiceberg/io/pyarrow.py | 22 ++++++++++++++++------ pyiceberg/table/__init__.py | 6 +++++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index dbb8edfefe..16a0462a5e 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1758,14 +1758,24 @@ def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[S data_files.append(data_file) return iter(data_files) -def bin_pack_arrow_table(tbl: pa.Table) -> Iterator[List[pa.RecordBatch]]: - # bin-pack the table into 256 MB chunks + +def bin_pack_arrow_table(tbl: pa.Table, table_properties: Properties) -> Iterator[List[pa.RecordBatch]]: from pyiceberg.utils.bin_packing import PackingIterator - splits = tbl.to_batches() - target_weight = 2 << 27 # 256 MB - bin_packed = PackingIterator(splits, target_weight, lookback=2, weight_func=lambda x: x.nbytes, largest_bin_first=True) - return bin_packed + target_file_size = PropertyUtil.property_as_int( + properties=table_properties, + property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, + ) + assert isinstance(target_file_size, int) + bin_packed_record_batches = PackingIterator( + items=tbl.to_batches(), + target_weight=target_file_size, + lookback=2, + weight_func=lambda x: x.nbytes, + largest_bin_first=True, + ) + return bin_packed_record_batches ICEBERG_UNCOMPRESSED_CODEC = "uncompressed" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 923cc645c3..6a04898ce3 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -156,6 +156,9 @@ class TableProperties: PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column" + WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes" + WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB + DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default" DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)" @@ -2367,9 +2370,10 @@ def _dataframe_to_data_files( counter = itertools.count(0) write_uuid = write_uuid or uuid.uuid4() + write_tasks = [WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, table.properties)] # This is an iter, so we don't have to materialize everything every time # This will be more relevant when we start doing partitioned writes - yield from write_file(table, iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df)]), file_schema=file_schema) + yield from write_file(table, iter(write_tasks), file_schema=file_schema) class _MergingSnapshotProducer: