Skip to content

Commit

Permalink
cache issues
Browse files Browse the repository at this point in the history
  • Loading branch information
kotlitecture committed Jul 1, 2024
1 parent d3f9334 commit 88b48b6
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BasicCacheViewModel(
launchAsync {
val cacheKey = SimpleCacheKey()
val cacheEntry = cacheSource.get(cacheKey, ::getDateAsFormattedString)
cacheEntry.changes().collectLatest(cacheState::set)
cacheEntry.getChanges().collectLatest(cacheState::set)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package shared.data.source.cache

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.update

/**
* Represents the state of a cache entry.
Expand All @@ -13,41 +14,48 @@ interface CacheEntry<T> {
/** The key associated with this cache entry. */
val key: CacheKey<T>

/**
* Sets the specified value to the given entry.
*
* @param value The value to be stored in the entry.
*/
suspend fun setValue(value: T?)

/**
* Retrieves the cached value.
*
* @return The cached value, or new one if the value is not present in the cache or expired.
*/
suspend fun value(): T?
suspend fun getValue(): T?

/**
* Retrieves the last cached value.
*
* @return The last cached value, or null if the value is not present in the cache.
*/
suspend fun last(): T?
suspend fun getLast(): T?

/**
* Retrieves a fresh copy of the cached value.
*
* @return A fresh copy of the cached value, or null if the value is not available.
*/
suspend fun fresh(): T?
suspend fun getFresh(): T?

/**
* Retrieves the last cached value if available, otherwise retrieves a fresh copy of the value.
*
* @return The last cached value if available, or a fresh copy of the value. Returns null if the value is not present in the cache.
*/
suspend fun lastOrFresh() = last() ?: fresh()
suspend fun getLastOrFresh() = getLast() ?: getFresh()

/**
* Emits the cached value whenever it changes.
* The flow updates an value in the cache based on the expiration of the key.
*
* @return A flow representing the changes to the cached value.
*/
suspend fun changes(): Flow<T>
suspend fun getChanges(): Flow<T?>

companion object {
/**
Expand All @@ -58,11 +66,14 @@ interface CacheEntry<T> {
* @return A CacheState instance representing the single cached value.
*/
fun <T> of(key: CacheKey<T>, value: T): CacheEntry<T> = object : CacheEntry<T> {
private val valueChanges = MutableStateFlow<T?>(value)

override val key: CacheKey<T> = key
override suspend fun value(): T? = value
override suspend fun last(): T? = value
override suspend fun fresh(): T? = value
override suspend fun changes(): Flow<T> = flowOf(value)
override suspend fun getValue(): T? = value
override suspend fun getLast(): T? = value
override suspend fun getFresh(): T? = value
override suspend fun getChanges(): Flow<T?> = valueChanges
override suspend fun setValue(value: T?) = valueChanges.update { value }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ interface CacheSource : DataSource {
*/
fun <K : CacheKey<*>> remove(key: K)

/**
* Associates the specified value with the specified key in the cache.
*
*@param key The cache key to associate with the value.
* @param value The value to be stored in the cache.
*/
fun <T> put(key: CacheKey<T>, value: T)

/**
* Clears all entries from the cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.retry
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.flow.updateAndGet
import kotlinx.coroutines.isActive
import kotlinx.coroutines.withContext
import kotlinx.datetime.Clock
Expand All @@ -24,26 +30,25 @@ import kotlin.reflect.KClass
* to present data without delays, but with the ability to update based on expiration and other conditions.
*
* @param changesRetryInterval The interval, in milliseconds, to retry cache changes.
* @param exceptionRetryInterval The interval, in milliseconds, to retry cache operations in case of exceptions.
* @param exceptionRetryCount The maximum number of retries for cache operations in case of exceptions.
*/
@Suppress("UNCHECKED_CAST")
open class InMemoryCacheSource(
private val changesRetryInterval: Long = 1000L,
private val exceptionRetryInterval: Long = 3000L,
private val exceptionRetryInterval: Long = 1000L,
private val exceptionRetryCount: Int = 10
) : CacheSource {

private val dispatcher = Dispatchers.Default
private val jobs = ConcurrentMutableMap<CacheKeySnapshot, Deferred<*>>()
private val cache = ConcurrentMutableMap<CacheKeySnapshot, CacheData>()

override fun <T> get(key: CacheKey<T>, valueProvider: suspend () -> T?): CacheEntry<T> =
CacheStateImpl(key, valueProvider)

override fun <T> put(key: CacheKey<T>, value: T) {
val cacheKey = CacheKeySnapshot(key)
cache[cacheKey] = CacheData(value)
private val jobs = ConcurrentMutableMap<KeyData<*>, Deferred<*>>()
private val cache = ConcurrentMutableMap<KeyData<*>, EntryData<*>>()

override fun <T> get(key: CacheKey<T>, valueProvider: suspend () -> T?): CacheEntry<T> {
val keyData = KeyData(key)
val entryData = cache.computeIfAbsent(keyData) {
EntryData(keyData, valueProvider)
} as CacheEntry<T>
return entryData
}

override fun clear() {
Expand Down Expand Up @@ -72,7 +77,7 @@ open class InMemoryCacheSource(
}

override fun <K : CacheKey<*>> invalidate(key: K) {
val cacheKey = CacheKeySnapshot(key)
val cacheKey = KeyData(key)
jobs.remove(cacheKey)?.cancel()
cache[cacheKey]?.invalidate()
}
Expand All @@ -98,110 +103,111 @@ open class InMemoryCacheSource(
}

override fun <K : CacheKey<*>> remove(key: K) {
val cacheKey = CacheKeySnapshot(key)
val cacheKey = KeyData(key)
jobs.remove(cacheKey)?.cancel()
cache.remove(cacheKey)
}

private suspend fun <T> getValue(
cacheKey: CacheKeySnapshot,
valueProvider: suspend () -> T?
): T? {
val job = jobs[cacheKey]
?.let { deferredJob ->
if (deferredJob.isCancelled) {
jobs.remove(cacheKey)?.cancel()
null
} else {
deferredJob
}
}
?: run {
if (cacheKey.key.immortal()) {
jobs.computeIfAbsent(cacheKey) {
GlobalScope.async { valueProvider() }
}
} else {
withContext(dispatcher) {
jobs.computeIfAbsent(cacheKey) {
async { valueProvider() }
}
}
}
}
job.invokeOnCompletion { jobs.remove(cacheKey)?.key }
private data class KeyData<T>(
val key: CacheKey<T>,
val type: KClass<*> = key::class
)

return job.await() as? T
}
private data class EntrySnapshot<T>(
val value: T,
val updateTime: Long = Clock.System.now().toEpochMilliseconds()
)

private data class CacheData(
val data: Any?,
val createTime: Long = Clock.System.now().toEpochMilliseconds()
) {
private inner class EntryData<T>(
private val keyData: KeyData<T>,
private val valueProvider: suspend () -> T?
) : CacheEntry<T> {

@Transient
private var invalid: Boolean = false
private var invalidated = false
private val liveChanges by lazy { fetchLiveChanges() }
private val changes = MutableStateFlow<EntrySnapshot<T>?>(null)

fun isValid(ttl: Long): Boolean = when {
invalid -> false
data == null -> false
ttl > 0 -> !isExpired(ttl)
ttl == 0L -> false
else -> true
}
override val key: CacheKey<T> = keyData.key

fun isExpired(ttl: Long): Boolean {
val now = Clock.System.now().toEpochMilliseconds()
return ttl > 0 && createTime + ttl <= now
override suspend fun setValue(value: T?) = changes.update { value?.let(::EntrySnapshot) }

override suspend fun getFresh(): T? = invalidate().run { getValue() }

override suspend fun getLast(): T? = changes.value?.value

override suspend fun getValue(): T? {
return if (!isValid(key.ttl)) {
val newValue = fetchValue()
changes.updateAndGet { newValue?.let(::EntrySnapshot) }?.value
} else {
changes.value?.value
}
}

override suspend fun getChanges(): Flow<T?> = liveChanges
.flatMapLatest { changes }
.map { snapshot -> snapshot?.value }
.retry { th -> !th.isCancellationException().also { delay(changesRetryInterval) } }

fun invalidate() {
invalid = true
invalidated = true
}
}

private inner class CacheStateImpl<T>(
override val key: CacheKey<T>,
private val valueProvider: suspend () -> T?
) : CacheEntry<T> {
private fun isValid(ttl: Long): Boolean = when {
invalidated -> false
changes.value == null -> false
ttl > 0 -> !isExpired(ttl)
ttl == 0L -> false
else -> true
}

private val cacheKey = CacheKeySnapshot(key)
private fun isExpired(ttl: Long): Boolean {
val updateTime = changes.value?.updateTime ?: return true
val now = Clock.System.now().toEpochMilliseconds()
return ttl > 0 && updateTime + ttl <= now
}

override suspend fun fresh(): T? = cache[cacheKey]?.invalidate().run { value() }
override suspend fun last(): T? = cache[cacheKey]?.data as? T
private suspend fun fetchValue(): T? {
val job = jobs[keyData]
?.let { deferredJob ->
if (deferredJob.isCancelled) {
jobs.remove(keyData)?.let { null }
} else {
deferredJob
}
}
?: run {
if (key.immortal()) {
jobs.computeIfAbsent(keyData) {
GlobalScope.async { valueProvider() }
}
} else {
withContext(dispatcher) {
jobs.computeIfAbsent(keyData) {
async { valueProvider() }
}
}
}
}
job.invokeOnCompletion { jobs.remove(keyData)?.let {} }

override suspend fun value(): T? {
val cacheKey = CacheKeySnapshot(key)
val cacheItem = cache[cacheKey]
if (cacheItem == null || !cacheItem.isValid(key.ttl)) {
val data = getValue(cacheKey, valueProvider) ?: return null
cache[cacheKey] = CacheData(data)
return data
} else {
return cacheItem.data as T?
}
return job.await() as? T
}

override suspend fun changes(): Flow<T> = flow<T> {
getLastData()?.data
?.let { data -> data as? T }
?.let { data -> emit(data) }

private fun fetchLiveChanges() = flow {
emit(true)
var retryAttempt = 0
while (currentCoroutineContext().isActive) {
try {
val prev = getLastData()
val next = value()

if (next != null) {
emit(next)
}
getValue()
val updateTime = changes.value?.updateTime

if (prev?.data != next) {
delay(key.ttl)
} else if (prev != null) {
if (updateTime == null) {
delay(changesRetryInterval)
} else {
val now = Clock.System.now().toEpochMilliseconds()
val time = key.ttl - (now - prev.createTime)
val time = key.ttl - (now - updateTime)
delay(time)
}

Expand All @@ -210,22 +216,11 @@ open class InMemoryCacheSource(
retryAttempt++
when {
retryAttempt >= exceptionRetryCount -> throw e
else -> Unit
else -> delay(exceptionRetryInterval)
}
}
}
}.distinctUntilChanged().retry { th ->
!th.isCancellationException().also {
delay(changesRetryInterval)
}
}

private fun getLastData(): CacheData? = cache[cacheKey]
}.shareIn(GlobalScope, SharingStarted.WhileSubscribed(), 1)
}

private data class CacheKeySnapshot(
val key: CacheKey<*>,
val type: KClass<*> = key::class
)

}
Loading

0 comments on commit 88b48b6

Please sign in to comment.