Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi geometry table builder #417

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/io/geozero/table/builder/anyvalue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use arrow_schema::DataType;
use chrono::Utc;
use geozero::ColumnValue;

use crate::array::*;
use crate::error::Result;
use crate::trait_::GeometryArrayBuilder;

// Types implemented by FlatGeobuf/Geozero
#[derive(Debug)]
Expand All @@ -36,6 +38,15 @@ pub enum AnyBuilder {
Json(StringBuilder),
DateTime(TimestampMicrosecondBuilder),
Binary(BinaryBuilder),
// geometry builders
Point(PointBuilder),
LineString(LineStringBuilder<i32>),
Polygon(PolygonBuilder<i32>),
MultiPoint(MultiPointBuilder<i32>),
MultiLineString(MultiLineStringBuilder<i32>),
MultiPolygon(MultiPolygonBuilder<i32>),
MixedGeometry(MixedGeometryBuilder<i32>),
GeometryCollection(GeometryCollectionBuilder<i32>),
}

// TODO: I think unused; remove
Expand Down Expand Up @@ -268,6 +279,31 @@ impl AnyBuilder {
Json(arr) => arr.append_null(),
DateTime(arr) => arr.append_null(),
Binary(arr) => arr.append_null(),
Point(arr) => arr.push_null(),
LineString(arr) => arr.push_null(),
Polygon(arr) => arr.push_null(),
MultiPoint(arr) => arr.push_null(),
MultiLineString(arr) => arr.push_null(),
MultiPolygon(arr) => arr.push_null(),
MixedGeometry(arr) => arr.push_null(),
GeometryCollection(arr) => arr.push_null(),
}
}

pub fn is_geometry(&self) -> bool {
use AnyBuilder::*;
match self {
Bool(_) | Int8(_) | Uint8(_) | Int16(_) | Uint16(_) | Int32(_) | Uint32(_)
| Int64(_) | Uint64(_) | Float32(_) | Float64(_) | String(_) | Json(_)
| DateTime(_) | Binary(_) => false,
Point(_)
| LineString(_)
| Polygon(_)
| MultiPoint(_)
| MultiLineString(_)
| MultiPolygon(_)
| MixedGeometry(_)
| GeometryCollection(_) => true,
}
}

Expand All @@ -289,6 +325,8 @@ impl AnyBuilder {
Json(_) => DataType::Utf8,
DateTime(_) => DataType::Utf8,
Binary(_) => DataType::Binary,
// Don't want to accidentally store geometry as attribute
_ => todo!()
}
}

Expand All @@ -310,6 +348,14 @@ impl AnyBuilder {
Json(arr) => arr.len(),
DateTime(arr) => arr.len(),
Binary(arr) => arr.len(),
Point(arr) => arr.len(),
LineString(arr) => arr.len(),
Polygon(arr) => arr.len(),
MultiPoint(arr) => arr.len(),
MultiLineString(arr) => arr.len(),
MultiPolygon(arr) => arr.len(),
MixedGeometry(arr) => arr.len(),
GeometryCollection(arr) => arr.len(),
}
}

Expand Down
218 changes: 66 additions & 152 deletions src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::{SchemaBuilder, SchemaRef};
use arrow_array::{Array, RecordBatch};
use arrow_schema::{Field, SchemaBuilder, SchemaRef};

use crate::algorithm::native::Downcast;
use crate::array::*;
Expand All @@ -31,40 +31,87 @@ static GEOARROW_EXTENSION_NAMES: Set<&'static str> = phf_set! {
};

