Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write timestamps as long since parquet has timestamp_micros #58

Closed
wants to merge 5 commits into from

Conversation

robert3005
Copy link

@schlosna @pwoody @ash211 @mccheah

will add tests shortly but wanted to run the logic by you. Looks like it's pretty straightforward with new parquet.

@robert3005
Copy link
Author

Just changes Timestamp serialization to TIMESTAMP_MICROS. Need to add it to filters that parquet can handle

@ash211
Copy link

ash211 commented Nov 16, 2016

Is there an upstream ticket for this?

Also I'm concerned about backcompat -- would data written to parquet with the pre-merge version of Spark still be read correctly by Spark with this PR merged?

@robert3005
Copy link
Author

You need a parquet release to be able to implement it. 1.9.0 hasn't come out yet. We should make a note to upstream once parquet publishes.

@ash211 why wouldn't they? Only issue is with filter pushdown

case TimestampType =>
Types.primitive(INT96, repetition).named(field.name)
Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in line with @ash211 question, how does this play with assumeInt96IsTimestamp set from spark.sql.parquet.int96AsTimestamp and defaults to true?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code path is when writing. We just stop writing INT96. I haven't removed code path for reading INT96

@robert3005
Copy link
Author

robert3005 commented Nov 30, 2016

For anyone curious, if you write parquet pre this change and then do a filter with this change this is the exception you will see when executing the query.

Caused by: java.lang.IllegalArgumentException: FilterPredicate column: ts's declared type (java.lang.Long) does not match the schema found in file metadata. Column ts is of type: INT96 Valid types for this column are: [class org.apache.parquet.io.api.Binary]

@ash211
Copy link

ash211 commented Dec 1, 2016

Is that exception in the last comment final design or just an artifact of an in-progress implementation?

We definitely want to support parquet written with older versions of the various libraries to be read by newer versions of the libraries. Otherwise we'd have to rematerialize lots of data

@pwoody
Copy link

pwoody commented Dec 1, 2016

Closing in favor of #72

@pwoody pwoody closed this Dec 1, 2016
@robert3005 robert3005 deleted the robertk/timestamps-as-long branch December 7, 2016 16:04
ash211 pushed a commit that referenced this pull request Oct 13, 2017
…conf

## What changes were proposed in this pull request?

This is an effort to reduce the difference between Hive and Spark. Spark supports case-sensitivity in columns. Especially, for Struct types, with `spark.sql.caseSensitive=true`, the following is supported.

```scala
scala> sql("select named_struct('a', 1, 'A', 2).a").show
+--------------------------+
|named_struct(a, 1, A, 2).a|
+--------------------------+
|                         1|
+--------------------------+

scala> sql("select named_struct('a', 1, 'A', 2).A").show
+--------------------------+
|named_struct(a, 1, A, 2).A|
+--------------------------+
|                         2|
+--------------------------+
```

And vice versa, with `spark.sql.caseSensitive=false`, the following is supported.
```scala
scala> sql("select named_struct('a', 1).A, named_struct('A', 1).a").show
+--------------------+--------------------+
|named_struct(a, 1).A|named_struct(A, 1).a|
+--------------------+--------------------+
|                   1|                   1|
+--------------------+--------------------+
```

However, types are considered different. For example, SET operations fail.
```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<A:int> <> struct<a:int> at the first column of the second table;;
'Union
:- Project [named_struct(a, 1) AS named_struct(a, 1)#57]
:  +- OneRowRelation$
+- Project [named_struct(A, 2) AS named_struct(A, 2)#58]
   +- OneRowRelation$
```

This PR aims to support case-insensitive type equality. For example, in Set operation, the above operation succeed when `spark.sql.caseSensitive=false`.

```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
+------------------+
|named_struct(a, 1)|
+------------------+
|               [1]|
|               [2]|
+------------------+
```

## How was this patch tested?

Pass the Jenkins with a newly add test case.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#18460 from dongjoon-hyun/SPARK-21247.
rahij pushed a commit that referenced this pull request Oct 21, 2020
… more scenarios such as PartitioningCollection

