Skip to content

Commit

Permalink
Merge branch 'master' into nestedEqual
Browse files Browse the repository at this point in the history
  • Loading branch information
dbtsai committed Dec 20, 2018
2 parents cd14e14 + aa0d4ca commit 2a4ec20
Show file tree
Hide file tree
Showing 43 changed files with 738 additions and 627 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1170,9 +1170,11 @@ private[spark] class DAGScheduler(

// Abort execution
return
case NonFatal(e) =>
case e: Throwable =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage

// Abort execution
return
}

Expand Down
13 changes: 13 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ Please note that if you specify multiple ways to obtain the credentials then the

An equivalent order applies for the secret. Essentially we prefer the configuration to be specified directly rather than indirectly by files, and we prefer that configuration settings are used over environment variables.

### Deploy to a Mesos running on Secure Sockets

If you want to deploy a Spark Application into a Mesos cluster that is running in a secure mode there are some environment variables that need to be set.

- `LIBPROCESS_SSL_ENABLED=true` enables SSL communication
- `LIBPROCESS_SSL_VERIFY_CERT=false` verifies the ssl certificate
- `LIBPROCESS_SSL_KEY_FILE=pathToKeyFile.key` path to key
- `LIBPROCESS_SSL_CERT_FILE=pathToCRTFile.crt` the certificate file to be used

All options can be found at http://mesos.apache.org/documentation/latest/ssl/

Then submit happens as described in Client mode or Cluster mode below

## Uploading Spark Package

When Mesos runs a task on a Mesos slave for the first time, that slave must have a Spark binary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.ml.fpm
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.param._
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.fpm.{PrefixSpan => mllibPrefixSpan}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions.col
Expand Down Expand Up @@ -135,7 +136,10 @@ final class PrefixSpan(@Since("2.4.0") override val uid: String) extends Params
* - `freq: Long`
*/
@Since("2.4.0")
def findFrequentSequentialPatterns(dataset: Dataset[_]): DataFrame = {
def findFrequentSequentialPatterns(dataset: Dataset[_]): DataFrame = instrumented { instr =>
instr.logDataset(dataset)
instr.logParams(this, params: _*)

val sequenceColParam = $(sequenceCol)
val inputType = dataset.schema(sequenceColParam).dataType
require(inputType.isInstanceOf[ArrayType] &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ private[yarn] class YarnAllocator(
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
s"executorsStarting: ${numExecutorsStarting.get}")

// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
// consideration of container placement, treat as allocated containers.
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)

if (missing > 0) {
if (log.isInfoEnabled()) {
var requestContainerMessage = s"Will request $missing executor container(s), each with " +
Expand All @@ -306,15 +315,6 @@ private[yarn] class YarnAllocator(
logInfo(requestContainerMessage)
}

// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
// consideration of container placement, treat as allocated containers.
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)

// cancel "stale" requests for locations that are no longer needed
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
Expand Down Expand Up @@ -374,14 +374,9 @@ private[yarn] class YarnAllocator(
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
s"total $targetNumExecutors executors.")

val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
matchingRequests.iterator().next().asScala
.take(numToCancel).foreach(amClient.removeContainerRequest)
} else {
logWarning("Expected to find pending requests, but found none.")
}
// cancel pending allocate requests by taking locality preference into account
val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
cancelRequests.foreach(amClient.removeContainerRequest)
}
}

Expand Down
1 change: 1 addition & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object Row {
/**
* Merge multiple rows into a single row, one after another.
*/
@deprecated("This method is deprecated and will be removed in future versions.", "3.0.0")
def merge(rows: Row*): Row = {
// TODO: Improve the performance of this if used in performance critical part.
new GenericRow(rows.flatMap(_.toSeq).toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,37 @@ object TypeCoercion {
}
}
e.withNewChildren(children)

case udf: ScalaUDF if udf.inputTypes.nonEmpty =>
val children = udf.children.zip(udf.inputTypes).map { case (in, expected) =>
implicitCast(in, udfInputToCastType(in.dataType, expected)).getOrElse(in)
}
udf.withNewChildren(children)
}

private def udfInputToCastType(input: DataType, expectedType: DataType): DataType = {
(input, expectedType) match {
// SPARK-26308: avoid casting to an arbitrary precision and scale for decimals. Please note
// that precision and scale cannot be inferred properly for a ScalaUDF because, when it is
// created, it is not bound to any column. So here the precision and scale of the input
// column is used.
case (in: DecimalType, _: DecimalType) => in
case (ArrayType(dtIn, _), ArrayType(dtExp, nullableExp)) =>
ArrayType(udfInputToCastType(dtIn, dtExp), nullableExp)
case (MapType(keyDtIn, valueDtIn, _), MapType(keyDtExp, valueDtExp, nullableExp)) =>
MapType(udfInputToCastType(keyDtIn, keyDtExp),
udfInputToCastType(valueDtIn, valueDtExp),
nullableExp)
case (StructType(fieldsIn), StructType(fieldsExp)) =>
val fieldTypes =
fieldsIn.map(_.dataType).zip(fieldsExp.map(_.dataType)).map { case (dtIn, dtExp) =>
udfInputToCastType(dtIn, dtExp)
}
StructType(fieldsExp.zip(fieldTypes).map { case (field, newDt) =>
field.copy(dataType = newDt)
})
case (_, other) => other
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class ScalaUDF(
udfName: Option[String] = None,
nullable: Boolean = true,
udfDeterministic: Boolean = true)
extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression {
extends Expression with NonSQLExpression with UserDefinedExpression {

// The constructor for SPARK 2.1 and 2.2
def this(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
RemoveRedundantAliases,
RemoveRedundantProject,
RemoveNoopOperators,
SimplifyExtractValueOps,
CombineConcats) ++
extendedOperatorOptimizationRules
Expand Down Expand Up @@ -177,7 +177,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
RewritePredicateSubquery,
ColumnPruning,
CollapseProject,
RemoveRedundantProject) :+
RemoveNoopOperators) :+
Batch("UpdateAttributeReferences", Once,
UpdateNullabilityInAttributeReferences)
}
Expand Down Expand Up @@ -403,11 +403,15 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
}

/**
* Remove projections from the query plan that do not make any modifications.
* Remove no-op operators from the query plan that do not make any modifications.
*/
object RemoveRedundantProject extends Rule[LogicalPlan] {
object RemoveNoopOperators extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case p @ Project(_, child) if p.output == child.output => child
// Eliminate no-op Projects
case p @ Project(_, child) if child.sameOutput(p) => child

// Eliminate no-op Window
case w: Window if w.windowExpressions.isEmpty => w.child
}
}

