Skip to content

Commit

Permalink
[FEAT] Add ability to specify snapshot_id for iceberg read (#2426)
Browse files Browse the repository at this point in the history
Closes #2400

---------

Co-authored-by: Jay Chia <jaychia94@gmail.com@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Jun 21, 2024
1 parent a2bb7f8 commit 4951842
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 5 deletions.
15 changes: 11 additions & 4 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,22 @@ def iceberg_partition_spec_to_fields(iceberg_schema: IcebergSchema, spec: Iceber


class IcebergScanOperator(ScanOperator):
def __init__(self, iceberg_table: Table, storage_config: StorageConfig) -> None:
def __init__(self, iceberg_table: Table, snapshot_id: int | None, storage_config: StorageConfig) -> None:
super().__init__()
self._table = iceberg_table
self._snapshot_id = snapshot_id
self._storage_config = storage_config
iceberg_schema = iceberg_table.schema()

iceberg_schema = (
iceberg_table.schema()
if self._snapshot_id is None
else self._table.scan(snapshot_id=self._snapshot_id).projection()
)
arrow_schema = schema_to_pyarrow(iceberg_schema)
self._field_id_mapping = visit(iceberg_schema, SchemaFieldIdMappingVisitor())
self._schema = Schema.from_pyarrow_schema(arrow_schema)
self._partition_keys = iceberg_partition_spec_to_fields(self._table.schema(), self._table.spec())

self._partition_keys = iceberg_partition_spec_to_fields(iceberg_schema, self._table.spec())

def schema(self) -> Schema:
return self._schema
Expand Down Expand Up @@ -129,7 +136,7 @@ def multiline_display(self) -> list[str]:

def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]:
limit = pushdowns.limit
iceberg_tasks = self._table.scan(limit=limit).plan_files()
iceberg_tasks = self._table.scan(limit=limit, snapshot_id=self._snapshot_id).plan_files()

limit_files = limit is not None and pushdowns.filters is None and pushdowns.partition_filters is None

Expand Down
4 changes: 3 additions & 1 deletion daft/io/_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def _convert_iceberg_file_io_properties_to_io_config(props: Dict[str, Any]) -> O
@PublicAPI
def read_iceberg(
pyiceberg_table: "PyIcebergTable",
snapshot_id: Optional[int] = None,
io_config: Optional["IOConfig"] = None,
) -> DataFrame:
"""Create a DataFrame from an Iceberg table
Expand All @@ -106,6 +107,7 @@ def read_iceberg(
Args:
pyiceberg_table: Iceberg table created using the PyIceberg library
snapshot_id: Snapshot ID of the table to query
io_config: A custom IOConfig to use when accessing Iceberg object storage data. Defaults to None.
Returns:
Expand All @@ -123,7 +125,7 @@ def read_iceberg(
multithreaded_io = not context.get_context().is_ray_runner
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))

iceberg_operator = IcebergScanOperator(pyiceberg_table, storage_config=storage_config)
iceberg_operator = IcebergScanOperator(pyiceberg_table, snapshot_id=snapshot_id, storage_config=storage_config)

handle = ScanOperatorHandle.from_python_scan_operator(iceberg_operator)
builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle)
Expand Down
21 changes: 21 additions & 0 deletions tests/integration/iceberg/docker-compose/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,24 @@
)

spark.sql("INSERT INTO default.test_evolve_partitioning VALUES (CAST('2021-02-01' AS date))")


###
# Multi-snapshot table
###

spark.sql("""
CREATE OR REPLACE TABLE default.test_snapshotting
USING iceberg
AS SELECT
1 AS idx,
float('NaN') AS col_numeric
UNION ALL SELECT
2 AS idx,
null AS col_numeric
UNION ALL SELECT
3 AS idx,
1 AS col_numeric
""")

spark.sql("INSERT INTO default.test_snapshotting VALUES (4, 1)")
12 changes: 12 additions & 0 deletions tests/integration/iceberg/test_table_load.py
Original file line number Diff line number Diff line change