Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: listen for changes in the provider for the context changes #133

Merged
merged 15 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions Provider/src/main/java/com/spotify/confidence/Confidence.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import com.spotify.confidence.client.FlagApplierClientImpl
import com.spotify.confidence.client.SdkMetadata
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.drop
import okhttp3.OkHttpClient

class Confidence internal constructor(
Expand All @@ -23,7 +27,13 @@ class Confidence internal constructor(
private val region: ConfidenceRegion = ConfidenceRegion.GLOBAL
) : Contextual, EventSender {
private val removedKeys = mutableListOf<String>()
private var contextMap: MutableMap<String, ConfidenceValue> = mutableMapOf()
private val contextMap = MutableStateFlow(mapOf<String, ConfidenceValue>())

// only return changes not the initial value
// only return distinct value
internal val contextChanges: Flow<Map<String, ConfidenceValue>> = contextMap
.drop(1)
.distinctUntilChanged()

private val flagApplier = FlagApplierWithRetries(
client = flagApplierClient,
Expand All @@ -40,34 +50,50 @@ class Confidence internal constructor(
}

internal suspend fun resolve(flags: List<String>): Result<FlagResolution> {
return flagResolver.resolve(flags, getContext().openFeatureFlatten())
return flagResolver.resolve(flags, getContext())
}

internal fun apply(flagName: String, resolveToken: String) {
flagApplier.apply(flagName, resolveToken)
}

@Synchronized
override fun putContext(key: String, value: ConfidenceValue) {
contextMap[key] = value
val map = contextMap.value.toMutableMap()
map[key] = value
contextMap.value = map
}

@Synchronized
override fun putContext(context: Map<String, ConfidenceValue>) {
contextMap += context
val map = contextMap.value.toMutableMap()
map += context
contextMap.value = map
}

override fun setContext(context: Map<String, ConfidenceValue>) {
contextMap = context.toMutableMap()
@Synchronized
internal fun putContext(context: Map<String, ConfidenceValue>, removedKeys: List<String>) {
val map = contextMap.value.toMutableMap()
map += context
for (key in removedKeys) {
map.remove(key)
}
this.removedKeys.addAll(removedKeys)
contextMap.value = map
}

@Synchronized
override fun removeContext(key: String) {
val map = contextMap.value.toMutableMap()
map.remove(key)
removedKeys.add(key)
fabriziodemaria marked this conversation as resolved.
Show resolved Hide resolved
contextMap.remove(key)
contextMap.value = map
}

override fun getContext(): Map<String, ConfidenceValue> =
this.parent?.let {
it.getContext().filterKeys { key -> !removedKeys.contains(key) } + contextMap
} ?: contextMap
it.getContext().filterKeys { key -> !removedKeys.contains(key) } + contextMap.value
} ?: contextMap.value

override fun withContext(context: Map<String, ConfidenceValue>): Confidence = Confidence(
clientSecret,
Expand All @@ -90,18 +116,6 @@ class Confidence internal constructor(
}
}

internal fun Map<String, ConfidenceValue>.openFeatureFlatten(): Map<String, ConfidenceValue> {
val context = this.toMutableMap()
val openFeatureContext = context[OPEN_FEATURE_CONTEXT_KEY]?.let { it as ConfidenceValue.Struct }
openFeatureContext?.let {
context += it.map
}
context.remove(OPEN_FEATURE_CONTEXT_KEY)
return context
}

internal const val OPEN_FEATURE_CONTEXT_KEY = "open_feature"

object ConfidenceFactory {
fun create(
context: Context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ interface Contextual : ConfidenceContextProvider {
fun withContext(context: Map<String, ConfidenceValue>): Contextual

fun putContext(context: Map<String, ConfidenceValue>)
fun setContext(context: Map<String, ConfidenceValue>)
fun putContext(key: String, value: ConfidenceValue)
fun removeContext(key: String)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch

const val SDK_ID = "SDK_ID_KOTLIN_PROVIDER"
Expand Down Expand Up @@ -49,52 +52,57 @@ class ConfidenceFeatureProvider private constructor(
}
}

@OptIn(ExperimentalCoroutinesApi::class)
private fun startListeningForContext() {
coroutineScope.launch {
confidence.contextChanges
.suspendingSwitchMap { resolve(InitialisationStrategy.FetchAndActivate) }
.collect {}
}
}

override fun initialize(initialContext: EvaluationContext?) {
initialContext?.let {
internalInitialize(
initialContext,
initialisationStrategy
)
// refresh cache with the last stored data
storage.read().let(providerCache::refresh)
if (initialisationStrategy == InitialisationStrategy.ActivateAndFetchAsync) {
eventHandler.publish(OpenFeatureEvents.ProviderReady)
}

coroutineScope.launch(networkExceptionHandler) {
val context = initialContext.toConfidenceContext()
confidence.putContext(context.map)
fabriziodemaria marked this conversation as resolved.
Show resolved Hide resolved
resolve(initialisationStrategy)
startListeningForContext()
}
}
}

private fun internalInitialize(
initialContext: EvaluationContext,
strategy: InitialisationStrategy
) {
// refresh cache with the last stored data
storage.read().let(providerCache::refresh)
if (strategy == InitialisationStrategy.ActivateAndFetchAsync) {
eventHandler.publish(OpenFeatureEvents.ProviderReady)
}
private suspend fun resolve(strategy: InitialisationStrategy) {
try {
val resolveResponse = confidence.resolve(listOf())
fabriziodemaria marked this conversation as resolved.
Show resolved Hide resolved
if (resolveResponse is Result.Success) {
// we store the flag anyways except when the response was not modified
if (resolveResponse.data != FlagResolution.EMPTY) {
storage.store(resolveResponse.data)
}

coroutineScope.launch(networkExceptionHandler) {
confidence.putContext(OPEN_FEATURE_CONTEXT_KEY, initialContext.toConfidenceContext())
try {
val resolveResponse = confidence.resolve(listOf())
if (resolveResponse is Result.Success) {
// we store the flag anyways except when the response was not modified
if (resolveResponse.data != FlagResolution.EMPTY) {
storage.store(resolveResponse.data)
when (strategy) {
InitialisationStrategy.FetchAndActivate -> {
// refresh the cache from the stored data
providerCache.refresh(resolveResponse.data)
eventHandler.publish(OpenFeatureEvents.ProviderReady)
}

when (strategy) {
InitialisationStrategy.FetchAndActivate -> {
// refresh the cache from the stored data
providerCache.refresh(resolveResponse.data)
eventHandler.publish(OpenFeatureEvents.ProviderReady)
}

InitialisationStrategy.ActivateAndFetchAsync -> {
// do nothing
}
InitialisationStrategy.ActivateAndFetchAsync -> {
// do nothing
}
} else {
eventHandler.publish(OpenFeatureEvents.ProviderReady)
}
} catch (e: ParseError) {
throw OpenFeatureError.ParseError(e.message)
} else {
eventHandler.publish(OpenFeatureEvents.ProviderReady)
}
} catch (e: ParseError) {
throw OpenFeatureError.ParseError(e.message)
}
}

Expand All @@ -106,14 +114,9 @@ class ConfidenceFeatureProvider private constructor(
oldContext: EvaluationContext?,
newContext: EvaluationContext
) {
if (newContext != oldContext) {
// on the new context we want to fetch new values and update
// the storage & cache right away which is why we pass `InitialisationStrategy.FetchAndActivate`
internalInitialize(
newContext,
InitialisationStrategy.FetchAndActivate
)
}
val context = newContext.toConfidenceContext()
val removedKeys = oldContext?.asMap()?.keys?.minus(newContext.asMap().keys) ?: emptySet()
confidence.putContext(context.map, removedKeys.toList())
fabriziodemaria marked this conversation as resolved.
Show resolved Hide resolved
}

override fun observe(): Flow<OpenFeatureEvents> = eventHandler.observe()
Expand Down Expand Up @@ -177,7 +180,7 @@ class ConfidenceFeatureProvider private constructor(
return providerCache.get().getEvaluation(
key,
defaultValue,
confidence.getContext().openFeatureFlatten()
confidence.getContext()
) { flagName, resolveToken ->
// this lambda will be invoked inside the evaluation process
// and only if the resolve reason is not targeting key error.
Expand Down Expand Up @@ -227,6 +230,13 @@ class ConfidenceFeatureProvider private constructor(
}
}

@OptIn(ExperimentalCoroutinesApi::class)
private fun <T, U> Flow<T>.suspendingSwitchMap(function: suspend () -> U): Flow<U> = flatMapLatest {
flow {
emit(function())
}
}

internal fun Value.toConfidenceValue(): ConfidenceValue = when (this) {
is Value.Structure -> ConfidenceValue.Struct(structure.mapValues { it.value.toConfidenceValue() })
is Value.Boolean -> ConfidenceValue.Boolean(this.boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import dev.openfeature.sdk.exceptions.OpenFeatureError
import junit.framework.TestCase.assertEquals
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
Expand Down Expand Up @@ -113,11 +114,11 @@ internal class ConfidenceFeatureProviderTests {
whenever(mockContext.filesDir).thenReturn(Files.createTempDirectory("tmpTests").toFile())
}

private fun getConfidence(dispatcher: CoroutineDispatcher): Confidence = Confidence(
private fun getConfidence(dispatcher: CoroutineDispatcher, flagResolver: FlagResolver? = null): Confidence = Confidence(
clientSecret = "",
dispatcher = dispatcher,
eventSenderEngine = mock(),
flagResolver = flagResolverClient,
flagResolver = flagResolver ?: flagResolverClient,
flagApplierClient = flagApplierClient,
diskStorage = FileDiskStorage.create(mockContext),
region = ConfidenceRegion.EUROPE
Expand Down Expand Up @@ -543,6 +544,82 @@ internal class ConfidenceFeatureProviderTests {
assertNull(evalString2.errorCode)
}

@Test
fun confidenceContextRemovedWorks() = runTest {
val testDispatcher = UnconfinedTestDispatcher(testScheduler)
val mockConfidence = getConfidence(testDispatcher)
val eventHandler = EventHandler(testDispatcher)
val confidenceFeatureProvider = ConfidenceFeatureProvider.create(
context = mockContext,
eventHandler = eventHandler,
confidence = mockConfidence,
dispatcher = testDispatcher
)
val evaluationContext = ImmutableContext("foo", mapOf("hello" to Value.String("world")))
val context = evaluationContext.toConfidenceContext().map
confidenceFeatureProvider.initialize(evaluationContext)
advanceUntilIdle()
assertEquals(mockConfidence.getContext(), context)
verify(flagResolverClient, times(1)).resolve(any(), eq(context))
val newContext = ImmutableContext("foo").toConfidenceContext().map
confidenceFeatureProvider.onContextSet(evaluationContext, ImmutableContext("foo"))
advanceUntilIdle()
assertEquals(mockConfidence.getContext(), newContext)
verify(flagResolverClient, times(1)).resolve(any(), eq(newContext))
}

@Test
fun testWithSlowResolvesWeCancelTheFirstResolveOnNewContextChangesOfConfidence() = runTest {
val testDispatcher = UnconfinedTestDispatcher(testScheduler)
val flagResolver = object : FlagResolver {
var callCount = 0
var returnCount = 0
var latestCalledContext = mapOf<String, ConfidenceValue>()
override suspend fun resolve(
flags: List<String>,
context: Map<String, ConfidenceValue>
): Result<FlagResolution> {
latestCalledContext = context
if (callCount++ == 0) {
delay(2000)
}
returnCount++
return Result.Success(
FlagResolution(
context,
resolvedFlags.list,
"token1"
)
)
}
}
val mockConfidence = getConfidence(testDispatcher, flagResolver)
val eventHandler = EventHandler(testDispatcher)

val confidenceFeatureProvider = ConfidenceFeatureProvider.create(
context = mockContext,
eventHandler = eventHandler,
confidence = mockConfidence,
dispatcher = testDispatcher
)
val evaluationContext = ImmutableContext("foo", mapOf("hello" to Value.String("world")))
val context = evaluationContext.toConfidenceContext().map
confidenceFeatureProvider.initialize(evaluationContext)
advanceUntilIdle()
// reset the fake flag resolver to count only changes
flagResolver.callCount = 0
flagResolver.returnCount = 0
assertEquals(mockConfidence.getContext(), context)
assertEquals(context, flagResolver.latestCalledContext)
confidenceFeatureProvider.onContextSet(evaluationContext, ImmutableContext("foo"))
val newContext2 = ImmutableContext("foo2").toConfidenceContext().map
confidenceFeatureProvider.onContextSet(evaluationContext, ImmutableContext("foo2"))
advanceUntilIdle()
assertEquals(mockConfidence.getContext(), newContext2)
assertEquals(newContext2, flagResolver.latestCalledContext)
assertEquals(1, flagResolver.returnCount)
}

@Test
fun testStaleValueReturnValueAndStaleReason() = runTest {
val testDispatcher = UnconfinedTestDispatcher(testScheduler)
Expand Down Expand Up @@ -587,6 +664,7 @@ internal class ConfidenceFeatureProviderTests {
)
fabriziodemaria marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(newContextEval.reason, Reason.STALE.name)
assertEquals(newContextEval.value, "red")
verify(flagResolverClient, times(2)).resolve(any(), any())
}

@Test
Expand Down
Loading