Expand Down Expand Up @@ -602,17 +606,12 @@ object ColumnPruning extends Rule[LogicalPlan] {
p.copy(child = w.copy(
windowExpressions = w.windowExpressions.filter(p.references.contains)))

// Eliminate no-op Window
case w: Window if w.windowExpressions.isEmpty => w.child

// Eliminate no-op Projects
case p @ Project(_, child) if child.sameOutput(p) => child

// Can't prune the columns on LeafNode
case p @ Project(_, _: LeafNode) => p

// for all other logical plans that inherits the output from it's children
case p @ Project(_, child) =>
// Project over project is handled by the first case, skip it here.
case p @ Project(_, child) if !child.isInstanceOf[Project] =>
val required = child.references ++ p.references
if (!child.inputSet.subsetOf(required)) {
val newChildren = child.children.map(c => prunedChild(c, required))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.rules.Rule
* Note:
* Before flipping the filter condition of the right node, we should:
* 1. Combine all it's [[Filter]].
* 2. Apply InferFiltersFromConstraints rule (to take into account of NULL values in the condition).
* 2. Update the attribute references to the left node;
* 3. Add a Coalesce(condition, False) (to take into account of NULL values in the condition).
*/
object ReplaceExceptWithFilter extends Rule[LogicalPlan] {

Expand All @@ -47,23 +48,28 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {

plan.transform {
case e @ Except(left, right, false) if isEligible(left, right) =>
val newCondition = transformCondition(left, skipProject(right))
newCondition.map { c =>
Distinct(Filter(Not(c), left))
}.getOrElse {
val filterCondition = combineFilters(skipProject(right)).asInstanceOf[Filter].condition
if (filterCondition.deterministic) {
transformCondition(left, filterCondition).map { c =>
Distinct(Filter(Not(c), left))
}.getOrElse {
e
}
} else {
e
}
}
}

private def transformCondition(left: LogicalPlan, right: LogicalPlan): Option[Expression] = {
val filterCondition =
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition

val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap

if (filterCondition.references.forall(r => attributeNameMap.contains(r.name))) {
Some(filterCondition.transform { case a: AttributeReference => attributeNameMap(a.name) })
private def transformCondition(plan: LogicalPlan, condition: Expression): Option[Expression] = {
val attributeNameMap: Map[String, Attribute] = plan.output.map(x => (x.name, x)).toMap
if (condition.references.forall(r => attributeNameMap.contains(r.name))) {
val rewrittenCondition = condition.transform {
case a: AttributeReference => attributeNameMap(a.name)
}
// We need to consider as False when the condition is NULL, otherwise we do not return those
// rows containing NULL which are instead filtered in the Except right plan
Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral)))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ColumnPruningSuite extends PlanTest {
val batches = Batch("Column pruning", FixedPoint(100),
PushDownPredicate,
ColumnPruning,
RemoveNoopOperators,
CollapseProject) :: Nil
}

Expand Down Expand Up @@ -340,10 +341,8 @@ class ColumnPruningSuite extends PlanTest {
test("Column pruning on Union") {
val input1 = LocalRelation('a.int, 'b.string, 'c.double)
val input2 = LocalRelation('c.int, 'd.string, 'e.double)
val query = Project('b :: Nil,
Union(input1 :: input2 :: Nil)).analyze
val expected = Project('b :: Nil,
Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil)).analyze
val query = Project('b :: Nil, Union(input1 :: input2 :: Nil)).analyze
val expected = Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil).analyze
comparePlans(Optimize.execute(query), expected)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ class CombiningLimitsSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Filter Pushdown", FixedPoint(100),
ColumnPruning) ::
Batch("Column Pruning", FixedPoint(100),
ColumnPruning,
RemoveNoopOperators) ::
Batch("Combine Limit", FixedPoint(10),
CombineLimits) ::
Batch("Constant Folding", FixedPoint(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class JoinOptimizationSuite extends PlanTest {
ReorderJoin,
PushPredicateThroughJoin,
ColumnPruning,
RemoveNoopOperators,
CollapseProject) :: Nil

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper
FixedPoint(50),
PushProjectionThroughUnion,
RemoveRedundantAliases,
RemoveRedundantProject) :: Nil
RemoveNoopOperators) :: Nil
}

test("all expressions in project list are aliased child output") {
Expand Down
Loading

0 comments on commit 2a4ec20

Please sign in to comment.