Skip to content

Commit

Permalink
fix multiple enqueues skipping waitForWake
Browse files Browse the repository at this point in the history
We renamed waitForWake to waitForNewOperationAndExecutionInterval to
describe that it is doing two things.

Fixed the logic where enqueuing two or more operations would skip our
executionInterval logic. Added test coverage for this logic, as well
as a 2nd test to ensure flush can still skip waiting for it.

Lastly cleaned up the force state in the main processQueueForever loop,
we were able to fully encapsulate it into
waitForNewOperationAndExecutionInterval. The other part of the logic is
once we start processing the queue we want to do so until it's empty
again, which ops != null is the only checked needed in
processQueueForever to cover that.
  • Loading branch information
jkasten2 committed Mar 28, 2024
1 parent 1c2fd11 commit bf2cfda
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,37 +97,47 @@ internal class OperationRepo(
* dedicated thread.
*/
private suspend fun processQueueForever() {
var force = waitForWake()
waitForNewOperationAndExecutionInterval()
while (true) {
if (paused) {
Logging.debug("OperationRepo is paused")
return
}

val ops = getNextOps()
Logging.debug("processQueueForever:force:$force, ops:$ops")
Logging.debug("processQueueForever:ops:$ops")

if (ops != null) {
executeOperations(ops)
// Allows for any subsequent operations (beyond the first one
// that woke us) to be enqueued before we pull from the queue.
delay(_configModelStore.model.opRepoPostWakeDelay)
} else if (!force) {
force = waitForWake()
} else {
force = false
waitForNewOperationAndExecutionInterval()
}
}
}

private suspend fun waitForWake(): Boolean {
/**
* Waits until a new operation is enqueued, then wait an additional
* amount of time afterwards, so operations can be grouped/batched.
*/
private suspend fun waitForNewOperationAndExecutionInterval() {
// 1. Wait for an operation to be enqueued
var force = waiter.waitForWake()
if (!force) {
withTimeoutOrNull(_configModelStore.model.opRepoExecutionInterval) {

// 2. Wait at least the time defined in opRepoExecutionInterval
// so operations can be grouped, unless one of them used
// flush=true (AKA force)
var lastTime = _time.currentTimeMillis
var remainingTime = _configModelStore.model.opRepoExecutionInterval
while (!force && remainingTime > 0) {
withTimeoutOrNull(remainingTime) {
force = waiter.waitForWake()
}
remainingTime -= _time.currentTimeMillis - lastTime
lastTime = _time.currentTimeMillis
}
return force
}

private suspend fun executeOperations(ops: List<OperationQueueItem>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.onesignal.core.internal.operations
import com.onesignal.common.threading.Waiter
import com.onesignal.core.internal.operations.impl.OperationModelStore
import com.onesignal.core.internal.operations.impl.OperationRepo
import com.onesignal.core.internal.time.impl.Time
import com.onesignal.debug.LogLevel
import com.onesignal.debug.internal.logging.Logging
import com.onesignal.mocks.MockHelper
Expand All @@ -19,9 +20,12 @@ import io.mockk.slot
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.delay
import kotlinx.coroutines.withTimeoutOrNull

// Mocks used by every test in this file
private class Mocks {
val configModelStore = MockHelper.configModelStore()

val operationModelStore: OperationModelStore =
run {
val mockOperationModelStore = mockk<OperationModelStore>()
Expand All @@ -45,8 +49,8 @@ private class Mocks {
OperationRepo(
listOf(executor),
operationModelStore,
MockHelper.configModelStore(),
MockHelper.time(1000),
configModelStore,
Time(),
),
)
}
Expand Down Expand Up @@ -312,6 +316,38 @@ class OperationRepoTests : FunSpec({
mocks.operationModelStore.remove("operationId2")
}
}

test("enqueuing normal operations should not skip minimum wait time") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoExecutionInterval = 1_000

// When
mocks.operationRepo.start()
mocks.operationRepo.enqueue(mockOperation())
val response =
withTimeoutOrNull(100) {
val value = mocks.operationRepo.enqueueAndWait(mockOperation())
value
}
response shouldBe null
}

test("enqueuing with flush = true should skip minimum wait time") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoExecutionInterval = 1_000

// When
mocks.operationRepo.start()
mocks.operationRepo.enqueue(mockOperation())
val response =
withTimeoutOrNull(100) {
val value = mocks.operationRepo.enqueueAndWait(mockOperation(), flush = true)
value
}
response shouldBe true
}
}) {
companion object {
private fun mockOperation(
Expand Down

0 comments on commit bf2cfda

Please sign in to comment.