Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-16719: [Python] Add path/URI + filesystem handling to parquet.read_metadata #13629

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions python/pyarrow/parquet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from collections import defaultdict
from concurrent import futures
from contextlib import nullcontext
from functools import partial, reduce

import json
Expand Down Expand Up @@ -3388,7 +3389,8 @@ def write_metadata(schema, where, metadata_collector=None, **kwargs):
metadata.write_metadata_file(where)


def read_metadata(where, memory_map=False, decryption_properties=None):
def read_metadata(where, memory_map=False, decryption_properties=None,
filesystem=None):
"""
Read FileMetaData from footer of a single Parquet file.

Expand All @@ -3399,6 +3401,10 @@ def read_metadata(where, memory_map=False, decryption_properties=None):
Create memory map when the source is a file path.
decryption_properties : FileDecryptionProperties, default None
Decryption properties for reading encrypted Parquet files.
filesystem : FileSystem, default None
If nothing passed, will be inferred based on path.
Path will try to be found in the local on-disk filesystem otherwise
it will be parsed as an URI to determine the filesystem.

Returns
-------
Expand All @@ -3421,11 +3427,19 @@ def read_metadata(where, memory_map=False, decryption_properties=None):
format_version: 2.6
serialized_size: 561
"""
return ParquetFile(where, memory_map=memory_map,
decryption_properties=decryption_properties).metadata
filesystem, where = _resolve_filesystem_and_path(where, filesystem)
file_ctx = nullcontext()
if filesystem is not None:
file_ctx = where = filesystem.open_input_file(where)

with file_ctx:
file = ParquetFile(where, memory_map=memory_map,
decryption_properties=decryption_properties)
return file.metadata


def read_schema(where, memory_map=False, decryption_properties=None):
def read_schema(where, memory_map=False, decryption_properties=None,
filesystem=None):
"""
Read effective Arrow schema from Parquet file metadata.

Expand All @@ -3436,6 +3450,10 @@ def read_schema(where, memory_map=False, decryption_properties=None):
Create memory map when the source is a file path.
decryption_properties : FileDecryptionProperties, default None
Decryption properties for reading encrypted Parquet files.
filesystem : FileSystem, default None
If nothing passed, will be inferred based on path.
Path will try to be found in the local on-disk filesystem otherwise
it will be parsed as an URI to determine the filesystem.

Returns
-------
Expand All @@ -3453,6 +3471,13 @@ def read_schema(where, memory_map=False, decryption_properties=None):
n_legs: int64
animal: string
"""
return ParquetFile(
where, memory_map=memory_map,
decryption_properties=decryption_properties).schema.to_arrow_schema()
filesystem, where = _resolve_filesystem_and_path(where, filesystem)
file_ctx = nullcontext()
if filesystem is not None:
file_ctx = where = filesystem.open_input_file(where)

with file_ctx:
file = ParquetFile(
where, memory_map=memory_map,
decryption_properties=decryption_properties)
return file.schema.to_arrow_schema()
41 changes: 41 additions & 0 deletions python/pyarrow/tests/parquet/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
import datetime
import decimal
from collections import OrderedDict
import os

import numpy as np
import pytest

import pyarrow as pa
from pyarrow.tests.parquet.common import _check_roundtrip, make_sample_file
from pyarrow.fs import LocalFileSystem, FileSystem
from pyarrow.tests import util

try:
import pyarrow.parquet as pq
Expand Down Expand Up @@ -533,6 +536,44 @@ def test_metadata_exceeds_message_size():
metadata = pq.read_metadata(pa.BufferReader(buf))


def test_metadata_schema_filesystem(tmpdir):
table = pa.table({"a": [1, 2, 3]})

# URI writing to local file.
fname = "data.parquet"
file_path = 'file:///' + os.path.join(str(tmpdir), fname)

pq.write_table(table, file_path)

# Get expected `metadata` from path.
metadata = pq.read_metadata(tmpdir / fname)
schema = table.schema

assert pq.read_metadata(file_path).equals(metadata)
assert pq.read_metadata(
fname, filesystem=f'file:///{tmpdir}').equals(metadata)

assert pq.read_schema(file_path).equals(schema)
assert pq.read_schema(fname, filesystem=f'file:///{tmpdir}').equals(schema)

with util.change_cwd(tmpdir):
# Pass `filesystem` arg
assert pq.read_metadata(
fname, filesystem=LocalFileSystem()).equals(metadata)
assert pq.read_metadata(
fname, filesystem=LocalFileSystem.get_instance()).equals(metadata)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new LocalFileSystem has no get_instance() method, so I think you can just remove this case (since the line above already includes the case of a plain LocalFileSystem())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I was away from keyboard for the week. Will fix it!


assert pq.read_schema(
fname, filesystem=LocalFileSystem()).equals(schema)
assert pq.read_schema(
fname, filesystem=LocalFileSystem.get_instance()).equals(schema)

err_msg = ('`filesystem` argument must be a FileSystem'
' instance or a valid file system URI')
with pytest.raises(TypeError, match=err_msg):
pq.read_metadata(fname, filesystem=FileSystem())


def test_metadata_equals():
table = pa.table({"a": [1, 2, 3]})
with pa.BufferOutputStream() as out:
Expand Down