Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24959][SQL] Speed up count() for JSON and CSV #21909

Closed
wants to merge 26 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Jul 28, 2018

What changes were proposed in this pull request?

In the PR, I propose to skip invoking of the CSV/JSON parser per each line in the case if the required schema is empty. Added benchmarks for count() shows performance improvement up to 3.5 times.

Before:

Count a dataset with 10 columns:      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
--------------------------------------------------------------------------------------
JSON count()                               7676 / 7715          1.3         767.6
CSV count()                                3309 / 3363          3.0         330.9

After:

Count a dataset with 10 columns:      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
--------------------------------------------------------------------------------------
JSON count()                               2104 / 2156          4.8         210.4
CSV count()                                2332 / 2386          4.3         233.2

How was this patch tested?

It was tested by CSVSuite and JSONSuite as well as on added benchmarks.

@holdensmagicalunicorn
Copy link

@MaxGekk, thanks! I am a bot who has found some folks who might be able to help with the review:@HyukjinKwon, @gatorsmile and @cloud-fan

@SparkQA
Copy link

SparkQA commented Jul 28, 2018

Test build #93729 has finished for PR 21909 at commit 359c4fc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Jul 28, 2018

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Jul 28, 2018

Test build #93732 has finished for PR 21909 at commit 359c4fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -450,7 +450,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
input => rawParser.parse(input, createParser, UTF8String.fromString),
parsedOptions.parseMode,
schema,
parsedOptions.columnNameOfCorruptRecord)
parsedOptions.columnNameOfCorruptRecord,
optimizeEmptySchema = true)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the case to turn off optimizeEmptySchema multiline JSONs ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here can be only one JSON object of struct type per input string. Don't see any reasons to turn the optimization off. Maybe you have some examples when the optimization doesn't work correctly?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, no I'm just wondering since you made it a parameter that you can turn off and on, what would be the case to turn it off?

If there is none, shouldn't we just get rid of the parameter altogether ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would be the case to turn it off?

We can apply the optimization if we know in advance that one JSON object corresponds to one struct. In that case, we can return empty row if required schema (struct) is empty. If multiLine is enabled, there could be many structs per a JSON document. So, we cannot say in advance how many empty rows need to return without parsing.

@@ -203,19 +203,11 @@ class UnivocityParser(
}
}

private val doParse = if (requiredSchema.nonEmpty) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are the changes here https://github.com/apache/spark/pull/21909/files#diff-3a4dc120191f7052e5d98db11934bfb5R63 replacing the need for the requiredSchema.nonEmpty check ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduced optimization works in the case if multiLine is disable. In that case, this removed code is used. For now it is not needed anymore because it just duplicates optimization in some sense.

