-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pyarrow IO property for configuring large v small types on read #986
Conversation
Once approved/merged, I'd like to bring this up on the discussion thread to add this item to 0.7.1 patch release as well. It's a small feature, and it would help with alleviate the memory issues we are running into (I expect other users would as well) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this, I left a few comments!
@@ -80,6 +80,7 @@ | |||
GCS_ENDPOINT = "gcs.endpoint" | |||
GCS_DEFAULT_LOCATION = "gcs.default-bucket-location" | |||
GCS_VERSION_AWARE = "gcs.version-aware" | |||
PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if this is a pyarrow specific setting, lets move it to the pyarrow file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of that, but decided to leave it here because I liked having the FileIO properties together in one place. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pyarrow is one of the FileIO implementations and this setting is specifically for Pyarrow. In the future, when we add more FileIO implementations, such as the rust one, it'll be good to have a clear separation between the FileIO settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it also makes more sense to move this inside of the Arrow file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Fokko and @kevinjqliu - I'll keep this in mind the next time I touch these files 🙂
@@ -1146,6 +1152,31 @@ def primitive(self, primitive: pa.DataType) -> pa.DataType: | |||
return primitive | |||
|
|||
|
|||
class _ConvertToSmallTypes(PyArrowSchemaVisitor[Union[pa.DataType, pa.Schema]]): | |||
def schema(self, schema: pa.Schema, struct_result: pa.StructType) -> pa.Schema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: looks like this is the same function definition as the one in _ConvertToLargeTypes
, as with other functions here.
Perhaps abstract into a common class and extend/override specific functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of that, but I didn't like naming one as _ConvertToLargeTypes, and then having an arg like reverse: bool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking something with inheritance like:
_ConvertToArrowTypes
_ConvertToLargeTypes(_ConvertToArrowTypes)
_ConvertToSmallTypes(_ConvertToArrowTypes)
tests/io/test_pyarrow_visitor.py
Outdated
@@ -596,6 +597,11 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa | |||
assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema | |||
|
|||
|
|||
def test_pyarrow_schema_ensure_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None: | |||
schema_with_large_types = _pyarrow_schema_ensure_small_types(pyarrow_schema_nested_without_ids) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: name is large_type, function is small_type
what is this function testing for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was for testing the roundtrip conversion - fixed it to use the correct function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the review feedback @kevinjqliu ! Adopted most of the feedback and left some comments for the others.
@@ -80,6 +80,7 @@ | |||
GCS_ENDPOINT = "gcs.endpoint" | |||
GCS_DEFAULT_LOCATION = "gcs.default-bucket-location" | |||
GCS_VERSION_AWARE = "gcs.version-aware" | |||
PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of that, but decided to leave it here because I liked having the FileIO properties together in one place. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@@ -1146,6 +1152,31 @@ def primitive(self, primitive: pa.DataType) -> pa.DataType: | |||
return primitive | |||
|
|||
|
|||
class _ConvertToSmallTypes(PyArrowSchemaVisitor[Union[pa.DataType, pa.Schema]]): | |||
def schema(self, schema: pa.Schema, struct_result: pa.StructType) -> pa.Schema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking something with inheritance like:
_ConvertToArrowTypes
_ConvertToLargeTypes(_ConvertToArrowTypes)
_ConvertToSmallTypes(_ConvertToArrowTypes)
Thanks for the review @kevinjqliu ! Just updated it to make use of @ndrluis 's cleaned up function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@sungwy Thanks for working on this! It seems we also need to update iceberg-python/pyiceberg/io/pyarrow.py Lines 1471 to 1478 in 846713b
If we have type promotion from string to binary , the schema_to_parrow will convert BinaryType() to pa.large_binary
Example to reproduce: @pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")])
def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
identifier = "default.test_table_scan_override_with_small_types"
arrow_table = pa.Table.from_arrays(
[pa.array(["a", "b", "c"]), pa.array([b"a", b"b", b"c"]), pa.array([["a", "b"], ["c", "d"], ["e", "f"]])],
names=["string", "binary", "list"],
)
try:
catalog.drop_table(identifier)
except NoSuchTableError:
pass
tbl = catalog.create_table(
identifier,
schema=arrow_table.schema,
)
tbl.append(arrow_table)
with tbl.update_schema() as update_schema:
update_schema.update_column("string", BinaryType())
tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False"
result_table = tbl.scan().to_arrow()
expected_schema = pa.schema([
pa.field("string", pa.large_binary()), # should be pa.binary()
pa.field("binary", pa.binary()),
pa.field("list", pa.list_(pa.string())),
])
assert result_table.schema.equals(expected_schema)
##### result_table.schema #####
string: large_binary
binary: binary
list: list<element: string>
child 0, element: string |
@@ -80,6 +80,7 @@ | |||
GCS_ENDPOINT = "gcs.endpoint" | |||
GCS_DEFAULT_LOCATION = "gcs.default-bucket-location" | |||
GCS_VERSION_AWARE = "gcs.version-aware" | |||
PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it also makes more sense to move this inside of the Arrow file.
@@ -1303,6 +1345,8 @@ def project_table( | |||
# When FsSpec is not installed | |||
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e | |||
|
|||
use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only part I wouldn't say I like where we now force the table to use large or normal tables. When we read record batches I agree that we need to force the schema, but for the table, we have to read all the footers anyway.
Once #929 goes in, I think we still need to change that, but let's defer that question for now.
Does anyone know when this will be released? |
Hi @fusion2222 - This will be released with 0.8.0, which will be a few months away (roughly 1~3 months) |
…he#986) * upyarrow IO property for configuring large v small types on read * tests * adopt feedback * use property_as_bool * fix * docs * nits * respect flag on promotion * lint --------- Co-authored-by: Sung Yun <107272191+syun64@users.noreply.github.com>
…he#986) * upyarrow IO property for configuring large v small types on read * tests * adopt feedback * use property_as_bool * fix * docs * nits * respect flag on promotion * lint --------- Co-authored-by: Sung Yun <107272191+syun64@users.noreply.github.com>
This addresses the issue discussed in the formal proposal discussed in the Google Doc.
The current behavior to always cast to large types results in RSS memory usage explosion as is highlighted in the benchmark discussed in the documentation.