Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return all chip geometries #98

Merged
merged 15 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/source/api/spatial-functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ mosaicfill
>>> df = spark.createDataFrame([{'wkt': 'MULTIPOLYGON (((30 20, 45 40, 10 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5)))'}])
>>> df.select(mosaicfill('wkt', lit(0))).printSchema()
root
|-- h3_mosaicfill(wkt, 0): mosaic (nullable = true)
|-- mosaicfill(wkt, 0): mosaic (nullable = true)
| |-- chips: array (nullable = true)
| | |-- element: mosaic_chip (containsNull = true)
| | | |-- is_core: boolean (nullable = true)
Expand All @@ -911,7 +911,7 @@ mosaicfill

>>> df.select(mosaicfill('wkt', lit(0))).show()
+---------------------+
|h3_mosaicfill(wkt, 0)|
|mosaicfill(wkt, 0) |
+---------------------+
| {[{false, 5774810...|
+---------------------+
Expand All @@ -921,7 +921,7 @@ mosaicfill
>>> val df = List(("MULTIPOLYGON (((30 20, 45 40, 10 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5)))")).toDF("wkt")
>>> df.select(mosaicfill($"wkt", lit(0))).printSchema
root
|-- h3_mosaicfill(wkt, 0): mosaic (nullable = true)
|-- mosaicfill(wkt, 0): mosaic (nullable = true)
| |-- chips: array (nullable = true)
| | |-- element: mosaic_chip (containsNull = true)
| | | |-- is_core: boolean (nullable = true)
Expand All @@ -930,7 +930,7 @@ mosaicfill

>>> df.select(mosaicfill($"wkt", lit(0))).show()
+---------------------+
|h3_mosaicfill(wkt, 0)|
|mosaicfill(wkt, 0) |
+---------------------+
| {[{false, 5774810...|
+---------------------+
Expand All @@ -939,7 +939,7 @@ mosaicfill

>>> SELECT mosaicfill("MULTIPOLYGON (((30 20, 45 40, 10 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5)))", 0)
+---------------------+
|h3_mosaicfill(wkt, 0)|
|mosaicfill(wkt, 0) |
+---------------------+
| {[{false, 5774810...|
+---------------------+
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/databricks/labs/mosaic/core/Mosaic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.databricks.labs.mosaic.core.types.model.GeometryTypeEnum.{LINESTRING,
*/
object Mosaic {

def mosaicFill(geometry: MosaicGeometry, resolution: Int, indexSystem: IndexSystem, geometryAPI: GeometryAPI): Seq[MosaicChip] = {
def mosaicFill(geometry: MosaicGeometry, resolution: Int, keepCoreGeom: Boolean, indexSystem: IndexSystem, geometryAPI: GeometryAPI): Seq[MosaicChip] = {

val radius = indexSystem.getBufferRadius(geometry, resolution, geometryAPI)

Expand All @@ -34,8 +34,8 @@ object Mosaic {
val coreIndices = indexSystem.polyfill(carvedGeometry, resolution)
val borderIndices = indexSystem.polyfill(borderGeometry, resolution)

val coreChips = indexSystem.getCoreChips(coreIndices)
val borderChips = indexSystem.getBorderChips(geometry, borderIndices, geometryAPI)
val coreChips = indexSystem.getCoreChips(coreIndices, keepCoreGeom, geometryAPI)
val borderChips = indexSystem.getBorderChips(geometry, borderIndices, keepCoreGeom, geometryAPI)

coreChips ++ borderChips
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,17 @@ object H3IndexSystem extends IndexSystem with Serializable {
override def getBorderChips(
geometry: MosaicGeometry,
borderIndices: util.List[java.lang.Long],
keepCoreGeom: Boolean,
geometryAPI: GeometryAPI
): Seq[MosaicChip] = {
val intersections = for (index <- borderIndices.asScala) yield {
val indexGeom = indexToGeometry(index, geometryAPI)
val chip = MosaicChip(isCore = false, index, indexGeom)
chip.intersection(geometry)
val intersect = geometry.intersection(indexGeom)
val isCore = intersect.equals(indexGeom)

val chipGeom = if (!isCore || keepCoreGeom) intersect else null

MosaicChip(isCore = isCore, index, chipGeom)
}
intersections.filterNot(_.isEmpty)
}
Expand All @@ -146,8 +151,11 @@ object H3IndexSystem extends IndexSystem with Serializable {
* @return
* A core area representation via [[MosaicChip]] set.
*/
override def getCoreChips(coreIndices: util.List[lang.Long]): Seq[MosaicChip] = {
coreIndices.asScala.map(MosaicChip(true, _, null))
override def getCoreChips(coreIndices: util.List[lang.Long], keepCoreGeom: Boolean, geometryAPI: GeometryAPI): Seq[MosaicChip] = {
coreIndices.asScala.map(index => {
val indexGeom = if (keepCoreGeom) indexToGeometry(index, geometryAPI) else null
MosaicChip(isCore = true, index, indexGeom)
})
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ trait IndexSystem extends Serializable {
* @return
* A border area representation via [[MosaicChip]] set.
*/
def getBorderChips(geometry: MosaicGeometry, borderIndices: util.List[java.lang.Long], geometryAPI: GeometryAPI): Seq[MosaicChip]
def getBorderChips(geometry: MosaicGeometry, borderIndices: util.List[java.lang.Long], keepCoreGeom: Boolean, geometryAPI: GeometryAPI): Seq[MosaicChip]

/**
* Return a set of [[MosaicChip]] instances computed based on the core
Expand All @@ -113,7 +113,7 @@ trait IndexSystem extends Serializable {
* @return
* A core area representation via [[MosaicChip]] set.
*/
def getCoreChips(coreIndices: util.List[java.lang.Long]): Seq[MosaicChip]
def getCoreChips(coreIndices: util.List[java.lang.Long], keepCoreGeom: Boolean, geometryAPI: GeometryAPI): Seq[MosaicChip]

/**
* Get the geometry corresponding to the index with the input id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,6 @@ import org.apache.spark.sql.catalyst.InternalRow
*/
case class MosaicChip(isCore: Boolean, index: Long, geom: MosaicGeometry) {

/**
* Perform an intersection with a geometry, and if intersection is non
* empty and the chip is not a core set chip then extract the chip
* geometry.
*
* @param other
* Geometry instance.
* @return
* A Mosaic Chip instance.
*/
def intersection(other: MosaicGeometry): MosaicChip = {
val intersect = other.intersection(geom)
val isCore = intersect.equals(geom)
if (isCore) {
MosaicChip(isCore, index, null)
} else {
MosaicChip(isCore, index, intersect)
}
}
edurdevic marked this conversation as resolved.
Show resolved Hide resolved

/**
* Indicates whether the chip is outside of the representation of the
* geometry it was generated to represent (ie false positive index).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ object MosaicExplode {
val geometry = geometryAPI.geometry(inputData, geomType)

val resolution = inputData.getInt(1)
val keepCoreGeom = inputData.getBoolean(2)

val chips = GeometryTypeEnum.fromString(geometry.getGeometryType) match {
case LINESTRING => Mosaic.lineFill(geometry, resolution, indexSystem, geometryAPI)
case MULTILINESTRING => Mosaic.lineFill(geometry, resolution, indexSystem, geometryAPI)
case _ => Mosaic.mosaicFill(geometry, resolution, indexSystem, geometryAPI)
case _ => Mosaic.mosaicFill(geometry, resolution, keepCoreGeom, indexSystem, geometryAPI)
}
edurdevic marked this conversation as resolved.
Show resolved Hide resolved

chips.map(row => InternalRow.fromSeq(Seq(row.serialize)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.locationtech.jts.geom.Geometry

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{
BinaryExpression,
TernaryExpression,
ExpectsInputTypes,
Expression,
ExpressionDescription,
Expand All @@ -30,33 +30,36 @@ import org.apache.spark.sql.types._
""",
since = "1.0"
)
case class MosaicFill(geom: Expression, resolution: Expression, indexSystemName: String, geometryAPIName: String)
extends BinaryExpression
case class MosaicFill(geom: Expression, resolution: Expression, keepCoreGeom: Expression, indexSystemName: String, geometryAPIName: String)
extends TernaryExpression
with ExpectsInputTypes
with NullIntolerant
with CodegenFallback {

// noinspection DuplicatedCode
override def inputTypes: Seq[DataType] =
(left.dataType, right.dataType) match {
case (BinaryType, IntegerType) => Seq(BinaryType, IntegerType)
case (StringType, IntegerType) => Seq(StringType, IntegerType)
case (HexType, IntegerType) => Seq(HexType, IntegerType)
case (InternalGeometryType, IntegerType) => Seq(InternalGeometryType, IntegerType)
case _ => throw new IllegalArgumentException(s"Not supported data type: (${left.dataType}, ${right.dataType}).")
(first.dataType, second.dataType, third.dataType) match {
case (BinaryType, IntegerType, BooleanType) => Seq(BinaryType, IntegerType, BooleanType)
case (StringType, IntegerType, BooleanType) => Seq(StringType, IntegerType, BooleanType)
case (HexType, IntegerType, BooleanType) => Seq(HexType, IntegerType, BooleanType)
case (InternalGeometryType, IntegerType, BooleanType) => Seq(InternalGeometryType, IntegerType, BooleanType)
case _ =>
throw new IllegalArgumentException(s"Not supported data type: (${first.dataType}, ${second.dataType}, ${third.dataType}).")
}

override def right: Expression = resolution
override def first: Expression = geom

override def left: Expression = geom
override def second: Expression = resolution

override def third: Expression = keepCoreGeom

/** Expression output DataType. */
override def dataType: DataType = MosaicType

override def toString: String = s"h3_mosaicfill($geom, $resolution)"
override def toString: String = s"mosaicfill($geom, $resolution, $keepCoreGeom)"

/** Overridden to ensure [[Expression.sql]] is properly formatted. */
override def prettyName: String = "h3_mosaicfill"
override def prettyName: String = "mosaicfill"
edurdevic marked this conversation as resolved.
Show resolved Hide resolved

/**
* Type-wise differences in evaluation are only present on the input data
Expand All @@ -69,22 +72,25 @@ case class MosaicFill(geom: Expression, resolution: Expression, indexSystemName:
* Any instance containing the geometry.
* @param input2
* Any instance containing the resolution
* @param input3
* Any instance defining if core chips should be geometries or nulls
* @return
* A set of serialized
* [[com.databricks.labs.mosaic.core.types.model.MosaicChip]].
*/
// noinspection DuplicatedCode
override def nullSafeEval(input1: Any, input2: Any): Any = {
override def nullSafeEval(input1: Any, input2: Any, input3: Any): Any = {
val resolution: Int = H3IndexSystem.getResolution(input2)
val keepCoreGeom: Boolean = input3.asInstanceOf[Boolean]

val indexSystem = IndexSystemID.getIndexSystem(IndexSystemID(indexSystemName))
val geometryAPI = GeometryAPI(geometryAPIName)
val geometry = geometryAPI.geometry(input1, left.dataType)
val geometry = geometryAPI.geometry(input1, first.dataType)

val chips = GeometryTypeEnum.fromString(geometry.getGeometryType) match {
case LINESTRING => Mosaic.lineFill(geometry, resolution, indexSystem, geometryAPI)
case MULTILINESTRING => Mosaic.lineFill(geometry, resolution, indexSystem, geometryAPI)
case _ => Mosaic.mosaicFill(geometry, resolution, indexSystem, geometryAPI)
case _ => Mosaic.mosaicFill(geometry, resolution, keepCoreGeom, indexSystem, geometryAPI)
}

val serialized = InternalRow.fromSeq(
Expand All @@ -97,14 +103,14 @@ case class MosaicFill(geom: Expression, resolution: Expression, indexSystemName:
}

override def makeCopy(newArgs: Array[AnyRef]): Expression = {
val asArray = newArgs.take(2).map(_.asInstanceOf[Expression])
val res = MosaicFill(asArray(0), asArray(1), indexSystemName, geometryAPIName)
val asArray = newArgs.take(3).map(_.asInstanceOf[Expression])
val res = MosaicFill(asArray(0), asArray(1), asArray(2), indexSystemName, geometryAPIName)
res.copyTagsFrom(this)
res
}

override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression =
copy(geom = newLeft, resolution = newRight)
override protected def withNewChildrenInternal(newFirst: Expression, newSecond: Expression, newThird: Expression): Expression =
copy(geom = newFirst, resolution = newSecond, keepCoreGeom = newThird)

}

Expand All @@ -115,14 +121,14 @@ object MosaicFill {
new ExpressionInfo(
classOf[IndexGeometry].getCanonicalName,
db.orNull,
"mosaic_fill",
"mosaicfill",
"""
| _FUNC_(geometry, resolution) - Returns the 2 set representation of geometry at resolution.
| _FUNC_(geometry, resolution, keepCoreGeom) - Returns the 2 set representation of geometry at resolution.
""".stripMargin,
"",
"""
| Examples:
| > SELECT _FUNC_(a, b);
| > SELECT _FUNC_(a, b, c);
| [{index_id, is_border, chip_geom}, {index_id, is_border, chip_geom}, ..., {index_id, is_border, chip_geom}]
| """.stripMargin,
"",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,25 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends
FunctionIdentifier("mosaic_explode", database),
MosaicExplode.registryExpressionInfo(database),
(exprs: Seq[Expression]) =>
MosaicExplode(struct(ColumnAdapter(exprs(0)), ColumnAdapter(exprs(1))).expr, indexSystem.name, geometryAPI.name)
exprs match {
case e if e.length == 2 =>
MosaicExplode(struct(ColumnAdapter(e(0)), ColumnAdapter(e(1)), lit(true)).expr, indexSystem.name, geometryAPI.name)
case e if e.length == 3 =>
MosaicExplode(
struct(ColumnAdapter(e(0)), ColumnAdapter(e(1)), ColumnAdapter(e(1))).expr,
indexSystem.name,
geometryAPI.name
)
}
)
registry.registerFunction(
FunctionIdentifier("mosaicfill", database),
MosaicFill.registryExpressionInfo(database),
(exprs: Seq[Expression]) => MosaicFill(exprs(0), exprs(1), indexSystem.name, geometryAPI.name)
(exprs: Seq[Expression]) =>
exprs match {
case e if e.length == 2 => MosaicFill(e(0), e(1), lit(true).expr, indexSystem.name, geometryAPI.name)
case e if e.length == 3 => MosaicFill(e(0), e(1), e(2), indexSystem.name, geometryAPI.name)
}
)
registry.registerFunction(
FunctionIdentifier("point_index_lonlat", database),
Expand Down Expand Up @@ -379,13 +392,21 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends

/** IndexSystem and GeometryAPI Specific methods */
def mosaic_explode(geom: Column, resolution: Column): Column =
ColumnAdapter(MosaicExplode(struct(geom, resolution).expr, indexSystem.name, geometryAPI.name))
ColumnAdapter(MosaicExplode(struct(geom, resolution, lit(true)).expr, indexSystem.name, geometryAPI.name))
def mosaic_explode(geom: Column, resolution: Column, keepCoreGeometries: Boolean): Column =
ColumnAdapter(MosaicExplode(struct(geom, resolution, lit(keepCoreGeometries)).expr, indexSystem.name, geometryAPI.name))
def mosaic_explode(geom: Column, resolution: Int): Column =
ColumnAdapter(MosaicExplode(struct(geom, lit(resolution)).expr, indexSystem.name, geometryAPI.name))
ColumnAdapter(MosaicExplode(struct(geom, lit(resolution), lit(true)).expr, indexSystem.name, geometryAPI.name))
def mosaic_explode(geom: Column, resolution: Int, keepCoreGeometries: Boolean): Column =
ColumnAdapter(MosaicExplode(struct(geom, lit(resolution), lit(keepCoreGeometries)).expr, indexSystem.name, geometryAPI.name))
def mosaicfill(geom: Column, resolution: Column): Column =
ColumnAdapter(MosaicFill(geom.expr, resolution.expr, indexSystem.name, geometryAPI.name))
ColumnAdapter(MosaicFill(geom.expr, resolution.expr, lit(true).expr, indexSystem.name, geometryAPI.name))
def mosaicfill(geom: Column, resolution: Int): Column =
ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, indexSystem.name, geometryAPI.name))
ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, lit(true).expr, indexSystem.name, geometryAPI.name))
def mosaicfill(geom: Column, resolution: Column, keepCoreGeometries: Boolean): Column =
ColumnAdapter(MosaicFill(geom.expr, resolution.expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name))
def mosaicfill(geom: Column, resolution: Int, keepCoreGeometries: Boolean): Column =
ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name))
def point_index_geom(point: Column, resolution: Column): Column =
ColumnAdapter(PointIndexGeom(point.expr, resolution.expr, indexSystem.name, geometryAPI.name))
def point_index_geom(point: Column, resolution: Int): Column =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import com.databricks.labs.mosaic.functions.MosaicContext
import com.databricks.labs.mosaic.test.mocks.{getBoroughs, getWKTRowsDf}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers._

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}


trait MosaicExplodeBehaviors {
this: AnyFlatSpec =>
Expand Down Expand Up @@ -37,6 +38,31 @@ trait MosaicExplodeBehaviors {
boroughs.collect().length should be < mosaics2.length
}

def wktDecomposeNoNulls(mosaicContext: => MosaicContext, spark: => SparkSession): Unit = {
val mc = mosaicContext
import mc.functions._
mosaicContext.register(spark)

val rdd = spark.sparkContext.makeRDD(Seq(
Row("POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))")
))
val schema = StructType(
List(
StructField("wkt", StringType)
)
)
val df = spark.createDataFrame(rdd, schema)

val emptyChips = df
.select(
mosaic_explode(col("wkt"), 4)
)
.filter(col("index.wkb").isNull)
.count()

emptyChips should equal(0)
}

def lineDecompose(mosaicContext: => MosaicContext, spark: => SparkSession): Unit = {
val mc = mosaicContext
import mc.functions._
Expand All @@ -61,6 +87,7 @@ trait MosaicExplodeBehaviors {
.collect()

wktRows.collect().length should be < mosaics2.length

}

def wkbDecompose(mosaicContext: => MosaicContext, spark: => SparkSession): Unit = {
Expand Down
Loading