diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 7472d16bd0b09..2c1d6c509d21b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -282,18 +282,19 @@ class ContinuousExecution( epoch: Long, reader: ContinuousReader, partitionOffsets: Seq[PartitionOffset]): Unit = { assert(continuousSources.length == 1, "only one continuous source supported currently") + val globalOffset = reader.mergeOffsets(partitionOffsets.toArray) + val oldOffset = synchronized { + offsetLog.add(epoch, OffsetSeq.fill(globalOffset)) + offsetLog.get(epoch - 1) + } + + // If offset hasn't changed since last epoch, there's been no new data. + if (oldOffset.contains(OffsetSeq.fill(globalOffset))) { + noNewData = true + } + awaitProgressLock.lock() try { - val globalOffset = reader.mergeOffsets(partitionOffsets.toArray) - val oldOffset = { - offsetLog.add(epoch, OffsetSeq.fill(globalOffset)) - offsetLog.get(epoch - 1) - } - - // If offset hasn't changed since last epoch, there's been no new data. - if (oldOffset.contains(OffsetSeq.fill(globalOffset))) { - noNewData = true - } awaitProgressLockCondition.signalAll() } finally { awaitProgressLock.unlock() @@ -307,9 +308,7 @@ class ContinuousExecution( def commit(epoch: Long): Unit = { assert(continuousSources.length == 1, "only one continuous source supported currently") assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit") - - awaitProgressLock.lock() - try { + synchronized { if (queryExecutionThread.isAlive) { commitLog.add(epoch) val offset = offsetLog.get(epoch).get.offsets(0).get @@ -317,12 +316,15 @@ class ContinuousExecution( } else { return } + } - if (minLogEntriesToMaintain < currentBatchId) { - offsetLog.purge(currentBatchId - minLogEntriesToMaintain) - commitLog.purge(currentBatchId - minLogEntriesToMaintain) - } + if (minLogEntriesToMaintain < currentBatchId) { + offsetLog.purge(currentBatchId - minLogEntriesToMaintain) + commitLog.purge(currentBatchId - minLogEntriesToMaintain) + } + awaitProgressLock.lock() + try { awaitProgressLockCondition.signalAll() } finally { awaitProgressLock.unlock()