From 34dd8ebefea638731ba5d6a59f837573f6664bae Mon Sep 17 00:00:00 2001 From: Anton Kukushkin Date: Thu, 20 Jul 2023 17:14:09 +0100 Subject: [PATCH] Pass schema during chunked parquet reads --- awswrangler/s3/_read_parquet.py | 2 +- tests/unit/test_s3_parquet.py | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/awswrangler/s3/_read_parquet.py b/awswrangler/s3/_read_parquet.py index 5b3a85e4d..79b99f95a 100644 --- a/awswrangler/s3/_read_parquet.py +++ b/awswrangler/s3/_read_parquet.py @@ -240,7 +240,7 @@ def _read_parquet_chunked( batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False ) table = _add_table_partitions( - table=pa.Table.from_batches(chunks), + table=pa.Table.from_batches(chunks, schema=pq_file.schema.to_arrow_schema()), path=path, path_root=path_root, ) diff --git a/tests/unit/test_s3_parquet.py b/tests/unit/test_s3_parquet.py index fc3efba04..d198674bb 100644 --- a/tests/unit/test_s3_parquet.py +++ b/tests/unit/test_s3_parquet.py @@ -674,12 +674,17 @@ def test_ignore_files(path: str, use_threads: Union[bool, int]) -> None: "(ExecutionPlan)[https://github.com/ray-project/ray/blob/ray-2.0.1/python/ray/data/_internal/plan.py#L253]" ), ) -def test_empty_parquet(path): +@pytest.mark.parametrize("chunked", [True, False]) +def test_empty_parquet(path, chunked): path = f"{path}file.parquet" s = pa.schema([pa.field("a", pa.int64())]) pq.write_table(s.empty_table(), path) - df = wr.s3.read_parquet(path) + df = wr.s3.read_parquet(path, chunked=chunked) + + if chunked: + df = pd.concat(list(df)) + assert len(df) == 0 assert len(df.columns) > 0