Skip to content

Commit

Permalink
Synchronize sync workers (#2385)
Browse files Browse the repository at this point in the history
* mutex is enough

* Mutex is actually good

* Remove extra mutex

* remove extra spotless changes

* spotless

* mutex test added

* adding more asserts

* test modified

* note for flaky test possibility
  • Loading branch information
MJ1998 authored Jan 31, 2024
1 parent 99c5b9c commit d601844
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import java.time.OffsetDateTime
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.hl7.fhir.r4.model.ResourceType

enum class SyncOperation {
Expand Down Expand Up @@ -79,18 +81,20 @@ internal class FhirSynchronizer(
}

suspend fun synchronize(): SyncJobStatus {
setSyncState(SyncJobStatus.Started())

return listOf(download(), upload())
.filterIsInstance<SyncResult.Error>()
.flatMap { it.exceptions }
.let {
if (it.isEmpty()) {
setSyncState(SyncResult.Success())
} else {
setSyncState(SyncResult.Error(it))
mutex.withLock {
setSyncState(SyncJobStatus.Started())

return listOf(download(), upload())
.filterIsInstance<SyncResult.Error>()
.flatMap { it.exceptions }
.let {
if (it.isEmpty()) {
setSyncState(SyncResult.Success())
} else {
setSyncState(SyncResult.Error(it))
}
}
}
}
}

private suspend fun download(): SyncResult {
Expand Down Expand Up @@ -141,4 +145,8 @@ internal class FhirSynchronizer(
SyncResult.Error(exceptions)
}
}

companion object {
private val mutex = Mutex()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import com.google.android.fhir.sync.upload.Uploader
import com.google.android.fhir.testing.TestFhirEngineImpl
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import org.hl7.fhir.r4.model.Patient
import org.hl7.fhir.r4.model.ResourceType
import org.junit.Before
import org.junit.Test
Expand Down Expand Up @@ -142,4 +143,65 @@ class FhirSynchronizerTest {
assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java)
assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions)
}

/**
* If you encounter flakiness in this test, consider increasing the delay time in the downloader
* object.
*/
@Test
fun `synchronize multiple invocations should execute in order`() =
runTest(UnconfinedTestDispatcher()) {
`when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 0, 0)))
`when`(uploader.upload(any()))
.thenReturn(
UploadSyncResult.Success(
listOf(),
),
)
val fhirSynchronizerWithDelayInDownload =
FhirSynchronizer(
TestFhirEngineImpl,
UploadConfiguration(uploader),
DownloadConfiguration(
object : Downloader {
override suspend fun download(): Flow<DownloadState> {
delay(10)
return flowOf(DownloadState.Success(listOf(), 10, 10))
}
},
conflictResolver,
),
fhirDataStore,
)
val emittedValues = mutableListOf<SyncJobStatus>()
val jobs =
listOf(fhirSynchronizerWithDelayInDownload, fhirSynchronizer).map {
backgroundScope.launch { it.syncState.collect { emittedValues.add(it) } }
/**
* Invoke synchronize() in separate coroutines. First invoke the synchronizer with delay
* in download. Then the [fhirSynchronizer]
*/
backgroundScope.launch { it.synchronize() }
}

jobs.forEach { it.join() }

assertThat(emittedValues).hasSize(10)
assertThat(emittedValues[0]).isInstanceOf(SyncJobStatus.Started::class.java)
assertThat(emittedValues[1])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10))
assertThat(emittedValues[2])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0))
assertThat(emittedValues[3])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1))
assertThat(emittedValues[4]).isInstanceOf(SyncJobStatus.Succeeded::class.java)
assertThat(emittedValues[5]).isInstanceOf(SyncJobStatus.Started::class.java)
assertThat(emittedValues[6])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 0, completed = 0))
assertThat(emittedValues[7])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0))
assertThat(emittedValues[8])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1))
assertThat(emittedValues[9]).isInstanceOf(SyncJobStatus.Succeeded::class.java)
}
}

0 comments on commit d601844

Please sign in to comment.