Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write Arrow Table/RecordBatchReader to GDAL #346

Merged
merged 58 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
c8addb5
Scratch work for writing Arrow to GDAL
kylebarron Jan 25, 2024
33ce924
wip
kylebarron Jan 25, 2024
3315998
update ogr header definitions
kylebarron Jan 25, 2024
0da2e45
include pycapsule function import
kylebarron Jan 25, 2024
54fd141
Update pyogrio/_io.pyx
kylebarron Jan 25, 2024
09ad7ee
check for null stream
kylebarron Jan 25, 2024
81fb974
ogr definitions
kylebarron Jan 25, 2024
eab20ba
compiles!
kylebarron Jan 25, 2024
8b8bdb7
switch to bint
kylebarron Jan 26, 2024
5a3aa5c
fix schema and array pointers
jorisvandenbossche Jan 26, 2024
2a6f0be
fix error checks
jorisvandenbossche Jan 26, 2024
838abf6
Merge branch 'main' into kyle/write-arrow
kylebarron Feb 8, 2024
98a527c
Simplify
kylebarron Feb 8, 2024
8236787
Add write_arrow
kylebarron Feb 8, 2024
6b22bb0
conditionally compile
kylebarron Feb 16, 2024
040b54f
fix indent: weird vscode bug
kylebarron Feb 16, 2024
dbf8f9d
wip tests
kylebarron Feb 16, 2024
76b64e9
require geometry_name arg
kylebarron Feb 16, 2024
77337b1
infer geometry field from metadata
jorisvandenbossche Feb 17, 2024
9bcec45
close dataset at the end
jorisvandenbossche Feb 19, 2024
d8fe405
Merge remote-tracking branch 'upstream/main' into kyle/write-arrow
jorisvandenbossche Feb 23, 2024
afc6807
clean-up unused imports in tests
jorisvandenbossche Feb 23, 2024
758d55e
don't require pyarrow dependency
kylebarron Feb 26, 2024
8d1a8ab
move code down
jorisvandenbossche Feb 27, 2024
083f90e
factor out get_extension_metadata helper
jorisvandenbossche Feb 27, 2024
a306ff3
hide warnings in tests
jorisvandenbossche Feb 27, 2024
05db5d6
Raise error for non-WKB extension types + actually test the inference…
jorisvandenbossche Feb 27, 2024
da26aec
Merge remote-tracking branch 'upstream/main' into kyle/write-arrow
jorisvandenbossche Mar 12, 2024
9d8aee4
add docstring + test geometry_type
jorisvandenbossche Mar 12, 2024
b9bd26c
the encoding keyword is optional -> properly document + remove in mos…
jorisvandenbossche Mar 12, 2024
9f2aa7b
fixup geometry_type checking
jorisvandenbossche Mar 12, 2024
3b3b853
Apply suggestions from code review
jorisvandenbossche Mar 14, 2024
e4a6aef
move common parts in write/write_arrow into helper functions
jorisvandenbossche Mar 14, 2024
e4562c3
add link to arrow pycapsule docs
jorisvandenbossche Mar 14, 2024
e13cec4
fixup
jorisvandenbossche Mar 14, 2024
6121418
add changelog
jorisvandenbossche Mar 14, 2024
0bd0236
Merge remote-tracking branch 'upstream/main' into kyle/write-arrow
jorisvandenbossche Apr 10, 2024
e4898aa
Apply suggestions from code review
jorisvandenbossche Apr 10, 2024
713e93a
address feedback
jorisvandenbossche Apr 10, 2024
1569a9c
re-raise writing exception with proper error message
jorisvandenbossche Apr 10, 2024
5e92571
wrap writing in try/finally + catch GDAL error message in creating fi…
jorisvandenbossche Apr 11, 2024
66b2a56
cleanup encoding
jorisvandenbossche Apr 11, 2024
e95f109
metadata parsing break early
jorisvandenbossche Apr 11, 2024
cd77695
raise DataLayerError instead of low-level GDAL error when writing a b…
jorisvandenbossche Apr 11, 2024
ed8fb9b
verify the input data have the protocol method
jorisvandenbossche Apr 12, 2024
45031d0
inline write_arrow_stream_capsule function, dedent helpers
jorisvandenbossche Apr 12, 2024
20e0676
use different invalid batch for error testing
jorisvandenbossche Apr 12, 2024
061d72d
move tests to test_arrow.py
jorisvandenbossche Apr 12, 2024
0588ea3
Merge remote-tracking branch 'upstream/main' into kyle/write-arrow
jorisvandenbossche Apr 18, 2024
dd8585a
add docstring and comments to get_arrow_extension_metadata
jorisvandenbossche Apr 18, 2024
2903f9a
validate child not NULL
jorisvandenbossche Apr 18, 2024
35e98f2
smll docstring updates
jorisvandenbossche Apr 18, 2024
31ddaa2
fix and test append + test unsupported format
jorisvandenbossche Apr 18, 2024
950f0ca
add test for error on GDALClose
jorisvandenbossche Apr 18, 2024
b605968
add missing @requires_arrow_write_api
jorisvandenbossche Apr 18, 2024
0994ddf
add test for no crs and for arrow stream that is not tabular (not a s…
jorisvandenbossche Apr 18, 2024
06ab570
centralize all release callbacks in finally block
jorisvandenbossche Apr 18, 2024
70e6ded
Apply suggestions from code review
jorisvandenbossche Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 116 additions & 5 deletions pyogrio/_io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ cdef process_fields(
data[i] = bin_value[:ret_length]

elif field_type == OFTDateTime or field_type == OFTDate:

if datetime_as_string:
# defer datetime parsing to user/ pandas layer
# Update to OGR_F_GetFieldAsISO8601DateTime when GDAL 3.7+ only
Expand Down Expand Up @@ -851,7 +851,7 @@ cdef get_features(

field_data = [
np.empty(shape=(num_features, ),
dtype = ("object" if datetime_as_string and
dtype = ("object" if datetime_as_string and
fields[field_index,3].startswith("datetime") else fields[field_index,3])
) for field_index in range(n_fields)
]
Expand Down Expand Up @@ -950,8 +950,8 @@ cdef get_features_by_fid(
field_ogr_types = fields[:,1]
field_data = [
np.empty(shape=(count, ),
dtype=("object" if datetime_as_string and fields[field_index,3].startswith("datetime")
else fields[field_index,3]))
dtype=("object" if datetime_as_string and fields[field_index,3].startswith("datetime")
else fields[field_index,3]))
for field_index in range(n_fields)
]

Expand Down Expand Up @@ -1678,6 +1678,117 @@ cdef infer_field_types(list dtypes):

return field_types

def ogr_write_arrow(
obj,
str path
):
IF CTE_GDAL_VERSION < (3, 8, 0):
raise RuntimeError("Need GDAL>=3.8 for Arrow write support")

stream_capsule = obj.__arrow_c_stream__()
return write_arrow_stream_capsule(stream_capsule)

cdef OGRErr write_arrow_stream_capsule(OGRLayerH destLayer, object capsule):
cdef ArrowArrayStream stream
cdef ArrowSchema schema
cdef ArrowArray array
kylebarron marked this conversation as resolved.
Show resolved Hide resolved

stream = PyCapsule_GetPointer(capsule, "arrow_array_stream")
if not stream:
raise RuntimeError("Accessing PyCapsule pointer named 'arrow_array_stream' failed.")
kylebarron marked this conversation as resolved.
Show resolved Hide resolved

if stream.release == NULL:
raise RuntimeError("Arrow Array Stream was already released.")

errcode = stream.get_schema(stream, &schema)
if errcode != 0:
stream.release(stream)
raise RuntimeError("Error while accessing schema from stream.")

errcode = create_fields_from_arrow_schema(destLayer, &schema)
if errcode != 0:
schema.release(&schema)
stream.release(stream)
raise RuntimeError("Error creating Arrow Schema in OGR layer.")

while True:
errcode = stream.get_next(stream, &array)
if errcode != 1:
schema.release(&schema)
stream.release(stream)
raise RuntimeError("Error while accessing batch from stream.")

# We've reached the end of the stream
if array.release == NULL:
break

errcode = OGR_L_WriteArrowBatch(destLayer, &schema, &array, options)
if errcode:
if array.release != NULL:
array.release(&array)

schema.release(&schema)
stream.release(stream)
raise RuntimeError("Error while writing batch to OGR layer.")

if array.release != NULL:
array.release(&array)


schema.release(&schema)
stream.release(stream)

# Create output fields using CreateFieldFromArrowSchema()
static bool create_fields_from_arrow_schema(
OGRLayerH destLayer,
const struct ArrowSchema* schema,
char** options
):
# The schema object is a struct type where each child is a column.
for child in schema.n_children:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for child in schema.n_children:
for child in range(schema.n_children):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to change from

- for i in range(schema.n_children):
+ for child in schema.children:

I'm not sure if cython lets you iterate through a list of pointers as if it's a python list?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if cython lets you iterate through a list of pointers as if it's a python list?

No idea either, you will have to try and see ;)

# Access the metadata for this column
const char *metadata = child.metadata

# TODO: I don't know how to parse this metadata in C... I guess I can just use Python APIs for this in Cython?
# https://github.com/OSGeo/gdal/pull/9133/files#diff-37bedc92ae1d5e04706c7b9f8ea9e9fcccf984ca0c9997e2020ff85f1b958433R1159-R1185
# if metadata:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it's a pity that GDAL doesn't provide a helper to create a full layer definition from a schema, instead of only field by field (that would us to access the individual children of the ArrowSchema).

We might want to vendor some helpers from nanoarrow-c to extract the children, and the metadata etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accessing the child is something we can probably do quite easily, as it's something like schema.children[i]

For a first prototype, I think we could also ignore the metadata for a moment, and manually specify which are the geometry columns.


field_name = child.name


# {
# for (int i = 0; i < schemaSrc->n_children; ++i)
# {
# const char *metadata =
# schemaSrc->children[i]->metadata;
# if( metadata )
# {
# char** keyValues = ParseArrowMetadata(metadata);
# const char *ARROW_EXTENSION_NAME_KEY = "ARROW:extension:name";
# const char *EXTENSION_NAME_OGC_WKB = "ogc.wkb";
# const char *EXTENSION_NAME_GEOARROW_WKB = "geoarrow.wkb";
# const char* value = CSLFetchNameValue(keyValues, ARROW_EXTENSION_NAME_KEY);
# const bool bSkip = ( value && (EQUAL(value, EXTENSION_NAME_OGC_WKB) || EQUAL(value, EXTENSION_NAME_GEOARROW_WKB)) );
# CSLDestroy(keyValues);
# if( bSkip )
# continue;
# }

# const char *pszFieldName =
# schemaSrc->children[i]->name;
# if (!EQUAL(pszFieldName, "OGC_FID") &&
# !EQUAL(pszFieldName, "wkb_geometry") &&
# !OGR_L_CreateFieldFromArrowSchema(
# hDstLayer, schemaSrc->children[i], options))
# {
# CPLError(CE_Failure, CPLE_AppDefined,
# "Cannot create field %s",
# pszFieldName);
# return false;
# }
# }
# return true;
# }

# TODO: set geometry and field data as memory views?
def ogr_write(
Expand Down Expand Up @@ -2077,4 +2188,4 @@ def ogr_write(
# on close
exc = exc_check()
if exc:
raise DataSourceError(f"Failed to write features to dataset {path}; {exc}")
raise DataSourceError(f"Failed to write features to dataset {path}; {exc}")
11 changes: 10 additions & 1 deletion pyogrio/_ogr.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,20 @@ cdef extern from "ogr_srs_api.h":


cdef extern from "arrow_bridge.h":
struct ArrowArray:
void (*release)(struct ArrowArray*)
kylebarron marked this conversation as resolved.
Show resolved Hide resolved

struct ArrowSchema:
const char* name
const char* metadata
struct ArrowSchema** children
int64_t n_children

struct ArrowArrayStream:
int (*get_schema)(ArrowArrayStream* stream, ArrowSchema* out)
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
const char* (*get_last_error)(struct ArrowArrayStream*);
void (*release)(struct ArrowArrayStream*);


cdef extern from "ogr_api.h":
Expand Down Expand Up @@ -385,4 +394,4 @@ cdef extern from "gdal.h":
const char* GDALVersionInfo(const char *pszRequest)


cdef get_string(const char *c_str, str encoding=*)
cdef get_string(const char *c_str, str encoding=*)
Loading