Skip to content

Commit

Permalink
Merge pull request #2087 from OneSignal/fix/canaccess_breaking_order_…
Browse files Browse the repository at this point in the history
…with_grouping

[Fix] grouping skipping opRepoPostCreateDelay, causing operations being applied out of order when multiple login operations are pending. (fixes issue since 5.1.10)
  • Loading branch information
jkasten2 authored May 16, 2024
2 parents a6e9f80 + 9f8afc0 commit 91c9f50
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ internal class OperationRepo(
flush: Boolean,
addToStore: Boolean,
index: Int? = null,
): Boolean {
) {
synchronized(queue) {
val hasExisting = queue.any { it.operation.id == queueItem.operation.id }
if (hasExisting) {
Logging.debug("OperationRepo: internalEnqueue - operation.id: ${queueItem.operation.id} already exists in the queue.")
return false
return
}

if (index != null) {
Expand All @@ -153,7 +153,6 @@ internal class OperationRepo(
}

waiter.wake(LoopWaiterMessage(flush, 0))
return true
}

/**
Expand Down Expand Up @@ -236,12 +235,17 @@ internal class OperationRepo(
queue.forEach { it.operation.translateIds(response.idTranslations) }
}
response.idTranslations.values.forEach { _newRecordState.add(it) }
coroutineScope.launch {
val waitTime = _configModelStore.model.opRepoPostCreateDelay
delay(waitTime)
synchronized(queue) {
if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, waitTime))
}
// Stall processing the queue so the backend's DB has to time
// reflect the change before we do any other operations to it.
// NOTE: Future: We could run this logic in a
// coroutineScope.launch() block so other operations not
// effecting this these id's can still be done in parallel,
// however other parts of the system don't currently account
// for this so this is not safe to do.
val waitTime = _configModelStore.model.opRepoPostCreateDelay
delay(waitTime)
synchronized(queue) {
if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, waitTime))
}
}

Expand Down Expand Up @@ -359,8 +363,7 @@ internal class OperationRepo(
* THIS SHOULD BE CALLED WHILE THE QUEUE IS SYNCHRONIZED!!
*/
private fun getGroupableOperations(startingOp: OperationQueueItem): List<OperationQueueItem> {
val ops = mutableListOf<OperationQueueItem>()
ops.add(startingOp)
val ops = mutableListOf(startingOp)

if (startingOp.operation.groupComparisonType == GroupComparisonType.NONE) {
return ops
Expand All @@ -373,23 +376,25 @@ internal class OperationRepo(
startingOp.operation.modifyComparisonKey
}

if (queue.isNotEmpty()) {
for (item in queue.toList()) {
val itemKey =
if (startingOp.operation.groupComparisonType == GroupComparisonType.CREATE) {
item.operation.createComparisonKey
} else {
item.operation.modifyComparisonKey
}

if (itemKey == "" && startingKey == "") {
throw Exception("Both comparison keys can not be blank!")
for (item in queue.toList()) {
val itemKey =
if (startingOp.operation.groupComparisonType == GroupComparisonType.CREATE) {
item.operation.createComparisonKey
} else {
item.operation.modifyComparisonKey
}

if (itemKey == startingKey) {
queue.remove(item)
ops.add(item)
}
if (itemKey == "" && startingKey == "") {
throw Exception("Both comparison keys can not be blank!")
}

if (!_newRecordState.canAccess(item.operation.applyToRecordId)) {
continue
}

if (itemKey == startingKey) {
queue.remove(item)
ops.add(item)
}
}

Expand All @@ -398,25 +403,18 @@ internal class OperationRepo(

/**
* Load saved operations from preference service and add them into the queue
* WARNING: Make sure queue.remove is NEVER called while this method is
* running, as internalEnqueue will throw IndexOutOfBounds or put things
* out of order if what was removed was something added by this method.
* - This never happens now, but is a landmine to be aware of!
* NOTE: Sometimes the loading might take longer than expected due to I/O reads from disk
* Any I/O implies executing time will vary greatly.
* NOTE: Sometimes the loading might take longer than expected due to I/O reads from disk,
* so always ensure this is call off the main thread.
*/
internal fun loadSavedOperations() {
_operationModelStore.loadOperations()
var successfulIndex = 0
for (operation in _operationModelStore.list()) {
val successful =
internalEnqueue(
OperationQueueItem(operation, bucket = enqueueIntoBucket),
flush = false,
addToStore = false,
index = successfulIndex,
)
if (successful) successfulIndex++
for (operation in _operationModelStore.list().reversed()) {
internalEnqueue(
OperationQueueItem(operation, bucket = enqueueIntoBucket),
flush = false,
addToStore = false,
index = 0,
)
}
initialized.complete(Unit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class NewRecordsState(

fun canAccess(key: String): Boolean {
val timeLastMovedOrCreated = records[key] ?: return true
return _time.currentTimeMillis - timeLastMovedOrCreated > _configModelStore.model.opRepoPostCreateDelay
return _time.currentTimeMillis - timeLastMovedOrCreated >= _configModelStore.model.opRepoPostCreateDelay
}

fun isInMissingRetryWindow(key: String): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.time.withTimeout
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.coroutines.yield
Expand Down Expand Up @@ -547,31 +546,42 @@ class OperationRepoTests : FunSpec({
mocks.operationRepo.start()
mocks.operationRepo.enqueue(operation1)
val job = launch { mocks.operationRepo.enqueueAndWait(operation2) }.also { yield() }
mocks.operationRepo.enqueue(operation3)
mocks.operationRepo.enqueueAndWait(operation3)
job.join()

// Then
coVerifyOrder {
mocks.executor.execute(
withArg {
it.count() shouldBe 1
it[0] shouldBe operation1
},
)
mocks.executor.execute(listOf(operation1))
operation2.translateIds(mapOf("local-id1" to "id2"))
mocks.executor.execute(
withArg {
it.count() shouldBe 1
it[0] shouldBe operation3
},
)
// Ensure operation2 runs after operation3 as it has to wait for the create delay
mocks.executor.execute(
withArg {
it.count() shouldBe 1
it[0] shouldBe operation2
},
)
mocks.executor.execute(listOf(operation2))
mocks.executor.execute(listOf(operation3))
}
}

// This tests the same logic as above, but makes sure the delay also
// applies to grouping operations.
test("execution of an operation with translation IDs delays follow up operations, including grouping") {
// Given
val mocks = Mocks()
mocks.configModelStore.model.opRepoPostCreateDelay = 100
val operation1 = mockOperation(groupComparisonType = GroupComparisonType.NONE)
val operation2 = mockOperation(groupComparisonType = GroupComparisonType.CREATE)
val operation3 = mockOperation(groupComparisonType = GroupComparisonType.CREATE, applyToRecordId = "id2")
coEvery {
mocks.executor.execute(listOf(operation1))
} returns ExecutionResponse(ExecutionResult.SUCCESS, mapOf("local-id1" to "id2"))

// When
mocks.operationRepo.start()
mocks.operationRepo.enqueue(operation1)
mocks.operationRepo.enqueue(operation2)
mocks.operationRepo.enqueueAndWait(operation3)

// Then
coVerifyOrder {
mocks.executor.execute(listOf(operation1))
operation2.translateIds(mapOf("local-id1" to "id2"))
mocks.executor.execute(listOf(operation2, operation3))
}
}

Expand Down

0 comments on commit 91c9f50

Please sign in to comment.