Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Magellan "index" column in parquet not reused for joins #203

Closed
zebehringer opened this issue Mar 12, 2018 · 1 comment
Closed

Magellan "index" column in parquet not reused for joins #203

zebehringer opened this issue Mar 12, 2018 · 1 comment
Milestone

Comments

@zebehringer
Copy link

I wanted to pre-generate the index for a very large set of polygons (loaded from Shapefile) and store as parquet so that I can reuse it in frequent production processes, but it seems that the ZOrderCurve type column named "index" is ignored when joining the parquet data with a list of points.

import org.apache.spark.sql.types._
import magellan.{Point, Polygon}
import org.apache.spark.sql.magellan.dsl.expressions._

val schema = new StructType(Array(
    StructField("latitude",           DoubleType,       false),
    StructField("longitude",          DoubleType,       false)
))

val sample = spark.read.schema(schema).option("header",true).csv("./sample.csv.gz")

magellan.Utils.injectRules(spark)

//spark.read.format("magellan").load("s3://myBucket/my_shapefile_folder")
//    .withColumn("index", $"polygon" index 15)
//    .selectExpr("polygon", "index", "metadata.ID AS id")
//    .write.saveAsTable("shapes")

sample.join(spark.table("shapes"), point($"longitude",$"latitude") within $"polygon").explain()

Here's the plan:

== Physical Plan ==
*Project [id#7, longitude#1, latitude#0, polygon#5, index#6]
+- *BroadcastHashJoin [curve#245], [curve#247], Inner, BuildLeft, ((relation#248 = Within) || Within(pointconverter(longitude#1, latitude#0), polygon#5))
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[2, struct<xmin:double,ymin:double,xmax:double,ymax:double,precision:int,bits:bigint>, true]))
   :  +- Generate inline(indexer(pointconverter(longitude#1, latitude#0), 30)), true, false, [curve#245, relation#246]
   :     +- *FileScan csv [latitude#0,longitude#1] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/ec2-user/sample.csv.gz], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<latitude:double,longitude:double>
   +- Generate inline(indexer(polygon#5, 30)), true, false, [curve#247, relation#248]
      +- *FileScan parquet default.df3[polygon#5,index#6,id#7] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/home/ec2-user/spark-warehouse/df3], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<polygon:struct<type:int,xmin:double,ymin:double,xmax:double,ymax:double,indices:array<int>...
@harsha2010
Copy link
Owner

harsha2010 commented Mar 12, 2018

@zebehringer can you give this PR a try? The issue I think is that the nullability column is reset(a bug in Spark SQL) when Spark SQL writes to Parquet.. and when we read back this causes a schema mismatch

@harsha2010 harsha2010 added this to the 1.0.6 milestone Mar 16, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants