Skip to content

Commit

Permalink
Bug Fix: Nullability seems to be changed by Spark when writing Parque… (
Browse files Browse the repository at this point in the history
#204)

* Bug Fix: Nullability seems to be changed by Spark when writing Parquet. Avoid enforcing nullability constraints

* Fix pattern matching
  • Loading branch information
halfabrane authored and harsha2010 committed Mar 16, 2018
1 parent a226690 commit e1d6809
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
5 changes: 3 additions & 2 deletions src/main/scala/magellan/catalyst/SpatialJoin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ private[magellan] case class SpatialJoin(session: SparkSession)

child
case p@Join(
Generate(Inline(_: Indexer), _, _, _, _, _),
Generate(Inline(_: Indexer), _, _, _, _, _), _, _) => p
Generate(Inline(_), _, _, _, _, _),
Generate(Inline(_), _, _, _, _, _), _, _) =>
p
case p@Join(l, r, joinType @ (Inner | LeftOuter), Some(cond)) =>
val trigger = matchesTrigger(cond)
if (trigger.isEmpty) p else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,6 @@ object Indexer {
val indexUDT = new ZOrderCurveUDT()

val dataType = ArrayType(new StructType()
.add("curve", indexUDT, false)
.add("relation", StringType, false))
.add("curve", indexUDT)
.add("relation", StringType))
}
30 changes: 23 additions & 7 deletions src/test/scala/magellan/catalyst/SpatialJoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package magellan.catalyst

import java.nio.file.Files

import magellan.{Point, Polygon, TestSparkContext, Utils}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin
Expand Down Expand Up @@ -92,23 +94,37 @@ class SpatialJoinSuite extends FunSuite with TestSparkContext {
val ring = Array(Point(1.0, 1.0), Point(1.0, -1.0),
Point(-1.0, -1.0), Point(-1.0, 1.0),
Point(1.0, 1.0))
val polygons = sc.parallelize(Seq(
var polygons = sc.parallelize(Seq(
("1", Polygon(Array(0), ring))
)).toDF("id", "polygon").withColumn("index", $"polygon" index 30)
)).toDF("id", "polygon")
.withColumn("index", $"polygon" index 5)

val points = sc.parallelize(Seq(
var points = sc.parallelize(Seq(
("a", 1, Point(0.0, 0.0)),
("b" , 2, Point(2.0, 2.0))
)).toDF("name", "value", "point")
.withColumn("index", $"point" index 5)

val joined = polygons.join(points index 5).where($"point" within $"polygon")
val outputDir = Files.createTempDirectory("output").toUri.getPath

val polygonsDir = s"$outputDir/polygons"

polygons.write.parquet(polygonsDir)

val pointsDir = s"$outputDir/points"

points.write.parquet(pointsDir)

points = spark.read.parquet(pointsDir)

polygons = spark.read.parquet(polygonsDir)

val joined = polygons.join(points).where($"point" within $"polygon")

val optimizedPlan = Optimize.execute(joined.queryExecution.analyzed)

assert(joined.queryExecution.analyzed.toString().contains("SpatialJoinHint"))
assert(!optimizedPlan.toString().contains("SpatialJoinHint"))
assert(optimizedPlan.toString().contains("Generate inline(index#"))
assert(optimizedPlan.toString().contains("Generate inline(indexer"))
assert(optimizedPlan.toString().contains("Generate inline(index#"))
assert(joined.count() === 1)
}

Expand Down

0 comments on commit e1d6809

Please sign in to comment.