Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spec_id back to data file #63

Merged
merged 5 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ class DataFile(Record):
"split_offsets",
"equality_ids",
"sort_order_id",
"spec_id",
)
content: DataFileContent
file_path: str
Expand All @@ -363,6 +364,7 @@ class DataFile(Record):
split_offsets: Optional[List[int]]
equality_ids: Optional[List[int]]
sort_order_id: Optional[int]
spec_id: Optional[int]

def __setattr__(self, name: str, value: Any) -> None:
"""Assign a key/value to a DataFile."""
Expand Down Expand Up @@ -582,7 +584,7 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List
read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
) as reader:
return [
_inherit_sequence_number(entry, self)
_inherit_from_manifest(entry, self)
for entry in reader
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]
Expand All @@ -607,18 +609,24 @@ def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
yield from reader


def _inherit_sequence_number(entry: ManifestEntry, manifest: ManifestFile) -> ManifestEntry:
"""Inherits the sequence numbers.
def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> ManifestEntry:
"""
Inherits properties from manifest file.

The properties that will be inherited are:
- sequence numbers
- partition spec id.

More information in the spec: https://iceberg.apache.org/spec/#sequence-number-inheritance
More information about inheriting sequence numbers: https://iceberg.apache.org/spec/#sequence-number-inheritance

Args:
entry: The manifest entry that has null sequence numbers.
manifest: The manifest that has a sequence number.
entry: The manifest entry.
manifest: The manifest file.

Returns:
The manifest entry with the sequence numbers set.
The manifest entry with properties inherited.
"""
# Inherit sequence numbers.
# The snapshot_id is required in V1, inherit with V2 when null
if entry.snapshot_id is None:
entry.snapshot_id = manifest.added_snapshot_id
Expand All @@ -634,6 +642,9 @@ def _inherit_sequence_number(entry: ManifestEntry, manifest: ManifestFile) -> Ma
# Only available in V2, always 0 in V1
entry.file_sequence_number = manifest.sequence_number

# Inherit partition spec id.
entry.data_file.spec_id = manifest.partition_spec_id

return entry


Expand Down
1 change: 1 addition & 0 deletions tests/avro/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg(format_version: in
split_offsets=[4, 133697593],
equality_ids=[],
sort_order_id=4,
spec_id=3,
)
if format_version == 1:
data_file.block_size_in_bytes = DEFAULT_BLOCK_SIZE
Expand Down
13 changes: 7 additions & 6 deletions tests/test_integration_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@

from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.manifest import (
DataFile,
ManifestEntry,
write_manifest,
)
from pyiceberg.manifest import DataFile, ManifestEntry, write_manifest
from pyiceberg.table import Table
from pyiceberg.utils.lazydict import LazyDict

Expand Down Expand Up @@ -101,9 +97,14 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None:
split_offsets=entry.data_file.split_offsets,
equality_ids=entry.data_file.equality_ids,
sort_order_id=entry.data_file.sort_order_id,
spec_id=entry.data_file.spec_id,
)
wrapped_entry_v2 = ManifestEntry(*entry.record_fields())
wrapped_entry_v2.data_file = wrapped_data_file_v2_debug
wrapped_entry_v2_dict = todict(wrapped_entry_v2)
# This one should not be written
del wrapped_entry_v2_dict['data_file']['spec_id']

with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/test_write_manifest.avro"
output = PyArrowFileIO().new_output(tmp_avro_file)
Expand All @@ -122,4 +123,4 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None:
it = iter(r)
fa_entry = next(it)

assert fa_entry == todict(wrapped_entry_v2)
assert fa_entry == wrapped_entry_v2_dict