Skip to content

Commit

Permalink
mutex test added
Browse files Browse the repository at this point in the history
  • Loading branch information
MJ1998 committed Jan 31, 2024
1 parent 3bb616d commit 77d9fe6
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 75 deletions.
120 changes: 56 additions & 64 deletions engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import timber.log.Timber

/** A WorkManager Worker that handles periodic sync. */
Expand All @@ -62,76 +60,74 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
internal open fun getDataSource() = FhirEngineProvider.getDataSource(applicationContext)

override suspend fun doWork(): Result {
mutex.withLock {
val dataSource =
getDataSource()
?: return Result.failure(
buildWorkData(
IllegalStateException(
"FhirEngineConfiguration.ServerConfiguration is not set. Call FhirEngineProvider.init to initialize with appropriate configuration.",
),
val dataSource =
getDataSource()
?: return Result.failure(
buildWorkData(
IllegalStateException(
"FhirEngineConfiguration.ServerConfiguration is not set. Call FhirEngineProvider.init to initialize with appropriate configuration.",
),
)

val synchronizer =
FhirSynchronizer(
getFhirEngine(),
UploadConfiguration(
Uploader(
dataSource = dataSource,
patchGenerator = PatchGeneratorFactory.byMode(getUploadStrategy().patchGeneratorMode),
requestGenerator =
UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode),
),
),
DownloadConfiguration(
DownloaderImpl(dataSource, getDownloadWorkManager()),
getConflictResolver(),
),
FhirEngineProvider.getFhirDataStore(applicationContext),
)

val job =
CoroutineScope(Dispatchers.IO).launch {
val fhirDataStore = FhirEngineProvider.getFhirDataStore(applicationContext)
synchronizer.syncState.collect { syncJobStatus ->
val uniqueWorkerName = inputData.getString(UNIQUE_WORK_NAME)
when (syncJobStatus) {
is SyncJobStatus.Succeeded,
is SyncJobStatus.Failed, -> {
// While creating periodicSync request if
// putString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY, uniqueWorkName) is not present,
// then inputData.getString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY) can be null.
if (uniqueWorkerName != null) {
fhirDataStore.writeTerminalSyncJobStatus(uniqueWorkerName, syncJobStatus)
}
cancel()
}
else -> {
setProgress(buildWorkData(syncJobStatus))
val synchronizer =
FhirSynchronizer(
getFhirEngine(),
UploadConfiguration(
Uploader(
dataSource = dataSource,
patchGenerator = PatchGeneratorFactory.byMode(getUploadStrategy().patchGeneratorMode),
requestGenerator =
UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode),
),
),
DownloadConfiguration(
DownloaderImpl(dataSource, getDownloadWorkManager()),
getConflictResolver(),
),
FhirEngineProvider.getFhirDataStore(applicationContext),
)

val job =
CoroutineScope(Dispatchers.IO).launch {
val fhirDataStore = FhirEngineProvider.getFhirDataStore(applicationContext)
synchronizer.syncState.collect { syncJobStatus ->
val uniqueWorkerName = inputData.getString(UNIQUE_WORK_NAME)
when (syncJobStatus) {
is SyncJobStatus.Succeeded,
is SyncJobStatus.Failed, -> {
// While creating periodicSync request if
// putString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY, uniqueWorkName) is not present,
// then inputData.getString(SYNC_STATUS_PREFERENCES_DATASTORE_KEY) can be null.
if (uniqueWorkerName != null) {
fhirDataStore.writeTerminalSyncJobStatus(uniqueWorkerName, syncJobStatus)
}
cancel()
}
else -> {
setProgress(buildWorkData(syncJobStatus))
}
}
}
}

val result = synchronizer.synchronize()
val output = buildWorkData(result)
val result = synchronizer.synchronize()
val output = buildWorkData(result)

// await/join is needed to collect states completely
kotlin.runCatching { job.join() }.onFailure(Timber::w)
// await/join is needed to collect states completely
kotlin.runCatching { job.join() }.onFailure(Timber::w)

Timber.d("Received result from worker $result and sending output $output")
Timber.d("Received result from worker $result and sending output $output")

/**
* In case of failure, we can check if its worth retrying and do retry based on
* [RetryConfiguration.maxRetries] set by user.
*/
val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0)
return when (result) {
is SyncJobStatus.Succeeded -> Result.success(output)
else -> {
if (retries > runAttemptCount) Result.retry() else Result.failure(output)
}
/**
* In case of failure, we can check if its worth retrying and do retry based on
* [RetryConfiguration.maxRetries] set by user.
*/
val retries = inputData.getInt(MAX_RETRIES_ALLOWED, 0)
return when (result) {
is SyncJobStatus.Succeeded -> Result.success(output)
else -> {
if (retries > runAttemptCount) Result.retry() else Result.failure(output)
}
}
}
Expand Down Expand Up @@ -161,8 +157,4 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter

override fun shouldSkipClass(clazz: Class<*>?) = false
}

companion object {
private val mutex = Mutex()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import java.time.OffsetDateTime
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.hl7.fhir.r4.model.ResourceType

enum class SyncOperation {
Expand Down Expand Up @@ -79,18 +81,20 @@ internal class FhirSynchronizer(
}

suspend fun synchronize(): SyncJobStatus {
setSyncState(SyncJobStatus.Started())

return listOf(download(), upload())
.filterIsInstance<SyncResult.Error>()
.flatMap { it.exceptions }
.let {
if (it.isEmpty()) {
setSyncState(SyncResult.Success())
} else {
setSyncState(SyncResult.Error(it))
mutex.withLock(mutexOwner) {
setSyncState(SyncJobStatus.Started())

return listOf(download(), upload())
.filterIsInstance<SyncResult.Error>()
.flatMap { it.exceptions }
.let {
if (it.isEmpty()) {
setSyncState(SyncResult.Success())
} else {
setSyncState(SyncResult.Error(it))
}
}
}
}
}

private suspend fun download(): SyncResult {
Expand Down Expand Up @@ -141,4 +145,19 @@ internal class FhirSynchronizer(
SyncResult.Error(exceptions)
}
}

companion object {
private val mutex = Mutex()
private var mutexOwner: Any? = null

/**
* This is used in testing. In testing the [mutexOwner] should be non-null to catch an
* [IllegalStateException] thrown when trying to re-synchronize. In production the [mutexOwner]
* should be null so that successive synchronize requests are suspended and resumed when lock is
* released.
*/
fun setMutexOwner(owner: Any) {
mutexOwner = owner
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.google.android.fhir.sync.upload.UploadSyncResult
import com.google.android.fhir.sync.upload.Uploader
import com.google.android.fhir.testing.TestFhirEngineImpl
import com.google.common.truth.Truth.assertThat
import java.lang.IllegalStateException
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -55,6 +56,7 @@ class FhirSynchronizerTest {

@Before
fun setUp() {
FhirSynchronizer.setMutexOwner(Object())
MockitoAnnotations.openMocks(this)
fhirSynchronizer =
FhirSynchronizer(
Expand Down Expand Up @@ -142,4 +144,33 @@ class FhirSynchronizerTest {
assertThat(result).isInstanceOf(SyncJobStatus.Failed::class.java)
assertThat(listOf(error)).isEqualTo((result as SyncJobStatus.Failed).exceptions)
}

@Test
fun `re-synchronize should emit IllegalStateException on trying to acquire mutex lock`() =
runTest(UnconfinedTestDispatcher()) {
`when`(downloader.download())
.thenReturn(
flowOf(DownloadState.Success(listOf(), 0, 0)),
)
`when`(uploader.upload(any()))
.thenReturn(
UploadSyncResult.Success(
listOf(),
),
)

backgroundScope.launch {
fhirSynchronizer.syncState.collect {
if (it is SyncJobStatus.Started) {
try {
fhirSynchronizer.synchronize()
} catch (e: Exception) {
assertThat(e).isInstanceOf(IllegalStateException::class.java)
}
}
}
}

val result = fhirSynchronizer.synchronize()
}
}

0 comments on commit 77d9fe6

Please sign in to comment.