Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore Granular Upload Progress Updates and Database Consolidation #2392

Merged
merged 12 commits into from
Feb 1, 2024
MJ1998 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Google LLC
* Copyright 2023-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import com.google.android.fhir.search.include
import com.google.android.fhir.search.revInclude
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import com.google.android.fhir.sync.upload.ResourceUploadResponseMapping
import com.google.android.fhir.sync.upload.UploadSyncResult
import com.google.android.fhir.sync.upload.UploadRequestResult
import com.google.android.fhir.testing.assertJsonArrayEqualsIgnoringOrder
import com.google.android.fhir.testing.assertResourceEquals
import com.google.android.fhir.testing.readFromFile
Expand All @@ -54,6 +54,7 @@ import java.math.BigDecimal
import java.time.Instant
import java.util.Date
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.runBlocking
import org.hl7.fhir.r4.model.Address
import org.hl7.fhir.r4.model.CarePlan
Expand Down Expand Up @@ -555,14 +556,16 @@ class DatabaseImplTest {
it
.first { it.resourceId == "remote-patient-3" }
.let {
UploadSyncResult.Success(
listOf(
ResourceUploadResponseMapping(
listOf(it),
Patient().apply {
id = it.resourceId
meta = remoteMeta
},
flowOf(
UploadRequestResult.Success(
listOf(
ResourceUploadResponseMapping(
listOf(it),
Patient().apply {
id = it.resourceId
meta = remoteMeta
},
),
),
),
)
Expand Down
4 changes: 2 additions & 2 deletions engine/src/main/java/com/google/android/fhir/FhirEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.google.android.fhir.search.Search
import com.google.android.fhir.sync.ConflictResolver
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import com.google.android.fhir.sync.upload.SyncUploadProgress
import com.google.android.fhir.sync.upload.UploadSyncResult
import com.google.android.fhir.sync.upload.UploadRequestResult
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import org.hl7.fhir.r4.model.Resource
Expand Down Expand Up @@ -63,7 +63,7 @@ interface FhirEngine {
*/
suspend fun syncUpload(
localChangesFetchMode: LocalChangesFetchMode,
upload: (suspend (List<LocalChange>) -> UploadSyncResult),
upload: (suspend (List<LocalChange>) -> Flow<UploadRequestResult>),
): Flow<SyncUploadProgress>

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ import com.google.android.fhir.sync.upload.DefaultResourceConsolidator
import com.google.android.fhir.sync.upload.LocalChangeFetcherFactory
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import com.google.android.fhir.sync.upload.SyncUploadProgress
import com.google.android.fhir.sync.upload.UploadSyncResult
import com.google.android.fhir.sync.upload.UploadRequestResult
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType

Expand Down Expand Up @@ -127,7 +129,7 @@ internal class FhirEngineImpl(private val database: Database, private val contex

override suspend fun syncUpload(
localChangesFetchMode: LocalChangesFetchMode,
upload: (suspend (List<LocalChange>) -> UploadSyncResult),
upload: (suspend (List<LocalChange>) -> Flow<UploadRequestResult>),
): Flow<SyncUploadProgress> = flow {
val resourceConsolidator = DefaultResourceConsolidator(database)
val localChangeFetcher = LocalChangeFetcherFactory.byMode(localChangesFetchMode, database)
Expand All @@ -141,23 +143,22 @@ internal class FhirEngineImpl(private val database: Database, private val contex

while (localChangeFetcher.hasNext()) {
val localChanges = localChangeFetcher.next()
val uploadSyncResult = upload(localChanges)

resourceConsolidator.consolidate(uploadSyncResult)
when (uploadSyncResult) {
is UploadSyncResult.Success -> emit(localChangeFetcher.getProgress())
is UploadSyncResult.Failure -> {
with(localChangeFetcher.getProgress()) {
emit(
SyncUploadProgress(
remaining = remaining,
initialTotal = initialTotal,
uploadError = uploadSyncResult.syncError,
),
)
val uploadRequestResult =
upload(localChanges)
.onEach { result ->
resourceConsolidator.consolidate(result)
val newProgress =
when (result) {
is UploadRequestResult.Success -> localChangeFetcher.getProgress()
MJ1998 marked this conversation as resolved.
Show resolved Hide resolved
is UploadRequestResult.Failure ->
localChangeFetcher.getProgress().copy(uploadError = result.uploadError)
}
emit(newProgress)
}
break
}
.firstOrNull { it is UploadRequestResult.Failure }

if (uploadRequestResult is UploadRequestResult.Failure) {
break
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Google LLC
* Copyright 2023-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,30 +35,30 @@ import org.hl7.fhir.r4.model.ResourceType
internal fun interface ResourceConsolidator {

/** Consolidates the local change token with the provided response from the FHIR server. */
suspend fun consolidate(uploadSyncResult: UploadSyncResult)
suspend fun consolidate(uploadRequestResult: UploadRequestResult)
}

/** Default implementation of [ResourceConsolidator] that uses the database to aid consolidation. */
internal class DefaultResourceConsolidator(private val database: Database) : ResourceConsolidator {

override suspend fun consolidate(uploadSyncResult: UploadSyncResult) =
when (uploadSyncResult) {
is UploadSyncResult.Success -> {
override suspend fun consolidate(uploadRequestResult: UploadRequestResult) =
when (uploadRequestResult) {
is UploadRequestResult.Success -> {
database.deleteUpdates(
LocalChangeToken(
uploadSyncResult.uploadResponses.flatMap {
uploadRequestResult.successfulUploadResponseMappings.flatMap {
it.localChanges.flatMap { localChange -> localChange.token.ids }
},
),
)
uploadSyncResult.uploadResponses.forEach {
uploadRequestResult.successfulUploadResponseMappings.forEach {
when (it) {
is BundleComponentUploadResponseMapping -> updateVersionIdAndLastUpdated(it.output)
is ResourceUploadResponseMapping -> updateVersionIdAndLastUpdated(it.output)
}
}
}
is UploadSyncResult.Failure -> {
is UploadRequestResult.Failure -> {
/* For now, do nothing (we do not delete the local changes from the database as they were
not uploaded successfully. In the future, add consolidation required if upload fails.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Google LLC
* Copyright 2023-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package com.google.android.fhir.sync.upload

import com.google.android.fhir.LocalChange
import com.google.android.fhir.LocalChangeToken
import com.google.android.fhir.sync.DataSource
import com.google.android.fhir.sync.ResourceSyncException
import com.google.android.fhir.sync.upload.patch.PatchGenerator
Expand All @@ -26,6 +25,8 @@ import com.google.android.fhir.sync.upload.request.UploadRequestGenerator
import com.google.android.fhir.sync.upload.request.UploadRequestMapping
import com.google.android.fhir.sync.upload.request.UrlUploadRequestMapping
import java.lang.IllegalStateException
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.transformWhile
import org.hl7.fhir.exceptions.FHIRException
import org.hl7.fhir.instance.model.api.IBase
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome
Expand All @@ -48,27 +49,22 @@ internal class Uploader(
private val patchGenerator: PatchGenerator,
private val requestGenerator: UploadRequestGenerator,
) {
suspend fun upload(localChanges: List<LocalChange>): UploadSyncResult {
val mappedPatches = patchGenerator.generate(localChanges)
val mappedRequests = requestGenerator.generateUploadRequests(mappedPatches)
val token = LocalChangeToken(localChanges.flatMap { it.token.ids })

val successfulMappedResponses = mutableListOf<SuccessfulUploadResponseMapping>()

for (mappedRequest in mappedRequests) {
when (val result = handleUploadRequest(mappedRequest)) {
is UploadRequestResult.Success ->
successfulMappedResponses.addAll(result.successfulUploadResponsMappings)
is UploadRequestResult.Failure -> return UploadSyncResult.Failure(result.exception, token)
suspend fun upload(localChanges: List<LocalChange>) =
localChanges
.let { patchGenerator.generate(it) }
.let { requestGenerator.generateUploadRequests(it) }
.asFlow()
.transformWhile {
with(handleUploadRequest(it)) {
emit(this)
this !is UploadRequestResult.Failure
}
MJ1998 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return UploadSyncResult.Success(successfulMappedResponses)
}

private fun handleUploadResponse(
private fun handleSuccessfulUploadResponse(
mappedUploadRequest: UploadRequestMapping,
response: Resource,
): UploadRequestResult {
): UploadRequestResult.Success {
val responsesList =
when {
mappedUploadRequest is UrlUploadRequestMapping && response is DomainResource ->
Expand Down Expand Up @@ -113,16 +109,18 @@ internal class Uploader(
when {
response is OperationOutcome && response.issue.isNotEmpty() ->
UploadRequestResult.Failure(
mappedUploadRequest.localChanges,
ResourceSyncException(
mappedUploadRequest.generatedRequest.resource.resourceType,
FHIRException(response.issueFirstRep.diagnostics),
),
)
(response is DomainResource || response is Bundle) &&
(response !is IBaseOperationOutcome) ->
handleUploadResponse(mappedUploadRequest, response)
handleSuccessfulUploadResponse(mappedUploadRequest, response)
else ->
UploadRequestResult.Failure(
mappedUploadRequest.localChanges,
ResourceSyncException(
mappedUploadRequest.generatedRequest.resource.resourceType,
FHIRException(
Expand All @@ -134,27 +132,22 @@ internal class Uploader(
} catch (e: Exception) {
Timber.e(e)
UploadRequestResult.Failure(
mappedUploadRequest.localChanges,
ResourceSyncException(mappedUploadRequest.generatedRequest.resource.resourceType, e),
)
}
}

private sealed class UploadRequestResult {
data class Success(
val successfulUploadResponsMappings: List<SuccessfulUploadResponseMapping>,
) : UploadRequestResult()

data class Failure(val exception: ResourceSyncException) : UploadRequestResult()
}
}

sealed class UploadSyncResult {
sealed class UploadRequestResult {
data class Success(
val uploadResponses: List<SuccessfulUploadResponseMapping>,
) : UploadSyncResult()
val successfulUploadResponseMappings: List<SuccessfulUploadResponseMapping>,
) : UploadRequestResult()

data class Failure(val syncError: ResourceSyncException, val localChangeToken: LocalChangeToken) :
UploadSyncResult()
data class Failure(
val localChanges: List<LocalChange>,
val uploadError: ResourceSyncException,
) : UploadRequestResult()
MJ1998 marked this conversation as resolved.
Show resolved Hide resolved
}

sealed class SuccessfulUploadResponseMapping(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Google LLC
* Copyright 2023-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,6 +45,7 @@ internal sealed class UploadRequestGeneratorMode {
data class BundleRequest(
val httpVerbToUseForCreate: Bundle.HTTPVerb,
val httpVerbToUseForUpdate: Bundle.HTTPVerb,
val bundleSize: Int = 500,
MJ1998 marked this conversation as resolved.
Show resolved Hide resolved
) : UploadRequestGeneratorMode()
}

Expand All @@ -59,20 +60,22 @@ internal object UploadRequestGeneratorFactory {
TransactionBundleGenerator.getGenerator(
mode.httpVerbToUseForCreate,
mode.httpVerbToUseForUpdate,
mode.bundleSize,
)
}
}

internal sealed class UploadRequestMapping(
open val localChanges: List<LocalChange>,
open val generatedRequest: UploadRequest,
)

internal data class UrlUploadRequestMapping(
val localChanges: List<LocalChange>,
override val localChanges: List<LocalChange>,
override val generatedRequest: UrlUploadRequest,
) : UploadRequestMapping(generatedRequest)
) : UploadRequestMapping(localChanges, generatedRequest)

internal data class BundleUploadRequestMapping(
val splitLocalChanges: List<List<LocalChange>>,
override val generatedRequest: BundleUploadRequest,
) : UploadRequestMapping(generatedRequest)
) : UploadRequestMapping(localChanges = splitLocalChanges.flatten(), generatedRequest)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Google LLC
* Copyright 2023-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,7 @@ import com.google.android.fhir.sync.download.DownloadRequest
import com.google.android.fhir.sync.download.UrlDownloadRequest
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import com.google.android.fhir.sync.upload.SyncUploadProgress
import com.google.android.fhir.sync.upload.UploadSyncResult
import com.google.android.fhir.sync.upload.UploadRequestResult
import com.google.android.fhir.sync.upload.request.BundleUploadRequest
import com.google.android.fhir.sync.upload.request.UploadRequest
import com.google.android.fhir.sync.upload.request.UrlUploadRequest
Expand Down Expand Up @@ -158,12 +158,14 @@ object TestFhirEngineImpl : FhirEngine {

override suspend fun syncUpload(
localChangesFetchMode: LocalChangesFetchMode,
upload: suspend (List<LocalChange>) -> UploadSyncResult,
upload: suspend (List<LocalChange>) -> Flow<UploadRequestResult>,
): Flow<SyncUploadProgress> = flow {
emit(SyncUploadProgress(1, 1))
when (val result = upload(getLocalChanges(ResourceType.Patient, "123"))) {
is UploadSyncResult.Success -> emit(SyncUploadProgress(0, 1))
is UploadSyncResult.Failure -> emit(SyncUploadProgress(1, 1, result.syncError))
upload(getLocalChanges(ResourceType.Patient, "123")).collect {
when (it) {
is UploadRequestResult.Success -> emit(SyncUploadProgress(0, 1))
is UploadRequestResult.Failure -> emit(SyncUploadProgress(1, 1, it.uploadError))
}
}
}

Expand Down Expand Up @@ -219,14 +221,14 @@ object TestFailingDatasource : DataSource {
}
}

class BundleDataSource(val onPostBundle: suspend (Bundle) -> Resource) : DataSource {
class BundleDataSource(val onPostBundle: suspend (BundleUploadRequest) -> Resource) : DataSource {

override suspend fun download(downloadRequest: DownloadRequest): Resource {
TODO("Not yet implemented")
}

override suspend fun upload(request: UploadRequest) =
onPostBundle((request as BundleUploadRequest).resource)
onPostBundle((request as BundleUploadRequest))
}

class UrlRequestDataSource(val onUrlRequestSend: suspend (UrlUploadRequest) -> Resource) :
Expand Down
Loading
Loading