From 38e5e6cd9a772236aeba38c6fce8a315b22be09c Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Mon, 13 May 2024 20:41:56 -0400 Subject: [PATCH 1/7] test proving grouping issue Added test to prove that grouping logic is not waiting on opRepoPostCreateDelay for new ids.This has side effect of operations being done out of order in some cases. A real world case is login operations can be done out of order, which also results in things like the push subscription being on the wrong users. There is also a smaller issue where we are simply not waiting the time limit so some 404 error were not being avoid but would have still resulted in correct behavior in the end. --- .../internal/operations/OperationRepoTests.kt | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt index aada5b40fd..168eecebc3 100644 --- a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt +++ b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt @@ -25,7 +25,6 @@ import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import kotlinx.coroutines.time.withTimeout import kotlinx.coroutines.withTimeout import kotlinx.coroutines.withTimeoutOrNull import kotlinx.coroutines.yield @@ -575,6 +574,49 @@ class OperationRepoTests : FunSpec({ } } + // This tests the same logic as above, but makes sure the delay also + // applies to grouping operations. + test("execution of an operation with translation IDs delays follow up operations, including grouping") { + // Given + val mocks = Mocks() + mocks.configModelStore.model.opRepoPostCreateDelay = 100 + val operation1 = mockOperation(groupComparisonType = GroupComparisonType.NONE) + val operation2 = mockOperation(groupComparisonType = GroupComparisonType.CREATE) + val operation3 = mockOperation(groupComparisonType = GroupComparisonType.CREATE, applyToRecordId = "id2") + coEvery { + mocks.executor.execute(listOf(operation1)) + } returns ExecutionResponse(ExecutionResult.SUCCESS, mapOf("local-id1" to "id2")) + + // When + mocks.operationRepo.start() + mocks.operationRepo.enqueue(operation1) + mocks.operationRepo.enqueue(operation2) + mocks.operationRepo.enqueueAndWait(operation3) + + // Then + coVerifyOrder { + mocks.executor.execute( + withArg { + it.count() shouldBe 1 + it[0] shouldBe operation1 + }, + ) + operation2.translateIds(mapOf("local-id1" to "id2")) + mocks.executor.execute( + withArg { + it.count() shouldBe 1 + it[0] shouldBe operation2 + }, + ) + mocks.executor.execute( + withArg { + it.count() shouldBe 1 + it[0] shouldBe operation3 + }, + ) + } + } + // We want to prevent a misbehaving app stuck in a loop from continuously // sending updates every opRepoExecutionInterval (5 seconds currently). // By waiting for the dust to settle we ensure the app is done making From ee6e690cd02dee396eaad6880f68f55dd2c54ce4 Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Mon, 13 May 2024 20:58:58 -0400 Subject: [PATCH 2/7] OperationRepo.getGroupableOperations() was not respecting opRepoPostCreateDelay like OperationRepo.getNextOps(), this would lead to some being applied out of order. The only known case is when there is a login in the queue for an Anonymous User crate and also User A and another for User B. The anonymous User create happens first with a push subscription which is correct. Then we have to wait for opRepoPostCreateDelay time after that create before we can attempt to identify it has User A, this is also ok. However we skip to User B and try to create it and transfer the subscription, this isn't correct as we also should wait opRepoPostCreateDelay for the new subscription id. While this commit fixes the issue noted above there should be another mechanism in place to prevent such an issue. This should be done in a follow up commit. See PR #2087 for more details. --- .../onesignal/core/internal/operations/impl/OperationRepo.kt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index 88c13b03da..68665b8eba 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -386,6 +386,10 @@ internal class OperationRepo( throw Exception("Both comparison keys can not be blank!") } + if (!_newRecordState.canAccess(item.operation.applyToRecordId)) { + continue + } + if (itemKey == startingKey) { queue.remove(item) ops.add(item) From cc67b11972949645556e93cfd74e5dd304086391 Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Mon, 13 May 2024 21:38:50 -0400 Subject: [PATCH 3/7] simply loadSavedOperations logic Reversing the list means we insert each item at index 0 to keep the order correct. Always inserting at index 0 means we no longer need to keep an index state, which both simples the code but also means we don't depend on state which may be wrong sometimes, noted in the warning comment that was removed in this commit. This reversed() logic for list used in a few places in this file already to solve this problem. --- .../internal/operations/impl/OperationRepo.kt | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index 68665b8eba..79a0348ae0 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -134,12 +134,12 @@ internal class OperationRepo( flush: Boolean, addToStore: Boolean, index: Int? = null, - ): Boolean { + ) { synchronized(queue) { val hasExisting = queue.any { it.operation.id == queueItem.operation.id } if (hasExisting) { Logging.debug("OperationRepo: internalEnqueue - operation.id: ${queueItem.operation.id} already exists in the queue.") - return false + return } if (index != null) { @@ -153,7 +153,6 @@ internal class OperationRepo( } waiter.wake(LoopWaiterMessage(flush, 0)) - return true } /** @@ -402,25 +401,18 @@ internal class OperationRepo( /** * Load saved operations from preference service and add them into the queue - * WARNING: Make sure queue.remove is NEVER called while this method is - * running, as internalEnqueue will throw IndexOutOfBounds or put things - * out of order if what was removed was something added by this method. - * - This never happens now, but is a landmine to be aware of! - * NOTE: Sometimes the loading might take longer than expected due to I/O reads from disk - * Any I/O implies executing time will vary greatly. + * NOTE: Sometimes the loading might take longer than expected due to I/O reads from disk, + * so always ensure this is call off the main thread. */ internal fun loadSavedOperations() { _operationModelStore.loadOperations() - var successfulIndex = 0 - for (operation in _operationModelStore.list()) { - val successful = - internalEnqueue( - OperationQueueItem(operation, bucket = enqueueIntoBucket), - flush = false, - addToStore = false, - index = successfulIndex, - ) - if (successful) successfulIndex++ + for (operation in _operationModelStore.list().reversed()) { + internalEnqueue( + OperationQueueItem(operation, bucket = enqueueIntoBucket), + flush = false, + addToStore = false, + index = 0, + ) } initialized.complete(Unit) } From fa07f4fa863551f35c2be65f9e996bdf9c55c405 Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Tue, 14 May 2024 17:46:48 -0400 Subject: [PATCH 4/7] clean up, removed unneeded isNotEmpty() check --- .../internal/operations/impl/OperationRepo.kt | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index 79a0348ae0..89386b52d2 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -372,27 +372,25 @@ internal class OperationRepo( startingOp.operation.modifyComparisonKey } - if (queue.isNotEmpty()) { - for (item in queue.toList()) { - val itemKey = - if (startingOp.operation.groupComparisonType == GroupComparisonType.CREATE) { - item.operation.createComparisonKey - } else { - item.operation.modifyComparisonKey - } - - if (itemKey == "" && startingKey == "") { - throw Exception("Both comparison keys can not be blank!") + for (item in queue.toList()) { + val itemKey = + if (startingOp.operation.groupComparisonType == GroupComparisonType.CREATE) { + item.operation.createComparisonKey + } else { + item.operation.modifyComparisonKey } - if (!_newRecordState.canAccess(item.operation.applyToRecordId)) { - continue - } + if (itemKey == "" && startingKey == "") { + throw Exception("Both comparison keys can not be blank!") + } - if (itemKey == startingKey) { - queue.remove(item) - ops.add(item) - } + if (!_newRecordState.canAccess(item.operation.applyToRecordId)) { + continue + } + + if (itemKey == startingKey) { + queue.remove(item) + ops.add(item) } } From 064db14362a707d94021c6176fdd25fb95f07d0b Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Tue, 14 May 2024 17:59:13 -0400 Subject: [PATCH 5/7] simplify mutableListOf --- .../onesignal/core/internal/operations/impl/OperationRepo.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index 89386b52d2..0a9b7293b6 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -358,8 +358,7 @@ internal class OperationRepo( * THIS SHOULD BE CALLED WHILE THE QUEUE IS SYNCHRONIZED!! */ private fun getGroupableOperations(startingOp: OperationQueueItem): List { - val ops = mutableListOf() - ops.add(startingOp) + val ops = mutableListOf(startingOp) if (startingOp.operation.groupComparisonType == GroupComparisonType.NONE) { return ops From 5d5c8370a199bc431e524303b2bb815535f2590e Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Wed, 15 May 2024 19:37:06 -0400 Subject: [PATCH 6/7] canAccess should be inclusive Since opRepoPostCreateDelay is passed to delay() canAccess() should use inclusive logic. --- .../user/internal/operations/impl/states/NewRecordsState.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/operations/impl/states/NewRecordsState.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/operations/impl/states/NewRecordsState.kt index e884fc7f6d..cdc44a4c38 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/operations/impl/states/NewRecordsState.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/operations/impl/states/NewRecordsState.kt @@ -23,7 +23,7 @@ class NewRecordsState( fun canAccess(key: String): Boolean { val timeLastMovedOrCreated = records[key] ?: return true - return _time.currentTimeMillis - timeLastMovedOrCreated > _configModelStore.model.opRepoPostCreateDelay + return _time.currentTimeMillis - timeLastMovedOrCreated >= _configModelStore.model.opRepoPostCreateDelay } fun isInMissingRetryWindow(key: String): Boolean { From 9f8afc0fede8693923157892983dcfc5ea9a7427 Mon Sep 17 00:00:00 2001 From: Josh Kasten Date: Wed, 15 May 2024 20:47:50 -0400 Subject: [PATCH 7/7] stall OperationRepo by opRepoPostCreateDelay The OperationRepo can't juggle two different users correctly, so stall the whole queue by opRepoPostCreateDelay. We plan to address this limitation in a future PR. --- .../internal/operations/impl/OperationRepo.kt | 17 ++++--- .../internal/operations/OperationRepoTests.kt | 44 +++---------------- 2 files changed, 17 insertions(+), 44 deletions(-) diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index 0a9b7293b6..42adaae1da 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -235,12 +235,17 @@ internal class OperationRepo( queue.forEach { it.operation.translateIds(response.idTranslations) } } response.idTranslations.values.forEach { _newRecordState.add(it) } - coroutineScope.launch { - val waitTime = _configModelStore.model.opRepoPostCreateDelay - delay(waitTime) - synchronized(queue) { - if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, waitTime)) - } + // Stall processing the queue so the backend's DB has to time + // reflect the change before we do any other operations to it. + // NOTE: Future: We could run this logic in a + // coroutineScope.launch() block so other operations not + // effecting this these id's can still be done in parallel, + // however other parts of the system don't currently account + // for this so this is not safe to do. + val waitTime = _configModelStore.model.opRepoPostCreateDelay + delay(waitTime) + synchronized(queue) { + if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, waitTime)) } } diff --git a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt index 168eecebc3..1094b06671 100644 --- a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt +++ b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt @@ -546,31 +546,15 @@ class OperationRepoTests : FunSpec({ mocks.operationRepo.start() mocks.operationRepo.enqueue(operation1) val job = launch { mocks.operationRepo.enqueueAndWait(operation2) }.also { yield() } - mocks.operationRepo.enqueue(operation3) + mocks.operationRepo.enqueueAndWait(operation3) job.join() // Then coVerifyOrder { - mocks.executor.execute( - withArg { - it.count() shouldBe 1 - it[0] shouldBe operation1 - }, - ) + mocks.executor.execute(listOf(operation1)) operation2.translateIds(mapOf("local-id1" to "id2")) - mocks.executor.execute( - withArg { - it.count() shouldBe 1 - it[0] shouldBe operation3 - }, - ) - // Ensure operation2 runs after operation3 as it has to wait for the create delay - mocks.executor.execute( - withArg { - it.count() shouldBe 1 - it[0] shouldBe operation2 - }, - ) + mocks.executor.execute(listOf(operation2)) + mocks.executor.execute(listOf(operation3)) } } @@ -595,25 +579,9 @@ class OperationRepoTests : FunSpec({ // Then coVerifyOrder { - mocks.executor.execute( - withArg { - it.count() shouldBe 1 - it[0] shouldBe operation1 - }, - ) + mocks.executor.execute(listOf(operation1)) operation2.translateIds(mapOf("local-id1" to "id2")) - mocks.executor.execute( - withArg { - it.count() shouldBe 1 - it[0] shouldBe operation2 - }, - ) - mocks.executor.execute( - withArg { - it.count() shouldBe 1 - it[0] shouldBe operation3 - }, - ) + mocks.executor.execute(listOf(operation2, operation3)) } }