From b98ab73270b57f355044cc144cc9feeed9bd2be2 Mon Sep 17 00:00:00 2001 From: shashank Date: Thu, 10 Sep 2020 17:43:56 +0530 Subject: [PATCH 1/3] OP-667: upgrade the cdap version to 3.0.0-1 --- assembly/pom.xml | 2 +- common/kvstore/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml | 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/avro/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10-token-provider/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- hadoop-cloud/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- resource-managers/kubernetes/core/pom.xml | 2 +- resource-managers/kubernetes/integration-tests/pom.xml | 2 +- resource-managers/mesos/pom.xml | 2 +- resource-managers/yarn/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 4 ++-- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- 36 files changed, 37 insertions(+), 37 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 0a52a00c4a694..ca69d2cc11278 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../pom.xml diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml index fa4fcb1fbfa80..a4038a019bca8 100644 --- a/common/kvstore/pom.xml +++ b/common/kvstore/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 14a1b7db9e33d..d0a08b51af7f3 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index e75a843f5e6bb..eba50ff8975f0 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 004af0a1ab203..fee572e49ff34 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index a35156a4bbfff..1918a29cd7afd 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/common/tags/pom.xml b/common/tags/pom.xml index dedc7dfc82056..ac6998cb4cb86 100644 --- a/common/tags/pom.xml +++ b/common/tags/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml index ebb05257d85f9..fae2b58fda4d6 100644 --- a/common/unsafe/pom.xml +++ b/common/unsafe/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/core/pom.xml b/core/pom.xml index ad743a7712812..5d586a1acc846 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index ec3490545132d..f47a00055d696 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../pom.xml diff --git a/external/avro/pom.xml b/external/avro/pom.xml index 70cada7ce95e4..c8a01898b2738 100644 --- a/external/avro/pom.xml +++ b/external/avro/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index 32679c759404e..cb2ff5ee1950b 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml index ae22afc3faeae..1db9f3e0e6c4b 100644 --- a/external/kafka-0-10-assembly/pom.xml +++ b/external/kafka-0-10-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 7b10f0e78bc28..39e34ccb979fe 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/external/kafka-0-10-token-provider/pom.xml b/external/kafka-0-10-token-provider/pom.xml index b2ecc0f81d7ac..f4d6c53b71721 100644 --- a/external/kafka-0-10-token-provider/pom.xml +++ b/external/kafka-0-10-token-provider/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 19e7352e3945c..163b6fd879566 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/external/kinesis-asl-assembly/pom.xml b/external/kinesis-asl-assembly/pom.xml index bb37b213c697b..9c2d9587719f0 100644 --- a/external/kinesis-asl-assembly/pom.xml +++ b/external/kinesis-asl-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/external/kinesis-asl/pom.xml b/external/kinesis-asl/pom.xml index cfe7b7373b967..0cd9956dae474 100644 --- a/external/kinesis-asl/pom.xml +++ b/external/kinesis-asl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/external/spark-ganglia-lgpl/pom.xml b/external/spark-ganglia-lgpl/pom.xml index d2f828421e64f..095a7a0a6cc34 100644 --- a/external/spark-ganglia-lgpl/pom.xml +++ b/external/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index ebfaccffcdded..e9f6f91604cb0 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../pom.xml diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml index 8a62a48236aab..cd19a78a95ad5 100644 --- a/hadoop-cloud/pom.xml +++ b/hadoop-cloud/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../pom.xml diff --git a/launcher/pom.xml b/launcher/pom.xml index c2f25df2f147e..80f709aacf632 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../pom.xml diff --git a/mllib-local/pom.xml b/mllib-local/pom.xml index bb4b640c68fe9..a59829c0fcdba 100644 --- a/mllib-local/pom.xml +++ b/mllib-local/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index c41bbe6ea85f8..6f814b8e4325e 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../pom.xml diff --git a/pom.xml b/pom.xml index 2d8a52cfe728c..c98dd03f61eaf 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 pom Spark Project Parent POM http://spark.apache.org/ diff --git a/repl/pom.xml b/repl/pom.xml index 83c8d34a3a132..b058c832a1710 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../pom.xml diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index f72bacf827dba..b8410c2d76561 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../../pom.xml diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index 9ede26ae51ad9..b9b584eb30a69 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../../pom.xml diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml index c61f8dff8cf55..3d47cadf8d0dc 100644 --- a/resource-managers/mesos/pom.xml +++ b/resource-managers/mesos/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml index f18f2ce58588a..1437e216f7858 100644 --- a/resource-managers/yarn/pom.xml +++ b/resource-managers/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 14a0f665ea84e..eea4581b7bf8d 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 3388aca287aa3..a250e03968db1 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 6dec472392574..f3117a4ee2509 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 267bd6039e3fa..15e31624404ec 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../../pom.xml @@ -198,7 +198,7 @@ org.codehaus.mojo build-helper-maven-plugin - 3.0.0 + 3.0.0-1 add-scala-test-sources diff --git a/streaming/pom.xml b/streaming/pom.xml index d6f4f303ad423..7a7c9d4fd3ccc 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index 4a58881077524..9d6e8743ebb0a 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.12 - 3.0.0 + 3.0.0-1 ../pom.xml From 4c6cf797f8cfa740f6412edfb0fc802d6b851f0c Mon Sep 17 00:00:00 2001 From: shashank Date: Thu, 10 Sep 2020 18:24:06 +0530 Subject: [PATCH 2/3] OP-667: [SPARK-24634][SS] Add a new metric regarding number of rows later than watermark plus allowed delay #24936 --- .../structured-streaming-programming-guide.md | 5 ++ .../spark/sql/execution/SparkStrategies.scala | 12 ++- .../streaming/IncrementalExecution.scala | 3 + .../streaming/ProgressReporter.scala | 2 +- .../streaming/statefulOperators.scala | 87 +++++++++++++++++- .../apache/spark/sql/streaming/progress.scala | 7 +- .../streaming/EventTimeWatermarkSuite.scala | 33 +++++-- .../FlatMapGroupsWithStateSuite.scala | 26 +++--- .../sql/streaming/StateStoreMetricsTest.scala | 16 +++- .../streaming/StreamingAggregationSuite.scala | 10 ++- .../StreamingDeduplicationSuite.scala | 56 ++++++------ .../sql/streaming/StreamingJoinSuite.scala | 88 ++++++++++--------- ...StreamingQueryStatusAndProgressSuite.scala | 6 +- 13 files changed, 243 insertions(+), 108 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 1776d23607f7f..d834cc536713a 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1674,6 +1674,11 @@ Any of the stateful operation(s) after any of below stateful operations can have As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function emits late rows if the operator uses Append mode. +Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue: + +1. On Spark UI: check the metrics in "CountLateRows" node in query execution details page in SQL tab +2. On Streaming Query Listener: check "numLateInputRows" in "stateOperators" in QueryProcessEvent + There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 12a1a1e7fc16e..0e6cdd6dc0ef5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -429,7 +429,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { aggregateExpressions.map(expr => expr.asInstanceOf[AggregateExpression]), rewrittenResultExpressions, stateVersion, - planLater(child)) + CountLateRowsExec(None, planLater(child))) case _ => Nil } @@ -441,7 +441,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object StreamingDeduplicationStrategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case Deduplicate(keys, child) if child.isStreaming => - StreamingDeduplicateExec(keys, planLater(child)) :: Nil + StreamingDeduplicateExec(keys, CountLateRowsExec(None, planLater(child))) :: Nil case _ => Nil } @@ -492,7 +492,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val stateVersion = conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) new StreamingSymmetricHashJoinExec(leftKeys, rightKeys, joinType, condition, - stateVersion, planLater(left), planLater(right)) :: Nil + stateVersion, CountLateRowsExec(None, planLater(left)), + CountLateRowsExec(None, planLater(right))) :: Nil case Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => throw new AnalysisException( @@ -625,10 +626,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case FlatMapGroupsWithState( func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, stateEnc, outputMode, _, timeout, child) => + val stateVersion = conf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION) val execPlan = FlatMapGroupsWithStateExec( func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, None, stateEnc, stateVersion, - outputMode, timeout, batchTimestampMs = None, eventTimeWatermark = None, planLater(child)) + outputMode, timeout, batchTimestampMs = None, eventTimeWatermark = None, + CountLateRowsExec(None, planLater(child))) + execPlan :: Nil case _ => Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 09ae7692ec518..7d293680ad299 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -132,6 +132,9 @@ class IncrementalExecution( } override def apply(plan: SparkPlan): SparkPlan = plan transform { + case m: CountLateRowsExec => + m.copy(eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs)) + case StateStoreSaveExec(keys, None, None, None, stateFormatVersion, UnaryExecNode(agg, StateStoreRestoreExec(_, None, _, child))) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 0dff1c2fe5768..a70146c89669e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -222,7 +222,7 @@ trait ProgressReporter extends Logging { lastExecution.executedPlan.collect { case p if p.isInstanceOf[StateStoreWriter] => val progress = p.asInstanceOf[StateStoreWriter].getProgress() - if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0) + if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0, newNumLateInputRows = 0) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 1bec924ba219a..5709665f17db8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ @@ -37,6 +37,84 @@ import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress} import org.apache.spark.sql.types._ import org.apache.spark.util.{CompletionIterator, NextIterator, Utils} +case class CountLateRowsExec( + eventTimeWatermark: Option[Long] = None, + child: SparkPlan) + extends UnaryExecNode with WatermarkSupport with CodegenSupport { + + // No need to determine key expressions here. + override def keyExpressions: Seq[Attribute] = Seq.empty + + override def output: Seq[Attribute] = child.output + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override lazy val metrics = Map( + "numLateRows" -> SQLMetrics.createMetric(sparkContext, + "number of input rows later than watermark plus allowed delay")) + + override protected def doExecute(): RDD[InternalRow] = { + val numLateRows = longMetric("numLateRows") + child.execute().mapPartitionsWithIndex { (_, iter) => + watermarkPredicateForData match { + case Some(pred) => + iter.map { row => + val r = pred.eval(row) + if (r) numLateRows += 1 + row + } + + case None => iter + } + } + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() + } + + override protected def doProduce(ctx: CodegenContext): String = { + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + val numLateRows = metricTerm(ctx, "numLateRows") + + val generated = watermarkExpression match { + case Some(expr) => + val bound = BindReferences.bindReference(expr, child.output) + val evaluated = evaluateRequiredVariables(child.output, input, expr.references) + + // Generate the code for the predicate. + val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx) + val nullCheck = if (bound.nullable) { + s"${ev.isNull} || " + } else { + s"" + } + + s""" + |$evaluated + |${ev.code} + |if (${nullCheck}${ev.value}) { + | $numLateRows.add(1); + |} + """.stripMargin + + case None => "" + } + + // Note: wrap in "do { } while(false);", so the generated checks can jump out with "continue;" + s""" + |do { + | $generated + | ${consume(ctx, input)} + |} while(false); + """.stripMargin + } +} /** Used to identify the state store for a given operator. */ case class StatefulOperatorStateInfo( @@ -96,11 +174,16 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] = new java.util.HashMap(customMetrics.mapValues(long2Long).asJava) + val numLateInputRows = self.children.flatMap(_.collectFirst { + case d: CountLateRowsExec => d + }).map(_.metrics("numLateRows").value).sum + new StateOperatorProgress( numRowsTotal = longMetric("numTotalStateRows").value, numRowsUpdated = longMetric("numUpdatedStateRows").value, + numLateInputRows = numLateInputRows, memoryUsedBytes = longMetric("stateMemory").value, - javaConvertedCustomMetrics + customMetrics = javaConvertedCustomMetrics ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 13b506b60a126..7cba601f9c919 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long, + val numLateInputRows: Long, val memoryUsedBytes: Long, val customMetrics: ju.Map[String, JLong] = new ju.HashMap() ) extends Serializable { @@ -52,12 +53,14 @@ class StateOperatorProgress private[sql]( /** The pretty (i.e. indented) JSON representation of this progress. */ def prettyJson: String = pretty(render(jsonValue)) - private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = - new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics) + private[sql] def copy(newNumRowsUpdated: Long, newNumLateInputRows: Long): StateOperatorProgress = + new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, newNumLateInputRows, memoryUsedBytes, + customMetrics) private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ + ("numLateInputRows" -> JInt(numLateInputRows)) ~ ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ ("customMetrics" -> { if (!customMetrics.isEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 6486e1aee8649..2c064a5d9a671 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -298,9 +298,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((10, 5)), assertNumStateRows(2), + assertNumLateRows(0), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(2) + assertNumStateRows(2), + assertNumLateRows(1) ) } @@ -321,12 +323,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((25, 1)), assertNumStateRows(2), + assertNumLateRows(0), AddData(inputData, 10, 25), // Ignore 10 as its less than watermark CheckNewAnswer((25, 2)), assertNumStateRows(2), + assertNumLateRows(1), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(2) + assertNumStateRows(2), + assertNumLateRows(1) ) } @@ -377,8 +382,10 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche testStream(df)( AddData(inputData, 10, 11, 12, 13, 14, 15), CheckAnswer(), + assertNumLateRows(0), AddData(inputData, 25), // Advance watermark to 15 seconds CheckAnswer((10, 5)), + assertNumLateRows(0), StopStream, AssertOnQuery { q => // purge commit and clear the sink val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) @@ -389,12 +396,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche StartStream(), AddData(inputData, 10, 27, 30), // Advance watermark to 20 seconds, 10 should be ignored CheckAnswer((15, 1)), + assertNumLateRows(1), StopStream, StartStream(), AddData(inputData, 17), // Watermark should still be 20 seconds, 17 should be ignored CheckAnswer((15, 1)), + assertNumLateRows(1), AddData(inputData, 40), // Advance watermark to 30 seconds, emit first data 25 - CheckNewAnswer((25, 2)) + CheckNewAnswer((25, 2)), + assertNumLateRows(0) ) } @@ -486,18 +496,24 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - // No eviction when asked to compute complete results. + // No state eviction when asked to compute complete results. + // It still counts late input rows, though. testStream(windowedAggregation, OutputMode.Complete)( AddData(inputData, 10, 11, 12), CheckAnswer((10, 3)), + assertNumLateRows(0), AddData(inputData, 25), CheckAnswer((10, 3), (25, 1)), + assertNumLateRows(0), AddData(inputData, 25), CheckAnswer((10, 3), (25, 2)), + assertNumLateRows(0), AddData(inputData, 10), CheckAnswer((10, 4), (25, 2)), + assertNumLateRows(1), AddData(inputData, 25), - CheckAnswer((10, 4), (25, 3)) + CheckAnswer((10, 4), (25, 3)), + assertNumLateRows(0) ) } @@ -783,6 +799,13 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche true } + private def assertNumLateRows(numLateRows: Long): AssertOnQuery = AssertOnQuery { q => + q.processAllAvailable() + val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get + assert(progressWithData.stateOperators(0).numLateInputRows === numLateRows) + true + } + /** Assert event stats generated on that last batch with data in it */ private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = { Execute("AssertEventStats") { q => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index b04f8b0d4d174..6a02f23de7f99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -626,20 +626,20 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { testStream(result, Update)( AddData(inputData, "a"), CheckNewAnswer(("a", "1")), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, "a", "b"), CheckNewAnswer(("a", "2"), ("b", "1")), - assertNumStateRows(total = 2, updated = 2), + assertNumStateRows(total = 2, updated = 2, lateInput = 0), StopStream, StartStream(), AddData(inputData, "a", "b"), // should remove state for "a" and not return anything for a CheckNewAnswer(("b", "2")), - assertNumStateRows(total = 1, updated = 2), + assertNumStateRows(total = 1, updated = 2, lateInput = 0), StopStream, StartStream(), AddData(inputData, "a", "c"), // should recreate state for "a" and return count as 1 and CheckNewAnswer(("a", "1"), ("c", "1")), - assertNumStateRows(total = 3, updated = 2) + assertNumStateRows(total = 3, updated = 2, lateInput = 0) ) } @@ -768,17 +768,17 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { AddData(inputData, "a"), AdvanceManualClock(1 * 1000), CheckNewAnswer(("a", "1")), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, "b"), AdvanceManualClock(1 * 1000), CheckNewAnswer(("b", "1")), - assertNumStateRows(total = 2, updated = 1), + assertNumStateRows(total = 2, updated = 1, lateInput = 0), AddData(inputData, "b"), AdvanceManualClock(10 * 1000), CheckNewAnswer(("a", "-1"), ("b", "2")), - assertNumStateRows(total = 1, updated = 2), + assertNumStateRows(total = 1, updated = 2, lateInput = 0), StopStream, StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), @@ -786,7 +786,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { AddData(inputData, "c"), AdvanceManualClock(11 * 1000), CheckNewAnswer(("b", "-1"), ("c", "1")), - assertNumStateRows(total = 1, updated = 2), + assertNumStateRows(total = 1, updated = 2, lateInput = 0), AdvanceManualClock(12 * 1000), AssertOnQuery { _ => clock.getTimeMillis() == 35000 }, @@ -798,7 +798,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { } }, CheckNewAnswer(("c", "-1")), - assertNumStateRows(total = 0, updated = 1) + assertNumStateRows(total = 0, updated = 1, lateInput = 0) ) } @@ -978,20 +978,20 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { testStream(result, Update)( AddData(inputData, "a"), CheckNewAnswer(("a", "1")), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, "a", "b"), CheckNewAnswer(("a", "2"), ("b", "1")), - assertNumStateRows(total = 2, updated = 2), + assertNumStateRows(total = 2, updated = 2, lateInput = 0), StopStream, StartStream(), AddData(inputData, "a", "b"), // should remove state for "a" and return count as -1 CheckNewAnswer(("a", "-1"), ("b", "2")), - assertNumStateRows(total = 1, updated = 2), + assertNumStateRows(total = 1, updated = 2, lateInput = 0), StopStream, StartStream(), AddData(inputData, "a", "c"), // should recreate state for "a" and return count as 1 CheckNewAnswer(("a", "1"), ("c", "1")), - assertNumStateRows(total = 3, updated = 2) + assertNumStateRows(total = 3, updated = 2, lateInput = 0) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala index fb5d13d09fb0e..fe87c7623f5b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala @@ -29,8 +29,12 @@ trait StateStoreMetricsTest extends StreamTest { lastCheckedRecentProgressIndex = -1 } - def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery = - AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated") { q => + def assertNumStateRows( + total: Seq[Long], + updated: Seq[Long], + lateInputRows: Seq[Long]): AssertOnQuery = + AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated" + + s", late input rows = $lateInputRows") { q => // This assumes that the streaming query will not make any progress while the eventually // is being executed. eventually(timeout(streamingTimeout)) { @@ -60,13 +64,17 @@ trait StateStoreMetricsTest extends StreamTest { val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators) assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString") + val numLateInputRows = recentProgress.filter(_.numInputRows > 0).last + .stateOperators.map(_.numLateInputRows) + assert(numLateInputRows === lateInputRows, s"incorrect late input rows, $debugString") + lastCheckedRecentProgressIndex = recentProgress.length - 1 } true } - def assertNumStateRows(total: Long, updated: Long): AssertOnQuery = - assertNumStateRows(Seq(total), Seq(updated)) + def assertNumStateRows(total: Long, updated: Long, lateInput: Long): AssertOnQuery = + assertNumStateRows(Seq(total), Seq(updated), Seq(lateInput)) def arraySum(arraySeq: Seq[Array[Long]], arrayLength: Int): Seq[Long] = { if (arraySeq.isEmpty) return Seq.fill(arrayLength)(0L) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 85e1b85b84d26..5f4d0755b7b8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -502,10 +502,12 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { expectShuffling: Boolean, expectedPartition: Int): Boolean = { val executedPlan = se.lastExecution.executedPlan - val restore = executedPlan - .collect { case ss: StateStoreRestoreExec => ss } - .head - restore.child match { + val nodeToCheck = executedPlan + .collect { + case StateStoreRestoreExec(_, _, _, s: CountLateRowsExec) => s.child + case ss: StateStoreRestoreExec => ss.child + }.head + nodeToCheck match { case node: UnaryExecNode => assert(node.outputPartitioning.numPartitions === expectedPartition, "Didn't get the expected number of partitions.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index f63778aef5a7f..54025e754cbd3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -37,13 +37,13 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Append)( AddData(inputData, "a"), CheckLastBatch("a"), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, "a"), CheckLastBatch(), - assertNumStateRows(total = 1, updated = 0), + assertNumStateRows(total = 1, updated = 0, lateInput = 0), AddData(inputData, "b"), CheckLastBatch("b"), - assertNumStateRows(total = 2, updated = 1) + assertNumStateRows(total = 2, updated = 1, lateInput = 0) ) } @@ -54,13 +54,13 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Append)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, "a" -> 2), // Dropped CheckLastBatch(), - assertNumStateRows(total = 1, updated = 0), + assertNumStateRows(total = 1, updated = 0, lateInput = 0), AddData(inputData, "b" -> 1), CheckLastBatch("b" -> 1), - assertNumStateRows(total = 2, updated = 1) + assertNumStateRows(total = 2, updated = 1, lateInput = 0) ) } @@ -71,15 +71,15 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Append)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 2), // Dropped from the second `dropDuplicates` CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)), + assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "b" -> 1), CheckLastBatch("b" -> 1), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } @@ -94,19 +94,19 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Append)( AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*), CheckAnswer(10 to 15: _*), - assertNumStateRows(total = 6, updated = 6), + assertNumStateRows(total = 6, updated = 6, lateInput = 0), AddData(inputData, 25), // Advance watermark to 15 secs, no-data-batch drops rows <= 15 CheckNewAnswer(25), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(total = 1, updated = 0), + assertNumStateRows(total = 1, updated = 0, lateInput = 1), AddData(inputData, 45), // Advance watermark to 35 seconds, no-data-batch drops row 25 CheckNewAnswer(45), - assertNumStateRows(total = 1, updated = 1) + assertNumStateRows(total = 1, updated = 1, lateInput = 0) ) } @@ -126,23 +126,23 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { CheckLastBatch(), // states in aggregate in [10, 14), [15, 20) (2 windows) // states in deduplicate is 10 to 15 - assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L)), + assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L), lateInputRows = Seq(0L, 0L)), AddData(inputData, 25), // Advance watermark to 15 seconds CheckLastBatch((10 -> 5)), // 5 items (10 to 14) after deduplicate, emitted with no-data-batch // states in aggregate in [15, 20) and [25, 30); no-data-batch removed [10, 14) // states in deduplicate is 25, no-data-batch removed 10 to 14 - assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckLastBatch(), - assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)), + assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 1L)), AddData(inputData, 40), // Advance watermark to 30 seconds CheckLastBatch((15 -> 1), (25 -> 1)), // states in aggregate is [40, 45); no-data-batch removed [15, 20) and [25, 30) // states in deduplicate is 40; no-data-batch removed 25 - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } @@ -158,16 +158,16 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Update)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 1), // Dropped CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 2), CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "b" -> 1), CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } @@ -183,16 +183,16 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Complete)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 1), // Dropped CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 2), CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "b" -> 1), CheckLastBatch("a" -> 3L, "b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } @@ -273,13 +273,13 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { StartStream(additionalConfs = Map(flagKey -> flag.toString)), AddData(inputData, 10, 11, 12, 13, 14, 15), CheckAnswer(10, 11, 12, 13, 14, 15), - assertNumStateRows(total = 6, updated = 6), + assertNumStateRows(total = 6, updated = 6, lateInput = 0), AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer(25), { // State should have been cleaned if flag is set, otherwise should not have been cleaned - if (flag) assertNumStateRows(total = 1, updated = 1) - else assertNumStateRows(total = 7, updated = 1) + if (flag) assertNumStateRows(total = 1, updated = 1, lateInput = 0) + else assertNumStateRows(total = 7, updated = 1, lateInput = 0) }, AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L) ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 3f218c9cb7fd9..2949e8cf09580 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -142,31 +142,32 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with testStream(joined)( AddData(input1, 1), CheckAnswer(), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(input2, 1), CheckAnswer((1, 10, 2, 3)), - assertNumStateRows(total = 2, updated = 1), + assertNumStateRows(total = 2, updated = 1, lateInput = 0), StopStream, StartStream(), AddData(input1, 25), CheckNewAnswer(), // watermark = 15, no-data-batch should remove 2 rows having window=[0,10] - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(input2, 25), CheckNewAnswer((25, 30, 50, 75)), - assertNumStateRows(total = 2, updated = 1), + assertNumStateRows(total = 2, updated = 1, lateInput = 0), StopStream, StartStream(), AddData(input2, 1), CheckNewAnswer(), // Should not join as < 15 removed - assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15 - + // row not add as 1 < state key watermark = 15 + // note that input row is not discarded as right side doesn't have watermark + assertNumStateRows(total = 2, updated = 0, lateInput = 0), AddData(input1, 5), CheckNewAnswer(), // Same reason as above - assertNumStateRows(total = 2, updated = 0) + assertNumStateRows(total = 2, updated = 0, lateInput = 1) ) } @@ -195,12 +196,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with CheckNewAnswer((1, 5, 11)), AddData(rightInput, (1, 10)), CheckNewAnswer(), // no match as leftTime 5 is not < rightTime 10 - 5 - assertNumStateRows(total = 3, updated = 3), + assertNumStateRows(total = 3, updated = 3, lateInput = 0), // Increase event time watermark to 20s by adding data with time = 30s on both inputs AddData(leftInput, (1, 3), (1, 30)), CheckNewAnswer((1, 3, 10), (1, 3, 11)), - assertNumStateRows(total = 5, updated = 2), + assertNumStateRows(total = 5, updated = 2, lateInput = 0), AddData(rightInput, (0, 30)), CheckNewAnswer(), @@ -208,7 +209,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with // so left side going to only receive data where leftTime > 20 // right side state constraint: 20 < leftTime < rightTime - 5 ==> rightTime > 25 // right state where rightTime <= 25 will be cleared, (1, 11) and (1, 10) removed - assertNumStateRows(total = 4, updated = 1), + assertNumStateRows(total = 4, updated = 1, lateInput = 0), // New data to right input should match with left side (1, 3) and (1, 5), as left state should // not be cleared. But rows rightTime <= 20 should be filtered due to event time watermark and @@ -219,12 +220,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with // (1, 28) ==> passed filter, matched with left (1, 3) and (1, 5), added to state AddData(rightInput, (1, 20), (1, 21), (1, 28)), CheckNewAnswer((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)), - assertNumStateRows(total = 5, updated = 1), + assertNumStateRows(total = 5, updated = 1, lateInput = 1), // New data to left input with leftTime <= 20 should be filtered due to event time watermark AddData(leftInput, (1, 20), (1, 21)), CheckNewAnswer((1, 21, 28)), - assertNumStateRows(total = 6, updated = 1) + assertNumStateRows(total = 6, updated = 1, lateInput = 1) ) } @@ -276,7 +277,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with CheckAnswer(), AddData(rightInput, (1, 14), (1, 15), (1, 25), (1, 26), (1, 30), (1, 31)), CheckNewAnswer((1, 20, 15), (1, 20, 25), (1, 20, 26), (1, 20, 30)), - assertNumStateRows(total = 7, updated = 7), + assertNumStateRows(total = 7, updated = 7, lateInput = 0), // If rightTime = 60, then it matches only leftTime = [50, 65] AddData(rightInput, (1, 60)), @@ -289,11 +290,11 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with // Should drop < 20 from left, i.e., none // Right state value watermark = 30 - 5 = slightly less than 25 (since condition has <=) // Should drop < 25 from the right, i.e., 14 and 15 - assertNumStateRows(total = 10, updated = 5), // 12 - 2 removed + assertNumStateRows(total = 10, updated = 5, lateInput = 0), // 12 - 2 removed AddData(leftInput, (1, 30), (1, 31)), // 30 should not be processed or added to state CheckNewAnswer((1, 31, 26), (1, 31, 30), (1, 31, 31)), - assertNumStateRows(total = 11, updated = 1), // only 31 added + assertNumStateRows(total = 11, updated = 1, lateInput = 1), // only 31 added // Advance the watermark AddData(rightInput, (1, 80)), @@ -303,11 +304,11 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with // Should drop < 36 from left, i.e., 20, 31 (30 was not added) // Right state value watermark = 46 - 5 = slightly less than 41 (since condition has <=) // Should drop < 41 from the right, i.e., 25, 26, 30, 31 - assertNumStateRows(total = 6, updated = 1), // 12 - 6 removed + assertNumStateRows(total = 6, updated = 1, lateInput = 0), // 12 - 6 removed AddData(rightInput, (1, 46), (1, 50)), // 46 should not be processed or added to state CheckNewAnswer((1, 49, 50), (1, 50, 50)), - assertNumStateRows(total = 7, updated = 1) // 50 added + assertNumStateRows(total = 7, updated = 1, lateInput = 1) // 50 added ) } @@ -467,7 +468,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with AddData(inputStream, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)), // batch 2: same result as above test CheckNewAnswer((6, 6L, 6, 6L), (8, 8L, 8, 8L), (10, 10L, 10, 10L)), - assertNumStateRows(11, 6), + assertNumStateRows(11, 6, 0), Execute { query => // Verify state format = 1 val f = query.lastExecution.executedPlan.collect { @@ -539,7 +540,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with // The left rows with leftValue <= 4 should generate their outer join row now and // not get added to the state. CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, 2, null), Row(2, 10, 4, null)), - assertNumStateRows(total = 4, updated = 4), + assertNumStateRows(total = 4, updated = 4, lateInput = 0), // We shouldn't get more outer join rows when the watermark advances. MultiAddData(leftInput, 20)(rightInput, 21), CheckNewAnswer(), @@ -567,7 +568,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3), // The right rows with rightValue <= 7 should never be added to the state. CheckNewAnswer(Row(3, 10, 6, "9")), // rightValue = 9 > 7 hence joined and added to state - assertNumStateRows(total = 4, updated = 4), + assertNumStateRows(total = 4, updated = 4, lateInput = 0), // When the watermark advances, we get the outer join rows just as we would if they // were added but didn't match the full join condition. MultiAddData(leftInput, 20)(rightInput, 21), // watermark = 10, no-data-batch computes nulls @@ -596,7 +597,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5), // The left rows with leftValue <= 4 should never be added to the state. CheckNewAnswer(Row(3, 10, 6, "9")), // leftValue = 7 > 4 hence joined and added to state - assertNumStateRows(total = 4, updated = 4), + assertNumStateRows(total = 4, updated = 4, lateInput = 0), // When the watermark advances, we get the outer join rows just as we would if they // were added but didn't match the full join condition. MultiAddData(leftInput, 20)(rightInput, 21), // watermark = 10, no-data-batch computes nulls @@ -626,7 +627,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with // The right rows with rightValue <= 7 should generate their outer join row now and // not get added to the state. CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, null, "3"), Row(2, 10, null, "6")), - assertNumStateRows(total = 4, updated = 4), + assertNumStateRows(total = 4, updated = 4, lateInput = 0), // We shouldn't get more outer join rows when the watermark advances. MultiAddData(leftInput, 20)(rightInput, 21), CheckNewAnswer(), @@ -645,11 +646,11 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)), - assertNumStateRows(total = 2, updated = 12), + assertNumStateRows(total = 2, updated = 12, lateInput = 0), AddData(leftInput, 22), CheckNewAnswer(Row(22, 30, 44, 66)), - assertNumStateRows(total = 3, updated = 1) + assertNumStateRows(total = 3, updated = 1, lateInput = 0) ) } @@ -663,11 +664,11 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls CheckNewAnswer(Row(6, 10, null, 18), Row(7, 10, null, 21)), - assertNumStateRows(total = 2, updated = 12), + assertNumStateRows(total = 2, updated = 12, lateInput = 0), AddData(leftInput, 22), CheckNewAnswer(Row(22, 30, 44, 66)), - assertNumStateRows(total = 3, updated = 1) + assertNumStateRows(total = 3, updated = 1, lateInput = 0) ) } @@ -703,15 +704,15 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with CheckNewAnswer((1, 1, 5, 10)), AddData(rightInput, (1, 11)), CheckNewAnswer(), // no match as left time is too low - assertNumStateRows(total = 5, updated = 5), + assertNumStateRows(total = 5, updated = 5, lateInput = 0), // Increase event time watermark to 20s by adding data with time = 30s on both inputs AddData(leftInput, (1, 7), (1, 30)), CheckNewAnswer((1, 1, 7, 10), (1, 1, 7, 11)), - assertNumStateRows(total = 7, updated = 2), + assertNumStateRows(total = 7, updated = 2, lateInput = 0), AddData(rightInput, (0, 30)), // watermark = 30 - 10 = 20, no-data-batch computes nulls CheckNewAnswer(outerResult), - assertNumStateRows(total = 2, updated = 1) + assertNumStateRows(total = 2, updated = 1, lateInput = 0) ) } } @@ -736,40 +737,41 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with // leftValue <= 10 should generate outer join rows even though it matches right keys MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3), CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null), Row(3, 10, 6, null)), - assertNumStateRows(total = 3, updated = 3), // only right 1, 2, 3 added + assertNumStateRows(total = 3, updated = 3, lateInput = 0), // only right 1, 2, 3 added MultiAddData(leftInput, 20)(rightInput, 21), // watermark = 10, no-data-batch cleared < 10 CheckNewAnswer(), - assertNumStateRows(total = 2, updated = 2), // only 20 and 21 left in state + assertNumStateRows(total = 2, updated = 2, lateInput = 0), // only 20 and 21 left in state AddData(rightInput, 20), CheckNewAnswer(Row(20, 30, 40, 60)), - assertNumStateRows(total = 3, updated = 1), + assertNumStateRows(total = 3, updated = 1, lateInput = 0), // leftValue and rightValue both satisfying condition should not generate outer join rows MultiAddData(leftInput, 40, 41)(rightInput, 40, 41), // watermark = 31 CheckNewAnswer((40, 50, 80, 120), (41, 50, 82, 123)), - assertNumStateRows(total = 4, updated = 4), // only left 40, 41 + right 40,41 left in state + // only left 40, 41 + right 40,41 left in state + assertNumStateRows(total = 4, updated = 4, lateInput = 0), MultiAddData(leftInput, 70)(rightInput, 71), // watermark = 60 CheckNewAnswer(), - assertNumStateRows(total = 2, updated = 2), // only 70, 71 left in state + assertNumStateRows(total = 2, updated = 2, lateInput = 0), // only 70, 71 left in state AddData(rightInput, 70), CheckNewAnswer((70, 80, 140, 210)), - assertNumStateRows(total = 3, updated = 1), + assertNumStateRows(total = 3, updated = 1, lateInput = 0), // rightValue between 300 and 1000 should generate outer join rows even though it matches left MultiAddData(leftInput, 101, 102, 103)(rightInput, 101, 102, 103), // watermark = 91 CheckNewAnswer(), - assertNumStateRows(total = 6, updated = 3), // only 101 - 103 left in state + assertNumStateRows(total = 6, updated = 3, lateInput = 0), // only 101 - 103 left in state MultiAddData(leftInput, 1000)(rightInput, 1001), CheckNewAnswer( Row(101, 110, 202, null), Row(102, 110, 204, null), Row(103, 110, 206, null)), - assertNumStateRows(total = 2, updated = 2) + assertNumStateRows(total = 2, updated = 2, lateInput = 0) ) } @@ -803,7 +805,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with // left: (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L) // right: (2, 2L), (4, 4L) CheckNewAnswer((2, 2L, 2, 2L), (4, 4L, 4, 4L)), - assertNumStateRows(7, 7), + assertNumStateRows(7, 7, 0), AddData(inputStream, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)), // batch 2 - global watermark = 5 @@ -817,7 +819,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with // NOTE: look for evicted rows in right which are not evicted from left - they were // properly joined in batch 1 CheckNewAnswer((6, 6L, 6, 6L), (8, 8L, 8, 8L), (10, 10L, 10, 10L)), - assertNumStateRows(13, 8), + assertNumStateRows(13, 8, 0), AddData(inputStream, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)), // batch 3 @@ -832,7 +834,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with CheckNewAnswer( Row(12, 12L, 12, 12L), Row(14, 14L, 14, 14L), Row(1, 1L, null, null), Row(3, 3L, null, null)), - assertNumStateRows(15, 7) + assertNumStateRows(15, 7, 0) ) } @@ -866,17 +868,17 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with testStream(query)( AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)), CheckNewAnswer((2, 2L, 2, 2L), (4, 4L, 4, 4L)), - assertNumStateRows(7, 7), + assertNumStateRows(7, 7, 0), AddData(inputStream, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)), CheckNewAnswer((6, 6L, 6, 6L), (8, 8L, 8, 8L), (10, 10L, 10, 10L)), - assertNumStateRows(13, 8), + assertNumStateRows(13, 8, 0), AddData(inputStream, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)), CheckNewAnswer( Row(12, 12L, 12, 12L), Row(14, 14L, 14, 14L), Row(null, null, 1, 1L), Row(null, null, 3, 3L)), - assertNumStateRows(15, 7) + assertNumStateRows(15, 7, 0) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 08b3644745f9a..a3aaccc2e6c24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -63,6 +63,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, + | "numLateInputRows" : 0, | "memoryUsedBytes" : 3, | "customMetrics" : { | "loadedMapCacheHitCount" : 1, @@ -113,6 +114,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, + | "numLateInputRows" : 0, | "memoryUsedBytes" : 2 | } ], | "sources" : [ { @@ -321,7 +323,7 @@ object StreamingQueryStatusAndProgressSuite { "avg" -> "2016-12-05T20:54:20.827Z", "watermark" -> "2016-12-05T20:54:20.827Z").asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, + numRowsTotal = 0, numRowsUpdated = 1, numLateInputRows = 0, memoryUsedBytes = 3, customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L, "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) .mapValues(long2Long).asJava) @@ -353,7 +355,7 @@ object StreamingQueryStatusAndProgressSuite { // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)), + numRowsTotal = 0, numRowsUpdated = 1, numLateInputRows = 0, memoryUsedBytes = 2)), sources = Array( new SourceProgress( description = "source", From 11560c9a388bcd68dfbeaae71bbcf79885113650 Mon Sep 17 00:00:00 2001 From: shashank Date: Mon, 12 Oct 2020 13:36:54 +0530 Subject: [PATCH 3/3] OP-667: adding Jenkinsfile for build and change pom.xml to push artifacts into artifactory --- .VERSION | 1 + Dockerfile | 49 ++++++++++++++++++++++ Jenkinsfile | 105 ++++++++++++++++++++++++++++++++++++++++++++++ pom.xml | 53 +++++------------------ sql/hive/pom.xml | 2 +- streaming/pom.xml | 4 +- 6 files changed, 168 insertions(+), 46 deletions(-) create mode 100644 .VERSION create mode 100644 Dockerfile create mode 100644 Jenkinsfile diff --git a/.VERSION b/.VERSION new file mode 100644 index 0000000000000..c7f9ef373b063 --- /dev/null +++ b/.VERSION @@ -0,0 +1 @@ +3.0.0-1,1.0 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000000..15b42311f4f16 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,49 @@ +ARG spark_image_tag=3.0.0-1-hadoop3.2-1.0 + +FROM artifacts.ggn.in.guavus.com:4244/spark:${spark_image_tag} + +ARG spark_uid=185 + +USER root + +RUN apt-get update && \ + apt-get -y install curl && \ + curl -fSL http://artifacts.ggn.in.guavus.com:8081/artifactory/libs-release-local/org/elasticsearch/elasticsearch-hadoop-core/7.8.1_3.0.0/elasticsearch-hadoop-core-7.8.1_3.0.0.jar -o elasticsearch-hadoop-core-7.8.1_3.0.0.jar && \ + curl -fSL http://artifacts.ggn.in.guavus.com:8081/artifactory/libs-release-local/org/elasticsearch/elasticsearch-hadoop-mr/7.8.1_3.0.0/elasticsearch-hadoop-mr-7.8.1_3.0.0.jar -o elasticsearch-hadoop-mr-7.8.1_3.0.0.jar && \ + curl -fSL http://artifacts.ggn.in.guavus.com:8081/artifactory/libs-release-local/org/elasticsearch/elasticsearch-hadoop-sql/7.8.1_3.0.0/elasticsearch-hadoop-sql-7.8.1_3.0.0.jar -o elasticsearch-hadoop-sql-7.8.1_3.0.0.jar && \ + curl -fSL https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.832/aws-java-sdk-s3-1.11.832.jar -o aws-java-sdk-s3-1.11.832.jar && \ + curl -fSL https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.11.832/aws-java-sdk-1.11.832.jar -o aws-java-sdk-1.11.832.jar && \ + curl -fSL https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.832/aws-java-sdk-core-1.11.832.jar -o aws-java-sdk-core-1.11.832.jar && \ + curl -fSL https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.832/aws-java-sdk-dynamodb-1.11.832.jar -o aws-java-sdk-dynamodb-1.11.832.jar && \ + curl -fSL https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.8.0/commons-pool2-2.8.0.jar -o commons-pool2-2.8.0.jar && \ + curl -fSL https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar -o hadoop-aws-3.2.0.jar && \ + curl -fSL https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.0.0/spark-avro_2.12-3.0.0.jar -o spark-avro_2.12-3.0.0.jar && \ + curl -fSL https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.0.0/spark-token-provider-kafka-0-10_2.12-3.0.0.jar -o spark-token-provider-kafka-0-10_2.12-3.0.0.jar && \ + curl -fSL https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.0/jets3t-0.9.0.jar -o jets3t-0.9.0.jar && \ + curl -fSL https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.0/spark-sql-kafka-0-10_2.12-3.0.0.jar -o spark-sql-kafka-0-10_2.12-3.0.0.jar && \ + curl -fSL https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.2.0/kafka-clients-2.2.0.jar -o kafka-clients-2.2.0.jar && \ + curl -fSL https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.0.0/spark-streaming-kafka-0-10_2.12-3.0.0.jar -o spark-streaming-kafka-0-10_2.12-3.0.0.jar && \ + curl -fSL https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.13.0/jmx_prometheus_javaagent-0.13.0.jar -o jmx_prometheus_javaagent-0.13.0.jar && \ + mv elasticsearch-hadoop-core-7.8.1_3.0.0.jar /opt/spark/jars/ && \ + mv elasticsearch-hadoop-mr-7.8.1_3.0.0.jar /opt/spark/jars/ && \ + mv elasticsearch-hadoop-sql-7.8.1_3.0.0.jar /opt/spark/jars/ && \ + mv aws-java-sdk-s3-1.11.832.jar /opt/spark/jars/ && \ + mv aws-java-sdk-1.11.832.jar /opt/spark/jars/ && \ + mv aws-java-sdk-core-1.11.832.jar /opt/spark/jars/ && \ + mv aws-java-sdk-dynamodb-1.11.832.jar /opt/spark/jars/ && \ + mv commons-pool2-2.8.0.jar /opt/spark/jars/ && \ + mv hadoop-aws-3.2.0.jar /opt/spark/jars/ && \ + mv spark-avro_2.12-3.0.0.jar /opt/spark/jars/ && \ + mv spark-token-provider-kafka-0-10_2.12-3.0.0.jar /opt/spark/jars/ && \ + mv jets3t-0.9.0.jar /opt/spark/jars/ && \ + mv spark-sql-kafka-0-10_2.12-3.0.0.jar /opt/spark/jars/ && \ + mv kafka-clients-2.2.0.jar /opt/spark/jars/ && \ + mv spark-streaming-kafka-0-10_2.12-3.0.0.jar /opt/spark/jars/ && \ + mv jmx_prometheus_javaagent-0.13.0.jar /opt/spark/jars/ + + + +ENTRYPOINT [ "/opt/entrypoint.sh" ] + +# Specify the User that the actual main process will run as +USER ${spark_uid} diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000000000..05854bc640346 --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,105 @@ +@Library('jenkins_lib')_ +pipeline + { + agent {label 'slave'} + + environment { + + project = "apache-spark"; + buildNum = currentBuild.getNumber() ; + //ex. like feat, release, fix + buildType = BRANCH_NAME.split("/").first(); + //ex. like OP- + branchVersion = BRANCH_NAME.split("/").last().toUpperCase(); + // Define global environment variables in this section + } + + stages { + stage("Define Release version") { + steps { + script { + //Global Lib for Environment Versions Definition + versionDefine() + env.GUAVUS_SPARK_VERSION = "${VERSION}".split(",").first(); + env.GUAVUS_DOCKER_VERSION = "${VERSION}".split(",").last(); + env.dockerTag = "${GUAVUS_SPARK_VERSION}-hadoop3.2-${GUAVUS_DOCKER_VERSION}-${RELEASE}" + echo "GUAVUS_SPARK_VERSION : ${GUAVUS_SPARK_VERSION}" + echo "GUAVUS_DOCKER_VERSION : ${GUAVUS_DOCKER_VERSION}" + echo "DOCKER TAG : ${dockerTag}" + } + } + } + + stage("Versioning") { + steps { + echo "GUAVUS_SPARK_VERSION : ${GUAVUS_SPARK_VERSION}" + echo "GUAVUS_DOCKER_VERSION : ${GUAVUS_DOCKER_VERSION}" + sh 'mvn versions:set -DnewVersion=${GUAVUS_SPARK_VERSION}' + } + } + + stage("Initialize Variable") { + steps { + script { + PUSH_JAR = false; + PUSH_DOCKER = false; + DOCKER_IMAGE_NAME = "spark-opsiq"; + longCommit = sh(returnStdout: true, script: "git rev-parse HEAD").trim() + + if( env.buildType in ['release'] ) + { + PUSH_JAR = true; + } + else if ( env.buildType ==~ /PR-.*/ ) { + PUSH_DOCKER = true + } + + } + + } + } + + stage("Push JAR to Maven Artifactory") { + when { + expression { PUSH_JAR == true } + } + steps { + script { + echo "Pushing JAR to Maven Artifactory" + sh "mvn deploy -U -Dcheckstyle.skip=true -Denforcer.skip=true -DskipTests=true;" + } + } + } + + stage("Build and Push Docker") { + when { + expression { PUSH_DOCKER == true } + } + stages { + stage("Create Docker Image") { + steps { + script { + echo "Creating docker build..." + sh "./dev/make-distribution.sh --name guavus_spark-${GUAVUS_SPARK_VERSION}-3.2.0 -Phive -Phive-thriftserver -Pkubernetes -Phadoop-3.2 -Dhadoop.version=3.2.0" + sh "./dist/bin/docker-image-tool.sh -r artifacts.ggn.in.guavus.com:4244 -t ${GUAVUS_SPARK_VERSION}-hadoop3.2-${GUAVUS_DOCKER_VERSION} build" + sh "./dist/bin/docker-image-tool.sh -r artifacts.ggn.in.guavus.com:4244 -t ${GUAVUS_SPARK_VERSION}-hadoop3.2-${GUAVUS_DOCKER_VERSION} push" + sh "docker build -t ${DOCKER_IMAGE_NAME} --build-arg GIT_HEAD=${longCommit} --build-arg GIT_BRANCH=${env.BRANCH_NAME} --build-arg VERSION=${dockerTag} --build-arg BUILD_NUMBER=${env.BUILD_NUMBER} ." + } + } + } + + stage("PUSH Docker") { + steps { + script { + echo "Docker PUSH..." + docker_push( buildType, DOCKER_IMAGE_NAME ) + } + } + } + } + } + + } + + } + diff --git a/pom.xml b/pom.xml index c98dd03f61eaf..c485e1c95c2ad 100644 --- a/pom.xml +++ b/pom.xml @@ -37,49 +37,16 @@ repo - - scm:git:git@github.com:apache/spark.git - scm:git:https://gitbox.apache.org/repos/asf/spark.git - scm:git:git@github.com:apache/spark.git - HEAD - - - - matei - Matei Zaharia - matei.zaharia@gmail.com - http://www.cs.berkeley.edu/~matei - Apache Software Foundation - http://spark.apache.org - - - - JIRA - https://issues.apache.org/jira/browse/SPARK - - - - - Dev Mailing List - dev@spark.apache.org - dev-subscribe@spark.apache.org - dev-unsubscribe@spark.apache.org - - - - User Mailing List - user@spark.apache.org - user-subscribe@spark.apache.org - user-unsubscribe@spark.apache.org - - - - Commits Mailing List - commits@spark.apache.org - commits-subscribe@spark.apache.org - commits-unsubscribe@spark.apache.org - - + + + central + http://artifacts.ggn.in.guavus.com/libs-release-local + + + snapshots + http://artifacts.ggn.in.guavus.com/libs-snapshot-local + + common/sketch diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 15e31624404ec..e022a421d6554 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -198,7 +198,7 @@ org.codehaus.mojo build-helper-maven-plugin - 3.0.0-1 + 3.0.0 add-scala-test-sources diff --git a/streaming/pom.xml b/streaming/pom.xml index 7a7c9d4fd3ccc..724e7eee73a41 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -118,7 +118,7 @@ test - +