Skip to content

Commit

Permalink
Merge pull request #98 from databrickslabs/feature/keep-code-geom
Browse files Browse the repository at this point in the history
Return all chip geometries
  • Loading branch information
sllynn committed Apr 28, 2022
2 parents 342fd0c + c902ea8 commit 5b53b7b
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 89 deletions.
18 changes: 11 additions & 7 deletions docs/source/api/spatial-functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ polyfill
mosaicfill
**********

.. function:: mosaicfill(geometry, resolution)
.. function:: mosaicfill(geometry, resolution, keep_core_geometries)

Generates:
- a set of core indices that are fully contained by `geometry`; and
Expand All @@ -1048,6 +1048,8 @@ mosaicfill
:type geometry: Column
:param resolution: Index resolution
:type resolution: Column: Integer
:param keep_core_geometries: Whether to keep the core geometries or set them to null
:type keep_core_geometries: Column: Boolean
:rtype: Column: ArrayType[MosaicType]

:example:
Expand All @@ -1058,7 +1060,7 @@ mosaicfill
>>> df = spark.createDataFrame([{'wkt': 'MULTIPOLYGON (((30 20, 45 40, 10 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5)))'}])
>>> df.select(mosaicfill('wkt', lit(0))).printSchema()
root
|-- h3_mosaicfill(wkt, 0): mosaic (nullable = true)
|-- mosaicfill(wkt, 0): mosaic (nullable = true)
| |-- chips: array (nullable = true)
| | |-- element: mosaic_chip (containsNull = true)
| | | |-- is_core: boolean (nullable = true)
Expand All @@ -1068,7 +1070,7 @@ mosaicfill

