From 6dd81b6dd2314157eb2761970958d4338fec3417 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 29 Feb 2024 19:38:14 -0500 Subject: [PATCH] Pass in bbox to read_flatgeobuf (#544) --- js/src/io/flatgeobuf.rs | 8 +++- .../core/python/geoarrow/rust/core/_rust.pyi | 12 ++++- python/core/src/io/flatgeobuf.rs | 39 ++++++++++++---- src/io/flatgeobuf/mod.rs | 2 +- src/io/flatgeobuf/reader/async.rs | 46 +++++++++++-------- src/io/flatgeobuf/reader/common.rs | 27 +++++++++++ src/io/flatgeobuf/reader/mod.rs | 1 + src/io/flatgeobuf/reader/sync.rs | 42 +++++++++-------- src/io/flatgeobuf/writer.rs | 2 +- 9 files changed, 128 insertions(+), 51 deletions(-) diff --git a/js/src/io/flatgeobuf.rs b/js/src/io/flatgeobuf.rs index 9f9ee8249..1a5e75488 100644 --- a/js/src/io/flatgeobuf.rs +++ b/js/src/io/flatgeobuf.rs @@ -1,6 +1,6 @@ use std::io::Cursor; -use geoarrow::io::flatgeobuf::read_flatgeobuf as _read_flatgeobuf; +use geoarrow::io::flatgeobuf::{read_flatgeobuf as _read_flatgeobuf, FlatGeobufReaderOptions}; // use parquet_wasm::utils::assert_parquet_file_not_empty; use wasm_bindgen::prelude::*; @@ -28,6 +28,10 @@ use crate::table::GeoTable; pub fn read_flatgeobuf(file: &[u8], batch_size: Option) -> WasmResult { // assert_parquet_file_not_empty(parquet_file)?; let mut cursor = Cursor::new(file); - let geo_table = _read_flatgeobuf(&mut cursor, Default::default(), batch_size)?; + let options = FlatGeobufReaderOptions { + batch_size, + ..Default::default() + }; + let geo_table = _read_flatgeobuf(&mut cursor, options)?; Ok(GeoTable(geo_table)) } diff --git a/python/core/python/geoarrow/rust/core/_rust.pyi b/python/core/python/geoarrow/rust/core/_rust.pyi index a02bd0c14..7b2698e0a 100644 --- a/python/core/python/geoarrow/rust/core/_rust.pyi +++ b/python/core/python/geoarrow/rust/core/_rust.pyi @@ -31,6 +31,7 @@ from .types import ( ArrowArrayExportable, ArrowStreamExportable, AreaMethodT, + IntFloat, LengthMethodT, NativeChunkedGeometryArrayT, NativeGeometryArrayT, @@ -1320,10 +1321,17 @@ def read_csv( batch_size: int = 65536, ) -> GeoTable: ... def read_flatgeobuf( - file: Union[str, Path, BinaryIO], *, batch_size: int = 65536 + file: Union[str, Path, BinaryIO], + *, + batch_size: int = 65536, + bbox: Tuple[IntFloat, IntFloat, IntFloat, IntFloat] | None = None, ) -> GeoTable: ... async def read_flatgeobuf_async( - url: str, *, batch_size: int = 65536, options: Dict[str, str] | None = None + url: str, + *, + batch_size: int = 65536, + options: Dict[str, str] | None = None, + bbox: Tuple[IntFloat, IntFloat, IntFloat, IntFloat] | None = None, ) -> GeoTable: ... def read_geojson( file: Union[str, Path, BinaryIO], *, batch_size: int = 65536 diff --git a/python/core/src/io/flatgeobuf.rs b/python/core/src/io/flatgeobuf.rs index 5c7312010..e16888b2c 100644 --- a/python/core/src/io/flatgeobuf.rs +++ b/python/core/src/io/flatgeobuf.rs @@ -4,9 +4,9 @@ use crate::error::{PyGeoArrowError, PyGeoArrowResult}; use crate::io::file::{BinaryFileReader, BinaryFileWriter}; use crate::table::GeoTable; use flatgeobuf::FgbWriterOptions; -use geoarrow::io::flatgeobuf::read_flatgeobuf as _read_flatgeobuf; use geoarrow::io::flatgeobuf::read_flatgeobuf_async as _read_flatgeobuf_async; use geoarrow::io::flatgeobuf::write_flatgeobuf_with_options as _write_flatgeobuf; +use geoarrow::io::flatgeobuf::{read_flatgeobuf as _read_flatgeobuf, FlatGeobufReaderOptions}; use object_store::{parse_url, parse_url_opts}; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; @@ -16,33 +16,52 @@ use url::Url; /// /// Args: /// file: the path to the file or a Python file object in binary read mode. +/// +/// Other args: /// batch_size: the number of rows to include in each internal batch of the table. +/// bbox: A spatial filter for reading rows, of the format (minx, miny, maxx, maxy). If set to +/// `None`, no spatial filtering will be performed. /// /// Returns: /// Table from FlatGeobuf file. #[pyfunction] -#[pyo3(signature = (file, *, batch_size=65536))] +#[pyo3(signature = (file, *, batch_size=65536, bbox=None))] pub fn read_flatgeobuf( py: Python, file: PyObject, batch_size: usize, + bbox: Option<(f64, f64, f64, f64)>, ) -> PyGeoArrowResult { let mut reader = file.extract::(py)?; - let table = _read_flatgeobuf(&mut reader, Default::default(), Some(batch_size))?; + let options = FlatGeobufReaderOptions { + batch_size: Some(batch_size), + bbox, + ..Default::default() + }; + let table = _read_flatgeobuf(&mut reader, options)?; Ok(GeoTable(table)) } /// Read a FlatGeobuf file from a url into a GeoTable. /// +/// Args: +/// url: the url to a remote FlatGeobuf file +/// +/// Other args: +/// batch_size: the number of rows to include in each internal batch of the table. +/// bbox: A spatial filter for reading rows, of the format (minx, miny, maxx, maxy). If set to +/// `None`, no spatial filtering will be performed. +/// /// Returns: /// Table from FlatGeobuf file. #[pyfunction] -#[pyo3(signature = (url, *, batch_size=65536, options=None))] +#[pyo3(signature = (url, *, batch_size=65536, options=None, bbox=None))] pub fn read_flatgeobuf_async( py: Python, url: String, batch_size: usize, options: Option>, + bbox: Option<(f64, f64, f64, f64)>, ) -> PyGeoArrowResult { let fut = pyo3_asyncio::tokio::future_into_py(py, async move { let url = Url::parse(&url).map_err(|err| PyValueError::new_err(err.to_string()))?; @@ -55,10 +74,14 @@ pub fn read_flatgeobuf_async( // dbg!(&reader); // dbg!(&location); - let table = - _read_flatgeobuf_async(reader, location, Default::default(), Some(batch_size), None) - .await - .map_err(PyGeoArrowError::GeoArrowError)?; + let options = FlatGeobufReaderOptions { + batch_size: Some(batch_size), + bbox, + ..Default::default() + }; + let table = _read_flatgeobuf_async(reader, location, options) + .await + .map_err(PyGeoArrowError::GeoArrowError)?; Ok(GeoTable(table)) })?; diff --git a/src/io/flatgeobuf/mod.rs b/src/io/flatgeobuf/mod.rs index 1765aebf4..ee2cbb141 100644 --- a/src/io/flatgeobuf/mod.rs +++ b/src/io/flatgeobuf/mod.rs @@ -3,7 +3,7 @@ mod reader; mod writer; -pub use reader::read_flatgeobuf; #[cfg(feature = "flatgeobuf_async")] pub use reader::read_flatgeobuf_async; +pub use reader::{read_flatgeobuf, FlatGeobufReaderOptions}; pub use writer::{write_flatgeobuf, write_flatgeobuf_with_options}; diff --git a/src/io/flatgeobuf/reader/async.rs b/src/io/flatgeobuf/reader/async.rs index 50d0701d7..6af638bc3 100644 --- a/src/io/flatgeobuf/reader/async.rs +++ b/src/io/flatgeobuf/reader/async.rs @@ -8,7 +8,7 @@ use object_store::ObjectStore; use crate::algorithm::native::Downcast; use crate::array::*; use crate::error::{GeoArrowError, Result}; -use crate::io::flatgeobuf::reader::common::infer_schema; +use crate::io::flatgeobuf::reader::common::{infer_schema, FlatGeobufReaderOptions}; use crate::io::flatgeobuf::reader::object_store_reader::ObjectStoreWrapper; use crate::io::geozero::array::MixedGeometryStreamBuilder; use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions}; @@ -17,9 +17,7 @@ use crate::table::GeoTable; pub async fn read_flatgeobuf_async( reader: T, location: Path, - coord_type: CoordType, - batch_size: Option, - bbox: Option<(f64, f64, f64, f64)>, + options: FlatGeobufReaderOptions, ) -> Result { let head = reader.head(&location).await?; @@ -42,7 +40,7 @@ pub async fn read_flatgeobuf_async( let schema = infer_schema(header); let geometry_type = header.geometry_type(); - let mut selection = if let Some((min_x, min_y, max_x, max_y)) = bbox { + let mut selection = if let Some((min_x, min_y, max_x, max_y)) = options.bbox { reader.select_bbox(min_x, min_y, max_x, max_y).await? } else { reader.select_all().await? @@ -52,9 +50,9 @@ pub async fn read_flatgeobuf_async( // TODO: propagate CRS let options = GeoTableBuilderOptions::new( - coord_type, + options.coord_type, true, - batch_size, + options.batch_size, Some(Arc::new(schema.finish())), features_count, Default::default(), @@ -118,26 +116,36 @@ mod test { #[tokio::test] async fn test_countries() { let fs = LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap(); - let _table = read_flatgeobuf_async( - fs, - Path::from("fixtures/flatgeobuf/countries.fgb"), - Default::default(), - None, - None, - ) - .await - .unwrap(); + let options = FlatGeobufReaderOptions::default(); + let table = + read_flatgeobuf_async(fs, Path::from("fixtures/flatgeobuf/countries.fgb"), options) + .await + .unwrap(); + assert_eq!(table.len(), 179); + } + + #[tokio::test] + async fn test_countries_bbox() { + let fs = LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap(); + let options = FlatGeobufReaderOptions { + bbox: Some((0., -90., 180., 90.)), + ..Default::default() + }; + let table = + read_flatgeobuf_async(fs, Path::from("fixtures/flatgeobuf/countries.fgb"), options) + .await + .unwrap(); + assert_eq!(table.len(), 133); } #[tokio::test] async fn test_nz_buildings() { let fs = LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap(); + let options = FlatGeobufReaderOptions::default(); let _table = read_flatgeobuf_async( fs, Path::from("fixtures/flatgeobuf/nz-building-outlines-small.fgb"), - Default::default(), - None, - None, + options, ) .await .unwrap(); diff --git a/src/io/flatgeobuf/reader/common.rs b/src/io/flatgeobuf/reader/common.rs index 6eb703090..4d2acbc97 100644 --- a/src/io/flatgeobuf/reader/common.rs +++ b/src/io/flatgeobuf/reader/common.rs @@ -1,6 +1,33 @@ use arrow_schema::{DataType, Field, SchemaBuilder, TimeUnit}; use flatgeobuf::{ColumnType, Header}; +use crate::array::CoordType; + +/// Options for the FlatGeobuf reader +#[derive(Debug, Clone, Copy)] +pub struct FlatGeobufReaderOptions { + /// The GeoArrow coordinate type to use in the geometry arrays. + pub coord_type: CoordType, + + /// The number of rows in each batch. + pub batch_size: Option, + + /// A spatial filter for reading rows. + /// + /// If set to `None`, no spatial filtering will be performed. + pub bbox: Option<(f64, f64, f64, f64)>, +} + +impl Default for FlatGeobufReaderOptions { + fn default() -> Self { + Self { + coord_type: Default::default(), + batch_size: Some(65_536), + bbox: None, + } + } +} + pub(super) fn infer_schema(header: Header<'_>) -> SchemaBuilder { let columns = header.columns().unwrap(); let mut schema = SchemaBuilder::with_capacity(columns.len()); diff --git a/src/io/flatgeobuf/reader/mod.rs b/src/io/flatgeobuf/reader/mod.rs index 908d19150..701250407 100644 --- a/src/io/flatgeobuf/reader/mod.rs +++ b/src/io/flatgeobuf/reader/mod.rs @@ -5,6 +5,7 @@ mod common; mod object_store_reader; mod sync; +pub use common::FlatGeobufReaderOptions; #[cfg(feature = "flatgeobuf_async")] pub use r#async::read_flatgeobuf_async; pub use sync::read_flatgeobuf; diff --git a/src/io/flatgeobuf/reader/sync.rs b/src/io/flatgeobuf/reader/sync.rs index 241c8e1a7..bb3bb2a08 100644 --- a/src/io/flatgeobuf/reader/sync.rs +++ b/src/io/flatgeobuf/reader/sync.rs @@ -22,7 +22,7 @@ use crate::algorithm::native::Downcast; use crate::array::*; use crate::error::{GeoArrowError, Result}; -use crate::io::flatgeobuf::reader::common::infer_schema; +use crate::io::flatgeobuf::reader::common::{infer_schema, FlatGeobufReaderOptions}; use crate::io::geozero::array::MixedGeometryStreamBuilder; use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions}; use crate::table::GeoTable; @@ -33,10 +33,9 @@ use std::sync::Arc; /// Read a FlatGeobuf file to a GeoTable pub fn read_flatgeobuf( file: &mut R, - coord_type: CoordType, - batch_size: Option, + options: FlatGeobufReaderOptions, ) -> Result { - let mut reader = FgbReader::open(file)?.select_all()?; + let reader = FgbReader::open(file)?; let header = reader.header(); if header.has_m() | header.has_t() | header.has_tm() | header.has_z() { @@ -45,57 +44,64 @@ pub fn read_flatgeobuf( )); } - let features_count = reader.features_count(); - let schema = infer_schema(header); + let geometry_type = header.geometry_type(); + + let mut selection = if let Some((min_x, min_y, max_x, max_y)) = options.bbox { + reader.select_bbox(min_x, min_y, max_x, max_y)? + } else { + reader.select_all()? + }; + + let features_count = selection.features_count(); // TODO: propagate CRS let options = GeoTableBuilderOptions::new( - coord_type, + options.coord_type, true, - batch_size, + options.batch_size, Some(Arc::new(schema.finish())), features_count, Default::default(), ); - match header.geometry_type() { + match geometry_type { GeometryType::Point => { let mut builder = GeoTableBuilder::::new_with_options(options); - reader.process_features(&mut builder)?; + selection.process_features(&mut builder)?; builder.finish() } GeometryType::LineString => { let mut builder = GeoTableBuilder::>::new_with_options(options); - reader.process_features(&mut builder)?; + selection.process_features(&mut builder)?; builder.finish() } GeometryType::Polygon => { let mut builder = GeoTableBuilder::>::new_with_options(options); - reader.process_features(&mut builder)?; + selection.process_features(&mut builder)?; builder.finish() } GeometryType::MultiPoint => { let mut builder = GeoTableBuilder::>::new_with_options(options); - reader.process_features(&mut builder)?; + selection.process_features(&mut builder)?; builder.finish() } GeometryType::MultiLineString => { let mut builder = GeoTableBuilder::>::new_with_options(options); - reader.process_features(&mut builder)?; + selection.process_features(&mut builder)?; builder.finish() } GeometryType::MultiPolygon => { let mut builder = GeoTableBuilder::>::new_with_options(options); - reader.process_features(&mut builder)?; + selection.process_features(&mut builder)?; builder.finish() } GeometryType::Unknown => { let mut builder = GeoTableBuilder::>::new_with_options(options); - reader.process_features(&mut builder)?; + selection.process_features(&mut builder)?; let table = builder.finish()?; table.downcast(true) } @@ -117,7 +123,7 @@ mod test { #[test] fn test_countries() { let mut filein = BufReader::new(File::open("fixtures/flatgeobuf/countries.fgb").unwrap()); - let _table = read_flatgeobuf(&mut filein, Default::default(), None).unwrap(); + let _table = read_flatgeobuf(&mut filein, Default::default()).unwrap(); } #[test] @@ -125,6 +131,6 @@ mod test { let mut filein = BufReader::new( File::open("fixtures/flatgeobuf/nz-building-outlines-small.fgb").unwrap(), ); - let _table = read_flatgeobuf(&mut filein, Default::default(), None).unwrap(); + let _table = read_flatgeobuf(&mut filein, Default::default()).unwrap(); } } diff --git a/src/io/flatgeobuf/writer.rs b/src/io/flatgeobuf/writer.rs index 4e2ee76ed..d0881a4eb 100644 --- a/src/io/flatgeobuf/writer.rs +++ b/src/io/flatgeobuf/writer.rs @@ -69,7 +69,7 @@ mod test { write_flatgeobuf(&mut table, writer, "name").unwrap(); let mut reader = Cursor::new(output_buffer); - let new_table = read_flatgeobuf(&mut reader, Default::default(), None).unwrap(); + let new_table = read_flatgeobuf(&mut reader, Default::default()).unwrap(); // TODO: it looks like it's getting read back in backwards row order! let batch = &new_table.batches()[0];