Skip to content

Commit

Permalink
Merge pull request #2033 from OneSignal/fix/op-repo-fix-keeping-cpu-a…
Browse files Browse the repository at this point in the history
…wake

[Improvement] Prevent OperationRepo from continuously pulling when empty
  • Loading branch information
jkasten2 authored Mar 28, 2024
2 parents 25924dc + fd60e70 commit 79a2fbd
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 185 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.onesignal.common.threading

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking

/**
* An abstraction which allows for a suspending function to coordinate
Expand All @@ -18,7 +17,13 @@ class Waiter {
/**
* Wake the suspending function that has called [waitForWake].
*/
fun wake() = runBlocking { channel.send(null) }
fun wake() {
val result = channel.trySend(null)
if (result.isFailure) {
// Most likely only happens when the chanel is misconfigured or misused
throw Exception("Waiter.wait failed", result.exceptionOrNull())
}
}
}

/**
Expand All @@ -40,5 +45,11 @@ open class WaiterWithValue<TType> {
*
* @param value The data to be returned by the [waitForWake].
*/
fun wake(value: TType) = runBlocking { channel.send(value) }
fun wake(value: TType) {
val result = channel.trySend(value)
if (result.isFailure) {
// Most likely only happens when the chanel is misconfigured or misused
throw Exception("WaiterWithValue.wait failed", result.exceptionOrNull())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ internal class OperationRepo(
private val _configModelStore: ConfigModelStore,
private val _time: ITime,
) : IOperationRepo, IStartableService {
private class OperationQueueItem(
internal class OperationQueueItem(
val operation: Operation,
val waiter: WaiterWithValue<Boolean>? = null,
var retries: Int = 0,
Expand Down Expand Up @@ -97,61 +97,46 @@ internal class OperationRepo(
* dedicated thread.
*/
private suspend fun processQueueForever() {
var lastSyncTime = _time.currentTimeMillis
var force = false

// This runs forever, until the application is destroyed.
waitForNewOperationAndExecutionInterval()
while (true) {
if (paused) {
Logging.debug("OperationRepo is paused")
return
}
try {
var ops: List<OperationQueueItem>? = null

synchronized(queue) {
val startingOp = queue.firstOrNull { it.operation.canStartExecute }

if (startingOp != null) {
queue.remove(startingOp)
ops = getGroupableOperations(startingOp)
}
}

// if the queue is empty at this point, we are no longer in force flush mode. We
// check this now so if the execution is unsuccessful with retry, we don't find ourselves
// continuously retrying without delaying.
if (queue.isEmpty()) {
force = false
}

if (ops != null) {
executeOperations(ops!!)
}
val ops = getNextOps()
Logging.debug("processQueueForever:ops:$ops")

if (!force) {
// potentially delay to prevent this from constant IO if a bunch of
// operations are set sequentially.
val newTime = _time.currentTimeMillis

val delay = (lastSyncTime - newTime) + _configModelStore.model.opRepoExecutionInterval
lastSyncTime = newTime
if (delay > 0) {
withTimeoutOrNull(delay) {
// wait to be woken up for the next pass
force = waiter.waitForWake()
}

// This secondary delay allows for any subsequent operations (beyond the first one
// that woke us) to be enqueued before we pull from the queue.
delay(_configModelStore.model.opRepoPostWakeDelay)
if (ops != null) {
executeOperations(ops)
// Allows for any subsequent operations (beyond the first one
// that woke us) to be enqueued before we pull from the queue.
delay(_configModelStore.model.opRepoPostWakeDelay)
} else {
waitForNewOperationAndExecutionInterval()
}
}
}

lastSyncTime = _time.currentTimeMillis
}
}
} catch (e: Throwable) {
Logging.log(LogLevel.ERROR, "Error occurred with Operation work loop", e)
/**
* Waits until a new operation is enqueued, then wait an additional
* amount of time afterwards, so operations can be grouped/batched.
*/
private suspend fun waitForNewOperationAndExecutionInterval() {
// 1. Wait for an operation to be enqueued
var force = waiter.waitForWake()

// 2. Wait at least the time defined in opRepoExecutionInterval
// so operations can be grouped, unless one of them used
// flush=true (AKA force)
var lastTime = _time.currentTimeMillis
var remainingTime = _configModelStore.model.opRepoExecutionInterval
while (!force && remainingTime > 0) {
withTimeoutOrNull(remainingTime) {
force = waiter.waitForWake()
}
remainingTime -= _time.currentTimeMillis - lastTime
lastTime = _time.currentTimeMillis
}
}

Expand Down Expand Up @@ -249,10 +234,23 @@ internal class OperationRepo(
suspend fun delayBeforeRetry(retries: Int) {
val delayFor = retries * 15_000L
if (delayFor < 1) return
Logging.error("Operations being delay for: $delayFor")
Logging.error("Operations being delay for: $delayFor ms")
delay(delayFor)
}

internal fun getNextOps(): List<OperationQueueItem>? {
return synchronized(queue) {
val startingOp = queue.firstOrNull { it.operation.canStartExecute }

if (startingOp != null) {
queue.remove(startingOp)
getGroupableOperations(startingOp)
} else {
null
}
}
}

/**
* Given a starting operation, find and remove from the queue all other operations that
* can be executed along with the starting operation. The full list of operations, with
Expand Down
Loading

0 comments on commit 79a2fbd

Please sign in to comment.