Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write Arrow Table/RecordBatchReader to GDAL #346

Merged
merged 58 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
c8addb5
Scratch work for writing Arrow to GDAL
kylebarron Jan 25, 2024
33ce924
wip
kylebarron Jan 25, 2024
3315998
update ogr header definitions
kylebarron Jan 25, 2024
0da2e45
include pycapsule function import
kylebarron Jan 25, 2024
54fd141
Update pyogrio/_io.pyx
kylebarron Jan 25, 2024
09ad7ee
check for null stream
kylebarron Jan 25, 2024
81fb974
ogr definitions
kylebarron Jan 25, 2024
eab20ba
compiles!
kylebarron Jan 25, 2024
8b8bdb7
switch to bint
kylebarron Jan 26, 2024
5a3aa5c
fix schema and array pointers
jorisvandenbossche Jan 26, 2024
2a6f0be
fix error checks
jorisvandenbossche Jan 26, 2024
838abf6
Merge branch 'main' into kyle/write-arrow
kylebarron Feb 8, 2024
98a527c
Simplify
kylebarron Feb 8, 2024
8236787
Add write_arrow
kylebarron Feb 8, 2024
6b22bb0
conditionally compile
kylebarron Feb 16, 2024
040b54f
fix indent: weird vscode bug
kylebarron Feb 16, 2024
dbf8f9d
wip tests
kylebarron Feb 16, 2024
76b64e9
require geometry_name arg
kylebarron Feb 16, 2024
77337b1
infer geometry field from metadata
jorisvandenbossche Feb 17, 2024
9bcec45
close dataset at the end
jorisvandenbossche Feb 19, 2024
d8fe405
Merge remote-tracking branch 'upstream/main' into kyle/write-arrow
jorisvandenbossche Feb 23, 2024
afc6807
clean-up unused imports in tests
jorisvandenbossche Feb 23, 2024
758d55e
don't require pyarrow dependency
kylebarron Feb 26, 2024
8d1a8ab
move code down
jorisvandenbossche Feb 27, 2024
083f90e
factor out get_extension_metadata helper
jorisvandenbossche Feb 27, 2024
a306ff3
hide warnings in tests
jorisvandenbossche Feb 27, 2024
05db5d6
Raise error for non-WKB extension types + actually test the inference…
jorisvandenbossche Feb 27, 2024
da26aec
Merge remote-tracking branch 'upstream/main' into kyle/write-arrow
jorisvandenbossche Mar 12, 2024
9d8aee4
add docstring + test geometry_type
jorisvandenbossche Mar 12, 2024
b9bd26c
the encoding keyword is optional -> properly document + remove in mos…
jorisvandenbossche Mar 12, 2024
9f2aa7b
fixup geometry_type checking
jorisvandenbossche Mar 12, 2024
3b3b853
Apply suggestions from code review
jorisvandenbossche Mar 14, 2024
e4a6aef
move common parts in write/write_arrow into helper functions
jorisvandenbossche Mar 14, 2024
e4562c3
add link to arrow pycapsule docs
jorisvandenbossche Mar 14, 2024
e13cec4
fixup
jorisvandenbossche Mar 14, 2024
6121418
add changelog
jorisvandenbossche Mar 14, 2024
0bd0236
Merge remote-tracking branch 'upstream/main' into kyle/write-arrow
jorisvandenbossche Apr 10, 2024
e4898aa
Apply suggestions from code review
jorisvandenbossche Apr 10, 2024
713e93a
address feedback
jorisvandenbossche Apr 10, 2024
1569a9c
re-raise writing exception with proper error message
jorisvandenbossche Apr 10, 2024
5e92571
wrap writing in try/finally + catch GDAL error message in creating fi…
jorisvandenbossche Apr 11, 2024
66b2a56
cleanup encoding
jorisvandenbossche Apr 11, 2024
e95f109
metadata parsing break early
jorisvandenbossche Apr 11, 2024
cd77695
raise DataLayerError instead of low-level GDAL error when writing a b…
jorisvandenbossche Apr 11, 2024
ed8fb9b
verify the input data have the protocol method
jorisvandenbossche Apr 12, 2024
45031d0
inline write_arrow_stream_capsule function, dedent helpers
jorisvandenbossche Apr 12, 2024
20e0676
use different invalid batch for error testing
jorisvandenbossche Apr 12, 2024
061d72d
move tests to test_arrow.py
jorisvandenbossche Apr 12, 2024
0588ea3
Merge remote-tracking branch 'upstream/main' into kyle/write-arrow
jorisvandenbossche Apr 18, 2024
dd8585a
add docstring and comments to get_arrow_extension_metadata
jorisvandenbossche Apr 18, 2024
2903f9a
validate child not NULL
jorisvandenbossche Apr 18, 2024
35e98f2
smll docstring updates
jorisvandenbossche Apr 18, 2024
31ddaa2
fix and test append + test unsupported format
jorisvandenbossche Apr 18, 2024
950f0ca
add test for error on GDALClose
jorisvandenbossche Apr 18, 2024
b605968
add missing @requires_arrow_write_api
jorisvandenbossche Apr 18, 2024
0994ddf
add test for no crs and for arrow stream that is not tabular (not a s…
jorisvandenbossche Apr 18, 2024
06ab570
centralize all release callbacks in finally block
jorisvandenbossche Apr 18, 2024
70e6ded
Apply suggestions from code review
jorisvandenbossche Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

### Improvements

- Support for writing based on Arrow as the transfer mechanism of the data
from Python to GDAL (requires GDAL >= 3.8). This is provided through the
new `pyogrio.raw.write_arrow` function (#314, #346).
- Add support for `fids` filter to `read_arrow` and `open_arrow`, and to
`read_dataframe` with `use_arrow=True` (#304).
- Add some missing properties to `read_info`, including layer name, geometry name
Expand Down
5 changes: 5 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ Reading as Arrow data

.. autofunction:: pyogrio.raw.read_arrow
.. autofunction:: pyogrio.raw.open_arrow

Writing from Arrow data
---------------------

.. autofunction:: pyogrio.raw.write_arrow
1 change: 1 addition & 0 deletions pyogrio/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@


HAS_ARROW_API = __gdal_version__ >= (3, 6, 0)
HAS_ARROW_WRITE_API = __gdal_version__ >= (3, 8, 0)
HAS_PYARROW = pyarrow is not None

HAS_GEOPANDAS = geopandas is not None
Expand Down
198 changes: 198 additions & 0 deletions pyogrio/_io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import locale
import logging
import math
import os
import sys
import warnings

from libc.stdint cimport uint8_t, uintptr_t
from libc.stdlib cimport malloc, free
from libc.string cimport strlen
from libc.math cimport isnan
from cpython.pycapsule cimport PyCapsule_GetPointer

cimport cython
from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer
Expand Down Expand Up @@ -2291,3 +2293,199 @@ def ogr_write(
exc = exc_check()
if exc:
raise DataSourceError(f"Failed to write features to dataset {path}; {exc}")


def ogr_write_arrow(
str path,
str layer,
str driver,
object arrow_obj,
str crs,
str geometry_type,
str geometry_name,
str encoding,
object dataset_kwargs,
object layer_kwargs,
bint append=False,
dataset_metadata=None,
layer_metadata=None,
):
IF CTE_GDAL_VERSION < (3, 8, 0):
raise RuntimeError("Need GDAL>=3.8 for Arrow write support")

cdef OGRDataSourceH ogr_dataset = NULL
cdef OGRLayerH ogr_layer = NULL
cdef char **options = NULL
cdef ArrowArrayStream* stream = NULL
cdef ArrowSchema schema
cdef ArrowArray array

schema.release = NULL
array.release = NULL

layer_created = create_ogr_dataset_layer(
path, layer, driver, crs, geometry_type, encoding,
dataset_kwargs, layer_kwargs, append,
dataset_metadata, layer_metadata,
&ogr_dataset, &ogr_layer,
)

if geometry_name:
brendan-ward marked this conversation as resolved.
Show resolved Hide resolved
brendan-ward marked this conversation as resolved.
Show resolved Hide resolved
opts = {"GEOMETRY_NAME": geometry_name}
else:
opts = {}
options = dict_to_options(opts)

try:
stream_capsule = arrow_obj.__arrow_c_stream__()
stream = <ArrowArrayStream*>PyCapsule_GetPointer(
stream_capsule, "arrow_array_stream"
)

if stream == NULL:
raise RuntimeError("Could not extract valid Arrow array stream.")

if stream.release == NULL:
raise RuntimeError("Arrow array stream was already released.")

if stream.get_schema(stream, &schema) != 0:
raise RuntimeError("Could not get Arrow schema from stream.")

if layer_created:
create_fields_from_arrow_schema(ogr_layer, &schema, options, geometry_name)

while True:
if stream.get_next(stream, &array) != 0:
raise RuntimeError("Error while accessing batch from stream.")
brendan-ward marked this conversation as resolved.
Show resolved Hide resolved

# We've reached the end of the stream
if array.release == NULL:
break

if not OGR_L_WriteArrowBatch(ogr_layer, &schema, &array, options):
exc = exc_check()
gdal_msg = f": {str(exc)}" if exc else "."
raise DataLayerError(
f"Error while writing batch to OGR layer{gdal_msg}"
)

if array.release != NULL:
array.release(&array)

finally:
brendan-ward marked this conversation as resolved.
Show resolved Hide resolved
if stream != NULL and stream.release != NULL:
stream.release(stream)
if schema.release != NULL:
schema.release(&schema)
if array.release != NULL:
array.release(&array)

if options != NULL:
CSLDestroy(options)
options = NULL

### Final cleanup
if ogr_dataset != NULL:
GDALClose(ogr_dataset)

# GDAL will set an error if there was an error writing the data source
# on close
exc = exc_check()
if exc:
raise DataSourceError(f"Failed to write features to dataset {path}; {exc}")


cdef get_arrow_extension_metadata(const ArrowSchema* schema):
"""
Parse the metadata of the ArrowSchema and extract extension type
metadata (extension name and metadata).

For the exact layout of the bytes, see
https://arrow.apache.org/docs/dev/format/CDataInterface.html#c.ArrowSchema.metadata
"""
cdef const char *metadata = schema.metadata

extension_name = None
extension_metadata = None

if metadata == NULL:
return extension_name, extension_metadata

# the number of metadata key/value pairs is stored
# as an int32 value in the first 4 bytes
n = int.from_bytes(metadata[:4], byteorder=sys.byteorder)
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved
pos = 4

for i in range(n):
# for each metadata key/value pair, the first 4 bytes is the byte length
# of the key as an int32, then follows the key (not null-terminated),
# and then the same for the value length and bytes
key_length = int.from_bytes(
metadata[pos:pos+4], byteorder=sys.byteorder, signed=True
)
pos += 4
key = metadata[pos:pos+key_length]
pos += key_length
value_length = int.from_bytes(
metadata[pos:pos+4], byteorder=sys.byteorder, signed=True
)
pos += 4
value = metadata[pos:pos+value_length]
pos += value_length

if key == b"ARROW:extension:name":
extension_name = value
elif key == b"ARROW:extension:metadata":
extension_metadata = value

if extension_name is not None and extension_metadata is not None:
break

return extension_name, extension_metadata


cdef is_arrow_geometry_field(const ArrowSchema* schema):
name, _ = get_arrow_extension_metadata(schema)
brendan-ward marked this conversation as resolved.
Show resolved Hide resolved
if name is not None:
if name == b"geoarrow.wkb" or name == b"ogc.wkb":
return True

# raise an error for other geoarrow types
if name.startswith(b"geoarrow."):
raise NotImplementedError(
f"Writing a geometry column of type {name.decode()} is not yet "
"supported. Only WKB is currently supported ('geoarrow.wkb' or "
"'ogc.wkb' types)."
)

return False


cdef create_fields_from_arrow_schema(
OGRLayerH destLayer, const ArrowSchema* schema, char** options, str geometry_name
):
"""Create output fields using CreateFieldFromArrowSchema()"""

IF CTE_GDAL_VERSION < (3, 8, 0):
raise RuntimeError("Need GDAL>=3.8 for Arrow write support")

# The schema object is a struct type where each child is a column.
cdef ArrowSchema* child
for i in range(schema.n_children):
child = schema.children[i]
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved

if child == NULL:
raise RuntimeError("Received invalid Arrow schema (null child)")

# Don't create property for geometry column
if get_string(child.name) == geometry_name or is_arrow_geometry_field(child):
continue

if not OGR_L_CreateFieldFromArrowSchema(destLayer, child, options):
exc = exc_check()
gdal_msg = f" ({str(exc)})" if exc else ""
raise FieldError(
f"Error while creating field from Arrow for field {i} with name "
f"'{get_string(child.name)}' and type {get_string(child.format)}"
f"{gdal_msg}."
)
34 changes: 31 additions & 3 deletions pyogrio/_ogr.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,35 @@ cdef extern from "ogr_srs_api.h":


cdef extern from "arrow_bridge.h" nogil:
struct ArrowArray:
int64_t length
int64_t null_count
int64_t offset
int64_t n_buffers
int64_t n_children
const void** buffers
ArrowArray** children
ArrowArray* dictionary
void (*release)(ArrowArray*) noexcept nogil
void* private_data

struct ArrowSchema:
const char* format
const char* name
const char* metadata
int64_t flags
int64_t n_children
ArrowSchema** children
ArrowSchema* dictionary
void (*release)(ArrowSchema*) noexcept nogil
void* private_data

struct ArrowArrayStream:
int (*get_schema)(ArrowArrayStream* stream, ArrowSchema* out) noexcept
int (*get_schema)(ArrowArrayStream*, ArrowSchema* out) noexcept
int (*get_next)(ArrowArrayStream*, ArrowArray* out)
const char* (*get_last_error)(ArrowArrayStream*)
void (*release)(ArrowArrayStream*) noexcept
void* private_data


cdef extern from "ogr_api.h":
Expand Down Expand Up @@ -315,8 +338,13 @@ cdef extern from "ogr_api.h":
IF CTE_GDAL_VERSION >= (3, 6, 0):

cdef extern from "ogr_api.h":
int8_t OGR_L_GetArrowStream(OGRLayerH hLayer, ArrowArrayStream *out_stream, char** papszOptions)
bint OGR_L_GetArrowStream(OGRLayerH hLayer, ArrowArrayStream *out_stream, char** papszOptions)

IF CTE_GDAL_VERSION >= (3, 8, 0):

cdef extern from "ogr_api.h":
bint OGR_L_CreateFieldFromArrowSchema(OGRLayerH hLayer, ArrowSchema *schema, char **papszOptions)
bint OGR_L_WriteArrowBatch(OGRLayerH hLayer, ArrowSchema *schema, ArrowArray *array, char **papszOptions)

cdef extern from "gdal.h":
ctypedef enum GDALDataType:
Expand Down Expand Up @@ -388,4 +416,4 @@ cdef extern from "gdal.h":
const char* GDALVersionInfo(const char *pszRequest)


cdef get_string(const char *c_str, str encoding=*)
cdef get_string(const char *c_str, str encoding=*)
4 changes: 2 additions & 2 deletions pyogrio/geopandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,10 @@ def write_dataframe(
metadata : dict, optional (default: None)
alias of layer_metadata
dataset_options : dict, optional
Dataset creation option (format specific) passed to OGR. Specify as
Dataset creation options (format specific) passed to OGR. Specify as
a key-value dictionary.
layer_options : dict, optional
Layer creation option (format specific) passed to OGR. Specify as
Layer creation options (format specific) passed to OGR. Specify as
a key-value dictionary.
**kwargs
Additional driver-specific dataset or layer creation options passed
Expand Down
Loading