#[derive(Debug, PartialEq, Clone)]
pub struct GeoTable {
pub struct Table {
/// The schema of each batch within the table. This schema must include extension metadata for
/// any geometry columns.
schema: SchemaRef,

/// A list of [RecordBatch] objects, which store the actual table data.
batches: Vec<RecordBatch>,
geometry_column_index: usize,

/// The positional index of the primary geometry column.
///
/// - This can be `None` if there is no geometry column in the table.
/// - If a single geometry column exists in the table, this **MUST** be set.
/// - If more than one geometry column exists in the table, this **MAY** not be set. In that
/// case, the user must pass in the index of the geometry column they wish to access.
primary_geometry_idx: Option<usize>,
}

impl GeoTable {
impl Table {
pub fn try_new(
schema: SchemaRef,
batches: Vec<RecordBatch>,
geometry_column_index: usize,
primary_geometry_idx: Option<usize>,
) -> Result<Self> {
// TODO: validate
Ok(Self {
schema,
batches,
geometry_column_index,
primary_geometry_idx,
})
}

/// Remove a column from the table
pub fn remove_column(&mut self, idx: usize) {}

/// Append a column to this table.
pub fn append_column(&mut self, chunks: Vec<Arc<dyn Array>>, field: Arc<Field>) {}

/// Replace the column at index `idx` with new data
pub fn replace_column(
&mut self,
idx: usize,
chunks: Vec<Arc<dyn Array>>,
field: Arc<Field>,
) {
let schema = self.schema.borrow_mut();
schema.fields
todo!()
}

/// Construct a Table from an external Arrow source
///
/// This schema is a **descriptive** schema, not a **prescriptive** one. Casts will not be
/// taken into account.
///
/// `geometry_columns` can be used to prescribe
///
/// The geometry columns are the exception to this.
///
// Note: This function is relatively complex because we want to parse any WKB columns to
// geoarrow-native arrays
pub fn from_arrow(
batches: Vec<RecordBatch>,
schema: SchemaRef,
geometry_column_index: Option<usize>,
target_geo_data_type: Option<GeoDataType>,
geometry_columns: Option<&[(usize, Option<GeoDataType>)]>,
) -> Result<Self> {
if batches.is_empty() {
return Err(GeoArrowError::General("empty input".to_string()));
}

let num_batches = batches.len();

// - If `geometry_columns` is passed:
// - For each index, if GeoDataType exists, cast to that. Otherwise cast to LargeMixed.
// - If `geometry_columns` is `None` but fields exist with geoarrow metadata, parse

// Requirements:
// - enable Cast to handle from wkb to a type
// - Clean up this function by using `replace_column`

// -

let original_geometry_column_index = geometry_column_index.unwrap_or_else(|| {
schema
.fields
Expand Down Expand Up @@ -103,12 +150,8 @@ impl GeoTable {
new_batches.push(new_batch);
}

let orig_geom_slices = orig_geom_chunks
.iter()
.map(|c| c.as_ref())
.collect::<Vec<_>>();
let mut chunked_geometry_array =
from_arrow_chunks(orig_geom_slices.as_slice(), original_geometry_field)?;
from_arrow_chunks(orig_geom_chunks.as_slice(), original_geometry_field)?;

let target_geo_data_type =
target_geo_data_type.unwrap_or(GeoDataType::LargeMixed(Default::default()));
Expand Down Expand Up @@ -172,8 +215,8 @@ impl GeoTable {
self.len() == 0
}

pub fn into_inner(self) -> (SchemaRef, Vec<RecordBatch>, usize) {
(self.schema, self.batches, self.geometry_column_index)
pub fn into_inner(self) -> (SchemaRef, Vec<RecordBatch>, Option<usize>) {
(self.schema, self.batches, self.primary_geometry_idx)
}

pub fn schema(&self) -> &SchemaRef {
Expand All @@ -184,12 +227,13 @@ impl GeoTable {
&self.batches
}

pub fn geometry_column_index(&self) -> usize {
self.geometry_column_index
/// The index of the primary geometry column, if set.
pub fn primary_geometry_idx(&self) -> Option<usize> {
self.primary_geometry_idx
}

pub fn geometry_data_type(&self) -> Result<GeoDataType> {
Ok(*self.geometry()?.data_type())
pub fn geometry_data_type(&self, index: Option<usize>) -> Result<GeoDataType> {
Ok(*self.geometry(index)?.data_type())
}

/// The number of columns in this table.
Expand All @@ -198,143 +242,13 @@ impl GeoTable {
}

/// Access the geometry column of the table
pub fn geometry(&self) -> Result<Arc<dyn ChunkedGeometryArrayTrait>> {
pub fn geometry(&self, index: Option<usize>) -> Result<Arc<dyn ChunkedGeometryArrayTrait>> {
let field = self.schema.field(self.geometry_column_index);
let array_refs = self
.batches
.iter()
.map(|batch| batch.column(self.geometry_column_index))
.collect::<Vec<_>>();
let geo_data_type = GeoDataType::try_from(field)?;
match geo_data_type {
GeoDataType::Point(_) => {
let chunks: Result<Vec<PointArray>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::LineString(_) => {
let chunks: Result<Vec<LineStringArray<i32>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::LargeLineString(_) => {
let chunks: Result<Vec<LineStringArray<i64>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::Polygon(_) => {
let chunks: Result<Vec<PolygonArray<i32>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::LargePolygon(_) => {
let chunks: Result<Vec<PolygonArray<i64>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::MultiPoint(_) => {
let chunks: Result<Vec<MultiPointArray<i32>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::LargeMultiPoint(_) => {
let chunks: Result<Vec<MultiPointArray<i64>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::MultiLineString(_) => {
let chunks: Result<Vec<MultiLineStringArray<i32>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::LargeMultiLineString(_) => {
let chunks: Result<Vec<MultiLineStringArray<i64>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::MultiPolygon(_) => {
let chunks: Result<Vec<MultiPolygonArray<i32>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::LargeMultiPolygon(_) => {
let chunks: Result<Vec<MultiPolygonArray<i64>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::Mixed(_) => {
let chunks: Result<Vec<MixedGeometryArray<i32>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::LargeMixed(_) => {
let chunks: Result<Vec<MixedGeometryArray<i64>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::GeometryCollection(_) => {
let chunks: Result<Vec<GeometryCollectionArray<i32>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::LargeGeometryCollection(_) => {
let chunks: Result<Vec<GeometryCollectionArray<i64>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::WKB => {
let chunks: Result<Vec<WKBArray<i32>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::LargeWKB => {
let chunks: Result<Vec<WKBArray<i64>>> = array_refs
.into_iter()
.map(|arr| arr.as_ref().try_into())
.collect();
Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
GeoDataType::Rect => {
// tryfrom not implemented for RectArray
todo!()
// let chunks: Result<Vec<RectArray>> = array_refs
// .into_iter()
// .map(|arr| arr.as_ref().try_into())
// .collect();
// Ok(Arc::new(ChunkedGeometryArray::new(chunks?)))
}
}
from_arrow_chunks(array_refs.as_slice(), field)
}
}
Loading