Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a LocalChangeFetcher #2135

Merged
merged 18 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

26 changes: 16 additions & 10 deletions engine/src/main/java/com/google/android/fhir/FhirEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.google.android.fhir
import com.google.android.fhir.db.ResourceNotFoundException
import com.google.android.fhir.search.Search
import com.google.android.fhir.sync.ConflictResolver
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import org.hl7.fhir.r4.model.Resource
Expand Down Expand Up @@ -53,7 +54,8 @@ interface FhirEngine {
* api caller should [Flow.collect] it.
*/
suspend fun syncUpload(
upload: (suspend (List<LocalChange>) -> Flow<Pair<LocalChangeToken, Resource>>)
localChangesFetchMode: LocalChangesFetchMode,
upload: (suspend (List<LocalChange>) -> Flow<Pair<LocalChangeToken, Resource>>),
)

/**
Expand All @@ -62,7 +64,7 @@ interface FhirEngine {
*/
suspend fun syncDownload(
conflictResolver: ConflictResolver,
download: suspend () -> Flow<List<Resource>>
download: suspend () -> Flow<List<Resource>>,
)

/**
Expand All @@ -87,29 +89,32 @@ interface FhirEngine {
* Retrieves a list of [LocalChange]s for [Resource] with given type and id, which can be used to
* purge resource from database. If there is no local change for given [resourceType] and
* [Resource.id], return an empty list.
*
* @param type The [ResourceType]
* @param id The resource id [Resource.id]
* @return [List]<[LocalChange]> A list of local changes for given [resourceType] and
* [Resource.id] . If there is no local change for given [resourceType] and [Resource.id], return
* an empty list.
* [Resource.id] . If there is no local change for given [resourceType] and [Resource.id],
* return an empty list.
*/
suspend fun getLocalChanges(type: ResourceType, id: String): List<LocalChange>

/**
* Purges a resource from the database based on resource type and id without any deletion of data
* from the server.
*
* @param type The [ResourceType]
* @param id The resource id [Resource.id]
* @param isLocalPurge default value is false here resource will not be deleted from
* LocalChangeEntity table but it will throw IllegalStateException("Resource has local changes
* either sync with server or FORCE_PURGE required") if local change exists. If true this API will
* delete resource entry from LocalChangeEntity table.
* LocalChangeEntity table but it will throw IllegalStateException("Resource has local changes
* either sync with server or FORCE_PURGE required") if local change exists. If true this API
* will delete resource entry from LocalChangeEntity table.
*/
suspend fun purge(type: ResourceType, id: String, forcePurge: Boolean = false)
}

/**
* Returns a FHIR resource of type [R] with [id] from the local storage.
*
* @param <R> The resource type which should be a subtype of [Resource].
* @throws ResourceNotFoundException if the resource is not found
*/
Expand All @@ -120,6 +125,7 @@ suspend inline fun <reified R : Resource> FhirEngine.get(id: String): R {

/**
* Deletes a FHIR resource of type [R] with [id] from the local storage.
*
* @param <R> The resource type which should be a subtype of [Resource].
*/
suspend inline fun <reified R : Resource> FhirEngine.delete(id: String) {
Expand All @@ -138,7 +144,7 @@ data class SearchResult<R : Resource>(
/** Matching referenced resources as per the [Search.include] criteria in the query. */
val included: Map<SearchParamName, List<Resource>>?,
/** Matching referenced resources as per the [Search.revInclude] criteria in the query. */
val revIncluded: Map<Pair<ResourceType, SearchParamName>, List<Resource>>?
val revIncluded: Map<Pair<ResourceType, SearchParamName>, List<Resource>>?,
) {
override fun equals(other: Any?) =
other is SearchResult<*> &&
Expand All @@ -155,7 +161,7 @@ data class SearchResult<R : Resource>(

private fun equalsShallow(
first: Map<SearchParamName, List<Resource>>?,
second: Map<SearchParamName, List<Resource>>?
second: Map<SearchParamName, List<Resource>>?,
) =
if (first != null && second != null && first.size == second.size) {
first.entries.asSequence().zip(second.entries.asSequence()).all { (x, y) ->
Expand All @@ -168,7 +174,7 @@ data class SearchResult<R : Resource>(
@JvmName("equalsShallowRevInclude")
private fun equalsShallow(
first: Map<Pair<ResourceType, SearchParamName>, List<Resource>>?,
second: Map<Pair<ResourceType, SearchParamName>, List<Resource>>?
second: Map<Pair<ResourceType, SearchParamName>, List<Resource>>?,
) =
if (first != null && second != null && first.size == second.size) {
first.entries.asSequence().zip(second.entries.asSequence()).all { (x, y) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import com.google.android.fhir.search.count
import com.google.android.fhir.search.execute
import com.google.android.fhir.sync.ConflictResolver
import com.google.android.fhir.sync.Resolved
import com.google.android.fhir.sync.upload.LocalChangeFetcher
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import org.hl7.fhir.r4.model.Bundle
Expand Down Expand Up @@ -81,15 +83,15 @@ internal class FhirEngineImpl(private val database: Database, private val contex

override suspend fun syncDownload(
conflictResolver: ConflictResolver,
download: suspend () -> Flow<List<Resource>>
download: suspend () -> Flow<List<Resource>>,
) {
download().collect { resources ->
database.withTransaction {
val resolved =
resolveConflictingResources(
resources,
getConflictingResourceIds(resources),
conflictResolver
conflictResolver,
)
database.insertSyncedResources(resources)
saveResolvedResourcesToDatabase(resolved)
Expand All @@ -107,7 +109,7 @@ internal class FhirEngineImpl(private val database: Database, private val contex
private suspend fun resolveConflictingResources(
resources: List<Resource>,
conflictingResourceIds: Set<String>,
conflictResolver: ConflictResolver
conflictResolver: ConflictResolver,
) =
resources
.filter { conflictingResourceIds.contains(it.logicalId) }
Expand All @@ -123,16 +125,15 @@ internal class FhirEngineImpl(private val database: Database, private val contex
.intersect(database.getAllLocalChanges().map { it.resourceId }.toSet())

override suspend fun syncUpload(
upload: suspend (List<LocalChange>) -> Flow<Pair<LocalChangeToken, Resource>>
localChangesFetchMode: LocalChangesFetchMode,
upload: suspend (List<LocalChange>) -> Flow<Pair<LocalChangeToken, Resource>>,
) {
val localChanges = database.getAllLocalChanges()
if (localChanges.isNotEmpty()) {
upload(localChanges).collect {
database.deleteUpdates(it.first)
when (it.second) {
is Bundle -> updateVersionIdAndLastUpdated(it.second as Bundle)
else -> updateVersionIdAndLastUpdated(it.second)
}
val localChangeFetcher = LocalChangeFetcher.byMode(localChangesFetchMode, database)
upload(localChangeFetcher.next()).collect {
omarismail94 marked this conversation as resolved.
Show resolved Hide resolved
database.deleteUpdates(it.first)
when (it.second) {
is Bundle -> updateVersionIdAndLastUpdated(it.second as Bundle)
else -> updateVersionIdAndLastUpdated(it.second)
}
}
}
Expand Down Expand Up @@ -161,7 +162,7 @@ internal class FhirEngineImpl(private val database: Database, private val contex
id,
type,
getVersionFromETag(response.etag),
response.lastModified.toInstant()
response.lastModified.toInstant(),
)
}
}
Expand All @@ -173,7 +174,7 @@ internal class FhirEngineImpl(private val database: Database, private val contex
resource.id,
resource.resourceType,
resource.meta.versionId,
resource.meta.lastUpdated.toInstant()
resource.meta.lastUpdated.toInstant(),
)
}
}
Expand All @@ -197,9 +198,7 @@ internal class FhirEngineImpl(private val database: Database, private val contex
* [Bundle.BundleEntryResponseComponent.location].
*
* [Bundle.BundleEntryResponseComponent.location] may be:
*
* 1. absolute path: `<server-path>/<resource-type>/<resource-id>/_history/<version>`
*
* 2. relative path: `<resource-type>/<resource-id>/_history/<version>`
*/
private val Bundle.BundleEntryResponseComponent.resourceIdAndType: Pair<String, ResourceType>?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.google.android.fhir.DatastoreUtil
import com.google.android.fhir.FhirEngine
import com.google.android.fhir.sync.download.DownloadState
import com.google.android.fhir.sync.download.Downloader
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import com.google.android.fhir.sync.upload.UploadState
import com.google.android.fhir.sync.upload.Uploader
import java.time.OffsetDateTime
Expand Down Expand Up @@ -129,7 +130,8 @@ internal class FhirSynchronizer(

private suspend fun upload(): SyncResult {
val exceptions = mutableListOf<ResourceSyncException>()
fhirEngine.syncUpload { list ->
val localChangesFetchMode = LocalChangesFetchMode.AllChanges
fhirEngine.syncUpload(localChangesFetchMode) { list ->
flow {
uploader.upload(list).collect { result ->
when (result) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.android.fhir.sync.upload

import com.google.android.fhir.LocalChange
import com.google.android.fhir.db.Database

/**
* Fetches local changes.
*
* This interface provides methods to check for the existence of further changes, retrieve the next
* batch of changes, and get the progress of fetched changes.
*
* It is marked as internal to keep [Database] unexposed to clients
*/
internal interface LocalChangeFetcher {
/** Checks if there are more local changes to be fetched. */
suspend fun hasNext(): Boolean

/** Retrieves the next batch of local changes. */
suspend fun next(): List<LocalChange>

/**
* Returns [ProgressState], which contains the remaining changes left to upload and the initial
* total to upload
*/
suspend fun getProgress(): ProgressState

companion object {
internal suspend fun byMode(
mode: LocalChangesFetchMode,
database: Database,
): LocalChangeFetcher {
val totalLocalChangeCount = database.getAllLocalChanges().size
omarismail94 marked this conversation as resolved.
Show resolved Hide resolved
return when (mode) {
is LocalChangesFetchMode.AllChanges ->
AllChangesLocalChangeFetcher(database, totalLocalChangeCount)
else -> error("$mode does not have an implementation yet.")
}
}
}
}

data class ProgressState(
omarismail94 marked this conversation as resolved.
Show resolved Hide resolved
val remaining: Int,
val initialTotal: Int,
)

internal class AllChangesLocalChangeFetcher(
private val database: Database,
private val total: Int,
) : LocalChangeFetcher {

override suspend fun hasNext(): Boolean = database.getAllLocalChanges().isNotEmpty()

override suspend fun next(): List<LocalChange> = database.getAllLocalChanges()

override suspend fun getProgress(): ProgressState =
ProgressState(database.getAllLocalChanges().size, total)
}

/** Represents the mode in which local changes should be fetched. */
sealed class LocalChangesFetchMode {

object AllChanges : LocalChangesFetchMode()

object PerResource : LocalChangesFetchMode()

object EarliestChange : LocalChangesFetchMode()
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.google.android.fhir.sync.DownloadRequest
import com.google.android.fhir.sync.DownloadWorkManager
import com.google.android.fhir.sync.UploadRequest
import com.google.android.fhir.sync.UrlDownloadRequest
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import com.google.common.truth.Truth.assertThat
import java.net.SocketTimeoutException
import java.time.Instant
Expand Down Expand Up @@ -111,7 +112,7 @@ object TestDataSourceImpl : DataSource {
}

open class TestDownloadManagerImpl(
private val queries: List<String> = listOf("Patient?address-city=NAIROBI")
private val queries: List<String> = listOf("Patient?address-city=NAIROBI"),
) : DownloadWorkManager {
private val urls = LinkedList(queries)

Expand Down Expand Up @@ -147,17 +148,17 @@ object TestFhirEngineImpl : FhirEngine {
}

override suspend fun syncUpload(
upload: suspend (List<LocalChange>) -> Flow<Pair<LocalChangeToken, Resource>>
) {
upload(getLocalChanges(ResourceType.Patient, "123")).collect()
}
localChangesFetchMode: LocalChangesFetchMode,
upload: suspend (List<LocalChange>) -> Flow<Pair<LocalChangeToken, Resource>>,
) = upload(getLocalChanges(ResourceType.Patient, "123")).collect()

override suspend fun syncDownload(
conflictResolver: ConflictResolver,
download: suspend () -> Flow<List<Resource>>
download: suspend () -> Flow<List<Resource>>,
) {
download().collect()
}

override suspend fun count(search: Search): Long {
return 0
}
Expand All @@ -176,8 +177,8 @@ object TestFhirEngineImpl : FhirEngine {
payload = "{ 'resourceType' : 'Patient', 'id' : '123' }",
token = LocalChangeToken(listOf()),
type = LocalChange.Type.INSERT,
timestamp = Instant.now()
)
timestamp = Instant.now(),
),
)
}

Expand Down
Loading