### What changes were proposed in this pull request?

This PR proposes to improve  `EnsureRquirement.reorderJoinKeys` to handle the following scenarios:
1. If the keys cannot be reordered to match the left-side `HashPartitioning`, consider the right-side `HashPartitioning`.
2. Handle `PartitioningCollection`, which may contain `HashPartitioning`

### Why are the changes needed?

1. For the scenario 1), the current behavior matches either the left-side `HashPartitioning` or the right-side `HashPartitioning`. This means that if both sides are `HashPartitioning`, it will try to match only the left side.
The following will not consider the right-side `HashPartitioning`:
```
val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2")
df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2"))
 join.explain

== Physical Plan ==
*(5) SortMergeJoin [i1#26, i1#26], [j2#31, i2#30], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#69]
:     +- *(1) Project [i1#26, j1#27]
:        +- *(1) Filter isnotnull(i1#26)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(4) Sort [j2#31 ASC NULLS FIRST, i2#30 ASC NULLS FIRST], false, 0.
   +- Exchange hashpartitioning(j2#31, i2#30, 4), true, [id=#79].       <===== This can be removed
      +- *(3) Project [i2#30, j2#31]
         +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4

```

2.  For the scenario 2), the current behavior does not handle `PartitioningCollection`:
```
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2")
val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3")
val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2")) // PartitioningCollection
val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3"))
join2.explain

== Physical Plan ==
*(9) SortMergeJoin [j1#8, i1#7], [j3#30, i3#29], Inner
:- *(6) Sort [j1#8 ASC NULLS FIRST, i1#7 ASC NULLS FIRST], false, 0.       <===== This can be removed
:  +- Exchange hashpartitioning(j1#8, i1#7, 5), true, [id=#58]             <===== This can be removed
:     +- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
:        :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
:        :  +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#45]
:        :     +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
:        :        +- *(1) LocalTableScan [_1#2, _2#3]
:        +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
:           +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#51]
:              +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
:                 +- *(3) LocalTableScan [_1#13, _2#14]
+- *(8) Sort [j3#30 ASC NULLS FIRST, i3#29 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(j3#30, i3#29, 5), true, [id=#64]
      +- *(7) Project [_1#24 AS i3#29, _2#25 AS j3#30]
         +- *(7) LocalTableScan [_1#24, _2#25]
```
### Does this PR introduce _any_ user-facing change?

Yes, now from the above examples, the shuffle/sort nodes pointed by `This can be removed` are now removed:
1. Senario 1):
```
== Physical Plan ==
*(4) SortMergeJoin [i1#26, i1#26], [i2#30, j2#31], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#67]
:     +- *(1) Project [i1#26, j1#27]
:        +- *(1) Filter isnotnull(i1#26)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(3) Sort [i2#30 ASC NULLS FIRST, j2#31 ASC NULLS FIRST], false, 0
   +- *(3) Project [i2#30, j2#31]
      +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
         +- *(3) ColumnarToRow
            +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4
```
2. Scenario 2):
```
== Physical Plan ==
*(8) SortMergeJoin [i1#7, j1#8], [i3#29, j3#30], Inner
:- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
:  :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#43]
:  :     +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
:  :        +- *(1) LocalTableScan [_1#2, _2#3]
:  +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#49]
:        +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
:           +- *(3) LocalTableScan [_1#13, _2#14]
+- *(7) Sort [i3#29 ASC NULLS FIRST, j3#30 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i3#29, j3#30, 5), true, [id=#58]
      +- *(6) Project [_1#24 AS i3#29, _2#25 AS j3#30]
         +- *(6) LocalTableScan [_1#24, _2#25]
```

### How was this patch tested?

Added tests.

Closes apache#29074 from imback82/reorder_keys.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
LorenzoMartini pushed a commit that referenced this pull request Jun 8, 2021
* Safe log Starting task

* Safer args

* Safe executorId

Co-authored-by: Will Raschkowski <wraschkowski@palantir.com>

* Reorder safe args

Co-authored-by: Will Raschkowski <wraschkowski@palantir.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants