Skip to content

Commit

Permalink
ARROW-16719: [Python] Add path/URI + filesystem handling to parquet.r…
Browse files Browse the repository at this point in the history
…ead_metadata (#13629)

Add `filesystem` support to `pq.read_metadata` and `pq.read_schema`.

Lead-authored-by: kshitij12345 <kshitijkalambarkar@gmail.com>
Co-authored-by: Kshiteej K <kshitijkalambarkar@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
kshitij12345 and jorisvandenbossche authored Aug 17, 2022
1 parent f6127fc commit 42ed37e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 7 deletions.
39 changes: 32 additions & 7 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from collections import defaultdict
from concurrent import futures
from contextlib import nullcontext
from functools import partial, reduce

import sys
Expand Down Expand Up @@ -3389,7 +3390,8 @@ def write_metadata(schema, where, metadata_collector=None, **kwargs):
metadata.write_metadata_file(where)


def read_metadata(where, memory_map=False, decryption_properties=None):
def read_metadata(where, memory_map=False, decryption_properties=None,
filesystem=None):
"""
Read FileMetaData from footer of a single Parquet file.
Expand All @@ -3400,6 +3402,10 @@ def read_metadata(where, memory_map=False, decryption_properties=None):
Create memory map when the source is a file path.
decryption_properties : FileDecryptionProperties, default None
Decryption properties for reading encrypted Parquet files.
filesystem : FileSystem, default None
If nothing passed, will be inferred based on path.
Path will try to be found in the local on-disk filesystem otherwise
it will be parsed as an URI to determine the filesystem.
Returns
-------
Expand All @@ -3422,11 +3428,19 @@ def read_metadata(where, memory_map=False, decryption_properties=None):
format_version: 2.6
serialized_size: ...
"""
return ParquetFile(where, memory_map=memory_map,
decryption_properties=decryption_properties).metadata
filesystem, where = _resolve_filesystem_and_path(where, filesystem)
file_ctx = nullcontext()
if filesystem is not None:
file_ctx = where = filesystem.open_input_file(where)

with file_ctx:
file = ParquetFile(where, memory_map=memory_map,
decryption_properties=decryption_properties)
return file.metadata


def read_schema(where, memory_map=False, decryption_properties=None):
def read_schema(where, memory_map=False, decryption_properties=None,
filesystem=None):
"""
Read effective Arrow schema from Parquet file metadata.
Expand All @@ -3437,6 +3451,10 @@ def read_schema(where, memory_map=False, decryption_properties=None):
Create memory map when the source is a file path.
decryption_properties : FileDecryptionProperties, default None
Decryption properties for reading encrypted Parquet files.
filesystem : FileSystem, default None
If nothing passed, will be inferred based on path.
Path will try to be found in the local on-disk filesystem otherwise
it will be parsed as an URI to determine the filesystem.
Returns
-------
Expand All @@ -3454,9 +3472,16 @@ def read_schema(where, memory_map=False, decryption_properties=None):
n_legs: int64
animal: string
"""
return ParquetFile(
where, memory_map=memory_map,
decryption_properties=decryption_properties).schema.to_arrow_schema()
filesystem, where = _resolve_filesystem_and_path(where, filesystem)
file_ctx = nullcontext()
if filesystem is not None:
file_ctx = where = filesystem.open_input_file(where)

with file_ctx:
file = ParquetFile(
where, memory_map=memory_map,
decryption_properties=decryption_properties)
return file.schema.to_arrow_schema()


# re-export everything
Expand Down
37 changes: 37 additions & 0 deletions python/pyarrow/tests/parquet/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import pyarrow as pa
from pyarrow.tests.parquet.common import _check_roundtrip, make_sample_file
from pyarrow.fs import LocalFileSystem
from pyarrow.tests import util

try:
import pyarrow.parquet as pq
Expand Down Expand Up @@ -533,6 +535,41 @@ def test_metadata_exceeds_message_size():
metadata = pq.read_metadata(pa.BufferReader(buf))


def test_metadata_schema_filesystem(tempdir):
table = pa.table({"a": [1, 2, 3]})

# URI writing to local file.
fname = "data.parquet"
file_path = str(tempdir / fname)
file_uri = 'file:///' + file_path

pq.write_table(table, file_path)

# Get expected `metadata` from path.
metadata = pq.read_metadata(tempdir / fname)
schema = table.schema

assert pq.read_metadata(file_uri).equals(metadata)
assert pq.read_metadata(
file_path, filesystem=LocalFileSystem()).equals(metadata)
assert pq.read_metadata(
fname, filesystem=f'file:///{tempdir}').equals(metadata)

assert pq.read_schema(file_uri).equals(schema)
assert pq.read_schema(
file_path, filesystem=LocalFileSystem()).equals(schema)
assert pq.read_schema(
fname, filesystem=f'file:///{tempdir}').equals(schema)

with util.change_cwd(tempdir):
# Pass `filesystem` arg
assert pq.read_metadata(
fname, filesystem=LocalFileSystem()).equals(metadata)

assert pq.read_schema(
fname, filesystem=LocalFileSystem()).equals(schema)


def test_metadata_equals():
table = pa.table({"a": [1, 2, 3]})
with pa.BufferOutputStream() as out:
Expand Down

0 comments on commit 42ed37e

Please sign in to comment.