From 1f0b1130de7c4502f1c17e9bba9695b6282dbe46 Mon Sep 17 00:00:00 2001 From: Madhuram Jajoo Date: Sun, 24 Dec 2023 18:20:22 +0530 Subject: [PATCH 1/9] mutex is enough --- .../android/fhir/sync/FhirSynchronizer.kt | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index 2959092d83..1fc9449893 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -27,6 +27,8 @@ import java.time.OffsetDateTime import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import org.hl7.fhir.r4.model.ResourceType enum class SyncOperation { @@ -83,18 +85,20 @@ internal class FhirSynchronizer( } suspend fun synchronize(): SyncJobStatus { - setSyncState(SyncJobStatus.Started) - - return listOf(download(), upload()) - .filterIsInstance() - .flatMap { it.exceptions } - .let { - if (it.isEmpty()) { - setSyncState(SyncResult.Success()) - } else { - setSyncState(SyncResult.Error(it)) + mutex.withLock { + setSyncState(SyncJobStatus.Started) + + return listOf(download(), upload()) + .filterIsInstance() + .flatMap { it.exceptions } + .let { + if (it.isEmpty()) { + setSyncState(SyncResult.Success()) + } else { + setSyncState(SyncResult.Error(it)) + } } - } + } } private suspend fun download(): SyncResult { @@ -145,4 +149,8 @@ internal class FhirSynchronizer( SyncResult.Error(exceptions) } } + + companion object { + private val mutex = Mutex() + } } From eabafa89eb91332213a4fc5ee58cc35f8e83b681 Mon Sep 17 00:00:00 2001 From: Madhuram Jajoo Date: Wed, 3 Jan 2024 15:11:14 +0530 Subject: [PATCH 2/9] Mutex is actually good --- .../android/fhir/sync/FhirSyncWorker.kt | 106 ++++++++++-------- .../android/fhir/sync/FhirSynchronizer.kt | 2 +- 2 files changed, 58 insertions(+), 50 deletions(-) diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt index a43e9d4172..67b8620d7c 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 Google LLC + * Copyright 2023-2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,8 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.cancel import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import timber.log.Timber /** A WorkManager Worker that handles periodic sync. */ @@ -60,65 +62,67 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter internal open fun getDataSource() = FhirEngineProvider.getDataSource(applicationContext) override suspend fun doWork(): Result { - val dataSource = - getDataSource() - ?: return Result.failure( - buildWorkData( - IllegalStateException( - "FhirEngineConfiguration.ServerConfiguration is not set. Call FhirEngineProvider.init to initialize with appropriate configuration.", + mutex.withLock { + val dataSource = + getDataSource() + ?: return Result.failure( + buildWorkData( + IllegalStateException( + "FhirEngineConfiguration.ServerConfiguration is not set. Call FhirEngineProvider.init to initialize with appropriate configuration.", + ), ), + ) + + val synchronizer = + FhirSynchronizer( + applicationContext, + getFhirEngine(), + UploadConfiguration( + Uploader( + dataSource = dataSource, + patchGenerator = PatchGeneratorFactory.byMode(getUploadStrategy().patchGeneratorMode), + requestGenerator = + UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode), + ), + ), + DownloadConfiguration( + DownloaderImpl(dataSource, getDownloadWorkManager()), + getConflictResolver(), ), ) - val synchronizer = - FhirSynchronizer( - applicationContext, - getFhirEngine(), - UploadConfiguration( - Uploader( - dataSource = dataSource, - patchGenerator = PatchGeneratorFactory.byMode(getUploadStrategy().patchGeneratorMode), - requestGenerator = - UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode), - ), - ), - DownloadConfiguration( - DownloaderImpl(dataSource, getDownloadWorkManager()), - getConflictResolver(), - ), - ) - - val job = - CoroutineScope(Dispatchers.IO).launch { - synchronizer.syncState.collect { - // now send Progress to work manager so caller app can listen - setProgress(buildWorkData(it)) - - if (it is SyncJobStatus.Finished || it is SyncJobStatus.Failed) { - this@launch.cancel() + val job = + CoroutineScope(Dispatchers.IO).launch { + synchronizer.syncState.collect { + // now send Progress to work manager so caller app can listen + setProgress(buildWorkData(it)) + + if (it is SyncJobStatus.Finished || it is SyncJobStatus.Failed) { + this@launch.cancel() + } } } - } - val result = synchronizer.synchronize() - val output = buildWorkData(result) + val result = synchronizer.synchronize() + val output = buildWorkData(result) - // await/join is needed to collect states completely - kotlin.runCatching { job.join() }.onFailure(Timber::w) + // await/join is needed to collect states completely + kotlin.runCatching { job.join() }.onFailure(Timber::w) - setProgress(output) + setProgress(output) - Timber.d("Received result from worker $result and sending output $output") + Timber.d("Received result from worker $result and sending output $output") - /** - * In case of failure, we can check if its worth retrying and do retry based on - * [RetryConfiguration.maxRetries] set by user. - */ - val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0) - return when (result) { - is SyncJobStatus.Finished -> Result.success(output) - else -> { - if (retries > runAttemptCount) Result.retry() else Result.failure(output) + /** + * In case of failure, we can check if its worth retrying and do retry based on + * [RetryConfiguration.maxRetries] set by user. + */ + val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0) + return when (result) { + is SyncJobStatus.Finished -> Result.success(output) + else -> { + if (retries > runAttemptCount) Result.retry() else Result.failure(output) + } } } } @@ -148,4 +152,8 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter override fun shouldSkipClass(clazz: Class<*>?) = false } + + companion object { + private val mutex = Mutex() + } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index 1fc9449893..e74c1293e7 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 Google LLC + * Copyright 2023-2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From b63d3527f1c8d13a76cf5180b3388c337e70cf12 Mon Sep 17 00:00:00 2001 From: Madhuram Jajoo Date: Wed, 3 Jan 2024 15:16:43 +0530 Subject: [PATCH 3/9] Remove extra mutex --- .../android/fhir/sync/FhirSynchronizer.kt | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index e74c1293e7..0437ec7c90 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -27,8 +27,6 @@ import java.time.OffsetDateTime import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import org.hl7.fhir.r4.model.ResourceType enum class SyncOperation { @@ -85,20 +83,18 @@ internal class FhirSynchronizer( } suspend fun synchronize(): SyncJobStatus { - mutex.withLock { - setSyncState(SyncJobStatus.Started) - - return listOf(download(), upload()) - .filterIsInstance() - .flatMap { it.exceptions } - .let { - if (it.isEmpty()) { - setSyncState(SyncResult.Success()) - } else { - setSyncState(SyncResult.Error(it)) - } + setSyncState(SyncJobStatus.Started) + + return listOf(download(), upload()) + .filterIsInstance() + .flatMap { it.exceptions } + .let { + if (it.isEmpty()) { + setSyncState(SyncResult.Success()) + } else { + setSyncState(SyncResult.Error(it)) } - } + } } private suspend fun download(): SyncResult { @@ -149,8 +145,4 @@ internal class FhirSynchronizer( SyncResult.Error(exceptions) } } - - companion object { - private val mutex = Mutex() - } } From 468e6c7dfacc9c189c6ab7d9e59ed2aaacec264b Mon Sep 17 00:00:00 2001 From: Madhuram Jajoo Date: Wed, 3 Jan 2024 15:17:31 +0530 Subject: [PATCH 4/9] remove extra spotless changes --- .../main/java/com/google/android/fhir/sync/FhirSynchronizer.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index 0437ec7c90..2959092d83 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 3bb616d4b6b12a01144afc76fadf2f961b209be8 Mon Sep 17 00:00:00 2001 From: Madhuram Jajoo Date: Wed, 3 Jan 2024 16:08:58 +0530 Subject: [PATCH 5/9] spotless --- .../main/java/com/google/android/fhir/sync/FhirSyncWorker.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt index 797f49461c..c38531d855 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt @@ -81,7 +81,7 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter dataSource = dataSource, patchGenerator = PatchGeneratorFactory.byMode(getUploadStrategy().patchGeneratorMode), requestGenerator = - UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode), + UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode), ), ), DownloadConfiguration( From 77d9fe6b0eae9219a0cbadba6a6af59f24df728b Mon Sep 17 00:00:00 2001 From: Madhuram Jajoo Date: Wed, 31 Jan 2024 12:19:49 +0530 Subject: [PATCH 6/9] mutex test added --- .../android/fhir/sync/FhirSyncWorker.kt | 120 ++++++++---------- .../android/fhir/sync/FhirSynchronizer.kt | 41 ++++-- .../android/fhir/sync/FhirSynchronizerTest.kt | 31 +++++ 3 files changed, 117 insertions(+), 75 deletions(-) diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt index c38531d855..bc07ddc12a 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt @@ -37,8 +37,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.cancel import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import timber.log.Timber /** A WorkManager Worker that handles periodic sync. */ @@ -62,76 +60,74 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter internal open fun getDataSource() = FhirEngineProvider.getDataSource(applicationContext) override suspend fun doWork(): Result { - mutex.withLock { - val dataSource = - getDataSource() - ?: return Result.failure( - buildWorkData( - IllegalStateException( - "FhirEngineConfiguration.ServerConfiguration is not set. Call FhirEngineProvider.init to initialize with appropriate configuration.", - ), + val dataSource = + getDataSource() + ?: return Result.failure( + buildWorkData( + IllegalStateException( + "FhirEngineConfiguration.ServerConfiguration is not set. Call FhirEngineProvider.init to initialize with appropriate configuration.", ), - ) - - val synchronizer = - FhirSynchronizer( - getFhirEngine(), - UploadConfiguration( - Uploader( - dataSource = dataSource, - patchGenerator = PatchGeneratorFactory.byMode(getUploadStrategy().patchGeneratorMode), - requestGenerator = - UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode), - ), - ), - DownloadConfiguration( - DownloaderImpl(dataSource, getDownloadWorkManager()), - getConflictResolver(), ), - FhirEngineProvider.getFhirDataStore(applicationContext), ) - val job = - CoroutineScope(Dispatchers.IO).launch { - val fhirDataStore = FhirEngineProvider.getFhirDataStore(applicationContext) - synchronizer.syncState.collect { syncJobStatus -> - val uniqueWorkerName = inputData.getString(UNIQUE_WORK_NAME) - when (syncJobStatus) { - is SyncJobStatus.Succeeded, - is SyncJobStatus.Failed, -> { - // While creating periodicSync request if - // putString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY, uniqueWorkName) is not present, - // then inputData.getString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY) can be null. - if (uniqueWorkerName != null) { - fhirDataStore.writeTerminalSyncJobStatus(uniqueWorkerName, syncJobStatus) - } - cancel() - } - else -> { - setProgress(buildWorkData(syncJobStatus)) + val synchronizer = + FhirSynchronizer( + getFhirEngine(), + UploadConfiguration( + Uploader( + dataSource = dataSource, + patchGenerator = PatchGeneratorFactory.byMode(getUploadStrategy().patchGeneratorMode), + requestGenerator = + UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode), + ), + ), + DownloadConfiguration( + DownloaderImpl(dataSource, getDownloadWorkManager()), + getConflictResolver(), + ), + FhirEngineProvider.getFhirDataStore(applicationContext), + ) + + val job = + CoroutineScope(Dispatchers.IO).launch { + val fhirDataStore = FhirEngineProvider.getFhirDataStore(applicationContext) + synchronizer.syncState.collect { syncJobStatus -> + val uniqueWorkerName = inputData.getString(UNIQUE_WORK_NAME) + when (syncJobStatus) { + is SyncJobStatus.Succeeded, + is SyncJobStatus.Failed, -> { + // While creating periodicSync request if + // putString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY, uniqueWorkName) is not present, + // then inputData.getString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY) can be null. + if (uniqueWorkerName != null) { + fhirDataStore.writeTerminalSyncJobStatus(uniqueWorkerName, syncJobStatus) } + cancel() + } + else -> { + setProgress(buildWorkData(syncJobStatus)) } } } + } - val result = synchronizer.synchronize() - val output = buildWorkData(result) + val result = synchronizer.synchronize() + val output = buildWorkData(result) - // await/join is needed to collect states completely - kotlin.runCatching { job.join() }.onFailure(Timber::w) + // await/join is needed to collect states completely + kotlin.runCatching { job.join() }.onFailure(Timber::w) - Timber.d("Received result from worker $result and sending output $output") + Timber.d("Received result from worker $result and sending output $output") - /** - * In case of failure, we can check if its worth retrying and do retry based on - * [RetryConfiguration.maxRetries] set by user. - */ - val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0) - return when (result) { - is SyncJobStatus.Succeeded -> Result.success(output) - else -> { - if (retries > runAttemptCount) Result.retry() else Result.failure(output) - } + /** + * In case of failure, we can check if its worth retrying and do retry based on + * [RetryConfiguration.maxRetries] set by user. + */ + val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0) + return when (result) { + is SyncJobStatus.Succeeded -> Result.success(output) + else -> { + if (retries > runAttemptCount) Result.retry() else Result.failure(output) } } } @@ -161,8 +157,4 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter override fun shouldSkipClass(clazz: Class<*>?) = false } - - companion object { - private val mutex = Mutex() - } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index e1b300884d..122d3f7d41 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -25,6 +25,8 @@ import java.time.OffsetDateTime import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import org.hl7.fhir.r4.model.ResourceType enum class SyncOperation { @@ -79,18 +81,20 @@ internal class FhirSynchronizer( } suspend fun synchronize(): SyncJobStatus { - setSyncState(SyncJobStatus.Started()) - - return listOf(download(), upload()) - .filterIsInstance() - .flatMap { it.exceptions } - .let { - if (it.isEmpty()) { - setSyncState(SyncResult.Success()) - } else { - setSyncState(SyncResult.Error(it)) + mutex.withLock(mutexOwner) { + setSyncState(SyncJobStatus.Started()) + + return listOf(download(), upload()) + .filterIsInstance() + .flatMap { it.exceptions } + .let { + if (it.isEmpty()) { + setSyncState(SyncResult.Success()) + } else { + setSyncState(SyncResult.Error(it)) + } } - } + } } private suspend fun download(): SyncResult { @@ -141,4 +145,19 @@ internal class FhirSynchronizer( SyncResult.Error(exceptions) } } + + companion object { + private val mutex = Mutex() + private var mutexOwner: Any? = null + + /** + * This is used in testing. In testing the [mutexOwner] should be non-null to catch an + * [IllegalStateException] thrown when trying to re-synchronize. In production the [mutexOwner] + * should be null so that successive synchronize requests are suspended and resumed when lock is + * released. + */ + fun setMutexOwner(owner: Any) { + mutexOwner = owner + } + } } diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt index e978562320..8a4974a54a 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -23,6 +23,7 @@ import com.google.android.fhir.sync.upload.UploadSyncResult import com.google.android.fhir.sync.upload.Uploader import com.google.android.fhir.testing.TestFhirEngineImpl import com.google.common.truth.Truth.assertThat +import java.lang.IllegalStateException import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.launch @@ -55,6 +56,7 @@ class FhirSynchronizerTest { @Before fun setUp() { + FhirSynchronizer.setMutexOwner(Object()) MockitoAnnotations.openMocks(this) fhirSynchronizer = FhirSynchronizer( @@ -142,4 +144,33 @@ class FhirSynchronizerTest { assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) } + + @Test + fun `re-synchronize should emit IllegalStateException on trying to acquire mutex lock`() = + runTest(UnconfinedTestDispatcher()) { + `when`(downloader.download()) + .thenReturn( + flowOf(DownloadState.Success(listOf(), 0, 0)), + ) + `when`(uploader.upload(any())) + .thenReturn( + UploadSyncResult.Success( + listOf(), + ), + ) + + backgroundScope.launch { + fhirSynchronizer.syncState.collect { + if (it is SyncJobStatus.Started) { + try { + fhirSynchronizer.synchronize() + } catch (e: Exception) { + assertThat(e).isInstanceOf(IllegalStateException::class.java) + } + } + } + } + + val result = fhirSynchronizer.synchronize() + } } From 900238115f7c49e926b1d8249654eef7da8497e7 Mon Sep 17 00:00:00 2001 From: Madhuram Jajoo Date: Wed, 31 Jan 2024 12:36:05 +0530 Subject: [PATCH 7/9] adding more asserts --- .../com/google/android/fhir/sync/FhirSynchronizerTest.kt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt index 8a4974a54a..ed6a1b5bee 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -54,9 +54,11 @@ class FhirSynchronizerTest { private lateinit var fhirSynchronizer: FhirSynchronizer + private val mutexOwner = Object() + @Before fun setUp() { - FhirSynchronizer.setMutexOwner(Object()) + FhirSynchronizer.setMutexOwner(mutexOwner) MockitoAnnotations.openMocks(this) fhirSynchronizer = FhirSynchronizer( @@ -166,6 +168,9 @@ class FhirSynchronizerTest { fhirSynchronizer.synchronize() } catch (e: Exception) { assertThat(e).isInstanceOf(IllegalStateException::class.java) + assertThat(e.localizedMessage).isNotNull() + assertThat(e.localizedMessage) + .isEqualTo("This mutex is already locked by the specified owner: $mutexOwner") } } } From c5601a8198f9395eebb0aa698a5f3cddc0305f3d Mon Sep 17 00:00:00 2001 From: Madhuram Jajoo Date: Wed, 31 Jan 2024 15:26:27 +0530 Subject: [PATCH 8/9] test modified --- .../android/fhir/sync/FhirSynchronizer.kt | 13 +--- .../android/fhir/sync/FhirSynchronizerTest.kt | 72 ++++++++++++------- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index 122d3f7d41..a0020adb4a 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -81,7 +81,7 @@ internal class FhirSynchronizer( } suspend fun synchronize(): SyncJobStatus { - mutex.withLock(mutexOwner) { + mutex.withLock { setSyncState(SyncJobStatus.Started()) return listOf(download(), upload()) @@ -148,16 +148,5 @@ internal class FhirSynchronizer( companion object { private val mutex = Mutex() - private var mutexOwner: Any? = null - - /** - * This is used in testing. In testing the [mutexOwner] should be non-null to catch an - * [IllegalStateException] thrown when trying to re-synchronize. In production the [mutexOwner] - * should be null so that successive synchronize requests are suspended and resumed when lock is - * released. - */ - fun setMutexOwner(owner: Any) { - mutexOwner = owner - } } } diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt index ed6a1b5bee..d7aa810edc 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -23,13 +23,13 @@ import com.google.android.fhir.sync.upload.UploadSyncResult import com.google.android.fhir.sync.upload.Uploader import com.google.android.fhir.testing.TestFhirEngineImpl import com.google.common.truth.Truth.assertThat -import java.lang.IllegalStateException import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.launch import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.runTest -import org.hl7.fhir.r4.model.Patient import org.hl7.fhir.r4.model.ResourceType import org.junit.Before import org.junit.Test @@ -54,11 +54,8 @@ class FhirSynchronizerTest { private lateinit var fhirSynchronizer: FhirSynchronizer - private val mutexOwner = Object() - @Before fun setUp() { - FhirSynchronizer.setMutexOwner(mutexOwner) MockitoAnnotations.openMocks(this) fhirSynchronizer = FhirSynchronizer( @@ -148,34 +145,59 @@ class FhirSynchronizerTest { } @Test - fun `re-synchronize should emit IllegalStateException on trying to acquire mutex lock`() = + fun `synchronize multiple invocations should execute in order`() = runTest(UnconfinedTestDispatcher()) { - `when`(downloader.download()) - .thenReturn( - flowOf(DownloadState.Success(listOf(), 0, 0)), - ) + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 0, 0))) `when`(uploader.upload(any())) .thenReturn( UploadSyncResult.Success( listOf(), ), ) - - backgroundScope.launch { - fhirSynchronizer.syncState.collect { - if (it is SyncJobStatus.Started) { - try { - fhirSynchronizer.synchronize() - } catch (e: Exception) { - assertThat(e).isInstanceOf(IllegalStateException::class.java) - assertThat(e.localizedMessage).isNotNull() - assertThat(e.localizedMessage) - .isEqualTo("This mutex is already locked by the specified owner: $mutexOwner") - } - } + val fhirSynchronizerWithDelayInDownload = + FhirSynchronizer( + TestFhirEngineImpl, + UploadConfiguration(uploader), + DownloadConfiguration( + object : Downloader { + override suspend fun download(): Flow { + delay(10) + return flowOf(DownloadState.Success(listOf(), 10, 10)) + } + }, + conflictResolver, + ), + fhirDataStore, + ) + val emittedValues = mutableListOf() + val jobs = + listOf(fhirSynchronizerWithDelayInDownload, fhirSynchronizer).map { + backgroundScope.launch { it.syncState.collect { emittedValues.add(it) } } + /** + * Invoke synchronize() in separate coroutines. First invoke the synchronizer with delay + * in download. Then the [fhirSynchronizer] + */ + backgroundScope.launch { it.synchronize() } } - } - val result = fhirSynchronizer.synchronize() + jobs.forEach { it.join() } + + assertThat(emittedValues).hasSize(10) + assertThat(emittedValues[0]).isInstanceOf(SyncJobStatus.Started::class.java) + assertThat(emittedValues[1]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10)) + assertThat(emittedValues[2]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0)) + assertThat(emittedValues[3]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1)) + assertThat(emittedValues[4]).isInstanceOf(SyncJobStatus.Succeeded::class.java) + assertThat(emittedValues[5]).isInstanceOf(SyncJobStatus.Started::class.java) + assertThat(emittedValues[6]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 0, completed = 0)) + assertThat(emittedValues[7]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0)) + assertThat(emittedValues[8]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1)) + assertThat(emittedValues[9]).isInstanceOf(SyncJobStatus.Succeeded::class.java) } } From 44f8aac0fcee6ebaf3e147d6c5a8702874b251fa Mon Sep 17 00:00:00 2001 From: Madhuram Jajoo Date: Wed, 31 Jan 2024 15:40:33 +0530 Subject: [PATCH 9/9] note for flaky test possibility --- .../java/com/google/android/fhir/sync/FhirSynchronizerTest.kt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt index d7aa810edc..fe7b5f9116 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -144,6 +144,10 @@ class FhirSynchronizerTest { assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) } + /** + * If you encounter flakiness in this test, consider increasing the delay time in the downloader + * object. + */ @Test fun `synchronize multiple invocations should execute in order`() = runTest(UnconfinedTestDispatcher()) {