>>> df.select(mosaicfill('wkt', lit(0))).show()
+---------------------+
|h3_mosaicfill(wkt, 0)|
|mosaicfill(wkt, 0) |
+---------------------+
| {[{false, 5774810...|
+---------------------+
Expand All @@ -1078,7 +1080,7 @@ mosaicfill
>>> val df = List(("MULTIPOLYGON (((30 20, 45 40, 10 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5)))")).toDF("wkt")
>>> df.select(mosaicfill($"wkt", lit(0))).printSchema
root
|-- h3_mosaicfill(wkt, 0): mosaic (nullable = true)
|-- mosaicfill(wkt, 0): mosaic (nullable = true)
| |-- chips: array (nullable = true)
| | |-- element: mosaic_chip (containsNull = true)
| | | |-- is_core: boolean (nullable = true)
Expand All @@ -1087,7 +1089,7 @@ mosaicfill

>>> df.select(mosaicfill($"wkt", lit(0))).show()
+---------------------+
|h3_mosaicfill(wkt, 0)|
|mosaicfill(wkt, 0) |
+---------------------+
| {[{false, 5774810...|
+---------------------+
Expand All @@ -1096,7 +1098,7 @@ mosaicfill

>>> SELECT mosaicfill("MULTIPOLYGON (((30 20, 45 40, 10 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5)))", 0)
+---------------------+
|h3_mosaicfill(wkt, 0)|
|mosaicfill(wkt, 0) |
+---------------------+
| {[{false, 5774810...|
+---------------------+
Expand All @@ -1105,7 +1107,7 @@ mosaicfill
mosaic_explode
**************

.. function:: mosaic_explode(geometry, resolution)
.. function:: mosaic_explode(geometry, resolution, keep_core_geometries)

Returns the set of Mosaic chips covering the input `geometry` at `resolution`.

Expand All @@ -1115,6 +1117,8 @@ mosaic_explode
:type geometry: Column
:param resolution: Index resolution
:type resolution: Column: Integer
:param keep_core_geometries: Whether to keep the core geometries or set them to null
:type keep_core_geometries: Column: Boolean
:rtype: Column: MosaicType

:example:
Expand Down
26 changes: 20 additions & 6 deletions python/mosaic/api/functions.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from pyspark.sql import Column
from pyspark.sql.functions import _to_java_column as pyspark_to_java_column
from pyspark.sql.functions import col

from pyspark.sql.functions import lit
from typing import Any
from mosaic.config import config
from mosaic.utils.types import ColumnOrName, as_typed_col


#####################
# Spatial functions #
#####################
Expand Down Expand Up @@ -592,7 +593,7 @@ def polyfill(geom: ColumnOrName, resolution: ColumnOrName) -> Column:
)


def mosaic_explode(geom: ColumnOrName, resolution: ColumnOrName) -> Column:
def mosaic_explode(geom: ColumnOrName, resolution: ColumnOrName, keep_core_geometries: Any = True) -> Column:
"""
Generates:
- a set of core indices that are fully contained by `geom`; and
Expand All @@ -604,21 +605,27 @@ def mosaic_explode(geom: ColumnOrName, resolution: ColumnOrName) -> Column:
----------
geom : Column
resolution : Column (IntegerType)
keep_core_geometries : Column (BooleanType) | bool
Returns
-------
Column (StructType[is_core: BooleanType, h3: LongType, wkb: BinaryType])
`wkb` in this struct represents a border chip geometry and is null for all 'core' chips.
`wkb` in this struct represents a border chip geometry and is null for all 'core' chips
if keep_core_geometries is set to False.
"""
if(type(keep_core_geometries) == bool):
keep_core_geometries = lit(keep_core_geometries)

return config.mosaic_context.invoke_function(
"mosaic_explode",
pyspark_to_java_column(geom),
pyspark_to_java_column(resolution),
pyspark_to_java_column(keep_core_geometries)
)


def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName) -> Column:
def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName, keep_core_geometries: Any = True) -> Column:
"""
Generates:
- a set of core indices that are fully contained by `geom`; and
Expand All @@ -630,15 +637,22 @@ def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName) -> Column:
----------
geom : Column
resolution : Column (IntegerType)
keep_core_geometries : Column (BooleanType) | bool
Returns
-------
Column (ArrayType[StructType[is_core: BooleanType, h3: LongType, wkb: BinaryType]])
`wkb` in this struct represents a border chip geometry and is null for all 'core' chips.
`wkb` in this struct represents a border chip geometry and is null for all 'core' chips
if keep_core_geometries is set to False.
"""

if(type(keep_core_geometries) == bool):
keep_core_geometries = lit(keep_core_geometries)

return config.mosaic_context.invoke_function(
"mosaicfill",
pyspark_to_java_column(geom),
pyspark_to_java_column(resolution),
pyspark_to_java_column(keep_core_geometries)
)
4 changes: 4 additions & 0 deletions python/test/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ def test_st_bindings_happy_flow(self):
.withColumn("index_geometry", api.index_geometry(lit(1)))
.withColumn("polyfill", api.polyfill("wkt", lit(1)))
.withColumn("mosaic_explode", api.mosaic_explode("wkt", lit(1)))
.withColumn("mosaic_explode_no_core_chips", api.mosaic_explode("wkt", lit(1), lit(False)))
.withColumn("mosaic_explode_no_core_chips_bool", api.mosaic_explode("wkt", lit(1), False))
.withColumn("mosaicfill", api.mosaicfill("wkt", lit(1)))
.withColumn("mosaicfill_no_core_chips", api.mosaicfill("wkt", lit(1), False))
.withColumn("mosaicfill_no_core_chips_bool", api.mosaicfill("wkt", lit(1), lit(False)))
.withColumn(
"geom_with_srid", api.st_setsrid(api.st_geomfromwkt("wkt"), lit(4326))
)
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/databricks/labs/mosaic/core/Mosaic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.databricks.labs.mosaic.core.types.model.GeometryTypeEnum.{LINESTRING,
*/
object Mosaic {

def mosaicFill(geometry: MosaicGeometry, resolution: Int, indexSystem: IndexSystem, geometryAPI: GeometryAPI): Seq[MosaicChip] = {
def mosaicFill(geometry: MosaicGeometry, resolution: Int, keepCoreGeom: Boolean, indexSystem: IndexSystem, geometryAPI: GeometryAPI): Seq[MosaicChip] = {

val radius = indexSystem.getBufferRadius(geometry, resolution, geometryAPI)

Expand All @@ -34,8 +34,8 @@ object Mosaic {
val coreIndices = indexSystem.polyfill(carvedGeometry, resolution)
val borderIndices = indexSystem.polyfill(borderGeometry, resolution)

val coreChips = indexSystem.getCoreChips(coreIndices)
val borderChips = indexSystem.getBorderChips(geometry, borderIndices, geometryAPI)
val coreChips = indexSystem.getCoreChips(coreIndices, keepCoreGeom, geometryAPI)
val borderChips = indexSystem.getBorderChips(geometry, borderIndices, keepCoreGeom, geometryAPI)

coreChips ++ borderChips
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,17 @@ object H3IndexSystem extends IndexSystem with Serializable {
override def getBorderChips(
geometry: MosaicGeometry,
borderIndices: util.List[java.lang.Long],
keepCoreGeom: Boolean,
geometryAPI: GeometryAPI
): Seq[MosaicChip] = {
val intersections = for (index <- borderIndices.asScala) yield {
val indexGeom = indexToGeometry(index, geometryAPI)
val chip = MosaicChip(isCore = false, index, indexGeom)
chip.intersection(geometry)
val intersect = geometry.intersection(indexGeom)
val isCore = intersect.equals(indexGeom)

val chipGeom = if (!isCore || keepCoreGeom) intersect else null

MosaicChip(isCore = isCore, index, chipGeom)
}
intersections.filterNot(_.isEmpty)
}
Expand All @@ -146,8 +151,11 @@ object H3IndexSystem extends IndexSystem with Serializable {
* @return
* A core area representation via [[MosaicChip]] set.
*/
override def getCoreChips(coreIndices: util.List[lang.Long]): Seq[MosaicChip] = {
coreIndices.asScala.map(MosaicChip(true, _, null))
override def getCoreChips(coreIndices: util.List[lang.Long], keepCoreGeom: Boolean, geometryAPI: GeometryAPI): Seq[MosaicChip] = {
coreIndices.asScala.map(index => {
val indexGeom = if (keepCoreGeom) indexToGeometry(index, geometryAPI) else null
MosaicChip(isCore = true, index, indexGeom)
})
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ trait IndexSystem extends Serializable {
* @return
* A border area representation via [[MosaicChip]] set.
*/
def getBorderChips(geometry: MosaicGeometry, borderIndices: util.List[java.lang.Long], geometryAPI: GeometryAPI): Seq[MosaicChip]
def getBorderChips(geometry: MosaicGeometry, borderIndices: util.List[java.lang.Long], keepCoreGeom: Boolean, geometryAPI: GeometryAPI): Seq[MosaicChip]

/**
* Return a set of [[MosaicChip]] instances computed based on the core
Expand All @@ -113,7 +113,7 @@ trait IndexSystem extends Serializable {
* @return
* A core area representation via [[MosaicChip]] set.
*/
def getCoreChips(coreIndices: util.List[java.lang.Long]): Seq[MosaicChip]
def getCoreChips(coreIndices: util.List[java.lang.Long], keepCoreGeom: Boolean, geometryAPI: GeometryAPI): Seq[MosaicChip]

/**
* Get the geometry corresponding to the index with the input id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,6 @@ import org.apache.spark.sql.catalyst.InternalRow
*/
case class MosaicChip(isCore: Boolean, index: Long, geom: MosaicGeometry) {

/**
* Perform an intersection with a geometry, and if intersection is non
* empty and the chip is not a core set chip then extract the chip
* geometry.
*
* @param other
* Geometry instance.
* @return
* A Mosaic Chip instance.
*/
def intersection(other: MosaicGeometry): MosaicChip = {
val intersect = other.intersection(geom)
val isCore = intersect.equals(geom)
if (isCore) {
MosaicChip(isCore, index, null)
} else {
MosaicChip(isCore, index, intersect)
}
}

/**
* Indicates whether the chip is outside of the representation of the
* geometry it was generated to represent (ie false positive index).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ object MosaicExplode {
val geometry = geometryAPI.geometry(inputData, geomType)

val resolution = inputData.getInt(1)
val keepCoreGeom = inputData.getBoolean(2)

val chips = GeometryTypeEnum.fromString(geometry.getGeometryType) match {
case LINESTRING => Mosaic.lineFill(geometry, resolution, indexSystem, geometryAPI)
case MULTILINESTRING => Mosaic.lineFill(geometry, resolution, indexSystem, geometryAPI)
case _ => Mosaic.mosaicFill(geometry, resolution, indexSystem, geometryAPI)
case _ => Mosaic.mosaicFill(geometry, resolution, keepCoreGeom, indexSystem, geometryAPI)
}

chips.map(row => InternalRow.fromSeq(Seq(row.serialize)))
Expand All @@ -92,14 +93,15 @@ object MosaicExplode {
val fields = child.dataType.asInstanceOf[StructType].fields
val geomType = fields.head
val resolutionType = fields(1)
val keepCoreGeom = fields(2)

(geomType.dataType, resolutionType.dataType) match {
case (BinaryType, IntegerType) => TypeCheckResult.TypeCheckSuccess
case (StringType, IntegerType) => TypeCheckResult.TypeCheckSuccess
case (HexType, IntegerType) => TypeCheckResult.TypeCheckSuccess
case (InternalGeometryType, IntegerType) => TypeCheckResult.TypeCheckSuccess
(geomType.dataType, resolutionType.dataType, keepCoreGeom.dataType) match {
case (BinaryType, IntegerType, BooleanType) => TypeCheckResult.TypeCheckSuccess
case (StringType, IntegerType, BooleanType) => TypeCheckResult.TypeCheckSuccess
case (HexType, IntegerType, BooleanType) => TypeCheckResult.TypeCheckSuccess
case (InternalGeometryType, IntegerType, BooleanType) => TypeCheckResult.TypeCheckSuccess
case _ => TypeCheckResult.TypeCheckFailure(
s"Input to h3 mosaic explode should be (geometry, resolution) pair. " +
s"Input to h3 mosaic explode should be (geometry, resolution, keepCoreGeom) pair. " +
s"Geometry type can be WKB, WKT, Hex or Coords. Provided type was: ${child.dataType.catalogString}"
)
}
Expand All @@ -122,14 +124,15 @@ object MosaicExplode {
val fields = child.dataType.asInstanceOf[StructType].fields
val geomType = fields.head
val resolutionType = fields(1)
val keepCoreGeom = fields(2)

(geomType.dataType, resolutionType.dataType) match {
case (BinaryType, IntegerType) => StructType(Array(StructField("index", ChipType)))
case (StringType, IntegerType) => StructType(Array(StructField("index", ChipType)))
case (HexType, IntegerType) => StructType(Array(StructField("index", ChipType)))
case (InternalGeometryType, IntegerType) => StructType(Array(StructField("index", ChipType)))
(geomType.dataType, resolutionType.dataType, keepCoreGeom.dataType) match {
case (BinaryType, IntegerType, BooleanType) => StructType(Array(StructField("index", ChipType)))
case (StringType, IntegerType, BooleanType) => StructType(Array(StructField("index", ChipType)))
case (HexType, IntegerType, BooleanType) => StructType(Array(StructField("index", ChipType)))
case (InternalGeometryType, IntegerType, BooleanType) => StructType(Array(StructField("index", ChipType)))
case _ => throw new Error(
s"Input to h3 mosaic explode should be (geometry, resolution) pair. " +
s"Input to h3 mosaic explode should be (geometry, resolution, keepCoreGeom) pair. " +
s"Geometry type can be WKB, WKT, Hex or Coords. Provided type was: ${child.dataType.catalogString}"
)
}
Expand All @@ -149,14 +152,14 @@ object MosaicExplode {
db.orNull,
"mosaic_explode",
"""
| _FUNC_(struct(geometry, resolution)) - Generates the h3 mosaic chips for the input geometry
| at a given resolution. Geometry and resolution are provided via struct wrapper to ensure
| _FUNC_(struct(geometry, resolution, keepCoreGeom)) - Generates the h3 mosaic chips for the input
| geometry at a given resolution. Geometry and resolution are provided via struct wrapper to ensure
| UnaryExpression API is respected.
""".stripMargin,
"",
"""
| Examples:
| > SELECT _FUNC_(a, b);
| > SELECT _FUNC_(a, b, c);
| {index_id, is_border, chip_geom}
| {index_id, is_border, chip_geom}
| ...
Expand Down
Loading

0 comments on commit 5b53b7b

Please sign in to comment.