Skip to content

Commit

Permalink
[SPARK-29292][STREAMING][SQL][BUILD] Get streaming, catalyst, sql com…
Browse files Browse the repository at this point in the history
…piling for Scala 2.13

### What changes were proposed in this pull request?

Continuation of #28971 which lets streaming, catalyst and sql compile for 2.13. Same idea.

### Why are the changes needed?

Eventually, we need to support a Scala 2.13 build, perhaps in Spark 3.1.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests. (2.13 was not tested; this is about getting it to compile without breaking 2.12)

Closes #29078 from srowen/SPARK-29292.2.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
srowen authored and dongjoon-hyun committed Jul 14, 2020
1 parent cc9371d commit d6a68e0
Show file tree
Hide file tree
Showing 100 changed files with 269 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ object ConsumerStrategies {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
}

/**
Expand Down Expand Up @@ -320,7 +320,7 @@ object ConsumerStrategies {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
}

/**
Expand Down Expand Up @@ -404,7 +404,7 @@ object ConsumerStrategies {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
@transient private var kc: Consumer[K, V] = null
def consumer(): Consumer[K, V] = this.synchronized {
if (null == kc) {
kc = consumerStrategy.onStart(currentOffsets.mapValues(l => java.lang.Long.valueOf(l)).asJava)
kc = consumerStrategy.onStart(
currentOffsets.mapValues(l => java.lang.Long.valueOf(l)).toMap.asJava)
}
kc
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,12 @@ public void testConsumerStrategyConstructors() {
JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
final Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp1, 23L);
final Map<TopicPartition, Object> dummyOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, Long> kv : offsets.entrySet()) {
dummyOffsets.put(kv.getKey(), kv.getValue());
}
final scala.collection.Map<TopicPartition, Object> sOffsets =
JavaConverters.mapAsScalaMapConverter(offsets).asScala().mapValues(
new scala.runtime.AbstractFunction1<Long, Object>() {
@Override
public Object apply(Long x) {
return (Object) x;
}
}
);
JavaConverters.mapAsScalaMap(dummyOffsets);

final ConsumerStrategy<String, String> sub1 =
ConsumerStrategies.Subscribe(sTopics, sKafkaParams, sOffsets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ class Analyzer(
val partCols = partitionColumnNames(r.table)
validatePartitionSpec(partCols, i.partitionSpec)

val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get).toMap
val query = addStaticPartitionColumns(r, i.query, staticPartitions)

if (!i.overwrite) {
Expand Down Expand Up @@ -2238,7 +2238,7 @@ class Analyzer(
}
}
if (aggregateExpressions.nonEmpty) {
Some(aggregateExpressions, transformedAggregateFilter)
Some(aggregateExpressions.toSeq, transformedAggregateFilter)
} else {
None
}
Expand Down Expand Up @@ -2677,7 +2677,7 @@ class Analyzer(
val windowOps =
groupedWindowExpressions.foldLeft(child) {
case (last, ((partitionSpec, orderSpec, _), windowExpressions)) =>
Window(windowExpressions, partitionSpec, orderSpec, last)
Window(windowExpressions.toSeq, partitionSpec, orderSpec, last)
}

// Finally, we create a Project to output windowOps's output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ object CTESubstitution extends Rule[LogicalPlan] {
}
// CTE relation is defined as `SubqueryAlias`. Here we skip it and check the child
// directly, so that `startOfQuery` is set correctly.
assertNoNameConflictsInCTE(relation.child, newNames)
assertNoNameConflictsInCTE(relation.child, newNames.toSeq)
newNames += name
}
assertNoNameConflictsInCTE(child, newNames, startOfQuery = false)
assertNoNameConflictsInCTE(child, newNames.toSeq, startOfQuery = false)

case other =>
other.subqueries.foreach(assertNoNameConflictsInCTE(_, outerCTERelationNames))
Expand Down Expand Up @@ -162,9 +162,9 @@ object CTESubstitution extends Rule[LogicalPlan] {
traverseAndSubstituteCTE(relation)
}
// CTE definition can reference a previous one
resolvedCTERelations += (name -> substituteCTE(innerCTEResolved, resolvedCTERelations))
resolvedCTERelations += (name -> substituteCTE(innerCTEResolved, resolvedCTERelations.toSeq))
}
resolvedCTERelations
resolvedCTERelations.toSeq
}

private def substituteCTE(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class EquivalentExpressions {
* an empty collection if there are none.
*/
def getEquivalentExprs(e: Expression): Seq[Expression] = {
equivalenceMap.getOrElse(Expr(e), Seq.empty)
equivalenceMap.getOrElse(Expr(e), Seq.empty).toSeq
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class CodegenContext extends Logging {

// The generated initialization code may exceed 64kb function size limit in JVM if there are too
// many mutable states, so split it into multiple functions.
splitExpressions(expressions = initCodes, funcName = "init", arguments = Nil)
splitExpressions(expressions = initCodes.toSeq, funcName = "init", arguments = Nil)
}

/**
Expand Down Expand Up @@ -927,6 +927,7 @@ class CodegenContext extends Logging {
length += CodeFormatter.stripExtraNewLinesAndComments(code).length
}
blocks += blockBuilder.toString()
blocks.toSeq
}

/**
Expand Down Expand Up @@ -1002,7 +1003,7 @@ class CodegenContext extends Logging {
def subexprFunctionsCode: String = {
// Whole-stage codegen's subexpression elimination is handled in another code path
assert(currentVars == null || subexprFunctions.isEmpty)
splitExpressions(subexprFunctions, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW))
splitExpressions(subexprFunctions.toSeq, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ case class MapObjects private(
case ObjectType(cls) if cls.isArray =>
_.asInstanceOf[Array[_]].toSeq
case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
_.asInstanceOf[java.util.List[_]].asScala
_.asInstanceOf[java.util.List[_]].asScala.toSeq
case ObjectType(cls) if cls == classOf[Object] =>
(inputCollection) => {
if (inputCollection.getClass.isArray) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ package object expressions {
}

private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = {
m.mapValues(_.distinct).map(identity)
m.mapValues(_.distinct).toMap
}

/** Map to use for direct case insensitive attribute lookups. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object SubExprUtils extends PredicateHelper {
e
}
}
outerExpressions
outerExpressions.toSeq
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
val topOutputSet = AttributeSet(output)
while (foundPlans.size < items.length) {
// Build plans for the next level.
foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet, filters)
foundPlans += searchLevel(foundPlans.toSeq, conf, conditions, topOutputSet, filters)
}

val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
// Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate
// keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this
// case we use the first mapping (which should be provided by the first child).
val mapping = AttributeMap(currentNextAttrPairs)
val mapping = AttributeMap(currentNextAttrPairs.toSeq)

// Create a an expression cleaning function for nodes that can actually produce redundant
// aliases, use identity otherwise.
Expand Down Expand Up @@ -940,7 +940,7 @@ object CombineUnions extends Rule[LogicalPlan] {
flattened += child
}
}
Union(flattened)
Union(flattened.toSeq)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ object CombineConcats extends Rule[LogicalPlan] {
flattened += child
}
}
Concat(flattened)
Concat(flattened.toSeq)
}

private def hasNestedConcats(concat: Concat): Boolean = concat.children.exists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,11 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
while (true) {
bottomPart match {
case havingPart @ Filter(_, aggPart: Aggregate) =>
return (topPart, Option(havingPart), aggPart)
return (topPart.toSeq, Option(havingPart), aggPart)

case aggPart: Aggregate =>
// No HAVING clause
return (topPart, None, aggPart)
return (topPart.toSeq, None, aggPart)

case p @ Project(_, child) =>
topPart += p
Expand Down
Loading

0 comments on commit d6a68e0

Please sign in to comment.