From 4ee0ad5839a497864365cc3341043c626ea0c1db Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Thu, 7 Apr 2022 16:14:16 +0200 Subject: [PATCH 01/12] Added geometries for all chips --- .../databricks/labs/mosaic/core/Mosaic.scala | 6 ++-- .../mosaic/core/index/H3IndexSystem.scala | 16 +++++++--- .../labs/mosaic/core/index/IndexSystem.scala | 4 +-- .../mosaic/core/types/model/MosaicChip.scala | 20 ------------- .../expressions/index/MosaicExplode.scala | 3 +- .../mosaic/expressions/index/MosaicFill.scala | 5 ++-- .../labs/mosaic/functions/MosaicContext.scala | 8 +++-- .../index/MosaicExplodeBehaviors.scala | 30 +++++++++++++++++-- .../expressions/index/TestMosaicExplode.scala | 5 ++++ 9 files changed, 61 insertions(+), 36 deletions(-) diff --git a/src/main/scala/com/databricks/labs/mosaic/core/Mosaic.scala b/src/main/scala/com/databricks/labs/mosaic/core/Mosaic.scala index 1ceb1bae6..05bb872fe 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/Mosaic.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/Mosaic.scala @@ -11,7 +11,7 @@ import com.databricks.labs.mosaic.core.types.model.MosaicChip */ object Mosaic { - def mosaicFill(geometry: MosaicGeometry, resolution: Int, indexSystem: IndexSystem, geometryAPI: GeometryAPI): Seq[MosaicChip] = { + def mosaicFill(geometry: MosaicGeometry, resolution: Int, keepCoreGeom: Boolean, indexSystem: IndexSystem, geometryAPI: GeometryAPI): Seq[MosaicChip] = { val radius = indexSystem.getBufferRadius(geometry, resolution, geometryAPI) @@ -28,8 +28,8 @@ object Mosaic { val coreIndices = indexSystem.polyfill(carvedGeometry, resolution) val borderIndices = indexSystem.polyfill(borderGeometry, resolution) - val coreChips = indexSystem.getCoreChips(coreIndices) - val borderChips = indexSystem.getBorderChips(geometry, borderIndices, geometryAPI) + val coreChips = indexSystem.getCoreChips(coreIndices, keepCoreGeom, geometryAPI) + val borderChips = indexSystem.getBorderChips(geometry, borderIndices, keepCoreGeom, geometryAPI) coreChips ++ borderChips } diff --git a/src/main/scala/com/databricks/labs/mosaic/core/index/H3IndexSystem.scala b/src/main/scala/com/databricks/labs/mosaic/core/index/H3IndexSystem.scala index 1f032b3de..c953f45f9 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/index/H3IndexSystem.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/index/H3IndexSystem.scala @@ -107,12 +107,17 @@ object H3IndexSystem extends IndexSystem with Serializable { override def getBorderChips( geometry: MosaicGeometry, borderIndices: util.List[java.lang.Long], + keepCoreGeom: Boolean, geometryAPI: GeometryAPI ): Seq[MosaicChip] = { val intersections = for (index <- borderIndices.asScala) yield { val indexGeom = indexToGeometry(index, geometryAPI) - val chip = MosaicChip(isCore = false, index, indexGeom) - chip.intersection(geometry) + val intersect = geometry.intersection(indexGeom) + val isCore = intersect.equals(indexGeom) + + val chipGeom = if (!isCore || keepCoreGeom) intersect else null + + MosaicChip(isCore = isCore, index, chipGeom) } intersections.filterNot(_.isEmpty) } @@ -142,8 +147,11 @@ object H3IndexSystem extends IndexSystem with Serializable { * @return * A core area representation via [[MosaicChip]] set. */ - override def getCoreChips(coreIndices: util.List[lang.Long]): Seq[MosaicChip] = { - coreIndices.asScala.map(MosaicChip(true, _, null)) + override def getCoreChips(coreIndices: util.List[lang.Long], keepCoreGeom: Boolean, geometryAPI: GeometryAPI): Seq[MosaicChip] = { + coreIndices.asScala.map(index => { + val indexGeom = if (keepCoreGeom) indexToGeometry(index, geometryAPI) else null + MosaicChip(isCore = true, index, indexGeom) + }) } /** diff --git a/src/main/scala/com/databricks/labs/mosaic/core/index/IndexSystem.scala b/src/main/scala/com/databricks/labs/mosaic/core/index/IndexSystem.scala index 7144596e9..91ee269bf 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/index/IndexSystem.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/index/IndexSystem.scala @@ -88,7 +88,7 @@ trait IndexSystem extends Serializable { * @return * A border area representation via [[MosaicChip]] set. */ - def getBorderChips(geometry: MosaicGeometry, borderIndices: util.List[java.lang.Long], geometryAPI: GeometryAPI): Seq[MosaicChip] + def getBorderChips(geometry: MosaicGeometry, borderIndices: util.List[java.lang.Long], keepCoreGeom: Boolean, geometryAPI: GeometryAPI): Seq[MosaicChip] /** * Return a set of [[MosaicChip]] instances computed based on the core @@ -101,7 +101,7 @@ trait IndexSystem extends Serializable { * @return * A core area representation via [[MosaicChip]] set. */ - def getCoreChips(coreIndices: util.List[java.lang.Long]): Seq[MosaicChip] + def getCoreChips(coreIndices: util.List[java.lang.Long], keepCoreGeom: Boolean, geometryAPI: GeometryAPI): Seq[MosaicChip] /** * Get the geometry corresponding to the index with the input id. diff --git a/src/main/scala/com/databricks/labs/mosaic/core/types/model/MosaicChip.scala b/src/main/scala/com/databricks/labs/mosaic/core/types/model/MosaicChip.scala index 804d5b67a..d64d993af 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/types/model/MosaicChip.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/types/model/MosaicChip.scala @@ -17,26 +17,6 @@ import org.apache.spark.sql.catalyst.InternalRow */ case class MosaicChip(isCore: Boolean, index: Long, geom: MosaicGeometry) { - /** - * Perform an intersection with a geometry, and if intersection is non - * empty and the chip is not a core set chip then extract the chip - * geometry. - * - * @param other - * Geometry instance. - * @return - * A Mosaic Chip instance. - */ - def intersection(other: MosaicGeometry): MosaicChip = { - val intersect = other.intersection(geom) - val isCore = intersect.equals(geom) - if (isCore) { - MosaicChip(isCore, index, null) - } else { - MosaicChip(isCore, index, intersect) - } - } - /** * Indicates whether the chip is outside of the representation of the * geometry it was generated to represent (ie false positive index). diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplode.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplode.scala index 01bf1c379..360ce5b64 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplode.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplode.scala @@ -66,8 +66,9 @@ object MosaicExplode { val geometry = geometryAPI.geometry(inputData, geomType) val resolution = inputData.getInt(1) + val keepCoreGeom = inputData.getBoolean(2) - val chips = Mosaic.mosaicFill(geometry, resolution, indexSystem, geometryAPI) + val chips = Mosaic.mosaicFill(geometry, resolution, keepCoreGeom, indexSystem, geometryAPI) chips.map(row => InternalRow.fromSeq(Seq(row.serialize))) } diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala index cbc081f92..11f27a692 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala @@ -70,13 +70,14 @@ case class MosaicFill(geom: Expression, resolution: Expression, indexSystemName: * [[com.databricks.labs.mosaic.core.types.model.MosaicChip]]. */ // noinspection DuplicatedCode - override def nullSafeEval(input1: Any, input2: Any): Any = { + override def nullSafeEval(input1: Any, input2: Any, input3: Any): Any = { val resolution: Int = H3IndexSystem.getResolution(input2) + val keepCoreGeom: Int = H3IndexSystem.getResolution(keepCoreGeom) val indexSystem = IndexSystemID.getIndexSystem(IndexSystemID(indexSystemName)) val geometryAPI = GeometryAPI(geometryAPIName) val geometry = geometryAPI.geometry(input1, left.dataType) - val chips = Mosaic.mosaicFill(geometry, resolution, indexSystem, geometryAPI) + val chips = Mosaic.mosaicFill(geometry, resolution, keepCoreGeom = true, indexSystem, geometryAPI) val serialized = InternalRow.fromSeq( Seq( diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala index 3240c3c77..57df6e877 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala @@ -379,9 +379,13 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends /** IndexSystem and GeometryAPI Specific methods */ def mosaic_explode(geom: Column, resolution: Column): Column = - ColumnAdapter(MosaicExplode(struct(geom, resolution).expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicExplode(struct(geom, resolution, lit(true)).expr, indexSystem.name, geometryAPI.name)) + def mosaic_explode(geom: Column, resolution: Column, keepCoreGeometries: Boolean): Column = + ColumnAdapter(MosaicExplode(struct(geom, resolution, lit(keepCoreGeometries)).expr, indexSystem.name, geometryAPI.name)) def mosaic_explode(geom: Column, resolution: Int): Column = - ColumnAdapter(MosaicExplode(struct(geom, lit(resolution)).expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicExplode(struct(geom, lit(resolution), lit(true)).expr, indexSystem.name, geometryAPI.name)) + def mosaic_explode(geom: Column, resolution: Int, keepCoreGeometries: Boolean): Column = + ColumnAdapter(MosaicExplode(struct(geom, lit(resolution), lit(keepCoreGeometries)).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Column): Column = ColumnAdapter(MosaicFill(geom.expr, resolution.expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Int): Column = diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplodeBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplodeBehaviors.scala index 25a7c092c..6e49417ee 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplodeBehaviors.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplodeBehaviors.scala @@ -4,9 +4,10 @@ import com.databricks.labs.mosaic.functions.MosaicContext import com.databricks.labs.mosaic.test.mocks.getBoroughs import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers._ - -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} + trait MosaicExplodeBehaviors { this: AnyFlatSpec => @@ -37,6 +38,31 @@ trait MosaicExplodeBehaviors { boroughs.collect().length should be < mosaics2.length } + def wktDecomposeNoNulls(mosaicContext: => MosaicContext, spark: => SparkSession): Unit = { + val mc = mosaicContext + import mc.functions._ + mosaicContext.register(spark) + + val rdd = spark.sparkContext.makeRDD(Seq( + Row("POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))") + )) + val schema = StructType( + List( + StructField("wkt", StringType) + ) + ) + val df = spark.createDataFrame(rdd, schema) + + val emptyChips = df + .select( + mosaic_explode(col("wkt"), 4) + ) + .filter(col("index.wkb").isNull) + .count() + + emptyChips should equal(0) + } + def wkbDecompose(mosaicContext: => MosaicContext, spark: => SparkSession): Unit = { val mc = mosaicContext import mc.functions._ diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicExplode.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicExplode.scala index 8d77f05c4..dccd3a563 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicExplode.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicExplode.scala @@ -13,6 +13,11 @@ class TestMosaicExplode extends AnyFlatSpec with MosaicExplodeBehaviors with Spa it should behave like wktDecompose(MosaicContext.build(H3IndexSystem, JTS), spark) } + "Mosaic_Explode" should "decompose wkt geometries with no null for any index system and any geometry API" in { + it should behave like wktDecomposeNoNulls(MosaicContext.build(H3IndexSystem, ESRI), spark) + it should behave like wktDecomposeNoNulls(MosaicContext.build(H3IndexSystem, JTS), spark) + } + "Mosaic_Explode" should "decompose wkb geometries for any index system and any geometry API" in { it should behave like wkbDecompose(MosaicContext.build(H3IndexSystem, ESRI), spark) it should behave like wkbDecompose(MosaicContext.build(H3IndexSystem, JTS), spark) From b9423bdb123eed5ff59f490a21378179d8856d57 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Thu, 7 Apr 2022 17:20:02 +0200 Subject: [PATCH 02/12] Binary to Ternary expression for MosaicFill --- .../mosaic/expressions/index/MosaicFill.scala | 52 ++++++++++--------- .../labs/mosaic/functions/MosaicContext.scala | 12 +++-- 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala index 34d036e34..58b554c8c 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala @@ -10,7 +10,7 @@ import org.locationtech.jts.geom.Geometry import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ - BinaryExpression, + TernaryExpression, ExpectsInputTypes, Expression, ExpressionDescription, @@ -30,33 +30,35 @@ import org.apache.spark.sql.types._ """, since = "1.0" ) -case class MosaicFill(geom: Expression, resolution: Expression, indexSystemName: String, geometryAPIName: String) - extends BinaryExpression +case class MosaicFill(geom: Expression, resolution: Expression, keepCoreGeom: Expression, indexSystemName: String, geometryAPIName: String) + extends TernaryExpression with ExpectsInputTypes with NullIntolerant with CodegenFallback { // noinspection DuplicatedCode override def inputTypes: Seq[DataType] = - (left.dataType, right.dataType) match { - case (BinaryType, IntegerType) => Seq(BinaryType, IntegerType) - case (StringType, IntegerType) => Seq(StringType, IntegerType) - case (HexType, IntegerType) => Seq(HexType, IntegerType) - case (InternalGeometryType, IntegerType) => Seq(InternalGeometryType, IntegerType) - case _ => throw new IllegalArgumentException(s"Not supported data type: (${left.dataType}, ${right.dataType}).") + (first.dataType, second.dataType, third.dataType) match { + case (BinaryType, IntegerType, BooleanType) => Seq(BinaryType, IntegerType, BooleanType) + case (StringType, IntegerType, BooleanType) => Seq(StringType, IntegerType, BooleanType) + case (HexType, IntegerType, BooleanType) => Seq(HexType, IntegerType, BooleanType) + case (InternalGeometryType, IntegerType, BooleanType) => Seq(InternalGeometryType, IntegerType, BooleanType) + case _ => throw new IllegalArgumentException(s"Not supported data type: (${first.dataType}, ${second.dataType}, ${third.dataType}).") } - override def right: Expression = resolution + override def first: Expression = resolution - override def left: Expression = geom + override def second: Expression = geom + + override def third: Expression = keepCoreGeom /** Expression output DataType. */ override def dataType: DataType = MosaicType - override def toString: String = s"h3_mosaicfill($geom, $resolution)" + override def toString: String = s"mosaicfill($geom, $resolution, $keepCoreGeom)" /** Overridden to ensure [[Expression.sql]] is properly formatted. */ - override def prettyName: String = "h3_mosaicfill" + override def prettyName: String = "mosaicfill" /** * Type-wise differences in evaluation are only present on the input data @@ -68,7 +70,9 @@ case class MosaicFill(geom: Expression, resolution: Expression, indexSystemName: * @param input1 * Any instance containing the geometry. * @param input2 - * Any instance containing the resolution + * Any instance containing the resolution + * @param input3 + * Any instance defining if core chips should be geometries or nulls * @return * A set of serialized * [[com.databricks.labs.mosaic.core.types.model.MosaicChip]]. @@ -76,16 +80,16 @@ case class MosaicFill(geom: Expression, resolution: Expression, indexSystemName: // noinspection DuplicatedCode override def nullSafeEval(input1: Any, input2: Any, input3: Any): Any = { val resolution: Int = H3IndexSystem.getResolution(input2) - val keepCoreGeom: Int = H3IndexSystem.getResolution(keepCoreGeom) + val keepCoreGeom: Boolean = input3.asInstanceOf[Boolean] val indexSystem = IndexSystemID.getIndexSystem(IndexSystemID(indexSystemName)) val geometryAPI = GeometryAPI(geometryAPIName) - val geometry = geometryAPI.geometry(input1, left.dataType) + val geometry = geometryAPI.geometry(input1, first.dataType) val chips = GeometryTypeEnum.fromString(geometry.getGeometryType) match { case LINESTRING => Mosaic.lineFill(geometry, resolution, indexSystem, geometryAPI) case MULTILINESTRING => Mosaic.lineFill(geometry, resolution, indexSystem, geometryAPI) - case _ => Mosaic.mosaicFill(geometry, resolution, keepCoreGeom = true, indexSystem, geometryAPI) + case _ => Mosaic.mosaicFill(geometry, resolution, keepCoreGeom, indexSystem, geometryAPI) } val serialized = InternalRow.fromSeq( @@ -98,14 +102,14 @@ case class MosaicFill(geom: Expression, resolution: Expression, indexSystemName: } override def makeCopy(newArgs: Array[AnyRef]): Expression = { - val asArray = newArgs.take(2).map(_.asInstanceOf[Expression]) - val res = MosaicFill(asArray(0), asArray(1), indexSystemName, geometryAPIName) + val asArray = newArgs.take(3).map(_.asInstanceOf[Expression]) + val res = MosaicFill(asArray(0), asArray(1), asArray(2), indexSystemName, geometryAPIName) res.copyTagsFrom(this) res } - override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression = - copy(geom = newLeft, resolution = newRight) + override protected def withNewChildrenInternal(newFirst: Expression, newSecond: Expression, newThird: Expression): Expression = + copy(geom = newFirst, resolution = newSecond) } @@ -116,14 +120,14 @@ object MosaicFill { new ExpressionInfo( classOf[IndexGeometry].getCanonicalName, db.orNull, - "mosaic_fill", + "mosaicfill", """ - | _FUNC_(geometry, resolution) - Returns the 2 set representation of geometry at resolution. + | _FUNC_(geometry, resolution, keepCoreGeom) - Returns the 2 set representation of geometry at resolution. """.stripMargin, "", """ | Examples: - | > SELECT _FUNC_(a, b); + | > SELECT _FUNC_(a, b, c); | [{index_id, is_border, chip_geom}, {index_id, is_border, chip_geom}, ..., {index_id, is_border, chip_geom}] | """.stripMargin, "", diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala index 81bf6c9d6..33ffffe49 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala @@ -269,12 +269,12 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends FunctionIdentifier("mosaic_explode", database), MosaicExplode.registryExpressionInfo(database), (exprs: Seq[Expression]) => - MosaicExplode(struct(ColumnAdapter(exprs(0)), ColumnAdapter(exprs(1))).expr, indexSystem.name, geometryAPI.name) + MosaicExplode(struct(ColumnAdapter(exprs(0)), ColumnAdapter(exprs(1)), ColumnAdapter(exprs(2))).expr, indexSystem.name, geometryAPI.name) ) registry.registerFunction( FunctionIdentifier("mosaicfill", database), MosaicFill.registryExpressionInfo(database), - (exprs: Seq[Expression]) => MosaicFill(exprs(0), exprs(1), indexSystem.name, geometryAPI.name) + (exprs: Seq[Expression]) => MosaicFill(exprs(0), exprs(1), exprs(2), indexSystem.name, geometryAPI.name) ) registry.registerFunction( FunctionIdentifier("point_index_lonlat", database), @@ -387,9 +387,13 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends def mosaic_explode(geom: Column, resolution: Int, keepCoreGeometries: Boolean): Column = ColumnAdapter(MosaicExplode(struct(geom, lit(resolution), lit(keepCoreGeometries)).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Column): Column = - ColumnAdapter(MosaicFill(geom.expr, resolution.expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicFill(geom.expr, resolution.expr, lit(true).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Int): Column = - ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, lit(true).expr, indexSystem.name, geometryAPI.name)) + def mosaicfill(geom: Column, resolution: Column, keepCoreGeometries: Boolean): Column = + ColumnAdapter(MosaicFill(geom.expr, resolution.expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) + def mosaicfill(geom: Column, resolution: Int, keepCoreGeometries: Boolean): Column = + ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) def point_index_geom(point: Column, resolution: Column): Column = ColumnAdapter(PointIndexGeom(point.expr, resolution.expr, indexSystem.name, geometryAPI.name)) def point_index_geom(point: Column, resolution: Int): Column = From 5845eaa75f3ecac6036159c75c1b0d4b1bc2039f Mon Sep 17 00:00:00 2001 From: sllynn Date: Thu, 7 Apr 2022 17:18:25 +0100 Subject: [PATCH 03/12] fixed argument order in expression, added matching to sql registration --- .../mosaic/expressions/index/MosaicFill.scala | 9 +++++---- .../labs/mosaic/functions/MosaicContext.scala | 16 ++++++++++++---- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala index 58b554c8c..ab18e25c3 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala @@ -43,12 +43,13 @@ case class MosaicFill(geom: Expression, resolution: Expression, keepCoreGeom: Ex case (StringType, IntegerType, BooleanType) => Seq(StringType, IntegerType, BooleanType) case (HexType, IntegerType, BooleanType) => Seq(HexType, IntegerType, BooleanType) case (InternalGeometryType, IntegerType, BooleanType) => Seq(InternalGeometryType, IntegerType, BooleanType) - case _ => throw new IllegalArgumentException(s"Not supported data type: (${first.dataType}, ${second.dataType}, ${third.dataType}).") + case _ => + throw new IllegalArgumentException(s"Not supported data type: (${first.dataType}, ${second.dataType}, ${third.dataType}).") } - override def first: Expression = resolution + override def first: Expression = geom - override def second: Expression = geom + override def second: Expression = resolution override def third: Expression = keepCoreGeom @@ -70,7 +71,7 @@ case class MosaicFill(geom: Expression, resolution: Expression, keepCoreGeom: Ex * @param input1 * Any instance containing the geometry. * @param input2 - * Any instance containing the resolution + * Any instance containing the resolution * @param input3 * Any instance defining if core chips should be geometries or nulls * @return diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala index 33ffffe49..553fdba78 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala @@ -269,12 +269,20 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends FunctionIdentifier("mosaic_explode", database), MosaicExplode.registryExpressionInfo(database), (exprs: Seq[Expression]) => - MosaicExplode(struct(ColumnAdapter(exprs(0)), ColumnAdapter(exprs(1)), ColumnAdapter(exprs(2))).expr, indexSystem.name, geometryAPI.name) + MosaicExplode( + struct(ColumnAdapter(exprs(0)), ColumnAdapter(exprs(1)), ColumnAdapter(exprs(2))).expr, + indexSystem.name, + geometryAPI.name + ) ) registry.registerFunction( FunctionIdentifier("mosaicfill", database), MosaicFill.registryExpressionInfo(database), - (exprs: Seq[Expression]) => MosaicFill(exprs(0), exprs(1), exprs(2), indexSystem.name, geometryAPI.name) + (exprs: Seq[Expression]) => + exprs match { + case e if e.length == 2 => MosaicFill(e(0), e(1), lit(true).expr, indexSystem.name, geometryAPI.name) + case e if e.length == 3 => MosaicFill(e(0), e(1), e(2), indexSystem.name, geometryAPI.name) + } ) registry.registerFunction( FunctionIdentifier("point_index_lonlat", database), @@ -379,13 +387,13 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends /** IndexSystem and GeometryAPI Specific methods */ def mosaic_explode(geom: Column, resolution: Column): Column = - ColumnAdapter(MosaicExplode(struct(geom, resolution, lit(true)).expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicExplode(struct(geom, resolution, lit(true)).expr, indexSystem.name, geometryAPI.name)) def mosaic_explode(geom: Column, resolution: Column, keepCoreGeometries: Boolean): Column = ColumnAdapter(MosaicExplode(struct(geom, resolution, lit(keepCoreGeometries)).expr, indexSystem.name, geometryAPI.name)) def mosaic_explode(geom: Column, resolution: Int): Column = ColumnAdapter(MosaicExplode(struct(geom, lit(resolution), lit(true)).expr, indexSystem.name, geometryAPI.name)) def mosaic_explode(geom: Column, resolution: Int, keepCoreGeometries: Boolean): Column = - ColumnAdapter(MosaicExplode(struct(geom, lit(resolution), lit(keepCoreGeometries)).expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicExplode(struct(geom, lit(resolution), lit(keepCoreGeometries)).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Column): Column = ColumnAdapter(MosaicFill(geom.expr, resolution.expr, lit(true).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Int): Column = From bcc1379f9066cc7f52fa7f245c5316958d1038a3 Mon Sep 17 00:00:00 2001 From: sllynn Date: Thu, 7 Apr 2022 17:24:54 +0100 Subject: [PATCH 04/12] added matcher for mosaic_explode --- .../labs/mosaic/functions/MosaicContext.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala index 553fdba78..42b7d878f 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala @@ -269,11 +269,16 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends FunctionIdentifier("mosaic_explode", database), MosaicExplode.registryExpressionInfo(database), (exprs: Seq[Expression]) => - MosaicExplode( - struct(ColumnAdapter(exprs(0)), ColumnAdapter(exprs(1)), ColumnAdapter(exprs(2))).expr, - indexSystem.name, - geometryAPI.name - ) + exprs match { + case e if e.length == 2 => + MosaicExplode(struct(ColumnAdapter(e(0)), ColumnAdapter(e(1)), lit(true)).expr, indexSystem.name, geometryAPI.name) + case e if e.length == 3 => + MosaicExplode( + struct(ColumnAdapter(e(0)), ColumnAdapter(e(1)), ColumnAdapter(e(1))).expr, + indexSystem.name, + geometryAPI.name + ) + } ) registry.registerFunction( FunctionIdentifier("mosaicfill", database), From 7c0d0ba656bf73d2df4d674580f9efdd8d310eb7 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Tue, 12 Apr 2022 12:42:01 +0200 Subject: [PATCH 05/12] Fixed mosaic copy --- .../databricks/labs/mosaic/expressions/index/MosaicFill.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala index ab18e25c3..25070658e 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicFill.scala @@ -110,7 +110,7 @@ case class MosaicFill(geom: Expression, resolution: Expression, keepCoreGeom: Ex } override protected def withNewChildrenInternal(newFirst: Expression, newSecond: Expression, newThird: Expression): Expression = - copy(geom = newFirst, resolution = newSecond) + copy(geom = newFirst, resolution = newSecond, keepCoreGeom = newThird) } From 99fb2d4b6cebf2f3231cf9ed2c2d026f846d44e5 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Tue, 12 Apr 2022 17:20:04 +0200 Subject: [PATCH 06/12] Updated docs with refactored mosaicfill --- docs/source/api/spatial-functions.rst | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/source/api/spatial-functions.rst b/docs/source/api/spatial-functions.rst index d6afde8d1..bd1de8c2c 100644 --- a/docs/source/api/spatial-functions.rst +++ b/docs/source/api/spatial-functions.rst @@ -901,7 +901,7 @@ mosaicfill >>> df = spark.createDataFrame([{'wkt': 'MULTIPOLYGON (((30 20, 45 40, 10 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5)))'}]) >>> df.select(mosaicfill('wkt', lit(0))).printSchema() root - |-- h3_mosaicfill(wkt, 0): mosaic (nullable = true) + |-- mosaicfill(wkt, 0): mosaic (nullable = true) | |-- chips: array (nullable = true) | | |-- element: mosaic_chip (containsNull = true) | | | |-- is_core: boolean (nullable = true) @@ -911,7 +911,7 @@ mosaicfill >>> df.select(mosaicfill('wkt', lit(0))).show() +---------------------+ - |h3_mosaicfill(wkt, 0)| + |mosaicfill(wkt, 0) | +---------------------+ | {[{false, 5774810...| +---------------------+ @@ -921,7 +921,7 @@ mosaicfill >>> val df = List(("MULTIPOLYGON (((30 20, 45 40, 10 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5)))")).toDF("wkt") >>> df.select(mosaicfill($"wkt", lit(0))).printSchema root - |-- h3_mosaicfill(wkt, 0): mosaic (nullable = true) + |-- mosaicfill(wkt, 0): mosaic (nullable = true) | |-- chips: array (nullable = true) | | |-- element: mosaic_chip (containsNull = true) | | | |-- is_core: boolean (nullable = true) @@ -930,7 +930,7 @@ mosaicfill >>> df.select(mosaicfill($"wkt", lit(0))).show() +---------------------+ - |h3_mosaicfill(wkt, 0)| + |mosaicfill(wkt, 0) | +---------------------+ | {[{false, 5774810...| +---------------------+ @@ -939,7 +939,7 @@ mosaicfill >>> SELECT mosaicfill("MULTIPOLYGON (((30 20, 45 40, 10 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5)))", 0) +---------------------+ - |h3_mosaicfill(wkt, 0)| + |mosaicfill(wkt, 0) | +---------------------+ | {[{false, 5774810...| +---------------------+ From d2f5f103b267a7178191b9f0120b51b314fc6335 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Thu, 14 Apr 2022 16:21:31 +0200 Subject: [PATCH 07/12] Added python bindings for keep_core_geometries --- python/mosaic/api/functions.py | 15 ++++++++++----- python/test/test_functions.py | 2 ++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/python/mosaic/api/functions.py b/python/mosaic/api/functions.py index 71acc422e..511e8ccce 100644 --- a/python/mosaic/api/functions.py +++ b/python/mosaic/api/functions.py @@ -519,7 +519,7 @@ def polyfill(geom: ColumnOrName, resolution: ColumnOrName) -> Column: ) -def mosaic_explode(geom: ColumnOrName, resolution: ColumnOrName) -> Column: +def mosaic_explode(geom: ColumnOrName, resolution: ColumnOrName, keep_core_geometries: bool = True) -> Column: """ Generates: - a set of core indices that are fully contained by `geom`; and @@ -531,21 +531,23 @@ def mosaic_explode(geom: ColumnOrName, resolution: ColumnOrName) -> Column: ---------- geom : Column resolution : Column (IntegerType) + keep_core_geometries : bool Returns ------- Column (StructType[is_core: BooleanType, h3: LongType, wkb: BinaryType]) - `wkb` in this struct represents a border chip geometry and is null for all 'core' chips. + `wkb` in this struct represents a border chip geometry and is null for all 'core' chips + if keep_core_geometries is set to False. """ return config.mosaic_context.invoke_function( "mosaic_explode", pyspark_to_java_column(geom), pyspark_to_java_column(resolution), + keep_core_geometries ) - -def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName) -> Column: +def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName, keep_core_geometries: bool = True) -> Column: """ Generates: - a set of core indices that are fully contained by `geom`; and @@ -557,15 +559,18 @@ def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName) -> Column: ---------- geom : Column resolution : Column (IntegerType) + keep_core_geometries : bool Returns ------- Column (ArrayType[StructType[is_core: BooleanType, h3: LongType, wkb: BinaryType]]) - `wkb` in this struct represents a border chip geometry and is null for all 'core' chips. + `wkb` in this struct represents a border chip geometry and is null for all 'core' chips + if keep_core_geometries is set to False. """ return config.mosaic_context.invoke_function( "mosaicfill", pyspark_to_java_column(geom), pyspark_to_java_column(resolution), + keep_core_geometries ) diff --git a/python/test/test_functions.py b/python/test/test_functions.py index 63ea1265a..1645f1193 100644 --- a/python/test/test_functions.py +++ b/python/test/test_functions.py @@ -63,6 +63,8 @@ def test_st_bindings_happy_flow(self): .withColumn("polyfill", api.polyfill("wkt", lit(1))) .withColumn("mosaic_explode", api.mosaic_explode("wkt", lit(1))) .withColumn("mosaicfill", api.mosaicfill("wkt", lit(1))) + .withColumn("mosaic_explode_no_core_chips", api.mosaic_explode("wkt", lit(1), False)) + .withColumn("mosaicfill_no_core_chips", api.mosaicfill("wkt", lit(1), False)) ) self.assertEqual(result.count(), 1) From 0e0aef0e6cf8916ad6bc1995ed2a962303a5f8bb Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Thu, 14 Apr 2022 16:46:51 +0200 Subject: [PATCH 08/12] Fixed mosaicfill for column argument --- python/mosaic/api/functions.py | 12 ++++++++---- python/test/test_functions.py | 1 + .../labs/mosaic/functions/MosaicContext.scala | 8 ++++++-- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/python/mosaic/api/functions.py b/python/mosaic/api/functions.py index 511e8ccce..9896d1375 100644 --- a/python/mosaic/api/functions.py +++ b/python/mosaic/api/functions.py @@ -1,8 +1,8 @@ import inspect -from typing import overload +from typing import overload, Any from pyspark.sql import Column -from pyspark.sql.functions import col, _to_java_column as pyspark_to_java_column +from pyspark.sql.functions import lit, _to_java_column as pyspark_to_java_column from mosaic.config import config from mosaic.utils.types import ColumnOrName, as_typed_col @@ -547,7 +547,7 @@ def mosaic_explode(geom: ColumnOrName, resolution: ColumnOrName, keep_core_geome keep_core_geometries ) -def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName, keep_core_geometries: bool = True) -> Column: +def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName, keep_core_geometries: Any = True) -> Column: """ Generates: - a set of core indices that are fully contained by `geom`; and @@ -568,9 +568,13 @@ def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName, keep_core_geometrie if keep_core_geometries is set to False. """ + + if(type(keep_core_geometries) == bool): + keep_core_geometries = lit(keep_core_geometries) + return config.mosaic_context.invoke_function( "mosaicfill", pyspark_to_java_column(geom), pyspark_to_java_column(resolution), - keep_core_geometries + pyspark_to_java_column(keep_core_geometries) ) diff --git a/python/test/test_functions.py b/python/test/test_functions.py index 1645f1193..a2c973005 100644 --- a/python/test/test_functions.py +++ b/python/test/test_functions.py @@ -65,6 +65,7 @@ def test_st_bindings_happy_flow(self): .withColumn("mosaicfill", api.mosaicfill("wkt", lit(1))) .withColumn("mosaic_explode_no_core_chips", api.mosaic_explode("wkt", lit(1), False)) .withColumn("mosaicfill_no_core_chips", api.mosaicfill("wkt", lit(1), False)) + .withColumn("mosaicfill_no_core_chips_bool", api.mosaicfill("wkt", lit(1), lit(False))) ) self.assertEqual(result.count(), 1) diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala index 42b7d878f..66d62e49f 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala @@ -404,9 +404,13 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends def mosaicfill(geom: Column, resolution: Int): Column = ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, lit(true).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Column, keepCoreGeometries: Boolean): Column = - ColumnAdapter(MosaicFill(geom.expr, resolution.expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicFill(geom.expr, resolution.expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Int, keepCoreGeometries: Boolean): Column = - ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) + def mosaicfill(geom: Column, resolution: Column, keepCoreGeometries: Column): Column = + ColumnAdapter(MosaicFill(geom.expr, resolution.expr, keepCoreGeometries.expr, indexSystem.name, geometryAPI.name)) + def mosaicfill(geom: Column, resolution: Int, keepCoreGeometries: Column): Column = + ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, keepCoreGeometries.expr, indexSystem.name, geometryAPI.name)) def point_index_geom(point: Column, resolution: Column): Column = ColumnAdapter(PointIndexGeom(point.expr, resolution.expr, indexSystem.name, geometryAPI.name)) def point_index_geom(point: Column, resolution: Int): Column = From 33f948c4c6bc004bbf4a0e3fb78fa819eca0bd70 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Tue, 19 Apr 2022 12:17:12 +0200 Subject: [PATCH 09/12] Tested MosaicExplode expressions --- .../expressions/index/MosaicExplode.scala | 32 +++++++------- .../labs/mosaic/functions/MosaicContext.scala | 4 ++ .../index/MosaicExplodeBehaviors.scala | 42 +++++++++++++++++-- .../index/MosaicFillBehaviors.scala | 27 ++++++++++++ .../expressions/index/TestMosaicExplode.scala | 5 +++ .../expressions/index/TestMosaicFill.scala | 5 +++ 6 files changed, 97 insertions(+), 18 deletions(-) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplode.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplode.scala index a8a4e297b..af3b3cdf9 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplode.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplode.scala @@ -93,14 +93,15 @@ object MosaicExplode { val fields = child.dataType.asInstanceOf[StructType].fields val geomType = fields.head val resolutionType = fields(1) + val keepCoreGeom = fields(2) - (geomType.dataType, resolutionType.dataType) match { - case (BinaryType, IntegerType) => TypeCheckResult.TypeCheckSuccess - case (StringType, IntegerType) => TypeCheckResult.TypeCheckSuccess - case (HexType, IntegerType) => TypeCheckResult.TypeCheckSuccess - case (InternalGeometryType, IntegerType) => TypeCheckResult.TypeCheckSuccess + (geomType.dataType, resolutionType.dataType, keepCoreGeom.dataType) match { + case (BinaryType, IntegerType, BooleanType) => TypeCheckResult.TypeCheckSuccess + case (StringType, IntegerType, BooleanType) => TypeCheckResult.TypeCheckSuccess + case (HexType, IntegerType, BooleanType) => TypeCheckResult.TypeCheckSuccess + case (InternalGeometryType, IntegerType, BooleanType) => TypeCheckResult.TypeCheckSuccess case _ => TypeCheckResult.TypeCheckFailure( - s"Input to h3 mosaic explode should be (geometry, resolution) pair. " + + s"Input to h3 mosaic explode should be (geometry, resolution, keepCoreGeom) pair. " + s"Geometry type can be WKB, WKT, Hex or Coords. Provided type was: ${child.dataType.catalogString}" ) } @@ -123,14 +124,15 @@ object MosaicExplode { val fields = child.dataType.asInstanceOf[StructType].fields val geomType = fields.head val resolutionType = fields(1) + val keepCoreGeom = fields(2) - (geomType.dataType, resolutionType.dataType) match { - case (BinaryType, IntegerType) => StructType(Array(StructField("index", ChipType))) - case (StringType, IntegerType) => StructType(Array(StructField("index", ChipType))) - case (HexType, IntegerType) => StructType(Array(StructField("index", ChipType))) - case (InternalGeometryType, IntegerType) => StructType(Array(StructField("index", ChipType))) + (geomType.dataType, resolutionType.dataType, keepCoreGeom.dataType) match { + case (BinaryType, IntegerType, BooleanType) => StructType(Array(StructField("index", ChipType))) + case (StringType, IntegerType, BooleanType) => StructType(Array(StructField("index", ChipType))) + case (HexType, IntegerType, BooleanType) => StructType(Array(StructField("index", ChipType))) + case (InternalGeometryType, IntegerType, BooleanType) => StructType(Array(StructField("index", ChipType))) case _ => throw new Error( - s"Input to h3 mosaic explode should be (geometry, resolution) pair. " + + s"Input to h3 mosaic explode should be (geometry, resolution, keepCoreGeom) pair. " + s"Geometry type can be WKB, WKT, Hex or Coords. Provided type was: ${child.dataType.catalogString}" ) } @@ -150,14 +152,14 @@ object MosaicExplode { db.orNull, "mosaic_explode", """ - | _FUNC_(struct(geometry, resolution)) - Generates the h3 mosaic chips for the input geometry - | at a given resolution. Geometry and resolution are provided via struct wrapper to ensure + | _FUNC_(struct(geometry, resolution, keepCoreGeom)) - Generates the h3 mosaic chips for the input + | geometry at a given resolution. Geometry and resolution are provided via struct wrapper to ensure | UnaryExpression API is respected. """.stripMargin, "", """ | Examples: - | > SELECT _FUNC_(a, b); + | > SELECT _FUNC_(a, b, c); | {index_id, is_border, chip_geom} | {index_id, is_border, chip_geom} | ... diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala index 232d2f8a8..e2413e288 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala @@ -413,10 +413,14 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends ColumnAdapter(MosaicExplode(struct(geom, resolution, lit(true)).expr, indexSystem.name, geometryAPI.name)) def mosaic_explode(geom: Column, resolution: Column, keepCoreGeometries: Boolean): Column = ColumnAdapter(MosaicExplode(struct(geom, resolution, lit(keepCoreGeometries)).expr, indexSystem.name, geometryAPI.name)) + def mosaic_explode(geom: Column, resolution: Column, keepCoreGeometries: Column): Column = + ColumnAdapter(MosaicExplode(struct(geom, resolution, keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) def mosaic_explode(geom: Column, resolution: Int): Column = ColumnAdapter(MosaicExplode(struct(geom, lit(resolution), lit(true)).expr, indexSystem.name, geometryAPI.name)) def mosaic_explode(geom: Column, resolution: Int, keepCoreGeometries: Boolean): Column = ColumnAdapter(MosaicExplode(struct(geom, lit(resolution), lit(keepCoreGeometries)).expr, indexSystem.name, geometryAPI.name)) + def mosaic_explode(geom: Column, resolution: Int, keepCoreGeometries: Column): Column = + ColumnAdapter(MosaicExplode(struct(geom, lit(resolution), keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Column): Column = ColumnAdapter(MosaicFill(geom.expr, resolution.expr, lit(true).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Int): Column = diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplodeBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplodeBehaviors.scala index 123227a30..b1c3c638d 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplodeBehaviors.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicExplodeBehaviors.scala @@ -53,14 +53,50 @@ trait MosaicExplodeBehaviors { ) val df = spark.createDataFrame(rdd, schema) - val emptyChips = df + val noEmptyChips = df .select( - mosaic_explode(col("wkt"), 4) + mosaic_explode(col("wkt"), 4, true) ) .filter(col("index.wkb").isNull) .count() - emptyChips should equal(0) + noEmptyChips should equal(0) + + val emptyChips = df + .select( + mosaic_explode(col("wkt"), 4, false) + ) + .filter(col("index.wkb").isNull) + + emptyChips.collect().length should be > 0 + } + + def wktDecomposeKeepCoreParamExpression(mosaicContext: => MosaicContext, spark: => SparkSession): Unit = { + val mc = mosaicContext + import mc.functions._ + mosaicContext.register(spark) + + val rdd = spark.sparkContext.makeRDD(Seq( + Row("POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))") + )) + val schema = StructType( + List( + StructField("wkt", StringType) + ) + ) + val df = spark.createDataFrame(rdd, schema) + + val noEmptyChips = df + .select( + expr("mosaic_explode(wkt, 4, true)") + ) + noEmptyChips.collect().length should be > 0 + + val noEmptyChips_2 = df + .select( + expr("mosaic_explode(wkt, 4, false)") + ) + noEmptyChips_2.collect().length should be > 0 } def lineDecompose(mosaicContext: => MosaicContext, spark: => SparkSession): Unit = { diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicFillBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicFillBehaviors.scala index 18351bf03..002b70466 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicFillBehaviors.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/index/MosaicFillBehaviors.scala @@ -115,4 +115,31 @@ trait MosaicFillBehaviors { boroughs.collect().length shouldEqual mosaics2.length } + + def wktMosaicFillKeepCoreGeom(mosaicContext: => MosaicContext, spark: => SparkSession): Unit = { + val mc = mosaicContext + import mc.functions._ + mosaicContext.register(spark) + + val boroughs: DataFrame = getBoroughs + + val mosaics = boroughs + .select( + mosaicfill(col("wkt"), 11, true) + ) + .collect() + + boroughs.collect().length shouldEqual mosaics.length + + boroughs.createOrReplaceTempView("boroughs") + + val mosaics2 = spark + .sql(""" + |select mosaicfill(wkt, 11, true) from boroughs + |""".stripMargin) + .collect() + + boroughs.collect().length shouldEqual mosaics2.length + } + } diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicExplode.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicExplode.scala index 7d9842d16..df922755f 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicExplode.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicExplode.scala @@ -13,6 +13,11 @@ class TestMosaicExplode extends AnyFlatSpec with MosaicExplodeBehaviors with Spa it should behave like wktDecompose(MosaicContext.build(H3IndexSystem, JTS), spark) } + "Mosaic_Explode" should "decompose wkt geometries for any index system and any geometry API with SQL expr" in { + it should behave like wktDecomposeKeepCoreParamExpression(MosaicContext.build(H3IndexSystem, ESRI), spark) + it should behave like wktDecomposeKeepCoreParamExpression(MosaicContext.build(H3IndexSystem, JTS), spark) + } + "Mosaic_Explode" should "decompose wkt geometries with no null for any index system and any geometry API" in { it should behave like wktDecomposeNoNulls(MosaicContext.build(H3IndexSystem, ESRI), spark) it should behave like wktDecomposeNoNulls(MosaicContext.build(H3IndexSystem, JTS), spark) diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicFill.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicFill.scala index 41add5e90..827492bae 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicFill.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/index/TestMosaicFill.scala @@ -28,4 +28,9 @@ class TestMosaicFill extends AnyFlatSpec with MosaicFillBehaviors with SparkSuit it should behave like coordsMosaicFill(MosaicContext.build(H3IndexSystem, JTS), spark) } + "MosaicFill" should "fill wkt geometries with keepCoreGeom parameter" in { + it should behave like wktMosaicFillKeepCoreGeom(MosaicContext.build(H3IndexSystem, ESRI), spark) + it should behave like wktMosaicFillKeepCoreGeom(MosaicContext.build(H3IndexSystem, JTS), spark) + } + } From 09fd998d8b0bca79e113cc2c15bab8f48b703c5d Mon Sep 17 00:00:00 2001 From: sllynn Date: Wed, 20 Apr 2022 14:27:33 +0100 Subject: [PATCH 10/12] fixed argument order issue --- .../labs/mosaic/functions/MosaicContext.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala index e2413e288..617dbc4a5 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala @@ -289,7 +289,7 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends MosaicExplode(struct(ColumnAdapter(e(0)), ColumnAdapter(e(1)), lit(true)).expr, indexSystem.name, geometryAPI.name) case e if e.length == 3 => MosaicExplode( - struct(ColumnAdapter(e(0)), ColumnAdapter(e(1)), ColumnAdapter(e(1))).expr, + struct(ColumnAdapter(e(0)), ColumnAdapter(e(1)), ColumnAdapter(e(2))).expr, indexSystem.name, geometryAPI.name ) @@ -426,13 +426,13 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends def mosaicfill(geom: Column, resolution: Int): Column = ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, lit(true).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Column, keepCoreGeometries: Boolean): Column = - ColumnAdapter(MosaicFill(geom.expr, resolution.expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicFill(geom.expr, resolution.expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Int, keepCoreGeometries: Boolean): Column = - ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, lit(keepCoreGeometries).expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Column, keepCoreGeometries: Column): Column = - ColumnAdapter(MosaicFill(geom.expr, resolution.expr, keepCoreGeometries.expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicFill(geom.expr, resolution.expr, keepCoreGeometries.expr, indexSystem.name, geometryAPI.name)) def mosaicfill(geom: Column, resolution: Int, keepCoreGeometries: Column): Column = - ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, keepCoreGeometries.expr, indexSystem.name, geometryAPI.name)) + ColumnAdapter(MosaicFill(geom.expr, lit(resolution).expr, keepCoreGeometries.expr, indexSystem.name, geometryAPI.name)) def point_index_geom(point: Column, resolution: Column): Column = ColumnAdapter(PointIndexGeom(point.expr, resolution.expr, indexSystem.name, geometryAPI.name)) def point_index_geom(point: Column, resolution: Int): Column = From d0400e6a9eb66683c81357be761a89a2d5f5acc8 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Wed, 20 Apr 2022 18:01:41 +0200 Subject: [PATCH 11/12] Updated python bindings and docs --- docs/source/api/spatial-functions.rst | 8 ++++++-- python/mosaic/api/functions.py | 21 +++++++++++++++++---- python/test/test_functions.py | 3 ++- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/docs/source/api/spatial-functions.rst b/docs/source/api/spatial-functions.rst index 22cc62258..c3f541330 100644 --- a/docs/source/api/spatial-functions.rst +++ b/docs/source/api/spatial-functions.rst @@ -1036,7 +1036,7 @@ polyfill mosaicfill ********** -.. function:: mosaicfill(geometry, resolution) +.. function:: mosaicfill(geometry, resolution, keep_core_geometries) Generates: - a set of core indices that are fully contained by `geometry`; and @@ -1048,6 +1048,8 @@ mosaicfill :type geometry: Column :param resolution: Index resolution :type resolution: Column: Integer + :param keep_core_geometries: Whether to keep the core geometries or set them to null + :type keep_core_geometries: Column: Boolean :rtype: Column: ArrayType[MosaicType] :example: @@ -1105,7 +1107,7 @@ mosaicfill mosaic_explode ************** -.. function:: mosaic_explode(geometry, resolution) +.. function:: mosaic_explode(geometry, resolution, keep_core_geometries) Returns the set of Mosaic chips covering the input `geometry` at `resolution`. @@ -1115,6 +1117,8 @@ mosaic_explode :type geometry: Column :param resolution: Index resolution :type resolution: Column: Integer + :param keep_core_geometries: Whether to keep the core geometries or set them to null + :type keep_core_geometries: Column: Boolean :rtype: Column: MosaicType :example: diff --git a/python/mosaic/api/functions.py b/python/mosaic/api/functions.py index d107332d2..2361be5a0 100644 --- a/python/mosaic/api/functions.py +++ b/python/mosaic/api/functions.py @@ -593,7 +593,7 @@ def polyfill(geom: ColumnOrName, resolution: ColumnOrName) -> Column: ) -def mosaic_explode(geom: ColumnOrName, resolution: ColumnOrName) -> Column: +def mosaic_explode(geom: ColumnOrName, resolution: ColumnOrName, keep_core_geometries: Any = True) -> Column: """ Generates: - a set of core indices that are fully contained by `geom`; and @@ -605,21 +605,27 @@ def mosaic_explode(geom: ColumnOrName, resolution: ColumnOrName) -> Column: ---------- geom : Column resolution : Column (IntegerType) + keep_core_geometries : Column (BooleanType) | bool Returns ------- Column (StructType[is_core: BooleanType, h3: LongType, wkb: BinaryType]) - `wkb` in this struct represents a border chip geometry and is null for all 'core' chips. + `wkb` in this struct represents a border chip geometry and is null for all 'core' chips + if keep_core_geometries is set to False. """ + if(type(keep_core_geometries) == bool): + keep_core_geometries = lit(keep_core_geometries) + return config.mosaic_context.invoke_function( "mosaic_explode", pyspark_to_java_column(geom), pyspark_to_java_column(resolution), + pyspark_to_java_column(keep_core_geometries) ) -def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName) -> Column: +def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName, keep_core_geometries: Any = True) -> Column: """ Generates: - a set of core indices that are fully contained by `geom`; and @@ -631,15 +637,22 @@ def mosaicfill(geom: ColumnOrName, resolution: ColumnOrName) -> Column: ---------- geom : Column resolution : Column (IntegerType) + keep_core_geometries : Column (BooleanType) | bool Returns ------- Column (ArrayType[StructType[is_core: BooleanType, h3: LongType, wkb: BinaryType]]) - `wkb` in this struct represents a border chip geometry and is null for all 'core' chips. + `wkb` in this struct represents a border chip geometry and is null for all 'core' chips + if keep_core_geometries is set to False. """ + + if(type(keep_core_geometries) == bool): + keep_core_geometries = lit(keep_core_geometries) + return config.mosaic_context.invoke_function( "mosaicfill", pyspark_to_java_column(geom), pyspark_to_java_column(resolution), + pyspark_to_java_column(keep_core_geometries) ) diff --git a/python/test/test_functions.py b/python/test/test_functions.py index 6bb679822..e5c6be3ed 100644 --- a/python/test/test_functions.py +++ b/python/test/test_functions.py @@ -64,8 +64,9 @@ def test_st_bindings_happy_flow(self): .withColumn("index_geometry", api.index_geometry(lit(1))) .withColumn("polyfill", api.polyfill("wkt", lit(1))) .withColumn("mosaic_explode", api.mosaic_explode("wkt", lit(1))) + .withColumn("mosaic_explode_no_core_chips", api.mosaic_explode("wkt", lit(1), lit(False))) + .withColumn("mosaic_explode_no_core_chips_bool", api.mosaic_explode("wkt", lit(1), False)) .withColumn("mosaicfill", api.mosaicfill("wkt", lit(1))) - .withColumn("mosaic_explode_no_core_chips", api.mosaic_explode("wkt", lit(1), False)) .withColumn("mosaicfill_no_core_chips", api.mosaicfill("wkt", lit(1), False)) .withColumn("mosaicfill_no_core_chips_bool", api.mosaicfill("wkt", lit(1), lit(False))) .withColumn( From d86c7a54afa46d13390f191fa19b01ff71264f51 Mon Sep 17 00:00:00 2001 From: Erni Durdevic Date: Fri, 22 Apr 2022 15:31:46 +0200 Subject: [PATCH 12/12] Fixed imports python functions --- python/mosaic/api/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/mosaic/api/functions.py b/python/mosaic/api/functions.py index 2361be5a0..662d2119d 100644 --- a/python/mosaic/api/functions.py +++ b/python/mosaic/api/functions.py @@ -1,7 +1,7 @@ from pyspark.sql import Column from pyspark.sql.functions import _to_java_column as pyspark_to_java_column from pyspark.sql.functions import lit - +from typing import Any from mosaic.config import config from mosaic.utils.types import ColumnOrName, as_typed_col