Skip to content

Commit

Permalink
Update usage and schema documentation (#70)
Browse files Browse the repository at this point in the history
* Allow pystac.Item input

* Add doc with schema considerations

* Add page with drawbacks

* drawbacks toc

* Add usage file

* Add pgstac page

* Update docs/usage.md

Co-authored-by: Tom Augspurger <taugspurger@microsoft.com>

* Update stac_geoparquet/arrow/_batch.py

Co-authored-by: Pete Gadomski <pete.gadomski@gmail.com>

* Update docs/usage.md

Co-authored-by: Pete Gadomski <pete.gadomski@gmail.com>

* fix typo

* wording

* add example

---------

Co-authored-by: Tom Augspurger <taugspurger@microsoft.com>
Co-authored-by: Pete Gadomski <pete.gadomski@gmail.com>
  • Loading branch information
3 people authored Jun 25, 2024
1 parent 7df15b3 commit cf07d13
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 44 deletions.
33 changes: 2 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,6 @@ For analytic questions like "find the items in the Sentinel-2 collection in June

See the [STAC-GeoParquet specification](./spec/stac-geoparquet-spec.md) for details on the exact schema of the written Parquet files.

## Usage
## Documentation

Use `stac_geoparquet.to_arrow.stac_items_to_arrow` and
`stac_geoparquet.from_arrow.stac_table_to_items` to convert between STAC items
and Arrow tables. Arrow Tables of STAC items can be written to parquet with
`stac_geoparquet.to_parquet.to_parquet`.

Note that `stac_geoparquet` lifts the keys in the item `properties` up to the top level of the DataFrame, similar to `geopandas.GeoDataFrame.from_features`.

```python
>>> import requests
>>> import stac_geoparquet.arrow
>>> import pyarrow.parquet
>>> import pyarrow as pa

>>> items = requests.get(
... "https://planetarycomputer.microsoft.com/api/stac/v1/collections/sentinel-2-l2a/items"
... ).json()["features"]
>>> table = pa.Table.from_batches(stac_geoparquet.arrow.parse_stac_items_to_arrow(items))
>>> stac_geoparquet.arrow.to_parquet(table, "items.parquet")
>>> table2 = pyarrow.parquet.read_table("items.parquet")
>>> items2 = list(stac_geoparquet.arrow.stac_table_to_items(table2))
```


## pgstac integration

`stac_geoparquet.pgstac_reader` has some helpers for working with items coming from a `pgstac.items` table. It takes care of

- Rehydrating the dehydrated items
- Partitioning by time
- Injecting dynamic links and assets from a STAC API
[Documentation website](https://stac-utils.github.io/stac-geoparquet/)
17 changes: 17 additions & 0 deletions docs/api/legacy.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@

The API listed here was the initial non-Arrow-based STAC-GeoParquet implementation, converting between JSON and GeoPandas directly. For large collections of STAC items, using the new Arrow-based functionality (under the `stac_geoparquet.arrow` namespace) will be more performant.

Note that `stac_geoparquet` lifts the keys in the item `properties` up to the top level of the DataFrame, similar to `geopandas.GeoDataFrame.from_features`.

```python
>>> import requests
>>> import stac_geoparquet.arrow
>>> import pyarrow.parquet
>>> import pyarrow as pa

>>> items = requests.get(
... "https://planetarycomputer.microsoft.com/api/stac/v1/collections/sentinel-2-l2a/items"
... ).json()["features"]
>>> table = pa.Table.from_batches(stac_geoparquet.arrow.parse_stac_items_to_arrow(items))
>>> stac_geoparquet.arrow.to_parquet(table, "items.parquet")
>>> table2 = pyarrow.parquet.read_table("items.parquet")
>>> items2 = list(stac_geoparquet.arrow.stac_table_to_items(table2))
```

::: stac_geoparquet.to_geodataframe
::: stac_geoparquet.to_item_collection
::: stac_geoparquet.to_dict
11 changes: 11 additions & 0 deletions docs/api/pgstac.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# pgstac integration

`stac_geoparquet.pgstac_reader` has some helpers for working with items coming from a `pgstac.items` table. It takes care of

- Rehydrating the dehydrated items
- Partitioning by time
- Injecting dynamic links and assets from a STAC API

::: stac_geoparquet.pgstac_reader.CollectionConfig
options:
show_if_no_docstring: true
58 changes: 58 additions & 0 deletions docs/drawbacks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Drawbacks

Trying to represent STAC data in GeoParquet has some drawbacks.

## Unable to represent undefined values

Parquet is unable to represent the difference between _undefined_ and _null_, and so is unable to perfectly round-trip STAC data with _undefined_ values.

In JSON a value can have one of three states: defined, undefined, or null. The `"b"` key in the next three examples illustrates this:

Defined:

```json
{
"a": 1,
"b": "foo"
}
```

Undefined:

```json
{
"a": 2
}
```

Null:

```json
{
"a": 3,
"b": null
}
```

Because Parquet is a columnar format, it is only able to represent undefined at the _column_ level. So if those three JSON items above were converted to Parquet, the column `"b"` would exist because it exists in the first and third item, and the second item would have `"b"` inferred as `null`:

| a | b |
| --- | ----- |
| 1 | "foo" |
| 2 | null |
| 3 | null |

Then when the second item is converted back to JSON, it will be returned as

```json
{
"a": 2
"b": null
}
```

which is not strictly equal to the input.

## Schema difficulties

JSON is schemaless while Parquet requires a strict schema, and it can be very difficult to unite these two systems. This is such an important consideration that we have a [documentation page](./schema.md) just to discuss this point.
42 changes: 42 additions & 0 deletions docs/schema.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Schema considerations

A STAC Item is a JSON object to describe an external geospatial dataset. The STAC specification defines a common core, plus a variety of extensions. Additionally, STAC Items may include custom extensions outside the common ones. Crucially, the majority of the specified fields in the core spec and extensions define optional keys. Those keys often differ across STAC collections and may even differ within a single collection across items.

STAC's flexibility is a blessing and a curse. The flexibility of schemaless JSON allows for very easy writing as each object can be dumped separately to JSON. Every item is allowed to have a different schema. And newer items are free to have a different schema than older items in the same collection. But this write-time flexibility makes it harder to read as there are no guarantees (outside STAC's few required fields) about what fields exist.

Parquet is the complete opposite of JSON. Parquet has a strict schema that must be known before writing can start. This puts the burden of work onto the writer instead of the reader. Reading Parquet is very efficient because the file's metadata defines the exact schema of every record. This also enables use cases like reading specific columns that would not be possible without a strict schema.

This conversion from schemaless to strict-schema is the difficult part of converting STAC from JSON to GeoParquet, especially for large input datasets like STAC that are often larger than memory.

## Full scan over input data

The most foolproof way to convert STAC JSON to GeoParquet is to perform a full scan over input data. This is done automatically by [`parse_stac_ndjson_to_arrow`][stac_geoparquet.arrow.parse_stac_ndjson_to_arrow] when a schema is not provided.

This is time consuming as it requires two full passes over the input data: once to infer a common schema and again to actually write to Parquet (though items are never fully held in memory, allowing this process to scale).

## User-provided schema

Alternatively, the user can pass in an Arrow schema themselves using the `schema` parameter of [`parse_stac_ndjson_to_arrow`][stac_geoparquet.arrow.parse_stac_ndjson_to_arrow]. This `schema` must match the on-disk schema of the the STAC JSON data.

## Multiple schemas per collection

It is also possible to write multiple Parquet files with STAC data where each Parquet file may have a different schema. This simplifies the conversion and writing process but makes reading and using the Parquet data harder.

### Merging data with schema mismatch

If you've created STAC GeoParquet data where the schema has updated, you can use [`pyarrow.concat_tables`][pyarrow.concat_tables] with `promote_options="permissive"` to combine multiple STAC GeoParquet files.

```py
import pyarrow as pa
import pyarrow.parquet as pq

table_1 = pq.read_table("stac1.parquet")
table_2 = pq.read_table("stac2.parquet")
combined_table = pa.concat_tables([table1, table2], promote_options="permissive")
```

## Future work

Schema operations is an area where future work can improve reliability and ease of use of STAC GeoParquet.

It's possible that in the future we could automatically infer an Arrow schema from the STAC specification's published JSON Schema files. If you're interested in this, open an issue and discuss.
66 changes: 66 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
@@ -1 +1,67 @@
# Usage

[Apache Arrow](https://arrow.apache.org/) is used as the in-memory interchange format between all formats. While some end-to-end helper functions are provided, the user can go through Arrow objects for maximal flexibility in the conversion process.

All functionality that goes through Arrow is currently exported via the `stac_geoparquet.arrow` namespace.

## `dict`/JSON - Arrow conversion

### Convert `dict`s to Arrow

Use [`parse_stac_items_to_arrow`][stac_geoparquet.arrow.parse_stac_items_to_arrow] to convert STAC items either in memory or on disk to a stream of Arrow record batches. This accepts either an iterable of Python `dict`s or an iterable of [`pystac.Item`][pystac.Item] objects.

For example:

```py
import pyarrow as pa
import pystac

import stac_geoparquet

item = pystac.read_file(
"https://planetarycomputer.microsoft.com/api/stac/v1/collections/sentinel-2-l2a/items/S2A_MSIL2A_20230112T104411_R008_T29NPE_20230113T053333"
)
assert isinstance(item, pystac.Item)

record_batch_reader = stac_geoparquet.arrow.parse_stac_items_to_arrow([item])
table = record_batch_reader.read_all()
```

### Convert JSON to Arrow

[`parse_stac_ndjson_to_arrow`][stac_geoparquet.arrow.parse_stac_ndjson_to_arrow] is a helper function to take one or more JSON or newline-delimited JSON files on disk, infer the schema from all of them, and convert the data to a stream of Arrow record batches.

### Convert Arrow to `dict`s

Use [`stac_table_to_items`][stac_geoparquet.arrow.stac_table_to_items] to convert a table or stream of Arrow record batches of STAC data to a generator of Python `dict`s. This accepts either a `pyarrow.Table` or a `pyarrow.RecordBatchReader`, which allows conversions of larger-than-memory files in a streaming manner.

### Convert Arrow to JSON

Use [`stac_table_to_ndjson`][stac_geoparquet.arrow.stac_table_to_ndjson] to convert a table or stream of Arrow record batches of STAC data to a newline-delimited JSON file. This accepts either a `pyarrow.Table` or a `pyarrow.RecordBatchReader`, which allows conversions of larger-than-memory files in a streaming manner.

## Parquet

Use [`to_parquet`][stac_geoparquet.arrow.to_parquet] to write STAC Arrow data from memory to a path or file-like object. This is a special function to ensure that [GeoParquet](https://geoparquet.org/) 1.0 or 1.1 metadata is written to the Parquet file.

[`parse_stac_ndjson_to_parquet`][stac_geoparquet.arrow.parse_stac_ndjson_to_parquet] is a helper that connects reading (newline-delimited) JSON on disk to writing out to a Parquet file.

No special API is required for reading a STAC GeoParquet file back into Arrow. You can use [`pyarrow.parquet.read_table`][pyarrow.parquet.read_table] or [`pyarrow.parquet.ParquetFile`][pyarrow.parquet.ParquetFile] directly to read the STAC GeoParquet data back into Arrow.

## Delta Lake


Use [`parse_stac_ndjson_to_delta_lake`][stac_geoparquet.arrow.parse_stac_ndjson_to_delta_lake] to read (newline-delimited) JSON on disk and write out to a Delta Lake table.

No special API is required for reading a STAC Delta Lake table back into Arrow. You can use the [`DeltaTable`][deltalake.DeltaTable] class directly to read the data back into Arrow.

!!! important
Arrow has a null data type, where every value in the column is always null, but Delta Lake does not. This means that for any column inferred to have a `null` data type, writing to Delta Lake will error with
```
_internal.SchemaMismatchError: Invalid data type for Delta Lake: Null
```

This is a problem because if all items in a STAC Collection have a `null` JSON key, it gets inferred as an Arrow `null` type. For example, in the `3dep-lidar-copc` collection in the tests, it has `start_datetime` and `end_datetime` fields, and so according to the spec, `datetime` is always `null`. This column would need to be casted to a timestamp type before being written to Delta Lake.

This means we cannot write this collection to Delta Lake **solely with automatic schema inference**.

In such cases, users may need to manually update the inferred schema to cast any `null` type to another Delta Lake-compatible type.
4 changes: 3 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ extra:
nav:
- index.md
- usage.md
- schema.md
- Specification: spec/stac-geoparquet-spec.md
- API Reference:
- api/arrow.md
- Legacy: api/legacy.md
# - api/pgstac.md
- api/pgstac.md
- drawbacks.md

watch:
- stac_geoparquet
Expand Down
5 changes: 3 additions & 2 deletions stac_geoparquet/arrow/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Iterable

import pyarrow as pa
import pystac

from stac_geoparquet.arrow._batch import StacArrowBatch, StacJsonBatch
from stac_geoparquet.arrow._constants import DEFAULT_JSON_CHUNK_SIZE
Expand All @@ -16,7 +17,7 @@


def parse_stac_items_to_arrow(
items: Iterable[dict[str, Any]],
items: Iterable[pystac.Item | dict[str, Any]],
*,
chunk_size: int = 8192,
schema: pa.Schema | InferredSchema | None = None,
Expand Down Expand Up @@ -163,7 +164,7 @@ def stac_table_to_ndjson(


def stac_items_to_arrow(
items: Iterable[dict[str, Any]], *, schema: pa.Schema | None = None
items: Iterable[pystac.Item | dict[str, Any]], *, schema: pa.Schema | None = None
) -> pa.RecordBatch:
"""Convert dicts representing STAC Items to Arrow
Expand Down
9 changes: 8 additions & 1 deletion stac_geoparquet/arrow/_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import orjson
import pyarrow as pa
import pyarrow.compute as pc
import pystac
import shapely
import shapely.geometry
from numpy.typing import NDArray
Expand Down Expand Up @@ -59,7 +60,10 @@ def __init__(self, batch: pa.RecordBatch) -> None:

@classmethod
def from_dicts(
cls, items: Iterable[dict[str, Any]], *, schema: pa.Schema | None = None
cls,
items: Iterable[pystac.Item | dict[str, Any]],
*,
schema: pa.Schema | None = None,
) -> Self:
"""Construct a StacJsonBatch from an iterable of dicts representing STAC items.
Expand All @@ -83,6 +87,9 @@ def from_dicts(
# `ArrowInvalid: cannot mix list and non-list, non-null values`
wkb_items = []
for item in items:
if isinstance(item, pystac.Item):
item = item.to_dict(transform_hrefs=False)

wkb_item = deepcopy(item)
wkb_item["geometry"] = shapely.to_wkb(
shapely.geometry.shape(wkb_item["geometry"]), flavor="iso"
Expand Down
7 changes: 5 additions & 2 deletions stac_geoparquet/arrow/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
List,
Optional,
Sequence,
TypeVar,
Union,
)

import pyarrow as pa

T = TypeVar("T")


def update_batch_schema(
batch: pa.RecordBatch,
Expand All @@ -23,8 +26,8 @@ def update_batch_schema(


def batched_iter(
lst: Iterable[Dict[str, Any]], n: int, *, limit: Optional[int] = None
) -> Iterable[Sequence[Dict[str, Any]]]:
lst: Iterable[T], n: int, *, limit: Optional[int] = None
) -> Iterable[Sequence[T]]:
"""Yield successive n-sized chunks from iterable."""
if n < 1:
raise ValueError("n must be at least one")
Expand Down
13 changes: 6 additions & 7 deletions stac_geoparquet/pgstac_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,12 @@ def make_pgstac_items(
"""
Make STAC items out of pgstac records.
Parameters
----------
records: list[tuple]
The dehydrated records from pgstac.items table.
base_item: dict[str, Any]
The base item from the ``collection_base_item`` pgstac function for this
collection. Used for rehydration
Args:
records: list[tuple]
The dehydrated records from pgstac.items table.
base_item: dict[str, Any]
The base item from the ``collection_base_item`` pgstac function for this
collection. Used for rehydration
"""
columns = [
"id",
Expand Down

0 comments on commit cf07d13

Please sign in to comment.