diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index aa27796081..2b71722c29 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1068,7 +1068,14 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: return StringType() elif pa.types.is_date32(primitive): return DateType() - elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us": + elif isinstance(primitive, pa.Time64Type): + if primitive.unit == "ns": + if self._downcast_ns_timestamp_to_us: + logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.") + else: + raise TypeError( + "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + ) return TimeType() elif pa.types.is_timestamp(primitive): primitive = cast(pa.TimestampType, primitive) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 85e626edf4..be19238bfa 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -601,6 +601,7 @@ def test_add_files_with_timestamp_tz_ns_fails(session_catalog: Catalog, format_v writer.write_table(arrow_table) # add the parquet files as data files + with pytest.raises( TypeError, match=re.escape( @@ -827,3 +828,60 @@ def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file with pytest.raises(ValueError) as exc_info: tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True) assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_timestamp_tz_ns_downcast_on_read(session_catalog: Catalog, format_version: int, mocker: MockerFixture) -> None: + nanoseconds_schema_iceberg = Schema(NestedField(1, "quux", TimestamptzType())) + + nanoseconds_schema = pa.schema([ + ("quux", pa.timestamp("ns", tz="UTC")), + ]) + + arrow_table = pa.Table.from_pylist( + [ + { + "quux": 1615967687249846175, + } + ], + schema=nanoseconds_schema, + ) + mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"}) + + identifier = f"default.timestamptz_ns_added{format_version}" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table( + identifier=identifier, + schema=nanoseconds_schema_iceberg, + properties={"format-version": str(format_version)}, + partition_spec=PartitionSpec(), + ) + + file_paths = [f"s3://warehouse/default/test_timestamp_tz/v{format_version}/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=nanoseconds_schema) as writer: + writer.write_table(arrow_table) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + assert tbl.scan().to_arrow() == pa.concat_tables( + [ + arrow_table.cast( + pa.schema([ + ("quux", pa.timestamp("us", tz="UTC")), + ]), + safe=False, + ) + ] + * 5 + ) \ No newline at end of file diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index fc2746c614..21be82e41b 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -18,7 +18,7 @@ import math import os import time -from datetime import date, datetime, timedelta +from datetime import date, datetime, timedelta, timezone from pathlib import Path from typing import Any, Dict from urllib.parse import urlparse @@ -1448,3 +1448,40 @@ def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) -> EqualTo("category", "A"), ), ) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_write_ns_timestamp_precision(mocker: MockerFixture, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_ns_timestamp_precision" + arrow_table_schema_with_ns_timestamp_precisions = pa.schema([ + ("timestamp_ns", pa.timestamp(unit="ns")), + ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")), + ]) + TEST_DATA_WITH_NULL = { + "timestamp_ns": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)], + "timestamptz_ns": [ + datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc), + None, + datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc), + ], + } + input_arrow_table = pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=arrow_table_schema_with_ns_timestamp_precisions) + mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"}) + + tbl = _create_table( + session_catalog, + identifier, + {"format-version": format_version}, + data=[input_arrow_table], + schema=arrow_table_schema_with_ns_timestamp_precisions, + ) + tbl.overwrite(input_arrow_table) + written_arrow_table = tbl.scan().to_arrow() + + expected_schema_in_all_us = pa.schema([ + ("timestamp_ns", pa.timestamp(unit="us")), + ("timestamptz_ns", pa.timestamp(unit="us", tz="UTC")), + ]) + assert written_arrow_table.schema == expected_schema_in_all_us + assert written_arrow_table == input_arrow_table.cast(expected_schema_in_all_us) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 9e6df720c6..5ae2eba6aa 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -170,9 +170,21 @@ def test_pyarrow_time64_us_to_iceberg() -> None: assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type -def test_pyarrow_time64_ns_to_iceberg() -> None: +def test_pyarrow_time64_ns_to_iceberg_downcast_set() -> None: pyarrow_type = pa.time64("ns") - with pytest.raises(TypeError, match=re.escape("Unsupported type: time64[ns]")): + converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg(downcast_ns_timestamp_to_us=True)) + assert converted_iceberg_type == TimeType() + assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.time64("us") + + +def test_pyarrow_time64_ns_to_iceberg_downcast_not_set() -> None: + pyarrow_type = pa.time64("ns") + with pytest.raises( + TypeError, + match=re.escape( + "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + ), + ): visit_pyarrow(pyarrow_type, _ConvertToIceberg())