Skip to content

Commit

Permalink
Undo ContinuousExecution changes
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Feb 21, 2018
1 parent 7b78fa1 commit fdcf716
Showing 1 changed file with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,19 @@ class ContinuousExecution(
epoch: Long, reader: ContinuousReader, partitionOffsets: Seq[PartitionOffset]): Unit = {
assert(continuousSources.length == 1, "only one continuous source supported currently")

val globalOffset = reader.mergeOffsets(partitionOffsets.toArray)
val oldOffset = synchronized {
offsetLog.add(epoch, OffsetSeq.fill(globalOffset))
offsetLog.get(epoch - 1)
}

// If offset hasn't changed since last epoch, there's been no new data.
if (oldOffset.contains(OffsetSeq.fill(globalOffset))) {
noNewData = true
}

awaitProgressLock.lock()
try {
val globalOffset = reader.mergeOffsets(partitionOffsets.toArray)
val oldOffset = {
offsetLog.add(epoch, OffsetSeq.fill(globalOffset))
offsetLog.get(epoch - 1)
}

// If offset hasn't changed since last epoch, there's been no new data.
if (oldOffset.contains(OffsetSeq.fill(globalOffset))) {
noNewData = true
}
awaitProgressLockCondition.signalAll()
} finally {
awaitProgressLock.unlock()
Expand All @@ -307,22 +308,23 @@ class ContinuousExecution(
def commit(epoch: Long): Unit = {
assert(continuousSources.length == 1, "only one continuous source supported currently")
assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")

awaitProgressLock.lock()
try {
synchronized {
if (queryExecutionThread.isAlive) {
commitLog.add(epoch)
val offset = offsetLog.get(epoch).get.offsets(0).get
committedOffsets ++= Seq(continuousSources(0) -> offset)
} else {
return
}
}

if (minLogEntriesToMaintain < currentBatchId) {
offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
commitLog.purge(currentBatchId - minLogEntriesToMaintain)
}
if (minLogEntriesToMaintain < currentBatchId) {
offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
commitLog.purge(currentBatchId - minLogEntriesToMaintain)
}

awaitProgressLock.lock()
try {
awaitProgressLockCondition.signalAll()
} finally {
awaitProgressLock.unlock()
Expand Down

0 comments on commit fdcf716

Please sign in to comment.