Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for ns #1169 #1215

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,14 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
return StringType()
elif pa.types.is_date32(primitive):
return DateType()
elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
elif isinstance(primitive, pa.Time64Type):
if primitive.unit == "ns":
if self._downcast_ns_timestamp_to_us:
logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.")
else:
raise TypeError(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
)
return TimeType()
elif pa.types.is_timestamp(primitive):
primitive = cast(pa.TimestampType, primitive)
Expand Down
58 changes: 58 additions & 0 deletions tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ def test_add_files_with_timestamp_tz_ns_fails(session_catalog: Catalog, format_v
writer.write_table(arrow_table)

# add the parquet files as data files

with pytest.raises(
TypeError,
match=re.escape(
Expand Down Expand Up @@ -827,3 +828,60 @@ def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file
with pytest.raises(ValueError) as exc_info:
tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True)
assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_timestamp_tz_ns_downcast_on_read(session_catalog: Catalog, format_version: int, mocker: MockerFixture) -> None:
nanoseconds_schema_iceberg = Schema(NestedField(1, "quux", TimestamptzType()))

nanoseconds_schema = pa.schema([
("quux", pa.timestamp("ns", tz="UTC")),
])

arrow_table = pa.Table.from_pylist(
[
{
"quux": 1615967687249846175,
}
],
schema=nanoseconds_schema,
)
mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"})

identifier = f"default.timestamptz_ns_added{format_version}"

try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

tbl = session_catalog.create_table(
identifier=identifier,
schema=nanoseconds_schema_iceberg,
properties={"format-version": str(format_version)},
partition_spec=PartitionSpec(),
)

file_paths = [f"s3://warehouse/default/test_timestamp_tz/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=nanoseconds_schema) as writer:
writer.write_table(arrow_table)

# add the parquet files as data files
tbl.add_files(file_paths=file_paths)

assert tbl.scan().to_arrow() == pa.concat_tables(
[
arrow_table.cast(
pa.schema([
("quux", pa.timestamp("us", tz="UTC")),
]),
safe=False,
)
]
* 5
)
39 changes: 38 additions & 1 deletion tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import math
import os
import time
from datetime import date, datetime, timedelta
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict
from urllib.parse import urlparse
Expand Down Expand Up @@ -1448,3 +1448,40 @@ def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) ->
EqualTo("category", "A"),
),
)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_write_ns_timestamp_precision(mocker: MockerFixture, session_catalog: Catalog, format_version: int) -> None:
identifier = "default.table_ns_timestamp_precision"
arrow_table_schema_with_ns_timestamp_precisions = pa.schema([
("timestamp_ns", pa.timestamp(unit="ns")),
("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
])
TEST_DATA_WITH_NULL = {
"timestamp_ns": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
"timestamptz_ns": [
datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
None,
datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
],
}
input_arrow_table = pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=arrow_table_schema_with_ns_timestamp_precisions)
mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"})

tbl = _create_table(
session_catalog,
identifier,
{"format-version": format_version},
data=[input_arrow_table],
schema=arrow_table_schema_with_ns_timestamp_precisions,
)
tbl.overwrite(input_arrow_table)
written_arrow_table = tbl.scan().to_arrow()

expected_schema_in_all_us = pa.schema([
("timestamp_ns", pa.timestamp(unit="us")),
("timestamptz_ns", pa.timestamp(unit="us", tz="UTC")),
])
assert written_arrow_table.schema == expected_schema_in_all_us
assert written_arrow_table == input_arrow_table.cast(expected_schema_in_all_us)
16 changes: 14 additions & 2 deletions tests/io/test_pyarrow_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,21 @@ def test_pyarrow_time64_us_to_iceberg() -> None:
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type


def test_pyarrow_time64_ns_to_iceberg() -> None:
def test_pyarrow_time64_ns_to_iceberg_downcast_set() -> None:
pyarrow_type = pa.time64("ns")
with pytest.raises(TypeError, match=re.escape("Unsupported type: time64[ns]")):
zaryab-ali marked this conversation as resolved.
Show resolved Hide resolved
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg(downcast_ns_timestamp_to_us=True))
assert converted_iceberg_type == TimeType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.time64("us")


def test_pyarrow_time64_ns_to_iceberg_downcast_not_set() -> None:
pyarrow_type = pa.time64("ns")
with pytest.raises(
TypeError,
match=re.escape(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
),
):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())


Expand Down