Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23886][SS] Update query status for ContinuousExecution #23095

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ class MicroBatchExecution(
logInfo(s"Query $prettyIdString was stopped")
}

/** Begins recording statistics about query progress for a given trigger. */
override protected def startTrigger(): Unit = {
super.startTrigger()
currentStatus = currentStatus.copy(isTriggerActive = true)
}

/**
* Repeatedly attempts to run batches as data arrives.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ trait ProgressReporter extends Logging {
logDebug("Starting Trigger Calculation")
lastTriggerStartTimestamp = currentTriggerStartTimestamp
currentTriggerStartTimestamp = triggerClock.getTimeMillis()
currentStatus = currentStatus.copy(isTriggerActive = true)
currentTriggerStartOffsets = null
currentTriggerEndOffsets = null
currentDurationsMs.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ class ContinuousExecution(
// For at least once, we can just ignore those reports and risk duplicates.
commitLog.getLatest() match {
case Some((latestEpochId, _)) =>
updateStatusMessage("Starting new streaming query " +
s"and getting offsets from latest epoch $latestEpochId")
val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
throw new IllegalStateException(
s"Batch $latestEpochId was committed without end epoch offsets!")
Expand All @@ -128,6 +130,7 @@ class ContinuousExecution(
nextOffsets
case None =>
// We are starting this stream for the first time. Offsets are all None.
updateStatusMessage("Starting new streaming query")
logInfo(s"Starting new streaming query.")
currentBatchId = 0
OffsetSeq.fill(continuousSources.map(_ => null): _*)
Expand Down Expand Up @@ -260,6 +263,7 @@ class ContinuousExecution(
epochUpdateThread.setDaemon(true)
epochUpdateThread.start()

updateStatusMessage("Running")
reportTimeTaken("runContinuous") {
SQLExecution.withNewExecutionId(
sparkSessionForQuery, lastExecution) {
Expand Down Expand Up @@ -319,6 +323,8 @@ class ContinuousExecution(
* before this is called.
*/
def commit(epoch: Long): Unit = {
updateStatusMessage(s"Committing epoch $epoch")

assert(continuousSources.length == 1, "only one continuous source supported currently")
assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import org.apache.spark.annotation.InterfaceStability
* Reports information about the instantaneous status of a streaming query.
*
* @param message A human readable description of what the stream is currently doing.
* @param isDataAvailable True when there is new data to be processed.
* @param isDataAvailable True when there is new data to be processed. Doesn't apply
* to ContinuousExecution where it is always false.
* @param isTriggerActive True when the trigger is actively firing, false when waiting for the
* next trigger time.
* next trigger time. Doesn't apply to ContinuousExecution where it is
* always false.
*
* @since 2.1.0
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.streaming.continuous

import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
import org.apache.spark.sql.streaming.Trigger

class ContinuousQueryStatusAndProgressSuite extends ContinuousSuiteBase {
test("StreamingQueryStatus - ContinuousExecution isDataAvailable and isTriggerActive " +
"should be false") {
import testImplicits._

val input = ContinuousMemoryStream[Int]

def assertStatus(stream: StreamExecution): Unit = {
assert(stream.status.isDataAvailable === false)
assert(stream.status.isTriggerActive === false)
}

val trigger = Trigger.Continuous(100)
testStream(input.toDF(), useV2Sink = true)(
StartStream(trigger),
Execute(assertStatus),
AddData(input, 0, 1, 2),
Execute(assertStatus),
CheckAnswer(0, 1, 2),
Execute(assertStatus),
StopStream,
Execute(assertStatus),
AddData(input, 3, 4, 5),
Execute(assertStatus),
StartStream(trigger),
Execute(assertStatus),
CheckAnswer(0, 1, 2, 3, 4, 5),
Execute(assertStatus),
StopStream,
Execute(assertStatus))
}
}