Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-31937
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Dec 29, 2020
2 parents 092c927 + aadda4b commit 9761c0e
Show file tree
Hide file tree
Showing 43 changed files with 799 additions and 347 deletions.
32 changes: 13 additions & 19 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ jobs:
lint:
name: Linters, licenses, dependencies and documentation generation
runs-on: ubuntu-20.04
container:
image: dongjoon/apache-spark-github-action-image:20201025
steps:
- name: Checkout Spark repository
uses: actions/checkout@v2
Expand Down Expand Up @@ -315,10 +317,6 @@ jobs:
key: docs-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
docs-maven-
- name: Install Java 8
uses: actions/setup-java@v1
with:
java-version: 8
- name: Install Python 3.6
uses: actions/setup-python@v2
with:
Expand All @@ -328,30 +326,24 @@ jobs:
run: |
# TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes.
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
pip3 install flake8 'sphinx<3.1.0' numpy pydata_sphinx_theme ipython nbsphinx mypy numpydoc
- name: Install R 4.0
uses: r-lib/actions/setup-r@v1
with:
r-version: 4.0
python3.6 -m pip install flake8 'sphinx<3.1.0' numpy pydata_sphinx_theme ipython nbsphinx mypy numpydoc
- name: Install R linter dependencies and SparkR
run: |
sudo apt-get install -y libcurl4-openssl-dev libgit2-dev libssl-dev libxml2-dev
sudo Rscript -e "install.packages(c('devtools'), repos='https://cloud.r-project.org/')"
sudo Rscript -e "devtools::install_github('jimhester/lintr@v2.0.0')"
apt-get install -y libcurl4-openssl-dev libgit2-dev libssl-dev libxml2-dev
Rscript -e "install.packages(c('devtools'), repos='https://cloud.r-project.org/')"
Rscript -e "devtools::install_github('jimhester/lintr@v2.0.0')"
./R/install-dev.sh
- name: Install Ruby 2.7 for documentation generation
uses: actions/setup-ruby@v1
with:
ruby-version: 2.7
- name: Install dependencies for documentation generation
run: |
# pandoc is required to generate PySpark APIs as well in nbsphinx.
sudo apt-get install -y libcurl4-openssl-dev pandoc
apt-get install -y libcurl4-openssl-dev pandoc
# TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes.
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme ipython nbsphinx numpydoc
python3.6 -m pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme ipython nbsphinx numpydoc
apt-get update -y
apt-get install -y ruby ruby-dev
gem install jekyll jekyll-redirect-from rouge
sudo Rscript -e "install.packages(c('devtools', 'testthat', 'knitr', 'rmarkdown', 'roxygen2'), repos='https://cloud.r-project.org/')"
Rscript -e "install.packages(c('devtools', 'testthat', 'knitr', 'rmarkdown', 'roxygen2'), repos='https://cloud.r-project.org/')"
- name: Scala linter
run: ./dev/lint-scala
- name: Java linter
Expand All @@ -367,6 +359,8 @@ jobs:
- name: Run documentation build
run: |
cd docs
export LC_ALL=C.UTF-8
export LANG=C.UTF-8
jekyll build
java-11:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1588,7 +1588,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
test("SPARK-23365 Don't update target num executors when killing idle executors") {
val clock = new ManualClock()
val manager = createManager(
createConf(1, 2, 1).set(config.DYN_ALLOCATION_TESTING, false),
createConf(1, 2, 1),
clock = clock)

when(client.requestTotalExecutors(any(), any(), any())).thenReturn(true)
Expand Down Expand Up @@ -1616,19 +1616,17 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
verify(client, never).killExecutors(any(), any(), any(), any())
assert(manager.executorMonitor.executorsPendingToRemove().isEmpty)

// now we cross the idle timeout for executor-1, so we kill it. the really important
// thing here is that we do *not* ask the executor allocation client to adjust the target
// number of executors down
when(client.killExecutors(Seq("executor-1"), false, false, false))
.thenReturn(Seq("executor-1"))
clock.advance(3000)
schedule(manager)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) === 1)
assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
// here's the important verify -- we did kill the executors, but did not adjust the target count
verify(client).killExecutors(Seq("executor-1"), false, false, false)
assert(manager.executorMonitor.executorsPendingToRemove() === Set("executor-1"))
}

