Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete flow only after cache writes have completed #5877

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,24 @@ import com.apollographql.apollo3.exception.apolloExceptionHandler
import com.apollographql.apollo3.interceptor.ApolloInterceptor
import com.apollographql.apollo3.interceptor.ApolloInterceptorChain
import com.apollographql.apollo3.mpp.currentTimeMillis
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch

internal class ApolloCacheInterceptor(
val store: ApolloStore,
) : ApolloInterceptor, ApolloStoreInterceptor {
private suspend fun <D : Operation.Data> maybeAsync(request: ApolloRequest<D>, block: suspend () -> Unit) {
private suspend fun <D : Operation.Data> maybeAsync(request: ApolloRequest<D>, block: suspend () -> Unit): Job {
if (request.writeToCacheAsynchronously) {
val scope = request.executionContext[ConcurrencyInfo]!!.coroutineScope
scope.launch {
return scope.launch {
try {
block()
} catch (e: Throwable) {
Expand All @@ -50,6 +54,7 @@ internal class ApolloCacheInterceptor(
}
} else {
block()
return CompletedJob()
}
}

Expand All @@ -61,18 +66,18 @@ internal class ApolloCacheInterceptor(
response: ApolloResponse<D>,
customScalarAdapters: CustomScalarAdapters,
extraKeys: Set<String> = emptySet(),
) {
): Job {
if (request.doNotStore) {
return
return CompletedJob()
}
if (response.data == null) {
return
return CompletedJob()
}
if (response.hasErrors() && !request.storePartialResponses) {
return
return CompletedJob()
}

maybeAsync(request) {
return maybeAsync(request) {
val cacheKeys = if (response.data != null) {
var cacheHeaders = request.cacheHeaders + response.cacheHeaders
if (request.storeReceiveDate) {
Expand Down Expand Up @@ -118,9 +123,12 @@ internal class ApolloCacheInterceptor(
chain: ApolloInterceptorChain,
): Flow<ApolloResponse<D>> {
val customScalarAdapters = request.customScalarAdapters
val cacheWrites = mutableListOf<Job>()

return chain.proceed(request).onEach {
maybeWriteToCache(request, it, customScalarAdapters)
}.onCompletion {
cacheWrites.joinAll()
}
}

Expand Down Expand Up @@ -169,8 +177,9 @@ internal class ApolloCacheInterceptor(
emptySet()
}

maybeWriteToCache(request, response, customScalarAdapters, optimisticKeys!!)
val cacheWriteJob = maybeWriteToCache(request, response, customScalarAdapters, optimisticKeys!!)
emit(response)
cacheWriteJob.join()
}

if (networkException != null) {
Expand Down Expand Up @@ -253,8 +262,10 @@ internal class ApolloCacheInterceptor(
customScalarAdapters: CustomScalarAdapters,
): Flow<ApolloResponse<D>> {
val startMillis = currentTimeMillis()
val cacheWrites = mutableListOf<Job>()

return chain.proceed(request).onEach {
maybeWriteToCache(request, it, customScalarAdapters)
cacheWrites += maybeWriteToCache(request, it, customScalarAdapters)
}.map { networkResponse ->
networkResponse.newBuilder()
.cacheInfo(
Expand All @@ -264,12 +275,16 @@ internal class ApolloCacheInterceptor(
.networkException(networkResponse.exception)
.build()
).build()
}.onCompletion {
cacheWrites.joinAll()
}
}

companion object {
private fun nowDateCacheHeaders(): CacheHeaders {
return CacheHeaders.Builder().addHeader(ApolloCacheHeaders.DATE, (currentTimeMillis() / 1000).toString()).build()
}

private fun CompletedJob() = Job().apply { complete() }
}
}
Loading