Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29292][STREAMING][SQL][BUILD] Get streaming, catalyst, sql compiling for Scala 2.13 #29078

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ object ConsumerStrategies {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
}

/**
Expand Down Expand Up @@ -320,7 +320,7 @@ object ConsumerStrategies {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
}

/**
Expand Down Expand Up @@ -404,7 +404,7 @@ object ConsumerStrategies {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
@transient private var kc: Consumer[K, V] = null
def consumer(): Consumer[K, V] = this.synchronized {
if (null == kc) {
kc = consumerStrategy.onStart(currentOffsets.mapValues(l => java.lang.Long.valueOf(l)).asJava)
kc = consumerStrategy.onStart(
currentOffsets.mapValues(l => java.lang.Long.valueOf(l)).toMap.asJava)
}
kc
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,12 @@ public void testConsumerStrategyConstructors() {
JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
final Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp1, 23L);
final Map<TopicPartition, Object> dummyOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, Long> kv : offsets.entrySet()) {
dummyOffsets.put(kv.getKey(), kv.getValue());
}
final scala.collection.Map<TopicPartition, Object> sOffsets =
JavaConverters.mapAsScalaMapConverter(offsets).asScala().mapValues(
new scala.runtime.AbstractFunction1<Long, Object>() {
@Override
public Object apply(Long x) {
return (Object) x;
}
}
);
JavaConverters.mapAsScalaMap(dummyOffsets);

final ConsumerStrategy<String, String> sub1 =
ConsumerStrategies.Subscribe(sTopics, sKafkaParams, sOffsets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ class Analyzer(
val partCols = partitionColumnNames(r.table)
validatePartitionSpec(partCols, i.partitionSpec)

val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get).toMap
val query = addStaticPartitionColumns(r, i.query, staticPartitions)

if (!i.overwrite) {
Expand Down Expand Up @@ -2238,7 +2238,7 @@ class Analyzer(
}
}
if (aggregateExpressions.nonEmpty) {
Some(aggregateExpressions, transformedAggregateFilter)
Some(aggregateExpressions.toSeq, transformedAggregateFilter)
} else {
None
}
Expand Down Expand Up @@ -2677,7 +2677,7 @@ class Analyzer(
val windowOps =
groupedWindowExpressions.foldLeft(child) {
case (last, ((partitionSpec, orderSpec, _), windowExpressions)) =>
Window(windowExpressions, partitionSpec, orderSpec, last)
Window(windowExpressions.toSeq, partitionSpec, orderSpec, last)
}

// Finally, we create a Project to output windowOps's output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ object CTESubstitution extends Rule[LogicalPlan] {
}
// CTE relation is defined as `SubqueryAlias`. Here we skip it and check the child
// directly, so that `startOfQuery` is set correctly.
assertNoNameConflictsInCTE(relation.child, newNames)
assertNoNameConflictsInCTE(relation.child, newNames.toSeq)
newNames += name
}
assertNoNameConflictsInCTE(child, newNames, startOfQuery = false)
assertNoNameConflictsInCTE(child, newNames.toSeq, startOfQuery = false)

case other =>
other.subqueries.foreach(assertNoNameConflictsInCTE(_, outerCTERelationNames))
Expand Down Expand Up @@ -162,9 +162,9 @@ object CTESubstitution extends Rule[LogicalPlan] {
traverseAndSubstituteCTE(relation)
}
// CTE definition can reference a previous one
resolvedCTERelations += (name -> substituteCTE(innerCTEResolved, resolvedCTERelations))
resolvedCTERelations += (name -> substituteCTE(innerCTEResolved, resolvedCTERelations.toSeq))
}
resolvedCTERelations
resolvedCTERelations.toSeq
}

private def substituteCTE(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class EquivalentExpressions {
* an empty collection if there are none.
*/
def getEquivalentExprs(e: Expression): Seq[Expression] = {
equivalenceMap.getOrElse(Expr(e), Seq.empty)
equivalenceMap.getOrElse(Expr(e), Seq.empty).toSeq
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class CodegenContext extends Logging {

// The generated initialization code may exceed 64kb function size limit in JVM if there are too
// many mutable states, so split it into multiple functions.
splitExpressions(expressions = initCodes, funcName = "init", arguments = Nil)
splitExpressions(expressions = initCodes.toSeq, funcName = "init", arguments = Nil)
}

/**
Expand Down Expand Up @@ -927,6 +927,7 @@ class CodegenContext extends Logging {
length += CodeFormatter.stripExtraNewLinesAndComments(code).length
}
blocks += blockBuilder.toString()
blocks.toSeq
}

/**
Expand Down Expand Up @@ -1002,7 +1003,7 @@ class CodegenContext extends Logging {
def subexprFunctionsCode: String = {
// Whole-stage codegen's subexpression elimination is handled in another code path
assert(currentVars == null || subexprFunctions.isEmpty)
splitExpressions(subexprFunctions, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW))
splitExpressions(subexprFunctions.toSeq, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ case class MapObjects private(
case ObjectType(cls) if cls.isArray =>
_.asInstanceOf[Array[_]].toSeq
case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
_.asInstanceOf[java.util.List[_]].asScala
_.asInstanceOf[java.util.List[_]].asScala.toSeq
case ObjectType(cls) if cls == classOf[Object] =>
(inputCollection) => {
if (inputCollection.getClass.isArray) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ package object expressions {
}

private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = {
m.mapValues(_.distinct).map(identity)
m.mapValues(_.distinct).toMap
}

/** Map to use for direct case insensitive attribute lookups. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object SubExprUtils extends PredicateHelper {
e
}
}
outerExpressions
outerExpressions.toSeq
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
val topOutputSet = AttributeSet(output)
while (foundPlans.size < items.length) {
// Build plans for the next level.
foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet, filters)
foundPlans += searchLevel(foundPlans.toSeq, conf, conditions, topOutputSet, filters)
}

val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
// Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate
// keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this
// case we use the first mapping (which should be provided by the first child).
val mapping = AttributeMap(currentNextAttrPairs)
val mapping = AttributeMap(currentNextAttrPairs.toSeq)

// Create a an expression cleaning function for nodes that can actually produce redundant
// aliases, use identity otherwise.
Expand Down Expand Up @@ -934,7 +934,7 @@ object CombineUnions extends Rule[LogicalPlan] {
flattened += child
}
}
Union(flattened)
Union(flattened.toSeq)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ object CombineConcats extends Rule[LogicalPlan] {
flattened += child
}
}
Concat(flattened)
Concat(flattened.toSeq)
}

private def hasNestedConcats(concat: Concat): Boolean = concat.children.exists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,11 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
while (true) {
bottomPart match {
case havingPart @ Filter(_, aggPart: Aggregate) =>
return (topPart, Option(havingPart), aggPart)
return (topPart.toSeq, Option(havingPart), aggPart)

case aggPart: Aggregate =>
// No HAVING clause
return (topPart, None, aggPart)
return (topPart.toSeq, None, aggPart)

case p @ Project(_, child) =>
topPart += p
Expand Down
Loading