diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index e1b300884d..a0020adb4a 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -25,6 +25,8 @@ import java.time.OffsetDateTime import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import org.hl7.fhir.r4.model.ResourceType enum class SyncOperation { @@ -79,18 +81,20 @@ internal class FhirSynchronizer( } suspend fun synchronize(): SyncJobStatus { - setSyncState(SyncJobStatus.Started()) - - return listOf(download(), upload()) - .filterIsInstance() - .flatMap { it.exceptions } - .let { - if (it.isEmpty()) { - setSyncState(SyncResult.Success()) - } else { - setSyncState(SyncResult.Error(it)) + mutex.withLock { + setSyncState(SyncJobStatus.Started()) + + return listOf(download(), upload()) + .filterIsInstance() + .flatMap { it.exceptions } + .let { + if (it.isEmpty()) { + setSyncState(SyncResult.Success()) + } else { + setSyncState(SyncResult.Error(it)) + } } - } + } } private suspend fun download(): SyncResult { @@ -141,4 +145,8 @@ internal class FhirSynchronizer( SyncResult.Error(exceptions) } } + + companion object { + private val mutex = Mutex() + } } diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt index e978562320..fe7b5f9116 100644 --- a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -24,11 +24,12 @@ import com.google.android.fhir.sync.upload.Uploader import com.google.android.fhir.testing.TestFhirEngineImpl import com.google.common.truth.Truth.assertThat import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.launch import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.runTest -import org.hl7.fhir.r4.model.Patient import org.hl7.fhir.r4.model.ResourceType import org.junit.Before import org.junit.Test @@ -142,4 +143,65 @@ class FhirSynchronizerTest { assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java) assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions) } + + /** + * If you encounter flakiness in this test, consider increasing the delay time in the downloader + * object. + */ + @Test + fun `synchronize multiple invocations should execute in order`() = + runTest(UnconfinedTestDispatcher()) { + `when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 0, 0))) + `when`(uploader.upload(any())) + .thenReturn( + UploadSyncResult.Success( + listOf(), + ), + ) + val fhirSynchronizerWithDelayInDownload = + FhirSynchronizer( + TestFhirEngineImpl, + UploadConfiguration(uploader), + DownloadConfiguration( + object : Downloader { + override suspend fun download(): Flow { + delay(10) + return flowOf(DownloadState.Success(listOf(), 10, 10)) + } + }, + conflictResolver, + ), + fhirDataStore, + ) + val emittedValues = mutableListOf() + val jobs = + listOf(fhirSynchronizerWithDelayInDownload, fhirSynchronizer).map { + backgroundScope.launch { it.syncState.collect { emittedValues.add(it) } } + /** + * Invoke synchronize() in separate coroutines. First invoke the synchronizer with delay + * in download. Then the [fhirSynchronizer] + */ + backgroundScope.launch { it.synchronize() } + } + + jobs.forEach { it.join() } + + assertThat(emittedValues).hasSize(10) + assertThat(emittedValues[0]).isInstanceOf(SyncJobStatus.Started::class.java) + assertThat(emittedValues[1]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10)) + assertThat(emittedValues[2]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0)) + assertThat(emittedValues[3]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1)) + assertThat(emittedValues[4]).isInstanceOf(SyncJobStatus.Succeeded::class.java) + assertThat(emittedValues[5]).isInstanceOf(SyncJobStatus.Started::class.java) + assertThat(emittedValues[6]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 0, completed = 0)) + assertThat(emittedValues[7]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0)) + assertThat(emittedValues[8]) + .isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1)) + assertThat(emittedValues[9]).isInstanceOf(SyncJobStatus.Succeeded::class.java) + } }