Skip to content

Commit

Permalink
add write target file size config
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinjqliu committed Feb 23, 2024
1 parent 7b4f3ce commit f632c22
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
22 changes: 16 additions & 6 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1758,14 +1758,24 @@ def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[S
data_files.append(data_file)
return iter(data_files)

def bin_pack_arrow_table(tbl: pa.Table) -> Iterator[List[pa.RecordBatch]]:
# bin-pack the table into 256 MB chunks

def bin_pack_arrow_table(tbl: pa.Table, table_properties: Properties) -> Iterator[List[pa.RecordBatch]]:
from pyiceberg.utils.bin_packing import PackingIterator

splits = tbl.to_batches()
target_weight = 2 << 27 # 256 MB
bin_packed = PackingIterator(splits, target_weight, lookback=2, weight_func=lambda x: x.nbytes, largest_bin_first=True)
return bin_packed
target_file_size = PropertyUtil.property_as_int(
properties=table_properties,
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)
assert isinstance(target_file_size, int)
bin_packed_record_batches = PackingIterator(
items=tbl.to_batches(),
target_weight=target_file_size,
lookback=2,
weight_func=lambda x: x.nbytes,
largest_bin_first=True,
)
return bin_packed_record_batches


ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class TableProperties:

PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column"

WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB

DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"

Expand Down Expand Up @@ -2367,9 +2370,10 @@ def _dataframe_to_data_files(
counter = itertools.count(0)
write_uuid = write_uuid or uuid.uuid4()

write_tasks = [WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, table.properties)]
# This is an iter, so we don't have to materialize everything every time
# This will be more relevant when we start doing partitioned writes
yield from write_file(table, iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df)]), file_schema=file_schema)
yield from write_file(table, iter(write_tasks), file_schema=file_schema)


class _MergingSnapshotProducer:
Expand Down

0 comments on commit f632c22

Please sign in to comment.