Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch and emit exceptions from downloading invalid resources #1917

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import com.google.android.fhir.DatastoreUtil
import com.google.android.fhir.FhirEngine
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import org.hl7.fhir.r4.model.ResourceType

enum class SyncOperation {
Expand Down Expand Up @@ -99,20 +101,27 @@ internal class FhirSynchronizer(
val exceptions = mutableListOf<ResourceSyncException>()
fhirEngine.syncDownload(conflictResolver) {
flow {
downloader.download(it).collect {
when (it) {
is DownloadState.Started -> {
setSyncState(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, it.total))
}
is DownloadState.Success -> {
setSyncState(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, it.total, it.completed))
emit(it.resources)
}
is DownloadState.Failure -> {
exceptions.add(it.syncError)
downloader
.download(it)
.onEach {
when (it) {
is DownloadState.Started -> {
datastoreUtil.readLastSyncTimestamp()
omarismail94 marked this conversation as resolved.
Show resolved Hide resolved
setSyncState(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, it.total, 0))
}
is DownloadState.Success -> {
setSyncState(
SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, it.total, it.completed)
)
emit(it.resources)
}
is DownloadState.Failure -> {
exceptions.add(it.syncError)
}
}
}
}
.catch { exceptions.add(ResourceSyncException(ResourceType.Bundle, Exception(it))) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed - if the exception is actually raised in the code where the already downloaded resources are being saved to the db, shouldn't the exception handling code be there instead of here where the resources are being downloaded?

perhaps - and only just perhaps - fhir engine's syncdownload function should return a flow... and you handle the exceptions here... but i'm not entirely sure that's the right solution here.

.collect()
}
}
return if (exceptions.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.android.fhir.sync

import android.content.Context
import androidx.test.core.app.ApplicationProvider
import com.google.android.fhir.FhirServices
import com.google.android.fhir.SyncDownloadContext
import com.google.android.fhir.resource.TestingUtils
import com.google.android.fhir.sync.upload.BundleUploader
import com.google.android.fhir.sync.upload.LocalChangesPaginator
import com.google.android.fhir.sync.upload.TransactionBundleGenerator
import com.google.common.truth.Truth.assertThat
import java.util.Date
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.test.runTest
import org.hl7.fhir.r4.model.Coding
import org.hl7.fhir.r4.model.Encounter
import org.hl7.fhir.r4.model.Meta
import org.hl7.fhir.r4.model.ResourceType
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.robolectric.RobolectricTestRunner

@OptIn(ExperimentalCoroutinesApi::class)
@RunWith(RobolectricTestRunner::class)
class FhirSynchronizerTest {
private val services =
FhirServices.builder(ApplicationProvider.getApplicationContext()).inMemory().build()
private val fhirEngine = services.fhirEngine
private val bundleUploader =
BundleUploader(
dataSource = TestingUtils.TestDataSourceImpl,
bundleGenerator = TransactionBundleGenerator.getDefault(),
localChangesPaginator = LocalChangesPaginator.DEFAULT
)

private lateinit var context: Context

@Before
fun setUp() {
context = ApplicationProvider.getApplicationContext()
}

@Test
fun `synchronize() should catch invalid resource downloaded`() = runTest {
val flow = MutableSharedFlow<SyncJobStatus>()

val testEncounterId = "encounter-1"
val invalidEncounter =
Encounter().apply {
id = testEncounterId
class_ = Coding()
meta = Meta().apply { lastUpdated = Date() }
}
val fhirSynchronizer =
FhirSynchronizer(
context = context,
fhirEngine = fhirEngine,
uploader = bundleUploader,
downloader =
object : Downloader {
override suspend fun download(context: SyncDownloadContext): Flow<DownloadState> =
flow {
emit(DownloadState.Success(listOf(invalidEncounter), 1, 1))
}
},
conflictResolver = AcceptLocalConflictResolver
)

val result = fhirSynchronizer.apply { subscribe(flow) }.synchronize()
val exception = (result as SyncJobStatus.Failed).exceptions.firstOrNull()
assertThat(exception!!.exception.localizedMessage)
.isEqualTo("java.lang.NullPointerException: coding.code must not be null")
assertThat(exception.resourceType).isEqualTo(ResourceType.Bundle)
}

@Test
fun `synchronize() should sync successfully`() = runTest {
val flow = MutableSharedFlow<SyncJobStatus>()

val codeString = "1427AAAAA"
val systemString = "http://openmrs.org/concepts"
val encounter =
Encounter().apply {
id = "non-null-ID"
class_ =
Coding().apply {
system = systemString
code = codeString
display = "Display"
}
meta = Meta().apply { lastUpdated = Date() }
}
val fhirSynchronizer =
FhirSynchronizer(
context = context,
fhirEngine = fhirEngine,
uploader = bundleUploader,
downloader =
object : Downloader {
override suspend fun download(context: SyncDownloadContext): Flow<DownloadState> =
flow {
emit(DownloadState.Success(listOf(encounter), 1, 1))
}
},
conflictResolver = AcceptLocalConflictResolver
)

val result = fhirSynchronizer.apply { subscribe(flow) }.synchronize()
assertThat(result).isInstanceOf(SyncJobStatus.Finished::class.java)
}
}