test("SPARK-26758 check executor target number after idle time out ") {
Expand Down
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ license: |

- In Spark 3.2, `ALTER TABLE .. RENAME TO PARTITION` throws `PartitionAlreadyExistsException` instead of `AnalysisException` for tables from Hive external when the target partition already exists.

- In Spark 3.2, script transform default FIELD DELIMIT is `\u0001` for no serde mode. In Spark 3.1 or earlier, the default FIELD DELIMIT is `\t`.

## Upgrading from Spark SQL 3.0 to 3.1

- In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`.
Expand Down
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3277,6 +3277,15 @@
<arg>-Wconf:cat=other-match-analysis&amp;site=org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction.catalogFunction:wv</arg>
<arg>-Wconf:cat=other-pure-statement&amp;site=org.apache.spark.streaming.util.FileBasedWriteAheadLog.readAll.readFile:wv</arg>
<arg>-Wconf:cat=other-pure-statement&amp;site=org.apache.spark.scheduler.OutputCommitCoordinatorSuite.&lt;local OutputCommitCoordinatorSuite&gt;.futureAction:wv</arg>
<!--
SPARK-33775 Suppress compilation warnings that contain the following contents.
TODO(SPARK-33805): Undo the corresponding deprecated usage suppression rule after fixed.
-->
<arg>-Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since 2.13).+$:s</arg>
<arg>-Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated because it loses precision).+$:s</arg>
<arg>-Wconf:msg=Auto-application to \`\(\)\` is deprecated:s</arg>
<arg>-Wconf:msg=method with a single empty parameter list overrides method without any parameter list:s</arg>
<arg>-Wconf:msg=method without a parameter list overrides a method with a single empty one:s</arg>
</args>
<compilerPlugins combine.self="override">
</compilerPlugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,21 @@ Map<String, String> loadPartitionMetadata(InternalRow ident)
* @return an array of Identifiers for the partitions
*/
InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident);

/**
* Rename an existing partition of the table.
*
* @param from an existing partition identifier to rename
* @param to new partition identifier
* @return true if renaming completes successfully otherwise false
* @throws UnsupportedOperationException If partition renaming is not supported
* @throws PartitionAlreadyExistsException If the `to` partition exists already
* @throws NoSuchPartitionException If the `from` partition does not exist
*/
default boolean renamePartition(InternalRow from, InternalRow to)
throws UnsupportedOperationException,
PartitionAlreadyExistsException,
NoSuchPartitionException {
throw new UnsupportedOperationException("Partition renaming is not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case s: SubqueryExpression =>
checkSubqueryExpression(operator, s)
s

case e: ExpressionWithRandomSeed if !e.seedExpression.foldable =>
failAnalysis(
s"Input argument to ${e.prettyName} must be a constant.")
}

operator match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames)))

case r @ AlterTableRenamePartition(
ResolvedTable(_, _, table: SupportsPartitionManagement), from, _) =>
ResolvedTable(_, _, table: SupportsPartitionManagement), from, to) =>
val partitionSchema = table.partitionSchema()
r.copy(from = resolvePartitionSpecs(
val Seq(resolvedFrom, resolvedTo) = resolvePartitionSpecs(
table.name,
Seq(from),
Seq(from, to),
partitionSchema,
requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames)).head)
requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames))
r.copy(from = resolvedFrom, to = resolvedTo)

case r @ ShowPartitions(ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs) =>
r.copy(pattern = resolvePartitionSpecs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ case class CsvToStructs(
schema: StructType,
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
timeZoneId: Option[String] = None,
requiredSchema: Option[StructType] = None)
extends UnaryExpression
with TimeZoneAwareExpression
with CodegenFallback
Expand Down Expand Up @@ -113,15 +114,20 @@ case class CsvToStructs(

val actualSchema =
StructType(nullableSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val rawParser = new UnivocityParser(actualSchema, actualSchema, parsedOptions)
val actualRequiredSchema =
StructType(requiredSchema.map(_.asNullable).getOrElse(nullableSchema)
.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val rawParser = new UnivocityParser(actualSchema,
actualRequiredSchema,
parsedOptions)
new FailureSafeParser[String](
input => rawParser.parse(input),
mode,
nullableSchema,
parsedOptions.columnNameOfCorruptRecord)
}

override def dataType: DataType = nullableSchema
override def dataType: DataType = requiredSchema.getOrElse(schema).asNullable

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = {
copy(timeZoneId = Option(timeZoneId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedSeed
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral}
Expand Down Expand Up @@ -47,10 +46,8 @@ abstract class RDG extends UnaryExpression with ExpectsInputTypes with Stateful
override def seedExpression: Expression = child

@transient protected lazy val seed: Long = seedExpression match {
case Literal(s, IntegerType) => s.asInstanceOf[Int]
case Literal(s, LongType) => s.asInstanceOf[Long]
case _ => throw new AnalysisException(
s"Input argument to $prettyName must be an integer, long or null literal.")
case e if e.dataType == IntegerType => e.eval().asInstanceOf[Int]
case e if e.dataType == LongType => e.eval().asInstanceOf[Long]
}

override def nullable: Boolean = false
Expand All @@ -64,7 +61,7 @@ abstract class RDG extends UnaryExpression with ExpectsInputTypes with Stateful
* Represents the behavior of expressions which have a random seed and can renew the seed.
* Usually the random seed needs to be renewed at each execution under streaming queries.
*/
trait ExpressionWithRandomSeed {
trait ExpressionWithRandomSeed extends Expression {
def seedExpression: Expression
def withNewSeed(seed: Long): Expression
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, StructType}

/**
* Simplify redundant csv/json related expressions.
*
* The optimization includes:
* 1. JsonToStructs(StructsToJson(child)) => child.
* 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs.
* 3. CreateNamedStruct(JsonToStructs(json).col1, JsonToStructs(json).col2, ...) =>
* If(IsNull(json), nullStruct, KnownNotNull(JsonToStructs(prunedSchema, ..., json)))
* if JsonToStructs(json) is shared among all fields of CreateNamedStruct. `prunedSchema`
* contains all accessed fields in original CreateNamedStruct.
* 4. Prune unnecessary columns from GetStructField + CsvToStructs.
*/
object OptimizeCsvJsonExprs extends Rule[LogicalPlan] {
private def nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case p =>
val optimized = if (SQLConf.get.jsonExpressionOptimization) {
p.transformExpressions(jsonOptimization)
} else {
p
}

if (SQLConf.get.csvExpressionOptimization) {
optimized.transformExpressions(csvOptimization)
} else {
optimized
}
}

private val jsonOptimization: PartialFunction[Expression, Expression] = {
case c: CreateNamedStruct
// If we create struct from various fields of the same `JsonToStructs`.
if c.valExprs.forall { v =>
v.isInstanceOf[GetStructField] &&
v.asInstanceOf[GetStructField].child.isInstanceOf[JsonToStructs] &&
v.children.head.semanticEquals(c.valExprs.head.children.head)
} =>
val jsonToStructs = c.valExprs.map(_.children.head)
val sameFieldName = c.names.zip(c.valExprs).forall {
case (name, valExpr: GetStructField) =>
name.toString == valExpr.childSchema(valExpr.ordinal).name
case _ => false
}

// Although `CreateNamedStruct` allows duplicated field names, e.g. "a int, a int",
// `JsonToStructs` does not support parsing json with duplicated field names.
val duplicateFields = c.names.map(_.toString).distinct.length != c.names.length

// If we create struct from various fields of the same `JsonToStructs` and we don't
// alias field names and there is no duplicated field in the struct.
if (sameFieldName && !duplicateFields) {
val fromJson = jsonToStructs.head.asInstanceOf[JsonToStructs].copy(schema = c.dataType)
val nullFields = c.children.grouped(2).flatMap {
case Seq(name, value) => Seq(name, Literal(null, value.dataType))
}.toSeq

If(IsNull(fromJson.child), c.copy(children = nullFields), KnownNotNull(fromJson))
} else {
c
}

case jsonToStructs @ JsonToStructs(_, options1,
StructsToJson(options2, child, timeZoneId2), timeZoneId1)
if options1.isEmpty && options2.isEmpty && timeZoneId1 == timeZoneId2 &&
jsonToStructs.dataType == child.dataType =>
// `StructsToJson` only fails when `JacksonGenerator` encounters data types it
// cannot convert to JSON. But `StructsToJson.checkInputDataTypes` already
// verifies its child's data types is convertible to JSON. But in
// `StructsToJson(JsonToStructs(...))` case, we cannot verify input json string
// so `JsonToStructs` might throw error in runtime. Thus we cannot optimize
// this case similarly.
child

case g @ GetStructField(j @ JsonToStructs(schema: StructType, _, _, _), ordinal, _)
if schema.length > 1 =>
val prunedSchema = StructType(Seq(schema(ordinal)))
g.copy(child = j.copy(schema = prunedSchema), ordinal = 0)

case g @ GetArrayStructFields(j @ JsonToStructs(schema: ArrayType, _, _, _), _, _, _, _)
if schema.elementType.asInstanceOf[StructType].length > 1 =>
val prunedSchema = ArrayType(StructType(Seq(g.field)), g.containsNull)
g.copy(child = j.copy(schema = prunedSchema), ordinal = 0, numFields = 1)
}

private val csvOptimization: PartialFunction[Expression, Expression] = {
case g @ GetStructField(c @ CsvToStructs(schema: StructType, _, _, _, None), ordinal, _)
if schema.length > 1 && c.options.isEmpty && schema(ordinal).name != nameOfCorruptRecord =>
// When the parse mode is permissive, and corrupt column is not selected, we can prune here
// from `GetStructField`. To be more conservative, it does not optimize when any option
// is set.
val prunedSchema = StructType(Seq(schema(ordinal)))
g.copy(child = c.copy(requiredSchema = Some(prunedSchema)), ordinal = 0)
}
}
Loading

0 comments on commit 9761c0e

Please sign in to comment.