diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/IOperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/IOperationRepo.kt index fbdc3d165..5344b8562 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/IOperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/IOperationRepo.kt @@ -39,7 +39,7 @@ interface IOperationRepo { */ fun containsInstanceOf(type: KClass): Boolean - fun addOperationLoadedListener(handler: IOperationRepoLoadedListener) + suspend fun awaitInitialized() } // Extension function so the syntax containsInstanceOf() can be used over diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/IOperationRepoLoadedListener.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/IOperationRepoLoadedListener.kt deleted file mode 100644 index a086541f7..000000000 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/IOperationRepoLoadedListener.kt +++ /dev/null @@ -1,5 +0,0 @@ -package com.onesignal.core.internal.operations - -interface IOperationRepoLoadedListener { - fun onOperationRepoLoaded() -} diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt index d2f46e6a3..88c13b03d 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt @@ -1,20 +1,18 @@ package com.onesignal.core.internal.operations.impl -import com.onesignal.common.events.EventProducer -import com.onesignal.common.events.IEventNotifier import com.onesignal.common.threading.WaiterWithValue import com.onesignal.core.internal.config.ConfigModelStore import com.onesignal.core.internal.operations.ExecutionResult import com.onesignal.core.internal.operations.GroupComparisonType import com.onesignal.core.internal.operations.IOperationExecutor import com.onesignal.core.internal.operations.IOperationRepo -import com.onesignal.core.internal.operations.IOperationRepoLoadedListener import com.onesignal.core.internal.operations.Operation import com.onesignal.core.internal.startup.IStartableService import com.onesignal.core.internal.time.ITime import com.onesignal.debug.LogLevel import com.onesignal.debug.internal.logging.Logging import com.onesignal.user.internal.operations.impl.states.NewRecordsState +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -30,7 +28,7 @@ internal class OperationRepo( private val _configModelStore: ConfigModelStore, private val _time: ITime, private val _newRecordState: NewRecordsState, -) : IOperationRepo, IStartableService, IEventNotifier { +) : IOperationRepo, IStartableService { internal class OperationQueueItem( val operation: Operation, val waiter: WaiterWithValue? = null, @@ -52,17 +50,10 @@ internal class OperationRepo( private val waiter = WaiterWithValue() private var paused = false private var coroutineScope = CoroutineScope(newSingleThreadContext(name = "OpRepo")) - private val loadedSubscription: EventProducer = EventProducer() + private val initialized = CompletableDeferred() - override val hasSubscribers: Boolean - get() = loadedSubscription.hasSubscribers - - override fun unsubscribe(handler: IOperationRepoLoadedListener) { - loadedSubscription.unsubscribe(handler) - } - - override fun subscribe(handler: IOperationRepoLoadedListener) { - loadedSubscription.subscribe(handler) + override suspend fun awaitInitialized() { + initialized.await() } /** *** Buckets *** @@ -101,10 +92,6 @@ internal class OperationRepo( } } - override fun addOperationLoadedListener(handler: IOperationRepoLoadedListener) { - subscribe(handler) - } - override fun start() { paused = false coroutineScope.launch { @@ -431,6 +418,6 @@ internal class OperationRepo( ) if (successful) successfulIndex++ } - loadedSubscription.fire { it.onOperationRepoLoaded() } + initialized.complete(Unit) } } diff --git a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/migrations/RecoverFromDroppedLoginBug.kt b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/migrations/RecoverFromDroppedLoginBug.kt index ff50f6b60..b763a0d28 100644 --- a/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/migrations/RecoverFromDroppedLoginBug.kt +++ b/OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/migrations/RecoverFromDroppedLoginBug.kt @@ -3,12 +3,14 @@ package com.onesignal.user.internal.migrations import com.onesignal.common.IDManager import com.onesignal.core.internal.config.ConfigModelStore import com.onesignal.core.internal.operations.IOperationRepo -import com.onesignal.core.internal.operations.IOperationRepoLoadedListener import com.onesignal.core.internal.operations.containsInstanceOf import com.onesignal.core.internal.startup.IStartableService import com.onesignal.debug.internal.logging.Logging import com.onesignal.user.internal.identity.IdentityModelStore import com.onesignal.user.internal.operations.LoginUserOperation +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch /** * Purpose: Automatically recovers a stalled User in the OperationRepo due @@ -31,21 +33,20 @@ class RecoverFromDroppedLoginBug( private val _operationRepo: IOperationRepo, private val _identityModelStore: IdentityModelStore, private val _configModelStore: ConfigModelStore, -) : IStartableService, IOperationRepoLoadedListener { +) : IStartableService { override fun start() { - _operationRepo.addOperationLoadedListener(this) - } - - override fun onOperationRepoLoaded() { - if (isInBadState()) { - Logging.warn( - "User with externalId:" + - "${_identityModelStore.model.externalId} " + - "was in a bad state, causing it to not update on OneSignal's " + - "backend! We are recovering and replaying all unsent " + - "operations now.", - ) - recoverByAddingBackDroppedLoginOperation() + GlobalScope.launch(Dispatchers.IO) { + _operationRepo.awaitInitialized() + if (isInBadState()) { + Logging.warn( + "User with externalId:" + + "${_identityModelStore.model.externalId} " + + "was in a bad state, causing it to not update on OneSignal's " + + "backend! We are recovering and replaying all unsent " + + "operations now.", + ) + recoverByAddingBackDroppedLoginOperation() + } } } diff --git a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt index 07c634e89..aada5b40f 100644 --- a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt +++ b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt @@ -25,6 +25,8 @@ import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import kotlinx.coroutines.time.withTimeout +import kotlinx.coroutines.withTimeout import kotlinx.coroutines.withTimeoutOrNull import kotlinx.coroutines.yield import java.util.UUID @@ -599,24 +601,17 @@ class OperationRepoTests : FunSpec({ result shouldBe null } - test("ensure onOperationRepoLoaded is called once loading is completed") { + test("ensure awaitInitialized() unsuspends") { // Given val mocks = Mocks() - val spyListener = spyk() // When - mocks.operationRepo.addOperationLoadedListener(spyListener) mocks.operationRepo.start() // enqueueAndWait used to know we are fully loaded. mocks.operationRepo.enqueueAndWait(mockOperation()) // Then - mocks.operationRepo.hasSubscribers shouldBe true - coVerifyOrder { - mocks.operationRepo.subscribe(any()) - mocks.operationModelStore.loadOperations() - spyListener.onOperationRepoLoaded() - } + withTimeout(1_000) { mocks.operationRepo.awaitInitialized() } } test("ensure loadSavedOperations doesn't duplicate existing OperationItems") { diff --git a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/user/internal/migrations/RecoverFromDroppedLoginBugTests.kt b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/user/internal/migrations/RecoverFromDroppedLoginBugTests.kt index bd5082b80..554c09ac9 100644 --- a/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/user/internal/migrations/RecoverFromDroppedLoginBugTests.kt +++ b/OneSignalSDK/onesignal/core/src/test/java/com/onesignal/user/internal/migrations/RecoverFromDroppedLoginBugTests.kt @@ -1,58 +1,111 @@ package com.onesignal.user.internal.migrations -import com.onesignal.common.threading.Waiter -import com.onesignal.core.internal.config.ConfigModelStore -import com.onesignal.core.internal.operations.IOperationRepoLoadedListener import com.onesignal.core.internal.operations.impl.OperationModelStore import com.onesignal.core.internal.operations.impl.OperationRepo import com.onesignal.core.internal.time.impl.Time +import com.onesignal.debug.LogLevel +import com.onesignal.debug.internal.logging.Logging import com.onesignal.mocks.MockHelper import com.onesignal.user.internal.operations.ExecutorMocks +import com.onesignal.user.internal.operations.LoginUserOperation import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe import io.mockk.every import io.mockk.just import io.mockk.mockk import io.mockk.runs import io.mockk.spyk import io.mockk.verify +import kotlinx.coroutines.delay +import kotlinx.coroutines.withTimeout + +private class Mocks { + val operationModelStore: OperationModelStore = + run { + val mockOperationModelStore = mockk() + every { mockOperationModelStore.loadOperations() } just runs + every { mockOperationModelStore.list() } returns listOf() + every { mockOperationModelStore.add(any()) } just runs + every { mockOperationModelStore.remove(any()) } just runs + mockOperationModelStore + } + val configModelStore = MockHelper.configModelStore() + val operationRepo = + spyk( + OperationRepo( + listOf(), + operationModelStore, + configModelStore, + Time(), + ExecutorMocks.getNewRecordState(configModelStore), + ), + ) + + var oneSignalId = "local-id" + val identityModelStore by lazy { + MockHelper.identityModelStore { + it.onesignalId = oneSignalId + it.externalId = "myExtId" + } + } + val recovery = spyk(RecoverFromDroppedLoginBug(operationRepo, identityModelStore, configModelStore)) + + val expectedOperation by lazy { + LoginUserOperation( + configModelStore.model.appId, + identityModelStore.model.onesignalId, + identityModelStore.model.externalId, + null, + ) + } + + fun verifyExpectedLoginOperation(expectedOp: LoginUserOperation = expectedOperation) { + verify(exactly = 1) { + operationRepo.enqueue( + withArg { + (it is LoginUserOperation) shouldBe true + val op = it as LoginUserOperation + op.appId shouldBe expectedOp.appId + op.externalId shouldBe expectedOp.externalId + op.existingOnesignalId shouldBe expectedOp.existingOnesignalId + op.onesignalId shouldBe expectedOp.onesignalId + }, + ) + } + } +} class RecoverFromDroppedLoginBugTests : FunSpec({ - test("ensure RecoverFromDroppedLoginBug receive onOperationRepoLoaded callback from operationRepo") { + beforeAny { + Logging.logLevel = LogLevel.NONE + } + + test("ensure it adds missing operation") { // Given - val mockOperationModelStore = mockk() - val mockConfigModelStore = mockk() - val operationRepo = - spyk( - OperationRepo( - listOf(), - mockOperationModelStore, - mockConfigModelStore, - Time(), - ExecutorMocks.getNewRecordState(mockConfigModelStore), - ), - ) - every { mockOperationModelStore.loadOperations() } just runs - every { mockOperationModelStore.list() } returns listOf() - val recovery = spyk(RecoverFromDroppedLoginBug(operationRepo, MockHelper.identityModelStore(), mockConfigModelStore)) + val mocks = Mocks() // When - recovery.start() - val waiter = Waiter() - operationRepo.addOperationLoadedListener( - object : IOperationRepoLoadedListener { - override fun onOperationRepoLoaded() { - waiter.wake() - } - }, - ) - operationRepo.start() - // Waiting here ensures recovery.onOperationRepoLoaded() is called consistently - waiter.waitForWake() + mocks.recovery.start() + mocks.operationRepo.start() + mocks.operationRepo.awaitInitialized() // Then - verify(exactly = 1) { - operationRepo.subscribe(recovery) - recovery.onOperationRepoLoaded() - } + mocks.verifyExpectedLoginOperation() + } + + test("ensure it adds missing operation, even if operationRepo is already initialized") { + // Given + val mocks = Mocks() + + // When + mocks.operationRepo.start() + // give operation repo some time to fully initialize + delay(200) + + mocks.recovery.start() + withTimeout(1_000) { mocks.operationRepo.awaitInitialized() } + + // Then + mocks.verifyExpectedLoginOperation() } })