Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch and emit exceptions from downloading invalid resources #1917

Closed
wants to merge 8 commits into from
5 changes: 3 additions & 2 deletions engine/src/main/java/com/google/android/fhir/FhirEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.google.android.fhir.db.ResourceNotFoundException
import com.google.android.fhir.db.impl.dao.LocalChangeToken
import com.google.android.fhir.search.Search
import com.google.android.fhir.sync.ConflictResolver
import com.google.android.fhir.sync.DownloadState
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import org.hl7.fhir.r4.model.Resource
Expand Down Expand Up @@ -63,8 +64,8 @@ interface FhirEngine {
*/
suspend fun syncDownload(
conflictResolver: ConflictResolver,
download: suspend () -> Flow<List<Resource>>
)
download: suspend () -> Flow<DownloadState>
): Flow<DownloadState>

/**
* Returns the total count of entities available for given search.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import com.google.android.fhir.search.Search
import com.google.android.fhir.search.count
import com.google.android.fhir.search.execute
import com.google.android.fhir.sync.ConflictResolver
import com.google.android.fhir.sync.DownloadState
import com.google.android.fhir.sync.Resolved
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType
Expand Down Expand Up @@ -82,21 +84,22 @@ internal class FhirEngineImpl(private val database: Database, private val contex

override suspend fun syncDownload(
conflictResolver: ConflictResolver,
download: suspend () -> Flow<List<Resource>>
) {
download().collect { resources ->
database.withTransaction {
val resolved =
resolveConflictingResources(
resources,
getConflictingResourceIds(resources),
conflictResolver
)
database.insertSyncedResources(resources)
saveResolvedResourcesToDatabase(resolved)
download: suspend () -> Flow<DownloadState>
): Flow<DownloadState> =
download().onEach {
if (it is DownloadState.Success) {
database.withTransaction {
val resolved =
resolveConflictingResources(
it.resources,
getConflictingResourceIds(it.resources),
conflictResolver
)
database.insertSyncedResources(it.resources)
saveResolvedResourcesToDatabase(resolved)
}
}
}
}

private suspend fun saveResolvedResourcesToDatabase(resolved: List<Resource>?) {
resolved?.let {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ internal interface Downloader {
}

/* TODO: Generalize the Downloader API to not sequentially download resource by type (https://github.com/google/android-fhir/issues/1884) */
internal sealed class DownloadState {
sealed class DownloadState {

data class Started(val type: ResourceType, val total: Int) : DownloadState()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import com.google.android.fhir.DatastoreUtil
import com.google.android.fhir.FhirEngine
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import org.hl7.fhir.r4.model.ResourceType

enum class SyncOperation {
Expand Down Expand Up @@ -97,24 +99,24 @@ internal class FhirSynchronizer(

private suspend fun download(): SyncResult {
val exceptions = mutableListOf<ResourceSyncException>()
fhirEngine.syncDownload(conflictResolver) {
flow {
downloader.download().collect {
fhirEngine
.syncDownload(conflictResolver) {
downloader.download().onEach {
when (it) {
is DownloadState.Started -> {
setSyncState(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, it.total))
}
is DownloadState.Success -> {
setSyncState(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, it.total, it.completed))
emit(it.resources)
}
is DownloadState.Failure -> {
exceptions.add(it.syncError)
}
}
}
}
}
.catch { exceptions.add(ResourceSyncException(ResourceType.Bundle, Exception(it))) }
.collect()
return if (exceptions.isEmpty()) {
SyncResult.Success()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import com.google.android.fhir.db.impl.dao.LocalChangeToken
import com.google.android.fhir.search.Search
import com.google.android.fhir.sync.ConflictResolver
import com.google.android.fhir.sync.DataSource
import com.google.android.fhir.sync.DownloadState
import com.google.android.fhir.sync.DownloadWorkManager
import com.google.common.truth.Truth.assertThat
import java.time.OffsetDateTime
import java.util.Date
import java.util.LinkedList
import kotlin.streams.toList
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Meta
import org.hl7.fhir.r4.model.Patient
Expand Down Expand Up @@ -142,10 +143,9 @@ object TestFhirEngineImpl : FhirEngine {

override suspend fun syncDownload(
conflictResolver: ConflictResolver,
download: suspend () -> Flow<List<Resource>>
) {
download().collect()
}
download: suspend () -> Flow<DownloadState>
): Flow<DownloadState> = download().onEach {}

override suspend fun count(search: Search): Long {
return 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,22 @@ import com.google.android.fhir.logicalId
import com.google.android.fhir.search.search
import com.google.android.fhir.sync.AcceptLocalConflictResolver
import com.google.android.fhir.sync.AcceptRemoteConflictResolver
import com.google.android.fhir.sync.DownloadState
import com.google.android.fhir.testing.assertResourceEquals
import com.google.android.fhir.testing.assertResourceNotEquals
import com.google.android.fhir.testing.readFromFile
import com.google.common.truth.Truth.assertThat
import java.util.Date
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.runBlocking
import org.hl7.fhir.exceptions.FHIRException
import org.hl7.fhir.r4.model.Address
import org.hl7.fhir.r4.model.Coding
import org.hl7.fhir.r4.model.Encounter
import org.hl7.fhir.r4.model.Enumerations
import org.hl7.fhir.r4.model.HumanName
import org.hl7.fhir.r4.model.Meta
Expand Down Expand Up @@ -262,9 +267,36 @@ class FhirEngineImplTest {
}
}

@Test
fun `syncDownload catch invalid resource downloaded`() = runBlocking {
val exception = mutableListOf<Exception>()

val testEncounterId = "encounter-1"
val invalidEncounter =
Encounter().apply {
id = testEncounterId
class_ = Coding()
meta = Meta().apply { lastUpdated = Date() }
}

fhirEngine
.syncDownload(AcceptLocalConflictResolver) {
flow { emit(DownloadState.Success((listOf((invalidEncounter))), 1, 1)) }
}
.catch { exception.add(Exception(it)) }
.collect()

assertThat(exception.first().localizedMessage)
.isEqualTo("java.lang.NullPointerException: coding.code must not be null")
}

@Test
fun syncDownload_downloadResources() = runBlocking {
fhirEngine.syncDownload(AcceptLocalConflictResolver) { flowOf((listOf((TEST_PATIENT_2)))) }
fhirEngine
.syncDownload(AcceptLocalConflictResolver) {
flow { emit(DownloadState.Success((listOf((TEST_PATIENT_2))), 1, 1)) }
}
.collect()

assertResourceEquals(TEST_PATIENT_2, fhirEngine.get<Patient>(TEST_PATIENT_2_ID))
}
Expand Down Expand Up @@ -410,7 +442,9 @@ class FhirEngineImplTest {
}
)
}
fhirEngine.syncDownload(AcceptRemoteConflictResolver) { flowOf((listOf((originalPatient)))) }
fhirEngine.syncDownload(AcceptRemoteConflictResolver) {
flowOf(DownloadState.Success(listOf((originalPatient)), 1, 1))
}

val localChange =
originalPatient.copy().apply { addAddress(Address().apply { city = "Malibu" }) }
Expand All @@ -426,7 +460,9 @@ class FhirEngineImplTest {
addAddress(Address().apply { country = "USA" })
}

fhirEngine.syncDownload(AcceptRemoteConflictResolver) { flowOf((listOf(remoteChange))) }
fhirEngine.syncDownload(AcceptRemoteConflictResolver) {
flowOf(DownloadState.Success(listOf(remoteChange), 1, 1))
}

assertThat(
services.database.getAllLocalChanges().filter {
Expand Down Expand Up @@ -455,11 +491,14 @@ class FhirEngineImplTest {
}
)
}
fhirEngine.syncDownload(AcceptLocalConflictResolver) { flowOf((listOf((originalPatient)))) }
fhirEngine
.syncDownload(AcceptLocalConflictResolver) {
flowOf(DownloadState.Success(listOf((originalPatient)), 1, 1))
}
.collect()
var localChange =
originalPatient.copy().apply { addAddress(Address().apply { city = "Malibu" }) }
fhirEngine.update(localChange)

localChange =
localChange.copy().apply {
addAddress(
Expand All @@ -481,7 +520,11 @@ class FhirEngineImplTest {
addAddress(Address().apply { country = "USA" })
}

fhirEngine.syncDownload(AcceptLocalConflictResolver) { flowOf((listOf(remoteChange))) }
fhirEngine
.syncDownload(AcceptLocalConflictResolver) {
flowOf(DownloadState.Success(listOf(remoteChange), 1, 1))
}
.collect()

val localChangeDiff =
"""[{"op":"remove","path":"\/address\/0\/country"},{"op":"add","path":"\/address\/0\/city","value":"Malibu"},{"op":"add","path":"\/address\/-","value":{"city":"Malibu","state":"California"}}]"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.android.fhir.sync

import android.content.Context
import androidx.test.core.app.ApplicationProvider
import com.google.android.fhir.FhirServices
import com.google.android.fhir.sync.upload.BundleUploader
import com.google.android.fhir.sync.upload.LocalChangesPaginator
import com.google.android.fhir.sync.upload.TransactionBundleGenerator
import com.google.android.fhir.testing.TestDataSourceImpl
import com.google.common.truth.Truth.assertThat
import java.util.Date
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.test.runTest
import org.hl7.fhir.r4.model.Coding
import org.hl7.fhir.r4.model.Encounter
import org.hl7.fhir.r4.model.Meta
import org.hl7.fhir.r4.model.ResourceType
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.robolectric.RobolectricTestRunner

@OptIn(ExperimentalCoroutinesApi::class)
@RunWith(RobolectricTestRunner::class)
class FhirSynchronizerTest {
private val services =
FhirServices.builder(ApplicationProvider.getApplicationContext()).inMemory().build()
private val fhirEngine = services.fhirEngine
private val bundleUploader =
BundleUploader(
dataSource = TestDataSourceImpl,
bundleGenerator = TransactionBundleGenerator.getDefault(),
localChangesPaginator = LocalChangesPaginator.DEFAULT
)

private lateinit var context: Context

@Before
fun setUp() {
context = ApplicationProvider.getApplicationContext()
}

@Test
fun `synchronize() should catch invalid resource downloaded`() = runTest {
val flow = MutableSharedFlow<SyncJobStatus>()

val testEncounterId = "encounter-1"
val invalidEncounter =
Encounter().apply {
id = testEncounterId
class_ = Coding()
meta = Meta().apply { lastUpdated = Date() }
}
val fhirSynchronizer =
FhirSynchronizer(
context = context,
fhirEngine = fhirEngine,
uploader = bundleUploader,
downloader =
object : Downloader {
override suspend fun download(): Flow<DownloadState> = flow {
emit(DownloadState.Success(listOf(invalidEncounter), 1, 1))
}
},
conflictResolver = AcceptLocalConflictResolver
)

val result = fhirSynchronizer.apply { subscribe(flow) }.synchronize()
val exception = (result as SyncJobStatus.Failed).exceptions.firstOrNull()
assertThat(exception!!.exception.localizedMessage)
.isEqualTo("java.lang.NullPointerException: coding.code must not be null")
assertThat(exception.resourceType).isEqualTo(ResourceType.Bundle)
}

@Test
fun `synchronize() should sync successfully`() = runTest {
val flow = MutableSharedFlow<SyncJobStatus>()

val codeString = "1427AAAAA"
val systemString = "http://openmrs.org/concepts"
val encounter =
Encounter().apply {
id = "non-null-ID"
class_ =
Coding().apply {
system = systemString
code = codeString
display = "Display"
}
meta = Meta().apply { lastUpdated = Date() }
}
val fhirSynchronizer =
FhirSynchronizer(
context = context,
fhirEngine = fhirEngine,
uploader = bundleUploader,
downloader =
object : Downloader {
override suspend fun download(): Flow<DownloadState> = flow {
emit(DownloadState.Success(listOf(encounter), 1, 1))
}
},
conflictResolver = AcceptLocalConflictResolver
)

val result = fhirSynchronizer.apply { subscribe(flow) }.synchronize()
assertThat(result).isInstanceOf(SyncJobStatus.Finished::class.java)
}
}