Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize sync workers #2385

Merged
merged 11 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import java.time.OffsetDateTime
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.hl7.fhir.r4.model.ResourceType

enum class SyncOperation {
Expand Down Expand Up @@ -79,18 +81,20 @@ internal class FhirSynchronizer(
}

suspend fun synchronize(): SyncJobStatus {
setSyncState(SyncJobStatus.Started())

return listOf(download(), upload())
.filterIsInstance<SyncResult.Error>()
.flatMap { it.exceptions }
.let {
if (it.isEmpty()) {
setSyncState(SyncResult.Success())
} else {
setSyncState(SyncResult.Error(it))
mutex.withLock {
setSyncState(SyncJobStatus.Started())

return listOf(download(), upload())
.filterIsInstance<SyncResult.Error>()
.flatMap { it.exceptions }
.let {
if (it.isEmpty()) {
setSyncState(SyncResult.Success())
} else {
setSyncState(SyncResult.Error(it))
}
}
}
}
}
MJ1998 marked this conversation as resolved.
Show resolved Hide resolved

private suspend fun download(): SyncResult {
Expand Down Expand Up @@ -141,4 +145,8 @@ internal class FhirSynchronizer(
SyncResult.Error(exceptions)
}
}

companion object {
private val mutex = Mutex()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import com.google.android.fhir.sync.upload.Uploader
import com.google.android.fhir.testing.TestFhirEngineImpl
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import org.hl7.fhir.r4.model.Patient
import org.hl7.fhir.r4.model.ResourceType
import org.junit.Before
import org.junit.Test
Expand Down Expand Up @@ -142,4 +143,65 @@ class FhirSynchronizerTest {
assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java)
assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions)
}

/**
* If you encounter flakiness in this test, consider increasing the delay time in the downloader
* object.
*/
@Test
fun `synchronize multiple invocations should execute in order`() =
runTest(UnconfinedTestDispatcher()) {
`when`(downloader.download()).thenReturn(flowOf(DownloadState.Success(listOf(), 0, 0)))
`when`(uploader.upload(any()))
.thenReturn(
UploadSyncResult.Success(
listOf(),
),
)
val fhirSynchronizerWithDelayInDownload =
FhirSynchronizer(
TestFhirEngineImpl,
UploadConfiguration(uploader),
DownloadConfiguration(
object : Downloader {
override suspend fun download(): Flow<DownloadState> {
delay(10)
return flowOf(DownloadState.Success(listOf(), 10, 10))
}
},
conflictResolver,
),
fhirDataStore,
)
val emittedValues = mutableListOf<SyncJobStatus>()
val jobs =
listOf(fhirSynchronizerWithDelayInDownload, fhirSynchronizer).map {
backgroundScope.launch { it.syncState.collect { emittedValues.add(it) } }
/**
* Invoke synchronize() in separate coroutines. First invoke the synchronizer with delay
* in download. Then the [fhirSynchronizer]
*/
backgroundScope.launch { it.synchronize() }
}

jobs.forEach { it.join() }

assertThat(emittedValues).hasSize(10)
assertThat(emittedValues[0]).isInstanceOf(SyncJobStatus.Started::class.java)
assertThat(emittedValues[1])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 10, completed = 10))
assertThat(emittedValues[2])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0))
assertThat(emittedValues[3])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1))
assertThat(emittedValues[4]).isInstanceOf(SyncJobStatus.Succeeded::class.java)
assertThat(emittedValues[5]).isInstanceOf(SyncJobStatus.Started::class.java)
assertThat(emittedValues[6])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, total = 0, completed = 0))
assertThat(emittedValues[7])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 0))
assertThat(emittedValues[8])
.isEqualTo(SyncJobStatus.InProgress(SyncOperation.UPLOAD, total = 1, completed = 1))
assertThat(emittedValues[9]).isInstanceOf(SyncJobStatus.Succeeded::class.java)
}
}
Loading