diff --git a/engine/src/main/java/com/google/android/fhir/FhirEngine.kt b/engine/src/main/java/com/google/android/fhir/FhirEngine.kt index 7d96f52c6e..72c1cd2e86 100644 --- a/engine/src/main/java/com/google/android/fhir/FhirEngine.kt +++ b/engine/src/main/java/com/google/android/fhir/FhirEngine.kt @@ -20,6 +20,7 @@ import com.google.android.fhir.db.ResourceNotFoundException import com.google.android.fhir.db.impl.dao.LocalChangeToken import com.google.android.fhir.search.Search import com.google.android.fhir.sync.ConflictResolver +import com.google.android.fhir.sync.DownloadState import java.time.OffsetDateTime import kotlinx.coroutines.flow.Flow import org.hl7.fhir.r4.model.Resource @@ -63,8 +64,8 @@ interface FhirEngine { */ suspend fun syncDownload( conflictResolver: ConflictResolver, - download: suspend () -> Flow> - ) + download: suspend () -> Flow + ): Flow /** * Returns the total count of entities available for given search. diff --git a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt index 93c9c800cd..c94876750a 100644 --- a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt +++ b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt @@ -28,10 +28,12 @@ import com.google.android.fhir.search.Search import com.google.android.fhir.search.count import com.google.android.fhir.search.execute import com.google.android.fhir.sync.ConflictResolver +import com.google.android.fhir.sync.DownloadState import com.google.android.fhir.sync.Resolved import java.time.OffsetDateTime import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import org.hl7.fhir.r4.model.Bundle import org.hl7.fhir.r4.model.Resource import org.hl7.fhir.r4.model.ResourceType @@ -82,21 +84,22 @@ internal class FhirEngineImpl(private val database: Database, private val contex override suspend fun syncDownload( conflictResolver: ConflictResolver, - download: suspend () -> Flow> - ) { - download().collect { resources -> - database.withTransaction { - val resolved = - resolveConflictingResources( - resources, - getConflictingResourceIds(resources), - conflictResolver - ) - database.insertSyncedResources(resources) - saveResolvedResourcesToDatabase(resolved) + download: suspend () -> Flow + ): Flow = + download().onEach { + if (it is DownloadState.Success) { + database.withTransaction { + val resolved = + resolveConflictingResources( + it.resources, + getConflictingResourceIds(it.resources), + conflictResolver + ) + database.insertSyncedResources(it.resources) + saveResolvedResourcesToDatabase(resolved) + } } } - } private suspend fun saveResolvedResourcesToDatabase(resolved: List?) { resolved?.let { diff --git a/engine/src/main/java/com/google/android/fhir/sync/Downloader.kt b/engine/src/main/java/com/google/android/fhir/sync/Downloader.kt index 9be709402c..18d7d67679 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/Downloader.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/Downloader.kt @@ -30,7 +30,7 @@ internal interface Downloader { } /* TODO: Generalize the Downloader API to not sequentially download resource by type (https://github.com/google/android-fhir/issues/1884) */ -internal sealed class DownloadState { +sealed class DownloadState { data class Started(val type: ResourceType, val total: Int) : DownloadState() diff --git a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt index 978e37cd51..bebaf28762 100644 --- a/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt +++ b/engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt @@ -21,8 +21,10 @@ import com.google.android.fhir.DatastoreUtil import com.google.android.fhir.FhirEngine import java.time.OffsetDateTime import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onEach import org.hl7.fhir.r4.model.ResourceType enum class SyncOperation { @@ -97,16 +99,15 @@ internal class FhirSynchronizer( private suspend fun download(): SyncResult { val exceptions = mutableListOf() - fhirEngine.syncDownload(conflictResolver) { - flow { - downloader.download().collect { + fhirEngine + .syncDownload(conflictResolver) { + downloader.download().onEach { when (it) { is DownloadState.Started -> { setSyncState(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, it.total)) } is DownloadState.Success -> { setSyncState(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, it.total, it.completed)) - emit(it.resources) } is DownloadState.Failure -> { exceptions.add(it.syncError) @@ -114,7 +115,8 @@ internal class FhirSynchronizer( } } } - } + .catch { exceptions.add(ResourceSyncException(ResourceType.Bundle, Exception(it))) } + .collect() return if (exceptions.isEmpty()) { SyncResult.Success() } else { diff --git a/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt b/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt index 6f24fae0f5..8bbd80bd74 100644 --- a/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt +++ b/engine/src/main/java/com/google/android/fhir/testing/Utilities.kt @@ -26,6 +26,7 @@ import com.google.android.fhir.db.impl.dao.LocalChangeToken import com.google.android.fhir.search.Search import com.google.android.fhir.sync.ConflictResolver import com.google.android.fhir.sync.DataSource +import com.google.android.fhir.sync.DownloadState import com.google.android.fhir.sync.DownloadWorkManager import com.google.common.truth.Truth.assertThat import java.time.OffsetDateTime @@ -33,7 +34,7 @@ import java.util.Date import java.util.LinkedList import kotlin.streams.toList import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import org.hl7.fhir.r4.model.Bundle import org.hl7.fhir.r4.model.Meta import org.hl7.fhir.r4.model.Patient @@ -142,10 +143,9 @@ object TestFhirEngineImpl : FhirEngine { override suspend fun syncDownload( conflictResolver: ConflictResolver, - download: suspend () -> Flow> - ) { - download().collect() - } + download: suspend () -> Flow + ): Flow = download().onEach {} + override suspend fun count(search: Search): Long { return 0 } diff --git a/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt b/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt index fb887fca5b..edea63d3c7 100644 --- a/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt +++ b/engine/src/test/java/com/google/android/fhir/impl/FhirEngineImplTest.kt @@ -27,17 +27,22 @@ import com.google.android.fhir.logicalId import com.google.android.fhir.search.search import com.google.android.fhir.sync.AcceptLocalConflictResolver import com.google.android.fhir.sync.AcceptRemoteConflictResolver +import com.google.android.fhir.sync.DownloadState import com.google.android.fhir.testing.assertResourceEquals import com.google.android.fhir.testing.assertResourceNotEquals import com.google.android.fhir.testing.readFromFile import com.google.common.truth.Truth.assertThat import java.util.Date import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.runBlocking import org.hl7.fhir.exceptions.FHIRException import org.hl7.fhir.r4.model.Address +import org.hl7.fhir.r4.model.Coding +import org.hl7.fhir.r4.model.Encounter import org.hl7.fhir.r4.model.Enumerations import org.hl7.fhir.r4.model.HumanName import org.hl7.fhir.r4.model.Meta @@ -262,9 +267,36 @@ class FhirEngineImplTest { } } + @Test + fun `syncDownload catch invalid resource downloaded`() = runBlocking { + val exception = mutableListOf() + + val testEncounterId = "encounter-1" + val invalidEncounter = + Encounter().apply { + id = testEncounterId + class_ = Coding() + meta = Meta().apply { lastUpdated = Date() } + } + + fhirEngine + .syncDownload(AcceptLocalConflictResolver) { + flow { emit(DownloadState.Success((listOf((invalidEncounter))), 1, 1)) } + } + .catch { exception.add(Exception(it)) } + .collect() + + assertThat(exception.first().localizedMessage) + .isEqualTo("java.lang.NullPointerException: coding.code must not be null") + } + @Test fun syncDownload_downloadResources() = runBlocking { - fhirEngine.syncDownload(AcceptLocalConflictResolver) { flowOf((listOf((TEST_PATIENT_2)))) } + fhirEngine + .syncDownload(AcceptLocalConflictResolver) { + flow { emit(DownloadState.Success((listOf((TEST_PATIENT_2))), 1, 1)) } + } + .collect() assertResourceEquals(TEST_PATIENT_2, fhirEngine.get(TEST_PATIENT_2_ID)) } @@ -410,7 +442,9 @@ class FhirEngineImplTest { } ) } - fhirEngine.syncDownload(AcceptRemoteConflictResolver) { flowOf((listOf((originalPatient)))) } + fhirEngine.syncDownload(AcceptRemoteConflictResolver) { + flowOf(DownloadState.Success(listOf((originalPatient)), 1, 1)) + } val localChange = originalPatient.copy().apply { addAddress(Address().apply { city = "Malibu" }) } @@ -426,7 +460,9 @@ class FhirEngineImplTest { addAddress(Address().apply { country = "USA" }) } - fhirEngine.syncDownload(AcceptRemoteConflictResolver) { flowOf((listOf(remoteChange))) } + fhirEngine.syncDownload(AcceptRemoteConflictResolver) { + flowOf(DownloadState.Success(listOf(remoteChange), 1, 1)) + } assertThat( services.database.getAllLocalChanges().filter { @@ -455,11 +491,14 @@ class FhirEngineImplTest { } ) } - fhirEngine.syncDownload(AcceptLocalConflictResolver) { flowOf((listOf((originalPatient)))) } + fhirEngine + .syncDownload(AcceptLocalConflictResolver) { + flowOf(DownloadState.Success(listOf((originalPatient)), 1, 1)) + } + .collect() var localChange = originalPatient.copy().apply { addAddress(Address().apply { city = "Malibu" }) } fhirEngine.update(localChange) - localChange = localChange.copy().apply { addAddress( @@ -481,7 +520,11 @@ class FhirEngineImplTest { addAddress(Address().apply { country = "USA" }) } - fhirEngine.syncDownload(AcceptLocalConflictResolver) { flowOf((listOf(remoteChange))) } + fhirEngine + .syncDownload(AcceptLocalConflictResolver) { + flowOf(DownloadState.Success(listOf(remoteChange), 1, 1)) + } + .collect() val localChangeDiff = """[{"op":"remove","path":"\/address\/0\/country"},{"op":"add","path":"\/address\/0\/city","value":"Malibu"},{"op":"add","path":"\/address\/-","value":{"city":"Malibu","state":"California"}}]""" diff --git a/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt new file mode 100644 index 0000000000..8e0eae125b --- /dev/null +++ b/engine/src/test/java/com/google/android/fhir/sync/FhirSynchronizerTest.kt @@ -0,0 +1,128 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.android.fhir.sync + +import android.content.Context +import androidx.test.core.app.ApplicationProvider +import com.google.android.fhir.FhirServices +import com.google.android.fhir.sync.upload.BundleUploader +import com.google.android.fhir.sync.upload.LocalChangesPaginator +import com.google.android.fhir.sync.upload.TransactionBundleGenerator +import com.google.android.fhir.testing.TestDataSourceImpl +import com.google.common.truth.Truth.assertThat +import java.util.Date +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.test.runTest +import org.hl7.fhir.r4.model.Coding +import org.hl7.fhir.r4.model.Encounter +import org.hl7.fhir.r4.model.Meta +import org.hl7.fhir.r4.model.ResourceType +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.robolectric.RobolectricTestRunner + +@OptIn(ExperimentalCoroutinesApi::class) +@RunWith(RobolectricTestRunner::class) +class FhirSynchronizerTest { + private val services = + FhirServices.builder(ApplicationProvider.getApplicationContext()).inMemory().build() + private val fhirEngine = services.fhirEngine + private val bundleUploader = + BundleUploader( + dataSource = TestDataSourceImpl, + bundleGenerator = TransactionBundleGenerator.getDefault(), + localChangesPaginator = LocalChangesPaginator.DEFAULT + ) + + private lateinit var context: Context + + @Before + fun setUp() { + context = ApplicationProvider.getApplicationContext() + } + + @Test + fun `synchronize() should catch invalid resource downloaded`() = runTest { + val flow = MutableSharedFlow() + + val testEncounterId = "encounter-1" + val invalidEncounter = + Encounter().apply { + id = testEncounterId + class_ = Coding() + meta = Meta().apply { lastUpdated = Date() } + } + val fhirSynchronizer = + FhirSynchronizer( + context = context, + fhirEngine = fhirEngine, + uploader = bundleUploader, + downloader = + object : Downloader { + override suspend fun download(): Flow = flow { + emit(DownloadState.Success(listOf(invalidEncounter), 1, 1)) + } + }, + conflictResolver = AcceptLocalConflictResolver + ) + + val result = fhirSynchronizer.apply { subscribe(flow) }.synchronize() + val exception = (result as SyncJobStatus.Failed).exceptions.firstOrNull() + assertThat(exception!!.exception.localizedMessage) + .isEqualTo("java.lang.NullPointerException: coding.code must not be null") + assertThat(exception.resourceType).isEqualTo(ResourceType.Bundle) + } + + @Test + fun `synchronize() should sync successfully`() = runTest { + val flow = MutableSharedFlow() + + val codeString = "1427AAAAA" + val systemString = "http://openmrs.org/concepts" + val encounter = + Encounter().apply { + id = "non-null-ID" + class_ = + Coding().apply { + system = systemString + code = codeString + display = "Display" + } + meta = Meta().apply { lastUpdated = Date() } + } + val fhirSynchronizer = + FhirSynchronizer( + context = context, + fhirEngine = fhirEngine, + uploader = bundleUploader, + downloader = + object : Downloader { + override suspend fun download(): Flow = flow { + emit(DownloadState.Success(listOf(encounter), 1, 1)) + } + }, + conflictResolver = AcceptLocalConflictResolver + ) + + val result = fhirSynchronizer.apply { subscribe(flow) }.synchronize() + assertThat(result).isInstanceOf(SyncJobStatus.Finished::class.java) + } +}