def parse(input: IN): Iterator[InternalRow] = {
try {
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
if (skipParsing) {
Iterator.single(InternalRow.empty)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Iterator.empty

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not the same. If you return empty iterator, count() will always return 0.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh yes my bad!

@@ -2233,7 +2233,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.option("multiline", "true")
.options(Map("encoding" -> "UTF-16BE"))
.json(testFile(fileName))
.count()
.collect()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious why going from count() to collect() here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test has to really touch JSON to detect encoding even without parsing. With this optimization jackson parser is not called at all in the case of count(). collect() guarantees that JSON parser will be invoked with wrong encoding.

ds.select("*").filter((_: Row) => true).count()
}
benchmark.addCase(s"Select 1 column + count()", 3) { _ =>
ds.select($"col1").filter((_: Row) => true).count()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this benchmark result vary if we select col2 or col10?

@MaxGekk
Copy link
Member Author

MaxGekk commented Jul 31, 2018

does this benchmark result vary if we select col2 or col10?

@felixcheung Not so much. Here is the benchmark for CSV.

JJava HotSpot(TM) 64-Bit Server VM 1.8.0_172-b11 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz

Count a dataset with 10 columns:         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
col0 + count()                                9097 / 9167          1.1         909.7       1.0X
col2 + count()                                9294 / 9302          1.1         929.4       1.0X
col5 + count()                                9346 / 9394          1.1         934.6       1.0X
col7 + count()                                9227 / 9231          1.1         922.7       1.0X
col9 + count()                                9141 / 9233          1.1         914.1       1.0X

@felixcheung
Copy link
Member

got it

@MaxGekk
Copy link
Member Author

MaxGekk commented Aug 1, 2018

@HyukjinKwon @maropu Any objections to the PR?

if (skipParsing) {
Iterator.single(InternalRow.empty)
} else {
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are broken records the parser can't parse, this skipping won't detect them?

Copy link
Member Author

@MaxGekk MaxGekk Aug 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. To detect them with 100% guarantee, the parser must fully parse such records and column values must be casted according to types in data schema. We actually don't do that due to the column pruning mechanisms in both datasources - CSV and JSON.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test case for counting both CSV and JSON source when the files having broken records? Any behavior change after this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... when the files having broken records?

Syntactically broken or semantically (wrong types for example)?

Any behavior change after this PR?

We have many tests in CSVSuite and JSONSuite for broken records. I have found behavior change in only one case: https://github.com/apache/spark/pull/21909/files#diff-fde14032b0e6ef8086461edf79a27c5dL2227 . This is due to Jackson parser touches a few first bytes in the input stream even if it is not called. Jackson checks encoding eagerly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both?

If we introduce a behavior change, we need to document it in the migration guide and add a conf. Users can do the conf to revert back to the previous behaviors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the tests

@SparkQA
Copy link

SparkQA commented Aug 3, 2018

Test build #94143 has finished for PR 21909 at commit 168eb99.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

…zation

# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@SparkQA
Copy link

SparkQA commented Aug 3, 2018

Test build #94155 has finished for PR 21909 at commit 6248c01.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2018

Test build #94154 has finished for PR 21909 at commit 05c8dbb.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Aug 3, 2018

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Aug 4, 2018

Test build #94179 has finished for PR 21909 at commit 6248c01.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -1476,6 +1476,14 @@ object SQLConf {
"are performed before any UNION, EXCEPT and MINUS operations.")
.booleanConf
.createWithDefault(false)

val BYPASS_PARSER_FOR_EMPTY_SCHEMA = buildConf("spark.sql.bypassParserForEmptySchema")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us get rid of this in the next release. Mark it as an internal and use the legacy scheme.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it to spark.sql.legacy.bypassParserForEmptySchema

@gatorsmile
Copy link
Member

Please document it in the migration guide.

@@ -1894,6 +1894,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.
- Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`.
- Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation.
- Since Spark 2.4, text-based datasources like CSV and JSON don't parse input lines if the required schema pushed down to the datasources is empty. The schema can be empty in the case of the count() action. For example, Spark 2.3 and earlier versions failed on JSON files with invalid encoding but Spark 2.4 returns total number of lines in the file. To restore the previous behavior when the underlying parser is always invoked even for the empty schema, set `true` to `spark.sql.legacy.bypassParserForEmptySchema`. This option will be removed in Spark 3.0.
Copy link
Member

@gatorsmile gatorsmile Aug 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this line after we enable it if and only if PERMISSIVE is used?

@@ -1492,6 +1492,15 @@ object SQLConf {
"This usually speeds up commands that need to list many directories.")
.booleanConf
.createWithDefault(true)

val BYPASS_PARSER_FOR_EMPTY_SCHEMA =
buildConf("spark.sql.legacy.bypassParserForEmptySchema")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no behavior change, do we still need this conf?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need it anymore

@@ -223,7 +224,8 @@ object MultiLineJsonDataSource extends JsonDataSource {
input => parser.parse[InputStream](input, streamParser, partitionedFileString),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)
parser.options.columnNameOfCorruptRecord,
optimizeEmptySchema = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename optimizeEmptySchema to isMultiLine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

@SparkQA
Copy link

SparkQA commented Aug 18, 2018

Test build #94908 has finished for PR 21909 at commit 2d8e754.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 18, 2018

Test build #94909 has finished for PR 21909 at commit 96a94cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 18, 2018

Test build #94922 has finished for PR 21909 at commit 050c8ce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM.

Thanks for being patient to address all the comments! Merged to master.

@asfgit asfgit closed this in a8a1ac0 Aug 18, 2018
@@ -56,9 +58,15 @@ class FailureSafeParser[IN](
}
}

private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal but I would leave a comment to explain why it's permissive and non-miltiline only. I assume counts are known when it's actually parsed for multiline cases, and counts should be given in any case when the mode is permissive, right?

asfgit pushed a commit that referenced this pull request Nov 3, 2018
## What changes were proposed in this pull request?

Added new benchmark which forcibly invokes Jackson parser to check overhead of its creation for short and wide JSON strings. Existing benchmarks do not allow to check that due to an optimisation introduced by #21909 for empty schema pushed down to JSON datasource. The `count()` action passes empty schema as required schema to the datasource, and Jackson parser is not created at all in that case.

Besides of new benchmark I also refactored existing benchmarks:
- Added `numIters` to control number of iteration in each benchmark
- Renamed `JSON per-line parsing` -> `count a short column`, `JSON parsing of wide lines` -> `count a wide column`, and `Count a dataset with 10 columns` -> `Select a subset of 10 columns`.

Closes #22920 from MaxGekk/json-benchmark-follow-up.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@@ -402,7 +403,7 @@ class JacksonParser(
}
}
} catch {
case e @ (_: RuntimeException | _: JsonProcessingException) =>
case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related, @MaxGekk? Let's don't add unrelated changes next time.

asfgit pushed a commit that referenced this pull request Jan 31, 2019
…ARK-24959

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of #21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also #23665 (comment).

## How was this patch tested?

Manually tested.

Closes #23667 from HyukjinKwon/revert-SPARK-24959.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
asfgit pushed a commit that referenced this pull request Feb 1, 2019
… in JSON datasource by

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of #21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also #23665 (comment).

## How was this patch tested?

Manually tested.

Closes #23708 from HyukjinKwon/SPARK-26745-backport.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

Added new benchmark which forcibly invokes Jackson parser to check overhead of its creation for short and wide JSON strings. Existing benchmarks do not allow to check that due to an optimisation introduced by apache#21909 for empty schema pushed down to JSON datasource. The `count()` action passes empty schema as required schema to the datasource, and Jackson parser is not created at all in that case.

Besides of new benchmark I also refactored existing benchmarks:
- Added `numIters` to control number of iteration in each benchmark
- Renamed `JSON per-line parsing` -> `count a short column`, `JSON parsing of wide lines` -> `count a wide column`, and `Count a dataset with 10 columns` -> `Select a subset of 10 columns`.

Closes apache#22920 from MaxGekk/json-benchmark-follow-up.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ARK-24959

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of apache#21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also apache#23665 (comment).

## How was this patch tested?

Manually tested.

Closes apache#23667 from HyukjinKwon/revert-SPARK-24959.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
… in JSON datasource by

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of apache#21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also apache#23665 (comment).

## How was this patch tested?

Manually tested.

Closes apache#23708 from HyukjinKwon/SPARK-26745-backport.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 25, 2019
… in JSON datasource by

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of apache#21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also apache#23665 (comment).

## How was this patch tested?

Manually tested.

Closes apache#23708 from HyukjinKwon/SPARK-26745-backport.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
… in JSON datasource by

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of apache#21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also apache#23665 (comment).

## How was this patch tested?

Manually tested.

Closes apache#23708 from HyukjinKwon/SPARK-26745-backport.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@MaxGekk MaxGekk deleted the empty-schema-optimization branch August 17, 2019 13:33
arjunshroff pushed a commit to arjunshroff/spark that referenced this pull request Nov 24, 2020
* [SPARK-26709][SQL] OptimizeMetadataOnlyQuery does not handle empty records correctly

## What changes were proposed in this pull request?

When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
```
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
sql("SELECT MAX(p1) FROM t")
```
The result is supposed to be `null`. However, with the optimization the result is `5`.

The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in apache#13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.

It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.

## How was this patch tested?

Unit test

Closes apache#23635 from gengliangwang/optimizeMetadata.

Lead-authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Co-authored-by: Xiao Li <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
(cherry picked from commit f5b9370)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>

* [SPARK-26080][PYTHON] Skips Python resource limit on Windows in Python worker

## What changes were proposed in this pull request?

`resource` package is a Unix specific package. See https://docs.python.org/2/library/resource.html and https://docs.python.org/3/library/resource.html.

Note that we document Windows support:

> Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS).

This should be backported into branch-2.4 to restore Windows support in Spark 2.4.1.

## How was this patch tested?

Manually mocking the changed logics.

Closes apache#23055 from HyukjinKwon/SPARK-26080.

Lead-authored-by: hyukjinkwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 9cda9a8)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>

* [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.

## What changes were proposed in this pull request?

Updates FileFormatWriter to create a consistent Hadoop Job ID for a write.

## How was this patch tested?

Existing tests for regressions.

Closes apache#23777 from rdblue/SPARK-26873-fix-file-format-writer-job-ids.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 33334e2)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count optimization in JSON datasource by

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of apache#21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also apache#23665 (comment).

## How was this patch tested?

Manually tested.

Closes apache#23708 from HyukjinKwon/SPARK-26745-backport.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>

* [SPARK-26677][BUILD] Update Parquet to 1.10.1 with notEq pushdown fix.

## What changes were proposed in this pull request?

Update to Parquet Java 1.10.1.

## How was this patch tested?

Added a test from HyukjinKwon that validates the notEq case from SPARK-26677.

Closes apache#23704 from rdblue/SPARK-26677-fix-noteq-parquet-bug.

Lead-authored-by: Ryan Blue <blue@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Ryan Blue <rdblue@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit f72d217)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

* [SPARK-26677][FOLLOWUP][BRANCH-2.4] Update Parquet manifest with Hadoop-2.6

## What changes were proposed in this pull request?

During merging Parquet upgrade PR, `hadoop-2.6` profile dependency manifest is missed.

## How was this patch tested?

Manual.

```
./dev/test-dependencies.sh
```

Also, this will recover `branch-2.4` with `hadoop-2.6` build.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.4-test-sbt-hadoop-2.6/281/

Closes apache#23738 from dongjoon-hyun/SPARK-26677-2.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

* [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

## What changes were proposed in this pull request?

When performing non-cascading cache invalidation, `recache` is called on the other cache entries which are dependent on the cache being invalidated. It leads to the the physical plans of those cache entries being re-compiled. For those cache entries, if the cache RDD has already been persisted, chances are there will be inconsistency between the data and the new plan. It can cause a correctness issue if the new plan's `outputPartitioning`  or `outputOrdering` is different from the that of the actual data, and meanwhile the cache is used by another query that asks for specific `outputPartitioning` or `outputOrdering` which happens to match the new plan but not the actual data.

The fix is to keep the cache entry as it is if the data has been loaded, otherwise re-build the cache entry, with a new plan and an empty cache buffer.

## How was this patch tested?

Added UT.

Closes apache#23678 from maryannxue/spark-26708-2.4.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>

* [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4)

## What changes were proposed in this pull request?

Backport apache#23324 to branch-2.4.

## How was this patch tested?

Jenkins

Closes apache#23365 from zsxwing/SPARK-26267-2.4.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>

* [SPARK-26706][SQL] Fix `illegalNumericPrecedence` for ByteType

This PR contains a minor change in `Cast$mayTruncate` that fixes its logic for bytes.

Right now, `mayTruncate(ByteType, LongType)` returns `false` while `mayTruncate(ShortType, LongType)` returns `true`. Consequently, `spark.range(1, 3).as[Byte]` and `spark.range(1, 3).as[Short]` behave differently.

Potentially, this bug can silently corrupt someone's data.
```scala
// executes silently even though Long is converted into Byte
spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
  .map(b => b - 1)
  .show()
+-----+
|value|
+-----+
|  -12|
|  -11|
|  -10|
|   -9|
|   -8|
|   -7|
|   -6|
|   -5|
|   -4|
|   -3|
+-----+
// throws an AnalysisException: Cannot up cast `id` from bigint to smallint as it may truncate
spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
  .map(s => s - 1)
  .show()
```

This PR comes with a set of unit tests.

Closes apache#23632 from aokolnychyi/cast-fix.

Authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>

* [SPARK-26078][SQL][BACKPORT-2.4] Dedup self-join attributes on IN subqueries

## What changes were proposed in this pull request?

When there is a self-join as result of a IN subquery, the join condition may be invalid, resulting in trivially true predicates and return wrong results.

The PR deduplicates the subquery output in order to avoid the issue.

## How was this patch tested?

added UT

Closes apache#23449 from mgaido91/SPARK-26078_2.4.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

* [SPARK-26233][SQL][BACKPORT-2.4] CheckOverflow when encoding a decimal value

## What changes were proposed in this pull request?

When we encode a Decimal from external source we don't check for overflow. That method is useful not only in order to enforce that we can represent the correct value in the specified range, but it also changes the underlying data to the right precision/scale. Since in our code generation we assume that a decimal has exactly the same precision and scale of its data type, missing to enforce it can lead to corrupted output/results when there are subsequent transformations.

## How was this patch tested?

added UT

Closes apache#23232 from mgaido91/SPARK-26233_2.4.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

* [SPARK-27097][CHERRY-PICK 2.4] Avoid embedding platform-dependent offsets literally in whole-stage generated code

## What changes were proposed in this pull request?

Spark SQL performs whole-stage code generation to speed up query execution. There are two steps to it:
- Java source code is generated from the physical query plan on the driver. A single version of the source code is generated from a query plan, and sent to all executors.
  - It's compiled to bytecode on the driver to catch compilation errors before sending to executors, but currently only the generated source code gets sent to the executors. The bytecode compilation is for fail-fast only.
- Executors receive the generated source code and compile to bytecode, then the query runs like a hand-written Java program.

In this model, there's an implicit assumption about the driver and executors being run on similar platforms. Some code paths accidentally embedded platform-dependent object layout information into the generated code, such as:
```java
Platform.putLong(buffer, /* offset */ 24, /* value */ 1);
```
This code expects a field to be at offset +24 of the `buffer` object, and sets a value to that field.
But whole-stage code generation generally uses platform-dependent information from the driver. If the object layout is significantly different on the driver and executors, the generated code can be reading/writing to wrong offsets on the executors, causing all kinds of data corruption.

One code pattern that leads to such problem is the use of `Platform.XXX` constants in generated code, e.g. `Platform.BYTE_ARRAY_OFFSET`.

Bad:
```scala
val baseOffset = Platform.BYTE_ARRAY_OFFSET
// codegen template:
s"Platform.putLong($buffer, $baseOffset, $value);"
```
This will embed the value of `Platform.BYTE_ARRAY_OFFSET` on the driver into the generated code.

Good:
```scala
val baseOffset = "Platform.BYTE_ARRAY_OFFSET"
// codegen template:
s"Platform.putLong($buffer, $baseOffset, $value);"
```
This will generate the offset symbolically -- `Platform.putLong(buffer, Platform.BYTE_ARRAY_OFFSET, value)`, which will be able to pick up the correct value on the executors.

Caveat: these offset constants are declared as runtime-initialized `static final` in Java, so they're not compile-time constants from the Java language's perspective. It does lead to a slightly increased size of the generated code, but this is necessary for correctness.

NOTE: there can be other patterns that generate platform-dependent code on the driver which is invalid on the executors. e.g. if the endianness is different between the driver and the executors, and if some generated code makes strong assumption about endianness, it would also be problematic.

## How was this patch tested?

Added a new test suite `WholeStageCodegenSparkSubmitSuite`. This test suite needs to set the driver's extraJavaOptions to force the driver and executor use different Java object layouts, so it's run as an actual SparkSubmit job.

Authored-by: Kris Mok <kris.mokdatabricks.com>

Closes apache#24032 from gatorsmile/testFailure.

Lead-authored-by: Kris Mok <kris.mok@databricks.com>
Co-authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>

* [SPARK-26188][SQL] FileIndex: don't infer data types of partition columns if user specifies schema

## What changes were proposed in this pull request?

This PR is to fix a regression introduced in: https://github.com/apache/spark/pull/21004/files#r236998030

If user specifies schema, Spark don't need to infer data type for of partition columns, otherwise the data type might not match with the one user provided.
E.g. for partition directory `p=4d`, after data type inference  the column value will be `4.0`.
See https://issues.apache.org/jira/browse/SPARK-26188 for more details.

Note that user specified schema **might not cover all the data columns**:
```
val schema = new StructType()
  .add("id", StringType)
  .add("ex", ArrayType(StringType))
val df = spark.read
  .schema(schema)
  .format("parquet")
  .load(src.toString)

assert(df.schema.toList === List(
  StructField("ex", ArrayType(StringType)),
  StructField("part", IntegerType), // inferred partitionColumn dataType
  StructField("id", StringType))) // used user provided partitionColumn dataType
```
For the missing columns in user specified schema, Spark still need to infer their data types if `partitionColumnTypeInferenceEnabled` is enabled.

To implement the partially inference, refactor `PartitioningUtils.parsePartitions`  and pass the user specified schema as parameter to cast partition values.

## How was this patch tested?

Add unit test.

Closes apache#23165 from gengliangwang/fixFileIndex.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 9cfc3ee)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-25921][PYSPARK] Fix barrier task run without BarrierTaskContext while python worker reuse

## What changes were proposed in this pull request?

Running a barrier job after a normal spark job causes the barrier job to run without a BarrierTaskContext. This is because while python worker reuse, BarrierTaskContext._getOrCreate() will still return a TaskContext after firstly submit a normal spark job, we'll get a `AttributeError: 'TaskContext' object has no attribute 'barrier'`. Fix this by adding check logic in BarrierTaskContext._getOrCreate() and make sure it will return BarrierTaskContext in this scenario.

## How was this patch tested?

Add new UT in pyspark-core.

Closes apache#22962 from xuanyuanking/SPARK-25921.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit c00e72f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants