Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch searching when list might be large or when count is not defined #3456

Merged
merged 4 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2021-2024 Ona Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.smartregister.fhircore.engine.util.extension

import android.content.Context
import androidx.test.core.app.ApplicationProvider
import androidx.test.filters.MediumTest
import com.google.android.fhir.FhirEngine
import com.google.android.fhir.FhirEngineConfiguration
import com.google.android.fhir.FhirEngineProvider
import com.google.android.fhir.search.search
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.hl7.fhir.r4.model.Patient
import org.hl7.fhir.r4.model.Questionnaire
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Test

@MediumTest
class FhirEngineExtensionKtTest {

private val context = ApplicationProvider.getApplicationContext<Context>()
private lateinit var fhirEngine: FhirEngine

@Before
fun setUp() {
FhirEngineProvider.init(FhirEngineConfiguration(testMode = true))
fhirEngine = FhirEngineProvider.getInstance(context)

val patients = (0..1000).map { Patient().apply { id = "test-patient-$it" } }
val questionnaires = (0..3).map { Questionnaire().apply { id = "test-questionnaire-$it" } }
runBlocking { fhirEngine.create(*patients.toTypedArray(), *questionnaires.toTypedArray()) }
}

@After
fun tearDown() {
runBlocking { fhirEngine.clearDatabase() }
FhirEngineProvider.cleanup()
}

@Test
fun test_search_time_searches_sequentially_and_short_running_query_waits() {
val fetchedResources = mutableListOf<Resource>()
runBlocking {
launch {
val patients = fhirEngine.search<Patient> {}.map { it.resource }
fetchedResources += patients
}

launch {
val questionnaires = fhirEngine.search<Questionnaire> {}.map { it.resource }
fetchedResources += questionnaires
}
}
val indexOfResultOfShortQuery =
fetchedResources.indexOfFirst { it.resourceType == ResourceType.Questionnaire }
val indexOfResultOfLongQuery =
fetchedResources.indexOfFirst { it.resourceType == ResourceType.Patient }
Assert.assertTrue(indexOfResultOfShortQuery > indexOfResultOfLongQuery)
}

@Test
fun test_batchedSearch_returns_short_running_query_and_long_running_does_not_block() {
val fetchedResources = mutableListOf<Resource>()
runBlocking {
launch {
val patients = fhirEngine.batchedSearch<Patient> {}.map { it.resource }
fetchedResources += patients
}

launch {
val questionnaires = fhirEngine.search<Questionnaire> {}
fetchedResources + questionnaires
}
}

val indexOfResultOfShortQuery =
fetchedResources.indexOfFirst { it.resourceType == ResourceType.Questionnaire }
val indexOfResultOfLongQuery =
fetchedResources.indexOfFirst { it.resourceType == ResourceType.Patient }
Assert.assertTrue(indexOfResultOfShortQuery < indexOfResultOfLongQuery)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import com.google.android.fhir.search.filter.TokenParamFilterCriterion
import com.google.android.fhir.search.has
import com.google.android.fhir.search.include
import com.google.android.fhir.search.revInclude
import com.google.android.fhir.search.search
import com.jayway.jsonpath.Configuration
import com.jayway.jsonpath.JsonPath
import com.jayway.jsonpath.Option
Expand Down Expand Up @@ -85,6 +84,7 @@ import org.smartregister.fhircore.engine.rulesengine.ConfigRulesExecutor
import org.smartregister.fhircore.engine.util.DispatcherProvider
import org.smartregister.fhircore.engine.util.SharedPreferencesHelper
import org.smartregister.fhircore.engine.util.extension.asReference
import org.smartregister.fhircore.engine.util.extension.batchedSearch
import org.smartregister.fhircore.engine.util.extension.encodeResourceToString
import org.smartregister.fhircore.engine.util.extension.extractId
import org.smartregister.fhircore.engine.util.extension.extractLogicalIdUuid
Expand Down Expand Up @@ -136,7 +136,7 @@ constructor(
): List<T> =
withContext(dispatcherProvider.io()) {
fhirEngine
.search<T> {
.batchedSearch<T> {
filterByResourceTypeId(token, subjectType, subjectId)
dataQueries.forEach {
filterBy(
Expand All @@ -149,7 +149,7 @@ constructor(
}

suspend inline fun <reified R : Resource> search(search: Search) =
fhirEngine.search<R>(search).map { it.resource }
fhirEngine.batchedSearch<R>(search).map { it.resource }

suspend inline fun count(search: Search) = fhirEngine.count(search)

Expand Down Expand Up @@ -265,14 +265,14 @@ constructor(
suspend fun loadManagingEntity(group: Group) =
group.managingEntity?.let { reference ->
fhirEngine
.search<RelatedPerson> {
.batchedSearch<RelatedPerson> {
filter(RelatedPerson.RES_ID, { value = of(reference.extractId()) })
}
.map { it.resource }
.firstOrNull()
?.let { relatedPerson ->
fhirEngine
.search<Patient> {
.batchedSearch<Patient> {
filter(
Patient.RES_ID,
{ value = of(relatedPerson.patient.extractId()) },
Expand Down Expand Up @@ -681,7 +681,7 @@ constructor(
}
}
return kotlin
.runCatching { fhirEngine.search<Resource>(search) }
.runCatching { fhirEngine.batchedSearch<Resource>(search) }
.onFailure { Timber.e(it, "Error fetching related resources") }
.getOrDefault(emptyList())
}
Expand Down Expand Up @@ -728,7 +728,7 @@ constructor(
configComputedRuleValues = computedValuesMap,
)
}
val resources = fhirEngine.search<Resource>(search).map { it.resource }
val resources = fhirEngine.batchedSearch<Resource>(search).map { it.resource }
val filteredResources =
filterResourcesByFhirPathExpression(
resourceFilterExpressions = eventWorkflow.resourceFilterExpressions,
Expand Down Expand Up @@ -941,7 +941,7 @@ constructor(
currentPage = pageNumber,
count = COUNT,
)
val result = fhirEngine.search<Resource>(baseResourceSearch)
val result = fhirEngine.batchedSearch<Resource>(baseResourceSearch)
searchResults.addAll(
result.filter { searchResult ->
when (baseResourceConfig.resource) {
Expand Down Expand Up @@ -996,7 +996,7 @@ constructor(
currentPage = currentPage,
count = pageSize,
)
fhirEngine.search<Resource>(search)
fhirEngine.batchedSearch<Resource>(search)
}
.onFailure {
Timber.e(
Expand Down Expand Up @@ -1148,7 +1148,7 @@ constructor(

private suspend fun retrieveSubLocations(locationId: String): ArrayDeque<Location> =
fhirEngine
.search<Location>(
.batchedSearch<Location>(
Search(type = ResourceType.Location).apply {
filter(
Location.PARTOF,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.smartregister.fhircore.engine.configuration.ConfigType
import org.smartregister.fhircore.engine.configuration.ConfigurationRegistry
import org.smartregister.fhircore.engine.configuration.app.ApplicationConfiguration
import org.smartregister.fhircore.engine.util.DispatcherProvider
import org.smartregister.fhircore.engine.util.extension.batchedSearch
import org.smartregister.fhircore.engine.util.extension.isValidResourceType
import org.smartregister.fhircore.engine.util.extension.resourceClassType
import org.smartregister.p2p.model.RecordCount
Expand Down Expand Up @@ -108,7 +109,7 @@ constructor(
count = batchSize
from = offset
}
fhirEngine.search(search)
fhirEngine.batchedSearch(search)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import ca.uhn.fhir.util.TerserUtil
import com.google.android.fhir.FhirEngine
import com.google.android.fhir.datacapture.extensions.logicalId
import com.google.android.fhir.get
import com.google.android.fhir.search.search
import dagger.hilt.android.qualifiers.ApplicationContext
import java.util.Date
import javax.inject.Inject
Expand Down Expand Up @@ -60,6 +59,7 @@ import org.smartregister.fhircore.engine.configuration.event.EventType
import org.smartregister.fhircore.engine.data.local.DefaultRepository
import org.smartregister.fhircore.engine.util.extension.addResourceParameter
import org.smartregister.fhircore.engine.util.extension.asReference
import org.smartregister.fhircore.engine.util.extension.batchedSearch
import org.smartregister.fhircore.engine.util.extension.encodeResourceToString
import org.smartregister.fhircore.engine.util.extension.extractFhirpathDuration
import org.smartregister.fhircore.engine.util.extension.extractFhirpathPeriod
Expand Down Expand Up @@ -120,7 +120,7 @@ constructor(
// Only one CarePlan per plan, update or init a new one if not exists
val output =
fhirEngine
.search<CarePlan> {
.batchedSearch<CarePlan> {
filter(
CarePlan.INSTANTIATES_CANONICAL,
{ value = planDefinition.referenceValue() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import android.content.Context
import androidx.hilt.work.HiltWorker
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import com.google.android.fhir.search.search
import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import kotlinx.coroutines.withContext
Expand All @@ -33,6 +32,7 @@ import org.smartregister.fhircore.engine.configuration.app.ApplicationConfigurat
import org.smartregister.fhircore.engine.data.local.DefaultRepository
import org.smartregister.fhircore.engine.util.DispatcherProvider
import org.smartregister.fhircore.engine.util.SharedPreferencesHelper
import org.smartregister.fhircore.engine.util.extension.batchedSearch
import org.smartregister.fhircore.engine.util.extension.extractId
import org.smartregister.fhircore.engine.util.extension.lastOffset
import org.smartregister.fhircore.engine.util.getLastOffset
Expand Down Expand Up @@ -94,7 +94,7 @@ constructor(

suspend fun getCarePlans(batchSize: Int, lastOffset: Int) =
defaultRepository.fhirEngine
.search<CarePlan> {
.batchedSearch<CarePlan> {
filter(
CarePlan.STATUS,
{ value = of(CarePlan.CarePlanStatus.DRAFT.toCode()) },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import ca.uhn.fhir.rest.param.ParamPrefixEnum
import com.google.android.fhir.datacapture.extensions.logicalId
import com.google.android.fhir.get
import com.google.android.fhir.search.filter.TokenParamFilterCriterion
import com.google.android.fhir.search.search
import dagger.hilt.android.qualifiers.ApplicationContext
import java.util.Date
import javax.inject.Inject
Expand All @@ -39,6 +38,7 @@ import org.smartregister.fhircore.engine.configuration.ConfigurationRegistry
import org.smartregister.fhircore.engine.configuration.app.ApplicationConfiguration
import org.smartregister.fhircore.engine.configuration.event.EventType
import org.smartregister.fhircore.engine.data.local.DefaultRepository
import org.smartregister.fhircore.engine.util.extension.batchedSearch
import org.smartregister.fhircore.engine.util.extension.executionStartIsBeforeOrToday
import org.smartregister.fhircore.engine.util.extension.expiredConcept
import org.smartregister.fhircore.engine.util.extension.extractId
Expand Down Expand Up @@ -66,7 +66,7 @@ constructor(
Timber.i("Fetch and expire overdue tasks")
val tasksResult =
fhirEngine
.search<Task> {
.batchedSearch<Task> {
filter(
Task.STATUS,
{ value = of(TaskStatus.REQUESTED.toCoding()) },
Expand Down Expand Up @@ -148,7 +148,7 @@ constructor(

val tasks =
defaultRepository.fhirEngine
.search<Task> {
.batchedSearch<Task> {
filter(
Task.STATUS,
{ value = of(TaskStatus.REQUESTED.toCoding()) },
Expand Down Expand Up @@ -235,7 +235,7 @@ constructor(
suspend fun closeResourcesRelatedToCompletedServiceRequests() {
Timber.i("Fetch completed service requests and close related resources")
defaultRepository.fhirEngine
.search<ServiceRequest> {
.batchedSearch<ServiceRequest> {
filter(
ServiceRequest.STATUS,
{ value = of(ServiceRequest.ServiceRequestStatus.COMPLETED.toCode()) },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package org.smartregister.fhircore.engine.util.extension

import ca.uhn.fhir.util.UrlUtil
import com.google.android.fhir.FhirEngine
import com.google.android.fhir.SearchResult
import com.google.android.fhir.db.ResourceNotFoundException
import com.google.android.fhir.get
import com.google.android.fhir.search.search
import com.google.android.fhir.search.Search
import com.google.android.fhir.workflow.FhirOperator
import org.hl7.fhir.r4.model.Composition
import org.hl7.fhir.r4.model.IdType
Expand All @@ -40,7 +41,7 @@ suspend inline fun <reified T : Resource> FhirEngine.loadResource(resourceId: St
}

suspend fun FhirEngine.searchCompositionByIdentifier(identifier: String): Composition? =
this.search<Composition> {
this.batchedSearch<Composition> {
filter(Composition.IDENTIFIER, { value = of(Identifier().apply { value = identifier }) })
}
.map { it.resource }
Expand All @@ -50,7 +51,9 @@ suspend fun FhirEngine.loadLibraryAtPath(fhirOperator: FhirOperator, path: Strin
// resource path could be Library/123 OR something like http://fhir.labs.common/Library/123
val library =
runCatching { get<Library>(IdType(path).idPart) }.getOrNull()
?: search<Library> { filter(Library.URL, { value = path }) }.map { it.resource }.firstOrNull()
?: batchedSearch<Library> { filter(Library.URL, { value = path }) }
.map { it.resource }
.firstOrNull()
}

suspend fun FhirEngine.loadLibraryAtPath(
Expand All @@ -72,7 +75,7 @@ suspend fun FhirEngine.loadCqlLibraryBundle(fhirOperator: FhirOperator, measureP
// resource path could be Measure/123 OR something like http://fhir.labs.common/Measure/123
val measure: Measure? =
if (UrlUtil.isValid(measurePath)) {
search<Measure> { filter(Measure.URL, { value = measurePath }) }
batchedSearch<Measure> { filter(Measure.URL, { value = measurePath }) }
.map { it.resource }
.firstOrNull()
} else {
Expand All @@ -93,3 +96,29 @@ suspend fun FhirEngine.countUnSyncedResources() =
.groupingBy { it.resourceType.spaceByUppercase() }
.eachCount()
.map { it.key to it.value }

suspend fun <R : Resource> FhirEngine.batchedSearch(search: Search) =
if (search.count != null) {
this.search<R>(search)
} else {
val result = mutableListOf<SearchResult<R>>()
var offset = search.from ?: 0
val pageCount = 100
do {
search.from = offset
search.count = pageCount
val searchResults = this.search<R>(search)
result += searchResults
offset += searchResults.size
} while (searchResults.size == pageCount)

result
Comment on lines +104 to +115
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @LZRS.

I have a few questions and comments on this implementation.

The decision to load this data in batches sounds reasonable, however, based on this implementation I believe the gain would be negligible or there would be no gain at all. Technically, the data will still be cached in memory inside the loop. This differs from the approach where we would load and process the data in batches. Data processing should happen after fetching each batch to improve the app responsiveness, otherwise, there are some drawbacks with batching including:

  1. Each batch fetch requires a new database query, introducing overhead. The overhead of executing smaller queries may outweigh the benefit of lower memory.

  2. Using OFFSET requires the database to skip over records
    before returning the batch. A larger offset would require scanning and discarding many rows thus if indexing is not done properly, this will impact fetching data in batches.

@LZRS Would it be possible to measure the performance impact introduced by this implementation for comparison?

The paging3 library loads data in batches and allows for data processing with each fetch.

cc @ndegwamartin @pld

Copy link
Contributor Author

@LZRS LZRS Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, generally, I think the performance gains would probably be negligible (in one of our cases, a difference of 20ms) especially when assuming only our query of interest would be running a single time. I think the significance of these changes would come about when we have multiple queries from different coroutines/threads.

Assuming for example we have a background long running operation accessing the db, and in the UI someone goes to a page that to render also requires a db fetch. Instead of waiting for the long running background operation to finish, the db fetch for the UI page can get queued in between the batches for the long running background. Looking to also add integration tests on the same

The changes here would ensure that no single query(in batch) would take more than a couple of milliseconds, a long running query would still take long to be loaded (as it's split and cached in memory) but it would allow space for other queries to also get run.

I also agree that the best case would be data processing after fetching each batch although that would be quite tasking to implement, I feel.

In terms of MalawiCore, it improved the app's performance from here to here.
We'll working on similar tests for FhirCore.

Use of paging3 to load would be the recommended approach but it would also need approval from the Google's sdk team that would also take time, I think

Note:

Room will only perform at most one transaction at a time, additional transactions are queued and executed on a first come, first serve order.

}

suspend inline fun <reified R : Resource> FhirEngine.batchedSearch(
init: Search.() -> Unit,
): List<SearchResult<R>> {
val search = Search(type = R::class.java.newInstance().resourceType)
search.init()
return this.batchedSearch<R>(search)
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,5 @@ suspend inline fun FhirEngine.retrievePreviouslyGeneratedMeasureReports(
search.filter(MeasureReport.MEASURE, { value = measureUrl })
subjects.forEach { search.filter(MeasureReport.SUBJECT, { value = it }) }

return this.search<MeasureReport>(search).map { it.resource }
return this.batchedSearch<MeasureReport>(search).map { it.resource }
}
Loading