diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 3e32b592b3a3a..ab6550ddf2fb3 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -241,7 +241,7 @@ object ConsumerStrategies { new Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava)) } /** @@ -320,7 +320,7 @@ object ConsumerStrategies { new SubscribePattern[K, V]( pattern, new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava)) } /** @@ -404,7 +404,7 @@ object ConsumerStrategies { new Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava)) } /** diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index a449a8bb7213e..fcdc92580ba35 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -70,7 +70,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( @transient private var kc: Consumer[K, V] = null def consumer(): Consumer[K, V] = this.synchronized { if (null == kc) { - kc = consumerStrategy.onStart(currentOffsets.mapValues(l => java.lang.Long.valueOf(l)).asJava) + kc = consumerStrategy.onStart( + currentOffsets.mapValues(l => java.lang.Long.valueOf(l)).toMap.asJava) } kc } diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java index dc364aca9bd3b..3d6e5ebe978e8 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -48,15 +48,12 @@ public void testConsumerStrategyConstructors() { JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); final Map offsets = new HashMap<>(); offsets.put(tp1, 23L); + final Map dummyOffsets = new HashMap<>(); + for (Map.Entry kv : offsets.entrySet()) { + dummyOffsets.put(kv.getKey(), kv.getValue()); + } final scala.collection.Map sOffsets = - JavaConverters.mapAsScalaMapConverter(offsets).asScala().mapValues( - new scala.runtime.AbstractFunction1() { - @Override - public Object apply(Long x) { - return (Object) x; - } - } - ); + JavaConverters.mapAsScalaMap(dummyOffsets); final ConsumerStrategy sub1 = ConsumerStrategies.Subscribe(sTopics, sKafkaParams, sOffsets); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d669509d18455..023ef2ee17473 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1050,7 +1050,7 @@ class Analyzer( val partCols = partitionColumnNames(r.table) validatePartitionSpec(partCols, i.partitionSpec) - val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get) + val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get).toMap val query = addStaticPartitionColumns(r, i.query, staticPartitions) if (!i.overwrite) { @@ -2238,7 +2238,7 @@ class Analyzer( } } if (aggregateExpressions.nonEmpty) { - Some(aggregateExpressions, transformedAggregateFilter) + Some(aggregateExpressions.toSeq, transformedAggregateFilter) } else { None } @@ -2677,7 +2677,7 @@ class Analyzer( val windowOps = groupedWindowExpressions.foldLeft(child) { case (last, ((partitionSpec, orderSpec, _), windowExpressions)) => - Window(windowExpressions, partitionSpec, orderSpec, last) + Window(windowExpressions.toSeq, partitionSpec, orderSpec, last) } // Finally, we create a Project to output windowOps's output diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 9f0eff5017f38..623cd131bf8da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -72,10 +72,10 @@ object CTESubstitution extends Rule[LogicalPlan] { } // CTE relation is defined as `SubqueryAlias`. Here we skip it and check the child // directly, so that `startOfQuery` is set correctly. - assertNoNameConflictsInCTE(relation.child, newNames) + assertNoNameConflictsInCTE(relation.child, newNames.toSeq) newNames += name } - assertNoNameConflictsInCTE(child, newNames, startOfQuery = false) + assertNoNameConflictsInCTE(child, newNames.toSeq, startOfQuery = false) case other => other.subqueries.foreach(assertNoNameConflictsInCTE(_, outerCTERelationNames)) @@ -162,9 +162,9 @@ object CTESubstitution extends Rule[LogicalPlan] { traverseAndSubstituteCTE(relation) } // CTE definition can reference a previous one - resolvedCTERelations += (name -> substituteCTE(innerCTEResolved, resolvedCTERelations)) + resolvedCTERelations += (name -> substituteCTE(innerCTEResolved, resolvedCTERelations.toSeq)) } - resolvedCTERelations + resolvedCTERelations.toSeq } private def substituteCTE( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala index a32052ce121df..458c48df6d0c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala @@ -106,7 +106,7 @@ class EquivalentExpressions { * an empty collection if there are none. */ def getEquivalentExprs(e: Expression): Seq[Expression] = { - equivalenceMap.getOrElse(Expr(e), Seq.empty) + equivalenceMap.getOrElse(Expr(e), Seq.empty).toSeq } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 817dd948f1a6a..9c20916790c21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -375,7 +375,7 @@ class CodegenContext extends Logging { // The generated initialization code may exceed 64kb function size limit in JVM if there are too // many mutable states, so split it into multiple functions. - splitExpressions(expressions = initCodes, funcName = "init", arguments = Nil) + splitExpressions(expressions = initCodes.toSeq, funcName = "init", arguments = Nil) } /** @@ -927,6 +927,7 @@ class CodegenContext extends Logging { length += CodeFormatter.stripExtraNewLinesAndComments(code).length } blocks += blockBuilder.toString() + blocks.toSeq } /** @@ -1002,7 +1003,7 @@ class CodegenContext extends Logging { def subexprFunctionsCode: String = { // Whole-stage codegen's subexpression elimination is handled in another code path assert(currentVars == null || subexprFunctions.isEmpty) - splitExpressions(subexprFunctions, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW)) + splitExpressions(subexprFunctions.toSeq, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW)) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index ab2f66b1a53e4..361bcd492965b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -741,7 +741,7 @@ case class MapObjects private( case ObjectType(cls) if cls.isArray => _.asInstanceOf[Array[_]].toSeq case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) => - _.asInstanceOf[java.util.List[_]].asScala + _.asInstanceOf[java.util.List[_]].asScala.toSeq case ObjectType(cls) if cls == classOf[Object] => (inputCollection) => { if (inputCollection.getClass.isArray) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index 8bf1f19844556..d950fef3b26a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -124,7 +124,7 @@ package object expressions { } private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { - m.mapValues(_.distinct).map(identity) + m.mapValues(_.distinct).toMap } /** Map to use for direct case insensitive attribute lookups. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala index f46a1c6836fcf..ff8856708c6d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala @@ -186,7 +186,7 @@ object SubExprUtils extends PredicateHelper { e } } - outerExpressions + outerExpressions.toSeq } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index f92d8f5b8e534..c450ea891a612 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -162,7 +162,7 @@ object JoinReorderDP extends PredicateHelper with Logging { val topOutputSet = AttributeSet(output) while (foundPlans.size < items.length) { // Build plans for the next level. - foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet, filters) + foundPlans += searchLevel(foundPlans.toSeq, conf, conditions, topOutputSet, filters) } val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1b141572cc7f9..7d1e46d708d2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -419,7 +419,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { // Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate // keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this // case we use the first mapping (which should be provided by the first child). - val mapping = AttributeMap(currentNextAttrPairs) + val mapping = AttributeMap(currentNextAttrPairs.toSeq) // Create a an expression cleaning function for nodes that can actually produce redundant // aliases, use identity otherwise. @@ -934,7 +934,7 @@ object CombineUnions extends Rule[LogicalPlan] { flattened += child } } - Union(flattened) + Union(flattened.toSeq) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index bd400f86ea2c1..759ce5718ed2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -765,7 +765,7 @@ object CombineConcats extends Rule[LogicalPlan] { flattened += child } } - Concat(flattened) + Concat(flattened.toSeq) } private def hasNestedConcats(concat: Concat): Boolean = concat.children.exists { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 6fdd2110ab12a..7b696912aa465 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -478,11 +478,11 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { while (true) { bottomPart match { case havingPart @ Filter(_, aggPart: Aggregate) => - return (topPart, Option(havingPart), aggPart) + return (topPart.toSeq, Option(havingPart), aggPart) case aggPart: Aggregate => // No HAVING clause - return (topPart, None, aggPart) + return (topPart.toSeq, None, aggPart) case p @ Project(_, child) => topPart += p diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f8261c293782d..29621e11e534c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -141,7 +141,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging s"CTE definition can't have duplicate names: ${duplicates.mkString("'", "', '", "'")}.", ctx) } - With(plan, ctes) + With(plan, ctes.toSeq) } /** @@ -182,7 +182,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (selects.length == 1) { selects.head } else { - Union(selects) + Union(selects.toSeq) } } @@ -229,7 +229,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (inserts.length == 1) { inserts.head } else { - Union(inserts) + Union(inserts.toSeq) } } @@ -389,7 +389,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging assignCtx.assignment().asScala.map { assign => Assignment(UnresolvedAttribute(visitMultipartIdentifier(assign.key)), expression(assign.value)) - } + }.toSeq } override def visitMergeIntoTable(ctx: MergeIntoTableContext): LogicalPlan = withOrigin(ctx) { @@ -444,7 +444,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging throw new ParseException("The number of inserted values cannot match the fields.", clause.notMatchedAction()) } - InsertAction(condition, columns.zip(values).map(kv => Assignment(kv._1, kv._2))) + InsertAction(condition, columns.zip(values).map(kv => Assignment(kv._1, kv._2)).toSeq) } } else { // It should not be here. @@ -473,8 +473,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging aliasedTarget, aliasedSource, mergeCondition, - matchedActions, - notMatchedActions) + matchedActions.toSeq, + notMatchedActions.toSeq) } /** @@ -490,7 +490,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging // Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values // in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for // partition columns will be done in analyzer. - checkDuplicateKeys(parts, ctx) + checkDuplicateKeys(parts.toSeq, ctx) parts.toMap } @@ -530,17 +530,17 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val withOrder = if ( !order.isEmpty && sort.isEmpty && distributeBy.isEmpty && clusterBy.isEmpty) { // ORDER BY ... - Sort(order.asScala.map(visitSortItem), global = true, query) + Sort(order.asScala.map(visitSortItem).toSeq, global = true, query) } else if (order.isEmpty && !sort.isEmpty && distributeBy.isEmpty && clusterBy.isEmpty) { // SORT BY ... - Sort(sort.asScala.map(visitSortItem), global = false, query) + Sort(sort.asScala.map(visitSortItem).toSeq, global = false, query) } else if (order.isEmpty && sort.isEmpty && !distributeBy.isEmpty && clusterBy.isEmpty) { // DISTRIBUTE BY ... withRepartitionByExpression(ctx, expressionList(distributeBy), query) } else if (order.isEmpty && !sort.isEmpty && !distributeBy.isEmpty && clusterBy.isEmpty) { // SORT BY ... DISTRIBUTE BY ... Sort( - sort.asScala.map(visitSortItem), + sort.asScala.map(visitSortItem).toSeq, global = false, withRepartitionByExpression(ctx, expressionList(distributeBy), query)) } else if (order.isEmpty && sort.isEmpty && distributeBy.isEmpty && !clusterBy.isEmpty) { @@ -841,7 +841,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging // Note that mapValues creates a view instead of materialized map. We force materialization by // mapping over identity. - WithWindowDefinition(windowMapView.map(identity), query) + WithWindowDefinition(windowMapView.map(identity).toMap, query) } /** @@ -856,8 +856,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (ctx.GROUPING != null) { // GROUP BY .... GROUPING SETS (...) val selectedGroupByExprs = - ctx.groupingSet.asScala.map(_.expression.asScala.map(e => expression(e))) - GroupingSets(selectedGroupByExprs, groupByExpressions, query, selectExpressions) + ctx.groupingSet.asScala.map(_.expression.asScala.map(e => expression(e)).toSeq) + GroupingSets(selectedGroupByExprs.toSeq, groupByExpressions, query, selectExpressions) } else { // GROUP BY .... (WITH CUBE | WITH ROLLUP)? val mappedGroupByExpressions = if (ctx.CUBE != null) { @@ -878,8 +878,9 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx: HintContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { var plan = query - ctx.hintStatements.asScala.reverse.foreach { case stmt => - plan = UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(expression), plan) + ctx.hintStatements.asScala.reverse.foreach { stmt => + plan = UnresolvedHint(stmt.hintName.getText, + stmt.parameters.asScala.map(expression).toSeq, plan) } plan } @@ -898,10 +899,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } else { CreateStruct( ctx.pivotColumn.identifiers.asScala.map( - identifier => UnresolvedAttribute.quoted(identifier.getText))) + identifier => UnresolvedAttribute.quoted(identifier.getText)).toSeq) } val pivotValues = ctx.pivotValues.asScala.map(visitPivotValue) - Pivot(None, pivotColumn, pivotValues, aggregates, query) + Pivot(None, pivotColumn, pivotValues.toSeq, aggregates, query) } /** @@ -930,7 +931,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging // scalastyle:off caselocale Some(ctx.tblName.getText.toLowerCase), // scalastyle:on caselocale - ctx.colName.asScala.map(_.getText).map(UnresolvedAttribute.apply), + ctx.colName.asScala.map(_.getText).map(UnresolvedAttribute.apply).toSeq, query) } @@ -1081,7 +1082,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } val tvf = UnresolvedTableValuedFunction( - func.funcName.getText, func.expression.asScala.map(expression), aliases) + func.funcName.getText, func.expression.asScala.map(expression).toSeq, aliases) tvf.optionalMap(func.tableAlias.strictIdentifier)(aliasPlan) } @@ -1106,7 +1107,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging Seq.tabulate(rows.head.size)(i => s"col${i + 1}") } - val table = UnresolvedInlineTable(aliases, rows) + val table = UnresolvedInlineTable(aliases, rows.toSeq) table.optionalMap(ctx.tableAlias.strictIdentifier)(aliasPlan) } @@ -1180,7 +1181,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Create a Sequence of Strings for an identifier list. */ override def visitIdentifierSeq(ctx: IdentifierSeqContext): Seq[String] = withOrigin(ctx) { - ctx.ident.asScala.map(_.getText) + ctx.ident.asScala.map(_.getText).toSeq } /* ******************************************************************************************** @@ -1205,10 +1206,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging /** * Create a multi-part identifier. */ - override def visitMultipartIdentifier( - ctx: MultipartIdentifierContext): Seq[String] = withOrigin(ctx) { - ctx.parts.asScala.map(_.getText) - } + override def visitMultipartIdentifier(ctx: MultipartIdentifierContext): Seq[String] = + withOrigin(ctx) { + ctx.parts.asScala.map(_.getText).toSeq + } /* ******************************************************************************************** * Expression parsing @@ -1223,7 +1224,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Create sequence of expressions from the given sequence of contexts. */ private def expressionList(trees: java.util.List[ExpressionContext]): Seq[Expression] = { - trees.asScala.map(expression) + trees.asScala.map(expression).toSeq } /** @@ -1231,7 +1232,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Both un-targeted (global) and targeted aliases are supported. */ override def visitStar(ctx: StarContext): Expression = withOrigin(ctx) { - UnresolvedStar(Option(ctx.qualifiedName()).map(_.identifier.asScala.map(_.getText))) + UnresolvedStar(Option(ctx.qualifiedName()).map(_.identifier.asScala.map(_.getText).toSeq)) } /** @@ -1387,7 +1388,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (expressions.isEmpty) { throw new ParseException("Expected something between '(' and ')'.", ctx) } else { - expressions.asScala.map(expression).map(p => invertIfNotDefined(new Like(e, p))) + expressions.asScala.map(expression).map(p => invertIfNotDefined(new Like(e, p))).toSeq } } @@ -1401,7 +1402,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case SqlBaseParser.IN if ctx.query != null => invertIfNotDefined(InSubquery(getValueExpressions(e), ListQuery(plan(ctx.query)))) case SqlBaseParser.IN => - invertIfNotDefined(In(e, ctx.expression.asScala.map(expression))) + invertIfNotDefined(In(e, ctx.expression.asScala.map(expression).toSeq)) case SqlBaseParser.LIKE => Option(ctx.quantifier).map(_.getType) match { case Some(SqlBaseParser.ANY) | Some(SqlBaseParser.SOME) => @@ -1526,7 +1527,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Create a [[CreateStruct]] expression. */ override def visitStruct(ctx: StructContext): Expression = withOrigin(ctx) { - CreateStruct.create(ctx.argument.asScala.map(expression)) + CreateStruct.create(ctx.argument.asScala.map(expression).toSeq) } /** @@ -1617,7 +1618,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging // Transform COUNT(*) into COUNT(1). Seq(Literal(1)) case expressions => - expressions + expressions.toSeq } val filter = Option(ctx.where).map(expression(_)) val function = UnresolvedFunction( @@ -1639,14 +1640,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * This is used in CREATE FUNCTION, DROP FUNCTION, SHOWFUNCTIONS. */ protected def visitFunctionName(ctx: MultipartIdentifierContext): FunctionIdentifier = { - visitFunctionName(ctx, ctx.parts.asScala.map(_.getText)) + visitFunctionName(ctx, ctx.parts.asScala.map(_.getText).toSeq) } /** * Create a function database (optional) and name pair. */ protected def visitFunctionName(ctx: QualifiedNameContext): FunctionIdentifier = { - visitFunctionName(ctx, ctx.identifier().asScala.map(_.getText)) + visitFunctionName(ctx, ctx.identifier().asScala.map(_.getText).toSeq) } /** @@ -1682,7 +1683,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val function = expression(ctx.expression).transformUp { case a: UnresolvedAttribute => UnresolvedNamedLambdaVariable(a.nameParts) } - LambdaFunction(function, arguments) + LambdaFunction(function, arguments.toSeq) } /** @@ -1714,8 +1715,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } WindowSpecDefinition( - partition, - order, + partition.toSeq, + order.toSeq, frameSpecOption.getOrElse(UnspecifiedFrame)) } @@ -1747,7 +1748,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Create a [[CreateStruct]] expression. */ override def visitRowConstructor(ctx: RowConstructorContext): Expression = withOrigin(ctx) { - CreateStruct(ctx.namedExpression().asScala.map(expression)) + CreateStruct(ctx.namedExpression().asScala.map(expression).toSeq) } /** @@ -1773,7 +1774,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val branches = ctx.whenClause.asScala.map { wCtx => (EqualTo(e, expression(wCtx.condition)), expression(wCtx.result)) } - CaseWhen(branches, Option(ctx.elseExpression).map(expression)) + CaseWhen(branches.toSeq, Option(ctx.elseExpression).map(expression)) } /** @@ -1792,7 +1793,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val branches = ctx.whenClause.asScala.map { wCtx => (expression(wCtx.condition), expression(wCtx.result)) } - CaseWhen(branches, Option(ctx.elseExpression).map(expression)) + CaseWhen(branches.toSeq, Option(ctx.elseExpression).map(expression)) } /** @@ -2245,7 +2246,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Create a [[StructType]] from a number of column definitions. */ override def visitColTypeList(ctx: ColTypeListContext): Seq[StructField] = withOrigin(ctx) { - ctx.colType().asScala.map(visitColType) + ctx.colType().asScala.map(visitColType).toSeq } /** @@ -2286,7 +2287,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging */ override def visitComplexColTypeList( ctx: ComplexColTypeListContext): Seq[StructField] = withOrigin(ctx) { - ctx.complexColType().asScala.map(visitComplexColType) + ctx.complexColType().asScala.map(visitComplexColType).toSeq } /** @@ -2362,7 +2363,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging key -> value } // Check for duplicate property names. - checkDuplicateKeys(properties, ctx) + checkDuplicateKeys(properties.toSeq, ctx) properties.toMap } @@ -2443,7 +2444,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (temporary && ifNotExists) { operationNotAllowed("CREATE TEMPORARY TABLE ... IF NOT EXISTS", ctx) } - val multipartIdentifier = ctx.multipartIdentifier.parts.asScala.map(_.getText) + val multipartIdentifier = ctx.multipartIdentifier.parts.asScala.map(_.getText).toSeq (multipartIdentifier, temporary, ifNotExists, ctx.EXTERNAL != null) } @@ -2452,7 +2453,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging */ override def visitReplaceTableHeader( ctx: ReplaceTableHeaderContext): TableHeader = withOrigin(ctx) { - val multipartIdentifier = ctx.multipartIdentifier.parts.asScala.map(_.getText) + val multipartIdentifier = ctx.multipartIdentifier.parts.asScala.map(_.getText).toSeq (multipartIdentifier, false, false, false) } @@ -2460,7 +2461,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * Parse a qualified name to a multipart name. */ override def visitQualifiedName(ctx: QualifiedNameContext): Seq[String] = withOrigin(ctx) { - ctx.identifier.asScala.map(_.getText) + ctx.identifier.asScala.map(_.getText).toSeq } /** @@ -2498,7 +2499,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging IdentityTransform(FieldReference(typedVisit[Seq[String]](identityCtx.qualifiedName))) case applyCtx: ApplyTransformContext => - val arguments = applyCtx.argument.asScala.map(visitTransformArgument) + val arguments = applyCtx.argument.asScala.map(visitTransformArgument).toSeq applyCtx.identifier.getText match { case "bucket" => @@ -2515,7 +2516,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val fields = arguments.tail.map(arg => getFieldReference(applyCtx, arg)) - BucketTransform(LiteralValue(numBuckets, IntegerType), fields) + BucketTransform(LiteralValue(numBuckets, IntegerType), fields.toSeq) case "years" => YearsTransform(getSingleFieldReference(applyCtx, arguments)) @@ -2532,7 +2533,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case name => ApplyTransform(name, arguments) } - } + }.toSeq } /** @@ -2956,7 +2957,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) { AlterTableAddColumnsStatement( visitMultipartIdentifier(ctx.multipartIdentifier), - ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType]) + ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType]).toSeq ) } @@ -2972,7 +2973,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) { AlterTableRenameColumnStatement( visitMultipartIdentifier(ctx.table), - ctx.from.parts.asScala.map(_.getText), + ctx.from.parts.asScala.map(_.getText).toSeq, ctx.to.getText) } @@ -3084,7 +3085,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging "Column position is not supported in Hive-style REPLACE COLUMNS") } typedVisit[QualifiedColType](colType) - } + }.toSeq ) } @@ -3102,7 +3103,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]]) AlterTableDropColumnsStatement( visitMultipartIdentifier(ctx.multipartIdentifier), - columnsToDrop) + columnsToDrop.toSeq) } /** @@ -3175,7 +3176,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } else { DescribeColumnStatement( visitMultipartIdentifier(ctx.multipartIdentifier()), - ctx.describeColName.nameParts.asScala.map(_.getText), + ctx.describeColName.nameParts.asScala.map(_.getText).toSeq, isExtended) } } else { @@ -3411,7 +3412,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } AlterTableAddPartitionStatement( visitMultipartIdentifier(ctx.multipartIdentifier), - specsAndLocs, + specsAndLocs.toSeq, ctx.EXISTS != null) } @@ -3451,7 +3452,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } AlterTableDropPartitionStatement( visitMultipartIdentifier(ctx.multipartIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec).toSeq, ifExists = ctx.EXISTS != null, purge = ctx.PURGE != null, retainData = false) @@ -3646,7 +3647,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging CreateFunctionStatement( functionIdentifier, string(ctx.className), - resources, + resources.toSeq, ctx.TEMPORARY != null, ctx.EXISTS != null, ctx.REPLACE != null) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala index e1dbef9ebeede..967ccedeeeacb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala @@ -344,7 +344,7 @@ object EstimationUtils { } } } - overlappedRanges + overlappedRanges.toSeq } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala index 19a0d1279cc32..777a4c8291223 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala @@ -323,7 +323,7 @@ case class JoinEstimation(join: Join) extends Logging { outputAttrStats += a -> newColStat } } - outputAttrStats + outputAttrStats.toSeq } private def extractJoinKeysWithColStats( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index c4a106702a515..6cd062da2b94a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -185,7 +185,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def map[A](f: BaseType => A): Seq[A] = { val ret = new collection.mutable.ArrayBuffer[A]() foreach(ret += f(_)) - ret + ret.toSeq } /** @@ -195,7 +195,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def flatMap[A](f: BaseType => TraversableOnce[A]): Seq[A] = { val ret = new collection.mutable.ArrayBuffer[A]() foreach(ret ++= f(_)) - ret + ret.toSeq } /** @@ -206,7 +206,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { val ret = new collection.mutable.ArrayBuffer[B]() val lifted = pf.lift foreach(node => lifted(node).foreach(ret.+=)) - ret + ret.toSeq } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala index 1f88a700847de..711ef265c6cf7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala @@ -26,7 +26,7 @@ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} class GenericArrayData(val array: Array[Any]) extends ArrayData { def this(seq: Seq[Any]) = this(seq.toArray) - def this(list: java.util.List[Any]) = this(list.asScala) + def this(list: java.util.List[Any]) = this(list.asScala.toSeq) // TODO: This is boxing. We should specialize. def this(primitiveArray: Array[Int]) = this(primitiveArray.toSeq) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala index 3a0490d07733d..2797a40614504 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala @@ -223,7 +223,7 @@ class QuantileSummaries( otherIdx += 1 } - val comp = compressImmut(mergedSampled, 2 * mergedRelativeError * mergedCount) + val comp = compressImmut(mergedSampled.toIndexedSeq, 2 * mergedRelativeError * mergedCount) new QuantileSummaries(other.compressThreshold, mergedRelativeError, comp, mergedCount, true) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index bd2c1d5c26299..b14fb04cc4539 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -423,7 +423,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum override def simpleString: String = { - val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}") + val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}").toSeq truncatedString( fieldTypes, "struct<", ",", ">", @@ -542,7 +542,7 @@ object StructType extends AbstractDataType { def apply(fields: java.util.List[StructField]): StructType = { import scala.collection.JavaConverters._ - StructType(fields.asScala) + StructType(fields.asScala.toSeq) } private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType = @@ -606,7 +606,7 @@ object StructType extends AbstractDataType { newFields += f } - StructType(newFields) + StructType(newFields.toSeq) case (DecimalType.Fixed(leftPrecision, leftScale), DecimalType.Fixed(rightPrecision, rightScale)) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala index 003ce850c926e..c3bc67d76138a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala @@ -121,7 +121,7 @@ private[sql] object ArrowUtils { val dt = fromArrowField(child) StructField(child.getName, dt, child.isNullable) } - StructType(fields) + StructType(fields.toSeq) case arrowType => fromArrowType(arrowType) } } @@ -137,7 +137,7 @@ private[sql] object ArrowUtils { StructType(schema.getFields.asScala.map { field => val dt = fromArrowField(field) StructField(field.getName, dt, field.isNullable) - }) + }.toSeq) } /** Return Map with conf settings to be used in ArrowPythonRunner */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala index 6a5bdc4f6fc3d..9fb8b0f351d51 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala @@ -136,7 +136,7 @@ object RandomDataGenerator { } i += 1 } - StructType(fields) + StructType(fields.toSeq) } /** @@ -372,6 +372,6 @@ object RandomDataGenerator { fields += gen() } } - Row.fromSeq(fields) + Row.fromSeq(fields.toSeq) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 5be37318ae6eb..bfa415afeab93 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -127,7 +127,7 @@ class ColumnPruningSuite extends PlanTest { val optimized = Optimize.execute(query) - val aliases = NestedColumnAliasingSuite.collectGeneratedAliases(optimized) + val aliases = NestedColumnAliasingSuite.collectGeneratedAliases(optimized).toSeq val selectedFields = UnresolvedAttribute("a") +: aliasedExprs(aliases) val finalSelectedExprs = Seq(UnresolvedAttribute("a"), $"${aliases(0)}".as("c.d")) ++ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ComplexDataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ComplexDataSuite.scala index 229e32479082c..f921f06537080 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ComplexDataSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ComplexDataSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.util -import scala.collection._ - import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{BoundReference, GenericInternalRow, SpecificInternalRow, UnsafeMapData, UnsafeProjection} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala index 51286986b835c..79c06cf8313b8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala @@ -288,14 +288,14 @@ class TimestampFormatterSuite extends DatetimeFormatterSuite { withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> zoneId.getId) { withDefaultTimeZone(zoneId) { withClue(s"zoneId = ${zoneId.getId}") { - val formatters = LegacyDateFormats.values.map { legacyFormat => + val formatters = LegacyDateFormats.values.toSeq.map { legacyFormat => TimestampFormatter( TimestampFormatter.defaultPattern, zoneId, TimestampFormatter.defaultLocale, legacyFormat, isParsing = false) - }.toSeq :+ TimestampFormatter.getFractionFormatter(zoneId) + } :+ TimestampFormatter.getFractionFormatter(zoneId) formatters.foreach { formatter => assert(microsToInstant(formatter.parse("1000-01-01 01:02:03")) .atZone(zoneId) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala index 1a262d646ca10..9fa016146bbd3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala @@ -502,6 +502,6 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { DataType.canWrite(writeType, readType, byName, analysis.caseSensitiveResolution, name, storeAssignmentPolicy, errMsg => errs += errMsg) === false, desc) assert(errs.size === numErrs, s"Should produce $numErrs error messages") - checkErrors(errs) + checkErrors(errs.toSeq) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index c37d8eaa294bf..611c03e7b208e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -479,7 +479,7 @@ class RelationalGroupedDataset protected[sql]( * @since 2.4.0 */ def pivot(pivotColumn: Column, values: java.util.List[Any]): RelationalGroupedDataset = { - pivot(pivotColumn, values.asScala) + pivot(pivotColumn, values.asScala.toSeq) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 9278eeeefe608..08b0a1c6a60a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -372,7 +372,7 @@ class SparkSession private( */ @DeveloperApi def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = withActive { - Dataset.ofRows(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala)) + Dataset.ofRows(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala.toSeq)) } /** @@ -495,7 +495,7 @@ class SparkSession private( * @since 2.0.0 */ def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T] = { - createDataset(data.asScala) + createDataset(data.asScala.toSeq) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala index 1c2bf9e7c2a57..ff706b5061f0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala @@ -103,7 +103,7 @@ class SparkSessionExtensions { * Build the override rules for columnar execution. */ private[sql] def buildColumnarRules(session: SparkSession): Seq[ColumnarRule] = { - columnarRuleBuilders.map(_.apply(session)) + columnarRuleBuilders.map(_.apply(session)).toSeq } /** @@ -119,7 +119,7 @@ class SparkSessionExtensions { * Build the analyzer resolution `Rule`s using the given [[SparkSession]]. */ private[sql] def buildResolutionRules(session: SparkSession): Seq[Rule[LogicalPlan]] = { - resolutionRuleBuilders.map(_.apply(session)) + resolutionRuleBuilders.map(_.apply(session)).toSeq } /** @@ -136,7 +136,7 @@ class SparkSessionExtensions { * Build the analyzer post-hoc resolution `Rule`s using the given [[SparkSession]]. */ private[sql] def buildPostHocResolutionRules(session: SparkSession): Seq[Rule[LogicalPlan]] = { - postHocResolutionRuleBuilders.map(_.apply(session)) + postHocResolutionRuleBuilders.map(_.apply(session)).toSeq } /** @@ -153,7 +153,7 @@ class SparkSessionExtensions { * Build the check analysis `Rule`s using the given [[SparkSession]]. */ private[sql] def buildCheckRules(session: SparkSession): Seq[LogicalPlan => Unit] = { - checkRuleBuilders.map(_.apply(session)) + checkRuleBuilders.map(_.apply(session)).toSeq } /** @@ -168,7 +168,7 @@ class SparkSessionExtensions { private[this] val optimizerRules = mutable.Buffer.empty[RuleBuilder] private[sql] def buildOptimizerRules(session: SparkSession): Seq[Rule[LogicalPlan]] = { - optimizerRules.map(_.apply(session)) + optimizerRules.map(_.apply(session)).toSeq } /** @@ -184,7 +184,7 @@ class SparkSessionExtensions { private[this] val plannerStrategyBuilders = mutable.Buffer.empty[StrategyBuilder] private[sql] def buildPlannerStrategies(session: SparkSession): Seq[Strategy] = { - plannerStrategyBuilders.map(_.apply(session)) + plannerStrategyBuilders.map(_.apply(session)).toSeq } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala index 9807b5dbe9348..94e159c562e31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala @@ -257,16 +257,16 @@ object AggregatingAccumulator { imperative }) - val updateAttrSeq: AttributeSeq = aggBufferAttributes ++ inputAttributes - val mergeAttrSeq: AttributeSeq = aggBufferAttributes ++ inputAggBufferAttributes - val aggBufferAttributesSeq: AttributeSeq = aggBufferAttributes + val updateAttrSeq: AttributeSeq = (aggBufferAttributes ++ inputAttributes).toSeq + val mergeAttrSeq: AttributeSeq = (aggBufferAttributes ++ inputAggBufferAttributes).toSeq + val aggBufferAttributesSeq: AttributeSeq = aggBufferAttributes.toSeq // Create the accumulator. new AggregatingAccumulator( - aggBufferAttributes.map(_.dataType), - initialValues, - updateExpressions.map(BindReferences.bindReference(_, updateAttrSeq)), - mergeExpressions.map(BindReferences.bindReference(_, mergeAttrSeq)), + aggBufferAttributes.map(_.dataType).toSeq, + initialValues.toSeq, + updateExpressions.map(BindReferences.bindReference(_, updateAttrSeq)).toSeq, + mergeExpressions.map(BindReferences.bindReference(_, mergeAttrSeq)).toSeq, resultExpressions.map(BindReferences.bindReference(_, aggBufferAttributesSeq)), imperatives.toArray, typedImperatives.toArray, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 3a2c673229c20..363282ea95997 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -229,14 +229,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { */ override def visitNestedConstantList( ctx: NestedConstantListContext): Seq[Seq[String]] = withOrigin(ctx) { - ctx.constantList.asScala.map(visitConstantList) + ctx.constantList.asScala.map(visitConstantList).toSeq } /** * Convert a constants list into a String sequence. */ override def visitConstantList(ctx: ConstantListContext): Seq[String] = withOrigin(ctx) { - ctx.constant.asScala.map(visitStringConstant) + ctx.constant.asScala.map(visitStringConstant).toSeq } /** @@ -355,7 +355,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { // Storage format val defaultStorage = HiveSerDe.getDefaultStorage(conf) - validateRowFormatFileFormat(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx) + validateRowFormatFileFormat( + ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx) val fileStorage = ctx.createFileFormat.asScala.headOption.map(visitCreateFileFormat) .getOrElse(CatalogStorageFormat.empty) val rowStorage = ctx.rowFormat.asScala.headOption.map(visitRowFormat) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 0244542054611..558d990e8c4bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -263,7 +263,7 @@ trait CodegenSupport extends SparkPlan { paramVars += ExprCode(paramIsNull, JavaCode.variable(paramName, attributes(i).dataType)) } - (arguments, parameters, paramVars) + (arguments.toSeq, parameters.toSeq, paramVars.toSeq) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index bc924e6978ddc..112090640040a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -196,7 +196,7 @@ case class AdaptiveSparkPlanExec( // In case of errors, we cancel all running stages and throw exception. if (errors.nonEmpty) { - cleanUpAndThrowException(errors, None) + cleanUpAndThrowException(errors.toSeq, None) } // Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala index 3cf6a13a4a892..8d7a2c95081c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanHelper.scala @@ -65,7 +65,7 @@ trait AdaptiveSparkPlanHelper { def mapPlans[A](p: SparkPlan)(f: SparkPlan => A): Seq[A] = { val ret = new collection.mutable.ArrayBuffer[A]() foreach(p)(ret += f(_)) - ret + ret.toSeq } /** @@ -75,7 +75,7 @@ trait AdaptiveSparkPlanHelper { def flatMap[A](p: SparkPlan)(f: SparkPlan => TraversableOnce[A]): Seq[A] = { val ret = new collection.mutable.ArrayBuffer[A]() foreach(p)(ret ++= f(_)) - ret + ret.toSeq } /** @@ -86,7 +86,7 @@ trait AdaptiveSparkPlanHelper { val ret = new collection.mutable.ArrayBuffer[B]() val lifted = pf.lift foreach(p)(node => lifted(node).foreach(ret.+=)) - ret + ret.toSeq } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index b2633c774f532..af18ee065aa86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -146,7 +146,7 @@ case class CustomShuffleReaderExec private( partitionDataSizeMetrics.set(dataSizes.sum) } - SQLMetrics.postDriverMetricsUpdatedByValue(sparkContext, executionId, driverAccumUpdates) + SQLMetrics.postDriverMetricsUpdatedByValue(sparkContext, executionId, driverAccumUpdates.toSeq) } @transient override lazy val metrics: Map[String, SQLMetric] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 396c9c9d6b4e5..627f0600f2383 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -241,8 +241,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight") if (numSkewedLeft > 0 || numSkewedRight > 0) { - val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions) - val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions) + val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq) + val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq) smj.copy( left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index d6e44b780d772..83fdafbadcb60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -121,7 +121,7 @@ object ShufflePartitionsUtil extends Logging { i += 1 } createPartitionSpec() - partitionSpecs + partitionSpecs.toSeq } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index c8fa07941af87..cf9f3ddeb42a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -123,7 +123,7 @@ case class CachedRDDBuilder( rowCountStats.add(rowCount) val stats = InternalRow.fromSeq( - columnBuilders.flatMap(_.columnStats.collectedStatistics)) + columnBuilders.flatMap(_.columnStats.collectedStatistics).toSeq) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 33b29bde93ee5..fc62dce5002b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -69,7 +69,7 @@ case class AnalyzePartitionCommand( if (filteredSpec.isEmpty) { None } else { - Some(filteredSpec) + Some(filteredSpec.toMap) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 47b213fc2d83b..d550fe270c753 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -650,7 +650,7 @@ case class AlterTableRecoverPartitionsCommand( val pathFilter = getPathFilter(hadoopConf) val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8) - val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] = + val partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)] = try { scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold, spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq @@ -697,7 +697,7 @@ case class AlterTableRecoverPartitionsCommand( // parallelize the list of partitions here, then we can have better parallelism later. val parArray = new ParVector(statuses.toVector) parArray.tasksupport = evalTaskSupport - parArray + parArray.seq } else { statuses } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index fc8cc11bb1067..7aebdddf1d59c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -657,7 +657,7 @@ case class DescribeTableCommand( } } - result + result.toSeq } private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { @@ -740,7 +740,7 @@ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan) val result = new ArrayBuffer[Row] val queryExecution = sparkSession.sessionState.executePlan(plan) describeSchema(queryExecution.analyzed.schema, result, header = false) - result + result.toSeq } } @@ -815,7 +815,7 @@ case class DescribeColumnCommand( } yield histogramDescription(hist) buffer ++= histDesc.getOrElse(Seq(Row("histogram", "NULL"))) } - buffer + buffer.toSeq } private def histogramDescription(histogram: Histogram): Seq[Row] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 07d7c4e97a095..db564485be883 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -811,7 +811,7 @@ object DataSource extends Logging { val path = CaseInsensitiveMap(options).get("path") val optionsWithoutPath = options.filterKeys(_.toLowerCase(Locale.ROOT) != "path") CatalogStorageFormat.empty.copy( - locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath) + locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath.toMap) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala index 095940772ae78..864130bbd87b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala @@ -80,7 +80,7 @@ object FilePartition extends Logging { currentFiles += file } closePartition() - partitions + partitions.toSeq } def maxSplitBytes( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index 57082b40e1132..b5e276bd421a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -46,7 +46,7 @@ class HadoopFileLinesReader( def this(file: PartitionedFile, conf: Configuration) = this(file, None, conf) - private val iterator = { + private val _iterator = { val fileSplit = new FileSplit( new Path(new URI(file.filePath)), file.start, @@ -66,9 +66,9 @@ class HadoopFileLinesReader( new RecordReaderIterator(reader) } - override def hasNext: Boolean = iterator.hasNext + override def hasNext: Boolean = _iterator.hasNext - override def next(): Text = iterator.next() + override def next(): Text = _iterator.next() - override def close(): Unit = iterator.close() + override def close(): Unit = _iterator.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala index 0e6d803f02d4d..a48001f04a9bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala @@ -35,7 +35,7 @@ import org.apache.spark.input.WholeTextFileRecordReader */ class HadoopFileWholeTextReader(file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { - private val iterator = { + private val _iterator = { val fileSplit = new CombineFileSplit( Array(new Path(new URI(file.filePath))), Array(file.start), @@ -50,9 +50,9 @@ class HadoopFileWholeTextReader(file: PartitionedFile, conf: Configuration) new RecordReaderIterator(reader) } - override def hasNext: Boolean = iterator.hasNext + override def hasNext: Boolean = _iterator.hasNext - override def next(): Text = iterator.next() + override def next(): Text = _iterator.next() - override def close(): Unit = iterator.close() + override def close(): Unit = _iterator.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 84160f35540df..a488ed16a835a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -133,7 +133,7 @@ class InMemoryFileIndex( } val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( - pathsToFetch, hadoopConf, filter, sparkSession, areRootPaths = true) + pathsToFetch.toSeq, hadoopConf, filter, sparkSession, areRootPaths = true) discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 5846d46e146ed..4087efc486a4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -273,7 +273,7 @@ object PartitioningUtils { (None, Some(path)) } else { val (columnNames, values) = columns.reverse.unzip - (Some(PartitionValues(columnNames, values)), Some(currentPath)) + (Some(PartitionValues(columnNames.toSeq, values.toSeq)), Some(currentPath)) } } @@ -420,7 +420,7 @@ object PartitioningUtils { val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = - seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }) + seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }).toMap val partColNamesToPaths = groupByKey(pathWithPartitionValues.map { case (path, partValues) => partValues.columnNames -> path diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 39bbc60200b86..73910c3943e9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -68,14 +68,14 @@ class ParquetFilters( // When g is a `Map`, `g.getOriginalType` is `MAP`. // When g is a `List`, `g.getOriginalType` is `LIST`. case g: GroupType if g.getOriginalType == null => - getPrimitiveFields(g.getFields.asScala, parentFieldNames :+ g.getName) + getPrimitiveFields(g.getFields.asScala.toSeq, parentFieldNames :+ g.getName) // Parquet only supports push-down for primitive types; as a result, Map and List types // are removed. case _ => None } } - val primitiveFields = getPrimitiveFields(schema.getFields.asScala).map { field => + val primitiveFields = getPrimitiveFields(schema.getFields.asScala.toSeq).map { field => import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper (field.fieldNames.toSeq.quoted, field) } @@ -90,7 +90,7 @@ class ParquetFilters( .groupBy(_._1.toLowerCase(Locale.ROOT)) .filter(_._2.size == 1) .mapValues(_.head._2) - CaseInsensitiveMap(dedupPrimitiveFields) + CaseInsensitiveMap(dedupPrimitiveFields.toMap) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 8ce8a86d2f026..2eb205db8ccdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -79,7 +79,7 @@ class ParquetToSparkSchemaConverter( } } - StructType(fields) + StructType(fields.toSeq) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala index b4a14c6face31..e273abf90e3bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala @@ -55,7 +55,7 @@ case class DescribeNamespaceExec( rows += toCatalystRow("Properties", properties.toSeq.mkString("(", ",", ")")) } } - rows + rows.toSeq } private def toCatalystRow(strs: String*): InternalRow = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index bc6bb175f979e..81b1c81499c74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -43,7 +43,7 @@ case class DescribeTableExec( if (isExtended) { addTableDetails(rows) } - rows + rows.toSeq } private def addTableDetails(rows: ArrayBuffer[InternalRow]): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala index 1a6f03f54f2e9..7f6ae20d5cd0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -63,7 +63,7 @@ object PushDownUtils extends PredicateHelper { val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter => DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) } - (r.pushedFilters(), untranslatableExprs ++ postScanFilters) + (r.pushedFilters(), (untranslatableExprs ++ postScanFilters).toSeq) case _ => (Nil, filters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala index 9188f4eb60d56..ceeed0f840700 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala @@ -52,6 +52,6 @@ case class ShowNamespacesExec( } } - rows + rows.toSeq } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala index 820f5ae8f1b12..5ba01deae9513 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala @@ -49,6 +49,6 @@ case class ShowTablesExec( } } - rows + rows.toSeq } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 2ed33b867183b..df3f231f7d0ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -288,7 +288,7 @@ private[sql] object V2SessionCatalog { s"SessionCatalog does not support partition transform: $transform") } - (identityCols, bucketSpec) + (identityCols.toSeq, bucketSpec) } private def toCatalogDatabase( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 3242ac21ab324..186bac6f43332 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -162,7 +162,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { return (leftKeys, rightKeys) } } - (leftKeysBuffer, rightKeysBuffer) + (leftKeysBuffer.toSeq, rightKeysBuffer.toSeq) } private def reorderJoinKeys( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index fcbd0b19515b1..dadf1129c34b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -103,11 +103,11 @@ case class AggregateInPandasExec( // Schema of input rows to the python runner val aggInputSchema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => StructField(s"_$i", dt) - }) + }.toSeq) // Map grouped rows to ArrowPythonRunner results, Only execute if partition is not empty inputRDD.mapPartitionsInternal { iter => if (iter.isEmpty) iter else { - val prunedProj = UnsafeProjection.create(allInputs, child.output) + val prunedProj = UnsafeProjection.create(allInputs.toSeq, child.output) val grouped = if (groupingExpressions.isEmpty) { // Use an empty unsafe row as a place holder for the grouping key diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 96e3bb721a822..298d63478b63e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -114,10 +114,10 @@ trait EvalPythonExec extends UnaryExecNode { } }.toArray }.toArray - val projection = MutableProjection.create(allInputs, child.output) + val projection = MutableProjection.create(allInputs.toSeq, child.output) val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => StructField(s"_$i", dt) - }) + }.toSeq) // Add rows to queue to join later with the result. val projectedRowIter = iter.map { inputRow => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 7bc8b95cfb03b..1c88056cb50c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -72,7 +72,7 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { } } // There is no Python UDF over aggregate expression - Project(projList, agg.copy(aggregateExpressions = aggExpr)) + Project(projList.toSeq, agg.copy(aggregateExpressions = aggExpr.toSeq)) } def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { @@ -134,9 +134,9 @@ object ExtractGroupingPythonUDFFromAggregate extends Rule[LogicalPlan] { }.asInstanceOf[NamedExpression] } agg.copy( - groupingExpressions = groupingExpr, + groupingExpressions = groupingExpr.toSeq, aggregateExpressions = aggExpr, - child = Project(projList ++ agg.child.output, agg.child)) + child = Project((projList ++ agg.child.output).toSeq, agg.child)) } def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PandasGroupUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PandasGroupUtils.scala index 68ce991a8ae7f..2da0000dad4ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PandasGroupUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PandasGroupUtils.scala @@ -118,6 +118,6 @@ private[python] object PandasGroupUtils { // Attributes after deduplication val dedupAttributes = nonDupGroupingAttributes ++ dataAttributes - (dedupAttributes, argOffsets) + (dedupAttributes.toSeq, argOffsets) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index e8ae0eaf0ea48..29537cc0e573f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -347,7 +347,7 @@ object CompactibleFileStreamLog { } else if (defaultInterval < (latestCompactBatchId + 1) / 2) { // Find the first divisor >= default compact interval def properDivisors(min: Int, n: Int) = - (min to n/2).view.filter(i => n % i == 0) :+ n + (min to n/2).view.filter(i => n % i == 0).toSeq :+ n properDivisors(defaultInterval, latestCompactBatchId + 1).head } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousRecordEndpoint.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousRecordEndpoint.scala index 985a5fa6063ef..11bdfee460e66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousRecordEndpoint.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousRecordEndpoint.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.streaming +import scala.collection.mutable + import org.apache.spark.SparkEnv import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -33,7 +35,7 @@ case class GetRecord(offset: ContinuousRecordPartitionOffset) * to the number of partitions. * @param lock a lock object for locking the buckets for read */ -class ContinuousRecordEndpoint(buckets: Seq[Seq[UnsafeRow]], lock: Object) +class ContinuousRecordEndpoint(buckets: Seq[mutable.Seq[UnsafeRow]], lock: Object) extends ThreadSafeRpcEndpoint { private var startOffsets: Seq[Int] = List.fill(buckets.size)(0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index e8ce8e1487093..f2557696485b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -482,7 +482,7 @@ object FileStreamSource { } private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { - val filters = new scala.collection.mutable.MutableList[GlobFilter]() + val filters = new scala.collection.mutable.ArrayBuffer[GlobFilter]() var currentPath = sourcePath while (!currentPath.isRoot) { @@ -490,7 +490,7 @@ object FileStreamSource { currentPath = currentPath.getParent } - filters.toList + filters.toSeq } override protected def cleanTask(entry: FileEntry): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index f6cc8116c6c4c..de8a8cd7d3b58 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -139,7 +139,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String) if (addedFiles.nonEmpty) { val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration) val statuses: Seq[SinkFileStatus] = - addedFiles.map(f => SinkFileStatus(fs.getFileStatus(new Path(f)))) + addedFiles.map(f => SinkFileStatus(fs.getFileStatus(new Path(f)))).toSeq new TaskCommitMessage(statuses) } else { new TaskCommitMessage(Seq.empty[SinkFileStatus]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 2c737206dd2d9..fe3f0e95b383c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -127,8 +127,8 @@ trait ProgressReporter extends Logging { * `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded. */ protected def recordTriggerOffsets(from: StreamProgress, to: StreamProgress): Unit = { - currentTriggerStartOffsets = from.mapValues(_.json) - currentTriggerEndOffsets = to.mapValues(_.json) + currentTriggerStartOffsets = from.mapValues(_.json).toMap + currentTriggerEndOffsets = to.mapValues(_.json).toMap } private def updateProgress(newProgress: StreamingQueryProgress): Unit = { @@ -192,7 +192,8 @@ trait ProgressReporter extends Logging { timestamp = formatTimestamp(currentTriggerStartTimestamp), batchId = currentBatchId, batchDuration = processingTimeMills, - durationMs = new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).asJava), + durationMs = + new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).toMap.asJava), eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava), stateOperators = executionStats.stateOperators.toArray, sources = sourceProgress.toArray, @@ -255,14 +256,14 @@ trait ProgressReporter extends Logging { "avg" -> stats.avg.toLong).mapValues(formatTimestamp) }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp - ExecutionStats(numInputRows, stateOperators, eventTimeStats) + ExecutionStats(numInputRows, stateOperators, eventTimeStats.toMap) } /** Extract number of input sources for each streaming source in plan */ private def extractSourceToNumInputRows(): Map[SparkDataStream, Long] = { def sumRows(tuples: Seq[(SparkDataStream, Long)]): Map[SparkDataStream, Long] = { - tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source + tuples.groupBy(_._1).mapValues(_.map(_._2).sum).toMap // sum up rows for each source } val onlyDataSourceV2Sources = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index e5b9e68d71026..9adb9af7318d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -215,7 +215,7 @@ case class MemoryStream[A : Encoder]( batches.slice(sliceStart, sliceEnd) } - logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal)) + logDebug(generateDebugString(newBlocks.flatten.toSeq, startOrdinal, endOrdinal)) numPartitions match { case Some(numParts) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala index 03ebbb9f1b376..24ff9c2e8384d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala @@ -80,7 +80,7 @@ class MemorySink extends Table with SupportsWrite with Logging { /** Returns all rows that are stored in this [[Sink]]. */ def allData: Seq[Row] = synchronized { - batches.flatMap(_.data) + batches.flatMap(_.data).toSeq } def latestBatchId: Option[Long] = synchronized { @@ -92,7 +92,7 @@ class MemorySink extends Table with SupportsWrite with Logging { } def dataSinceBatch(sinceBatchId: Long): Seq[Row] = synchronized { - batches.filter(_.batchId > sinceBatchId).flatMap(_.data) + batches.filter(_.batchId > sinceBatchId).flatMap(_.data).toSeq } def toDebugString: String = synchronized { @@ -183,7 +183,7 @@ class MemoryDataWriter(partition: Int, schema: StructType) } override def commit(): MemoryWriterCommitMessage = { - val msg = MemoryWriterCommitMessage(partition, data.clone()) + val msg = MemoryWriterCommitMessage(partition, data.clone().toSeq) data.clear() msg } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index a9c01e69b9b13..497b13793a67b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -97,7 +97,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => .map(entry => entry._1 -> longMetric(entry._1).value) val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] = - new java.util.HashMap(customMetrics.mapValues(long2Long).asJava) + new java.util.HashMap(customMetrics.mapValues(long2Long).toMap.asJava) new StateOperatorProgress( numRowsTotal = longMetric("numTotalStateRows").value, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 33539c01ee5dd..ff229c2bea7ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -57,7 +57,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L if (running.nonEmpty) { val runningPageTable = - executionsTable(request, "running", running, currentTime, true, true, true) + executionsTable(request, "running", running.toSeq, currentTime, true, true, true) _content ++= val rng = new org.apache.spark.util.random.XORShiftRandom(7 + index) data.filter(_.getInt(0) < rng.nextDouble() * 10) - } - } + }.toSeq val union = df1.union(df2) checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index 124b58483d24f..2be86b9ad6208 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -223,16 +223,6 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSparkSession { checkDataset(Seq(Queue(true)).toDS(), Queue(true)) checkDataset(Seq(Queue("test")).toDS(), Queue("test")) checkDataset(Seq(Queue(Tuple1(1))).toDS(), Queue(Tuple1(1))) - - checkDataset(Seq(ArrayBuffer(1)).toDS(), ArrayBuffer(1)) - checkDataset(Seq(ArrayBuffer(1.toLong)).toDS(), ArrayBuffer(1.toLong)) - checkDataset(Seq(ArrayBuffer(1.toDouble)).toDS(), ArrayBuffer(1.toDouble)) - checkDataset(Seq(ArrayBuffer(1.toFloat)).toDS(), ArrayBuffer(1.toFloat)) - checkDataset(Seq(ArrayBuffer(1.toByte)).toDS(), ArrayBuffer(1.toByte)) - checkDataset(Seq(ArrayBuffer(1.toShort)).toDS(), ArrayBuffer(1.toShort)) - checkDataset(Seq(ArrayBuffer(true)).toDS(), ArrayBuffer(true)) - checkDataset(Seq(ArrayBuffer("test")).toDS(), ArrayBuffer("test")) - checkDataset(Seq(ArrayBuffer(Tuple1(1))).toDS(), ArrayBuffer(Tuple1(1))) } test("sequence and product combinations") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index fe6775cc7f9b9..f24da6df67ca0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -712,7 +712,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |ON | big.key = small.a """.stripMargin), - expected + expected.toSeq ) } @@ -729,7 +729,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |ON | big.key = small.a """.stripMargin), - expected + expected.toSeq ) } } @@ -770,7 +770,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |ON | big.key = small.a """.stripMargin), - expected + expected.toSeq ) } @@ -787,7 +787,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |ON | big.key = small.a """.stripMargin), - expected + expected.toSeq ) } @@ -806,7 +806,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |ON | big.key = small.a """.stripMargin), - expected + expected.toSeq ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index e52d2262a6bf8..8469216901b05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -418,7 +418,7 @@ object QueryTest extends Assertions { } def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): Unit = { - getErrorMessageInCheckAnswer(df, expectedAnswer.asScala) match { + getErrorMessageInCheckAnswer(df, expectedAnswer.asScala.toSeq) match { case Some(errorMessage) => Assert.fail(errorMessage) case None => } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala index 4e85f739b95a2..1106a787cc9a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala @@ -238,7 +238,7 @@ abstract class ShowCreateTableSuite extends QueryTest with SQLTestUtils { table.copy( createTime = 0L, lastAccessTime = 0L, - properties = table.properties.filterKeys(!nondeterministicProps.contains(_)), + properties = table.properties.filterKeys(!nondeterministicProps.contains(_)).toMap, stats = None, ignoredProperties = Map.empty ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 347bc735a8b76..2bb9aa55e4579 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -992,7 +992,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark subqueryExpressions ++= (getSubqueryExpressions(s.plan) :+ s) s } - subqueryExpressions + subqueryExpressions.toSeq } private def getNumSorts(plan: LogicalPlan): Int = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala index 96ad453aeb2d7..a9696e6718de8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/IntervalBenchmark.scala @@ -109,11 +109,11 @@ object IntervalBenchmark extends SqlBasedBenchmark { // The first 2 cases are used to show the overhead of preparing the interval string. addCase(benchmark, cardinality, "prepare string w/ interval", buildString(true, timeUnits)) addCase(benchmark, cardinality, "prepare string w/o interval", buildString(false, timeUnits)) - addCase(benchmark, cardinality, intervalToTest) // Only years + addCase(benchmark, cardinality, intervalToTest.toSeq) // Only years for (unit <- timeUnits) { intervalToTest.append(unit) - addCase(benchmark, cardinality, intervalToTest) + addCase(benchmark, cardinality, intervalToTest.toSeq) } benchmark.run() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 8bbf81efff316..ce726046c3215 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -220,9 +220,9 @@ trait SQLMetricsTestUtils extends SQLTestUtils { (nodeName, nodeMetrics.mapValues(expectedMetricValue => (actualMetricValue: Any) => { actualMetricValue.toString.matches(expectedMetricValue.toString) - })) + }).toMap) } - testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates, + testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates.toMap, enableWholeStage) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala index e87bd11f0dca5..0fe339b93047a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala @@ -333,6 +333,6 @@ class TestForeachWriter extends ForeachWriter[Int] { override def close(errorOrNull: Throwable): Unit = { events += ForeachWriterSuite.Close(error = Option(errorOrNull)) - ForeachWriterSuite.addEvents(events) + ForeachWriterSuite.addEvents(events.toSeq) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 4d5cd109b7c24..b033761498ea7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -209,21 +209,24 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) - checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(statusStore.executionMetrics(executionId), + accumulatorUpdates.mapValues(_ * 2).toMap) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) - checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(statusStore.executionMetrics(executionId), + accumulatorUpdates.mapValues(_ * 2).toMap) listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), - (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2).toMap)) ))) - checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3)) + checkAnswer(statusStore.executionMetrics(executionId), + accumulatorUpdates.mapValues(_ * 3).toMap) // Retrying a stage should reset the metrics listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) @@ -236,7 +239,8 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) - checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(statusStore.executionMetrics(executionId), + accumulatorUpdates.mapValues(_ * 2).toMap) // Ignore the task end for the first attempt listener.onTaskEnd(SparkListenerTaskEnd( @@ -244,11 +248,12 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100).toMap), new ExecutorMetrics, null)) - checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(statusStore.executionMetrics(executionId), + accumulatorUpdates.mapValues(_ * 2).toMap) // Finish two tasks listener.onTaskEnd(SparkListenerTaskEnd( @@ -256,7 +261,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils stageAttemptId = 1, taskType = "", reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)), + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2).toMap), new ExecutorMetrics, null)) listener.onTaskEnd(SparkListenerTaskEnd( @@ -264,11 +269,12 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils stageAttemptId = 1, taskType = "", reason = null, - createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), + createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap), new ExecutorMetrics, null)) - checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 5)) + checkAnswer(statusStore.executionMetrics(executionId), + accumulatorUpdates.mapValues(_ * 5).toMap) // Summit a new stage listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0))) @@ -281,7 +287,8 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates)) ))) - checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 7)) + checkAnswer(statusStore.executionMetrics(executionId), + accumulatorUpdates.mapValues(_ * 7).toMap) // Finish two tasks listener.onTaskEnd(SparkListenerTaskEnd( @@ -289,7 +296,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)), + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap), new ExecutorMetrics, null)) listener.onTaskEnd(SparkListenerTaskEnd( @@ -297,11 +304,12 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), + createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap), new ExecutorMetrics, null)) - checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11)) + checkAnswer(statusStore.executionMetrics(executionId), + accumulatorUpdates.mapValues(_ * 11).toMap) assertJobs(statusStore.execution(executionId), running = Seq(0)) @@ -315,7 +323,8 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils assertJobs(statusStore.execution(executionId), completed = Seq(0)) - checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11)) + checkAnswer(statusStore.executionMetrics(executionId), + accumulatorUpdates.mapValues(_ * 11).toMap) } test("control a plan explain mode in listeners via SQLConf") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 8d5439534b513..5e401f5136019 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -873,7 +873,7 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with } if(!running) { actions += StartStream() } addCheck() - testStream(ds)(actions: _*) + testStream(ds)(actions.toSeq: _*) } object AwaitTerminationTester { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 6e08b88f538df..26158f4d639ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -323,7 +323,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { actions += AssertOnQuery { q => q.recentProgress.size > 1 && q.recentProgress.size <= 11 } - testStream(input.toDS)(actions: _*) + testStream(input.toDS)(actions.toSeq: _*) spark.sparkContext.listenerBus.waitUntilEmpty() // 11 is the max value of the possible numbers of events. assert(numProgressEvent > 1 && numProgressEvent <= 11) @@ -559,11 +559,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { private val _progressEvents = new mutable.Queue[StreamingQueryProgress] def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized { - _progressEvents.filter(_.numInputRows > 0) + _progressEvents.filter(_.numInputRows > 0).toSeq } def allProgressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized { - _progressEvents.clone() + _progressEvents.clone().toSeq } def reset(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 98e2342c78e56..ec61102804ea3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -316,7 +316,7 @@ object StreamingQueryStatusAndProgressSuite { timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, batchDuration = 0L, - durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).asJava), + durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).toMap.asJava), eventTime = new java.util.HashMap(Map( "max" -> "2016-12-05T20:54:20.827Z", "min" -> "2016-12-05T20:54:20.827Z", @@ -326,7 +326,7 @@ object StreamingQueryStatusAndProgressSuite { numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numRowsDroppedByWatermark = 0, customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L, "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) - .mapValues(long2Long).asJava) + .mapValues(long2Long).toMap.asJava) )), sources = Array( new SourceProgress( @@ -351,7 +351,7 @@ object StreamingQueryStatusAndProgressSuite { timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, batchDuration = 0L, - durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).asJava), + durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).toMap.asJava), // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), stateOperators = Array(new StateOperatorProgress( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 683db21d3f0e1..37cc1b8a6d2ab 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -123,7 +123,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption - } + }.toSeq } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 2d53a1b4c78b6..bfb2c647ea0bb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -366,7 +366,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val sQueue = new scala.collection.mutable.Queue[RDD[T]] - sQueue.enqueue(queue.asScala.map(_.rdd).toSeq: _*) + sQueue ++= queue.asScala.map(_.rdd) ssc.queueStream(sQueue) } @@ -390,7 +390,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val sQueue = new scala.collection.mutable.Queue[RDD[T]] - sQueue.enqueue(queue.asScala.map(_.rdd).toSeq: _*) + sQueue ++= queue.asScala.map(_.rdd) ssc.queueStream(sQueue, oneAtATime) } @@ -415,7 +415,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val sQueue = new scala.collection.mutable.Queue[RDD[T]] - sQueue.enqueue(queue.asScala.map(_.rdd).toSeq: _*) + sQueue ++= queue.asScala.map(_.rdd) ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala index ee8370d262609..7555e2f57fccb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala @@ -65,7 +65,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav private def toJavaBatchInfo(batchInfo: BatchInfo): JavaBatchInfo = { JavaBatchInfo( batchInfo.batchTime, - batchInfo.streamIdToInputInfo.mapValues(toJavaStreamInputInfo(_)).asJava, + batchInfo.streamIdToInputInfo.mapValues(toJavaStreamInputInfo).toMap.asJava, batchInfo.submissionTime, batchInfo.processingStartTime.getOrElse(-1), batchInfo.processingEndTime.getOrElse(-1), @@ -73,7 +73,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav batchInfo.processingDelay.getOrElse(-1), batchInfo.totalDelay.getOrElse(-1), batchInfo.numRecords, - batchInfo.outputOperationInfos.mapValues(toJavaOutputOperationInfo(_)).asJava + batchInfo.outputOperationInfos.mapValues(toJavaOutputOperationInfo).toMap.asJava ) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index d46c0a01e05d9..2f4536ec6f0c2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -45,7 +45,7 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) s" time $validTime") } if (rdds.nonEmpty) { - Some(ssc.sc.union(rdds)) + Some(ssc.sc.union(rdds.toSeq)) } else { None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala index 8da5a5f8193cf..662312b7b0db8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala @@ -75,7 +75,7 @@ private[streaming] object MapWithStateRDDRecord { } } - MapWithStateRDDRecord(newStateMap, mappedData) + MapWithStateRDDRecord(newStateMap, mappedData.toSeq) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 6c71b18b46213..d038021e93e73 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -116,7 +116,9 @@ private[streaming] class ReceivedBlockTracker( // a few thousand elements. So we explicitly allocate a collection for serialization which // we know doesn't have this issue. (See SPARK-26734). val streamIdToBlocks = streamIds.map { streamId => - (streamId, mutable.ArrayBuffer(getReceivedBlockQueue(streamId).clone(): _*)) + val blocks = mutable.ArrayBuffer[ReceivedBlockInfo]() + blocks ++= getReceivedBlockQueue(streamId).clone() + (streamId, blocks.toSeq) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala index 4105171a3db24..0569abab1f36d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala @@ -135,7 +135,7 @@ private[streaming] class ReceiverSchedulingPolicy { leastScheduledExecutors += executor } - receivers.map(_.streamId).zip(scheduledLocations).toMap + receivers.map(_.streamId).zip(scheduledLocations.map(_.toSeq)).toMap } /** @@ -183,7 +183,7 @@ private[streaming] class ReceiverSchedulingPolicy { val executorWeights: Map[ExecutorCacheTaskLocation, Double] = { receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights) - .groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor + .groupBy(_._1).mapValues(_.map(_._2).sum).toMap // Sum weights for each executor } val idleExecutors = executors.toSet -- executorWeights.keys diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 13cf5cc0e71ea..be62d3ce49b8e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -248,7 +248,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false _.runningExecutor.map { _.executorId } - } + }.toMap } else { Map.empty } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala index 31e4c6b59a64a..d0a3517af70b9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala @@ -170,7 +170,7 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp // We take the latest record for the timestamp. Please refer to the class Javadoc for // detailed explanation val time = sortedByTime.last.time - segment = wrappedLog.write(aggregate(sortedByTime), time) + segment = wrappedLog.write(aggregate(sortedByTime.toSeq), time) } buffer.foreach(_.promise.success(segment)) } catch { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index d33f83c819086..6f3ee5cbaec63 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -146,7 +146,7 @@ private[streaming] class FileBasedWriteAheadLog( } else { // For performance gains, it makes sense to parallelize the recovery if // closeFileAfterWrite = true - seqToParIterator(executionContext, logFilesToRead, readFile).asJava + seqToParIterator(executionContext, logFilesToRead.toSeq, readFile).asJava } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala index 36036fcd44b04..541a6e2d48b51 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala @@ -190,7 +190,7 @@ class DStreamScopeSuite assertDefined(foreachBaseScope) assert(foreachBaseScope.get.name === "foreachRDD") - val rddScopes = generatedRDDs.map { _.scope } + val rddScopes = generatedRDDs.map { _.scope }.toSeq assertDefined(rddScopes: _*) rddScopes.zipWithIndex.foreach { case (rddScope, idx) => assert(rddScope.get.name === "reduceByKey") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala index 0c4a64ccc513f..42a5aaba5178f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/JavaTestUtils.scala @@ -36,7 +36,7 @@ trait JavaTestBase extends TestSuiteBase { ssc: JavaStreamingContext, data: JList[JList[T]], numPartitions: Int): JavaDStream[T] = { - val seqData = data.asScala.map(_.asScala) + val seqData = data.asScala.map(_.asScala.toSeq).toSeq implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index d0a5ababc7cac..9d735a32f7090 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -284,7 +284,7 @@ object MasterFailureTest extends Logging { }) } } - mergedOutput + mergedOutput.toSeq } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index bb60d6fa7bf78..60e04403937a2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -612,7 +612,7 @@ object WriteAheadLogSuite { } } writer.close() - segments + segments.toSeq } /** @@ -685,7 +685,7 @@ object WriteAheadLogSuite { } finally { reader.close() } - buffer + buffer.toSeq } /** Read all the data from a log file using reader class and return the list of byte buffers. */