Skip to content

Commit

Permalink
Pass in bbox to read_flatgeobuf (#544)
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Mar 1, 2024
1 parent 589c636 commit 6dd81b6
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 51 deletions.
8 changes: 6 additions & 2 deletions js/src/io/flatgeobuf.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io::Cursor;

use geoarrow::io::flatgeobuf::read_flatgeobuf as _read_flatgeobuf;
use geoarrow::io::flatgeobuf::{read_flatgeobuf as _read_flatgeobuf, FlatGeobufReaderOptions};
// use parquet_wasm::utils::assert_parquet_file_not_empty;
use wasm_bindgen::prelude::*;

Expand Down Expand Up @@ -28,6 +28,10 @@ use crate::table::GeoTable;
pub fn read_flatgeobuf(file: &[u8], batch_size: Option<usize>) -> WasmResult<GeoTable> {
// assert_parquet_file_not_empty(parquet_file)?;
let mut cursor = Cursor::new(file);
let geo_table = _read_flatgeobuf(&mut cursor, Default::default(), batch_size)?;
let options = FlatGeobufReaderOptions {
batch_size,
..Default::default()
};
let geo_table = _read_flatgeobuf(&mut cursor, options)?;
Ok(GeoTable(geo_table))
}
12 changes: 10 additions & 2 deletions python/core/python/geoarrow/rust/core/_rust.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ from .types import (
ArrowArrayExportable,
ArrowStreamExportable,
AreaMethodT,
IntFloat,
LengthMethodT,
NativeChunkedGeometryArrayT,
NativeGeometryArrayT,
Expand Down Expand Up @@ -1320,10 +1321,17 @@ def read_csv(
batch_size: int = 65536,
) -> GeoTable: ...
def read_flatgeobuf(
file: Union[str, Path, BinaryIO], *, batch_size: int = 65536
file: Union[str, Path, BinaryIO],
*,
batch_size: int = 65536,
bbox: Tuple[IntFloat, IntFloat, IntFloat, IntFloat] | None = None,
) -> GeoTable: ...
async def read_flatgeobuf_async(
url: str, *, batch_size: int = 65536, options: Dict[str, str] | None = None
url: str,
*,
batch_size: int = 65536,
options: Dict[str, str] | None = None,
bbox: Tuple[IntFloat, IntFloat, IntFloat, IntFloat] | None = None,
) -> GeoTable: ...
def read_geojson(
file: Union[str, Path, BinaryIO], *, batch_size: int = 65536
Expand Down
39 changes: 31 additions & 8 deletions python/core/src/io/flatgeobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::error::{PyGeoArrowError, PyGeoArrowResult};
use crate::io::file::{BinaryFileReader, BinaryFileWriter};
use crate::table::GeoTable;
use flatgeobuf::FgbWriterOptions;
use geoarrow::io::flatgeobuf::read_flatgeobuf as _read_flatgeobuf;
use geoarrow::io::flatgeobuf::read_flatgeobuf_async as _read_flatgeobuf_async;
use geoarrow::io::flatgeobuf::write_flatgeobuf_with_options as _write_flatgeobuf;
use geoarrow::io::flatgeobuf::{read_flatgeobuf as _read_flatgeobuf, FlatGeobufReaderOptions};
use object_store::{parse_url, parse_url_opts};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
Expand All @@ -16,33 +16,52 @@ use url::Url;
///
/// Args:
/// file: the path to the file or a Python file object in binary read mode.
///
/// Other args:
/// batch_size: the number of rows to include in each internal batch of the table.
/// bbox: A spatial filter for reading rows, of the format (minx, miny, maxx, maxy). If set to
/// `None`, no spatial filtering will be performed.
///
/// Returns:
/// Table from FlatGeobuf file.
#[pyfunction]
#[pyo3(signature = (file, *, batch_size=65536))]
#[pyo3(signature = (file, *, batch_size=65536, bbox=None))]
pub fn read_flatgeobuf(
py: Python,
file: PyObject,
batch_size: usize,
bbox: Option<(f64, f64, f64, f64)>,
) -> PyGeoArrowResult<GeoTable> {
let mut reader = file.extract::<BinaryFileReader>(py)?;
let table = _read_flatgeobuf(&mut reader, Default::default(), Some(batch_size))?;
let options = FlatGeobufReaderOptions {
batch_size: Some(batch_size),
bbox,
..Default::default()
};
let table = _read_flatgeobuf(&mut reader, options)?;
Ok(GeoTable(table))
}

/// Read a FlatGeobuf file from a url into a GeoTable.
///
/// Args:
/// url: the url to a remote FlatGeobuf file
///
/// Other args:
/// batch_size: the number of rows to include in each internal batch of the table.
/// bbox: A spatial filter for reading rows, of the format (minx, miny, maxx, maxy). If set to
/// `None`, no spatial filtering will be performed.
///
/// Returns:
/// Table from FlatGeobuf file.
#[pyfunction]
#[pyo3(signature = (url, *, batch_size=65536, options=None))]
#[pyo3(signature = (url, *, batch_size=65536, options=None, bbox=None))]
pub fn read_flatgeobuf_async(
py: Python,
url: String,
batch_size: usize,
options: Option<HashMap<String, String>>,
bbox: Option<(f64, f64, f64, f64)>,
) -> PyGeoArrowResult<PyObject> {
let fut = pyo3_asyncio::tokio::future_into_py(py, async move {
let url = Url::parse(&url).map_err(|err| PyValueError::new_err(err.to_string()))?;
Expand All @@ -55,10 +74,14 @@ pub fn read_flatgeobuf_async(
// dbg!(&reader);
// dbg!(&location);

let table =
_read_flatgeobuf_async(reader, location, Default::default(), Some(batch_size), None)
.await
.map_err(PyGeoArrowError::GeoArrowError)?;
let options = FlatGeobufReaderOptions {
batch_size: Some(batch_size),
bbox,
..Default::default()
};
let table = _read_flatgeobuf_async(reader, location, options)
.await
.map_err(PyGeoArrowError::GeoArrowError)?;

Ok(GeoTable(table))
})?;
Expand Down
2 changes: 1 addition & 1 deletion src/io/flatgeobuf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod reader;
mod writer;

pub use reader::read_flatgeobuf;
#[cfg(feature = "flatgeobuf_async")]
pub use reader::read_flatgeobuf_async;
pub use reader::{read_flatgeobuf, FlatGeobufReaderOptions};
pub use writer::{write_flatgeobuf, write_flatgeobuf_with_options};
46 changes: 27 additions & 19 deletions src/io/flatgeobuf/reader/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use object_store::ObjectStore;
use crate::algorithm::native::Downcast;
use crate::array::*;
use crate::error::{GeoArrowError, Result};
use crate::io::flatgeobuf::reader::common::infer_schema;
use crate::io::flatgeobuf::reader::common::{infer_schema, FlatGeobufReaderOptions};
use crate::io::flatgeobuf::reader::object_store_reader::ObjectStoreWrapper;
use crate::io::geozero::array::MixedGeometryStreamBuilder;
use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions};
Expand All @@ -17,9 +17,7 @@ use crate::table::GeoTable;
pub async fn read_flatgeobuf_async<T: ObjectStore>(
reader: T,
location: Path,
coord_type: CoordType,
batch_size: Option<usize>,
bbox: Option<(f64, f64, f64, f64)>,
options: FlatGeobufReaderOptions,
) -> Result<GeoTable> {
let head = reader.head(&location).await?;

Expand All @@ -42,7 +40,7 @@ pub async fn read_flatgeobuf_async<T: ObjectStore>(
let schema = infer_schema(header);
let geometry_type = header.geometry_type();

let mut selection = if let Some((min_x, min_y, max_x, max_y)) = bbox {
let mut selection = if let Some((min_x, min_y, max_x, max_y)) = options.bbox {
reader.select_bbox(min_x, min_y, max_x, max_y).await?
} else {
reader.select_all().await?
Expand All @@ -52,9 +50,9 @@ pub async fn read_flatgeobuf_async<T: ObjectStore>(

// TODO: propagate CRS
let options = GeoTableBuilderOptions::new(
coord_type,
options.coord_type,
true,
batch_size,
options.batch_size,
Some(Arc::new(schema.finish())),
features_count,
Default::default(),
Expand Down Expand Up @@ -118,26 +116,36 @@ mod test {
#[tokio::test]
async fn test_countries() {
let fs = LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap();
let _table = read_flatgeobuf_async(
fs,
Path::from("fixtures/flatgeobuf/countries.fgb"),
Default::default(),
None,
None,
)
.await
.unwrap();
let options = FlatGeobufReaderOptions::default();
let table =
read_flatgeobuf_async(fs, Path::from("fixtures/flatgeobuf/countries.fgb"), options)
.await
.unwrap();
assert_eq!(table.len(), 179);
}

#[tokio::test]
async fn test_countries_bbox() {
let fs = LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap();
let options = FlatGeobufReaderOptions {
bbox: Some((0., -90., 180., 90.)),
..Default::default()
};
let table =
read_flatgeobuf_async(fs, Path::from("fixtures/flatgeobuf/countries.fgb"), options)
.await
.unwrap();
assert_eq!(table.len(), 133);
}

#[tokio::test]
async fn test_nz_buildings() {
let fs = LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap();
let options = FlatGeobufReaderOptions::default();
let _table = read_flatgeobuf_async(
fs,
Path::from("fixtures/flatgeobuf/nz-building-outlines-small.fgb"),
Default::default(),
None,
None,
options,
)
.await
.unwrap();
Expand Down
27 changes: 27 additions & 0 deletions src/io/flatgeobuf/reader/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,33 @@
use arrow_schema::{DataType, Field, SchemaBuilder, TimeUnit};
use flatgeobuf::{ColumnType, Header};

use crate::array::CoordType;

/// Options for the FlatGeobuf reader
#[derive(Debug, Clone, Copy)]
pub struct FlatGeobufReaderOptions {
/// The GeoArrow coordinate type to use in the geometry arrays.
pub coord_type: CoordType,

/// The number of rows in each batch.
pub batch_size: Option<usize>,

/// A spatial filter for reading rows.
///
/// If set to `None`, no spatial filtering will be performed.
pub bbox: Option<(f64, f64, f64, f64)>,
}

impl Default for FlatGeobufReaderOptions {
fn default() -> Self {
Self {
coord_type: Default::default(),
batch_size: Some(65_536),
bbox: None,
}
}
}

pub(super) fn infer_schema(header: Header<'_>) -> SchemaBuilder {
let columns = header.columns().unwrap();
let mut schema = SchemaBuilder::with_capacity(columns.len());
Expand Down
1 change: 1 addition & 0 deletions src/io/flatgeobuf/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod common;
mod object_store_reader;
mod sync;

pub use common::FlatGeobufReaderOptions;
#[cfg(feature = "flatgeobuf_async")]
pub use r#async::read_flatgeobuf_async;
pub use sync::read_flatgeobuf;
42 changes: 24 additions & 18 deletions src/io/flatgeobuf/reader/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use crate::algorithm::native::Downcast;
use crate::array::*;
use crate::error::{GeoArrowError, Result};
use crate::io::flatgeobuf::reader::common::infer_schema;
use crate::io::flatgeobuf::reader::common::{infer_schema, FlatGeobufReaderOptions};
use crate::io::geozero::array::MixedGeometryStreamBuilder;
use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions};
use crate::table::GeoTable;
Expand All @@ -33,10 +33,9 @@ use std::sync::Arc;
/// Read a FlatGeobuf file to a GeoTable
pub fn read_flatgeobuf<R: Read + Seek>(
file: &mut R,
coord_type: CoordType,
batch_size: Option<usize>,
options: FlatGeobufReaderOptions,
) -> Result<GeoTable> {
let mut reader = FgbReader::open(file)?.select_all()?;
let reader = FgbReader::open(file)?;

let header = reader.header();
if header.has_m() | header.has_t() | header.has_tm() | header.has_z() {
Expand All @@ -45,57 +44,64 @@ pub fn read_flatgeobuf<R: Read + Seek>(
));
}

let features_count = reader.features_count();

let schema = infer_schema(header);
let geometry_type = header.geometry_type();

let mut selection = if let Some((min_x, min_y, max_x, max_y)) = options.bbox {
reader.select_bbox(min_x, min_y, max_x, max_y)?
} else {
reader.select_all()?
};

let features_count = selection.features_count();

// TODO: propagate CRS
let options = GeoTableBuilderOptions::new(
coord_type,
options.coord_type,
true,
batch_size,
options.batch_size,
Some(Arc::new(schema.finish())),
features_count,
Default::default(),
);

match header.geometry_type() {
match geometry_type {
GeometryType::Point => {
let mut builder = GeoTableBuilder::<PointBuilder>::new_with_options(options);
reader.process_features(&mut builder)?;
selection.process_features(&mut builder)?;
builder.finish()
}
GeometryType::LineString => {
let mut builder = GeoTableBuilder::<LineStringBuilder<i32>>::new_with_options(options);
reader.process_features(&mut builder)?;
selection.process_features(&mut builder)?;
builder.finish()
}
GeometryType::Polygon => {
let mut builder = GeoTableBuilder::<PolygonBuilder<i32>>::new_with_options(options);
reader.process_features(&mut builder)?;
selection.process_features(&mut builder)?;
builder.finish()
}
GeometryType::MultiPoint => {
let mut builder = GeoTableBuilder::<MultiPointBuilder<i32>>::new_with_options(options);
reader.process_features(&mut builder)?;
selection.process_features(&mut builder)?;
builder.finish()
}
GeometryType::MultiLineString => {
let mut builder =
GeoTableBuilder::<MultiLineStringBuilder<i32>>::new_with_options(options);
reader.process_features(&mut builder)?;
selection.process_features(&mut builder)?;
builder.finish()
}
GeometryType::MultiPolygon => {
let mut builder =
GeoTableBuilder::<MultiPolygonBuilder<i32>>::new_with_options(options);
reader.process_features(&mut builder)?;
selection.process_features(&mut builder)?;
builder.finish()
}
GeometryType::Unknown => {
let mut builder =
GeoTableBuilder::<MixedGeometryStreamBuilder<i32>>::new_with_options(options);
reader.process_features(&mut builder)?;
selection.process_features(&mut builder)?;
let table = builder.finish()?;
table.downcast(true)
}
Expand All @@ -117,14 +123,14 @@ mod test {
#[test]
fn test_countries() {
let mut filein = BufReader::new(File::open("fixtures/flatgeobuf/countries.fgb").unwrap());
let _table = read_flatgeobuf(&mut filein, Default::default(), None).unwrap();
let _table = read_flatgeobuf(&mut filein, Default::default()).unwrap();
}

#[test]
fn test_nz_buildings() {
let mut filein = BufReader::new(
File::open("fixtures/flatgeobuf/nz-building-outlines-small.fgb").unwrap(),
);
let _table = read_flatgeobuf(&mut filein, Default::default(), None).unwrap();
let _table = read_flatgeobuf(&mut filein, Default::default()).unwrap();
}
}
2 changes: 1 addition & 1 deletion src/io/flatgeobuf/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ mod test {
write_flatgeobuf(&mut table, writer, "name").unwrap();

let mut reader = Cursor::new(output_buffer);
let new_table = read_flatgeobuf(&mut reader, Default::default(), None).unwrap();
let new_table = read_flatgeobuf(&mut reader, Default::default()).unwrap();

// TODO: it looks like it's getting read back in backwards row order!
let batch = &new_table.batches()[0];
Expand Down

0 comments on commit 6dd81b6

Please sign in to comment.