Skip to content

Commit

Permalink
Allow "on changed" Flow emission for more general MutableMap
Browse files Browse the repository at this point in the history
  • Loading branch information
twyatt committed Aug 12, 2020
1 parent 7e9b8ff commit c7ee114
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 54 deletions.
3 changes: 2 additions & 1 deletion collections/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ apply from: rootProject.file('gradle/publish.gradle')

dependencies {
api deps.kotlin.stdlib
api deps.kotlin.coroutines
api deps.kotlin.coroutines.core
testImplementation deps.kotlin.coroutines.debug
testImplementation deps.kotlin.junit
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,37 @@
package com.juul.tuulbox.collections

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.filterNotNull

fun <K, V> ConcurrentMap<K, V>.withFlow() = FlowConcurrentMap(this)
fun <K, V> MutableMap<K, V>.withFlow() = FlowMutableMap(this)

/**
* Wraps a [ConcurrentMap] to provide a [Flow] of [onChanged] events. The [onChanged] events emit
* a copy of the [ConcurrentMap] at the time of the change event.
* Wraps a [MutableMap] to provide a [Flow] of [onChanged] events. The [onChanged] events emit a
* copy of the [MutableMap] at the time of the change event.
*/
class FlowConcurrentMap<K, V>(
private val map: ConcurrentMap<K, V> = ConcurrentHashMap()
) : ConcurrentMap<K, V> by map {
class FlowMutableMap<K, V>(
private val map: MutableMap<K, V>
) : MutableMap<K, V> by map {

private val _onChanged = MutableStateFlow<Map<K, V>?>(null)
val onChanged: Flow<Map<K, V>> = _onChanged.filterNotNull()

/** @see ConcurrentMap.put */
/** @see MutableMap.put */
override fun put(
key: K,
value: V
): V? = map.emitChangedAfter { put(key, value) }

/** @see ConcurrentMap.remove */
/** @see MutableMap.remove */
override fun remove(
key: K
): V? = map.emitChangedAfter { remove(key) }

/**
* Emits `onChanged` event when [remove] returns `true`.
*
* @see ConcurrentMap.remove
* @see MutableMap.remove
*/
override fun remove(
key: K,
Expand All @@ -42,18 +40,18 @@ class FlowConcurrentMap<K, V>(
if (didRemove) _onChanged.value = map.toMap()
}

/** @see ConcurrentMap.clear */
/** @see MutableMap.clear */
override fun clear() = map.emitChangedAfter { clear() }

/** @see ConcurrentMap.putAll */
/** @see MutableMap.putAll */
override fun putAll(
from: Map<out K, V>
) = map.emitChangedAfter { putAll(from) }

/**
* Emits `onChanged` event when [putIfAbsent] returns `null`.
*
* @see ConcurrentMap.putIfAbsent
* @see MutableMap.putIfAbsent
*/
override fun putIfAbsent(
key: K,
Expand All @@ -65,7 +63,7 @@ class FlowConcurrentMap<K, V>(
/**
* Emits `onChanged` event when [replace] returns `true`.
*
* @see ConcurrentMap.replace
* @see MutableMap.replace
*/
override fun replace(
key: K,
Expand All @@ -78,7 +76,7 @@ class FlowConcurrentMap<K, V>(
/**
* Emits `onChanged` event when [replace] returns non-`null`.
*
* @see ConcurrentMap.replace
* @see MutableMap.replace
*/
override fun replace(
key: K,
Expand All @@ -99,8 +97,8 @@ class FlowConcurrentMap<K, V>(
override val values: MutableCollection<V>
get() = throw UnsupportedOperationException()

private inline fun <T> ConcurrentMap<K, V>.emitChangedAfter(
action: ConcurrentMap<K, V>.() -> T
private inline fun <T> MutableMap<K, V>.emitChangedAfter(
action: MutableMap<K, V>.() -> T
): T {
val result = action.invoke(this)
_onChanged.value = toMap()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
package com.juul.tuulbox.collections.test

import com.juul.tuulbox.collections.FlowConcurrentMap
import java.util.concurrent.ConcurrentHashMap
import com.juul.tuulbox.collections.withFlow
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.debug.junit4.CoroutinesTimeout
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking
import org.junit.Rule
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertNull
import kotlin.test.assertTrue
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking

class FlowConcurrentMapTest {
class FlowMutableMapTest {

@get:Rule
val timeoutRule = CoroutinesTimeout.seconds(1)

@Test
fun `Adding produces onChanged events`() = runBlocking {
val map = FlowConcurrentMap<String, Int>()
val map = mutableMapOf<String, Int>().withFlow()
val events = Channel<Map<String, Int>>()
map.onChanged.onEach(events::send).launchIn(GlobalScope)
val job = map.onChanged.onEach(events::send).launchIn(this)

assertEquals(
expected = 0,
Expand All @@ -40,13 +44,15 @@ class FlowConcurrentMapTest {
expected = 2,
actual = map.size
)

job.cancelAndJoin()
}

@Test
fun `Replacing produces onChanged events`() = runBlocking {
val map = FlowConcurrentMap<String, Int>()
val map = mutableMapOf<String, Int>().withFlow()
val events = Channel<Map<String, Int>>()
map.onChanged.onEach(events::send).launchIn(GlobalScope)
val job = map.onChanged.onEach(events::send).launchIn(this)

map.putAll(mapOf("1" to 1, "2" to 2))
events.receiveAndAssert(mapOf("1" to 1, "2" to 2))
Expand All @@ -68,17 +74,15 @@ class FlowConcurrentMapTest {

assertTrue(map.replace("1", 100, 250))
events.receiveAndAssert(mapOf("1" to 250, "2" to -200))

job.cancelAndJoin()
}

@Test
fun `Removing produces onChanged events`() = runBlocking {
val map = FlowConcurrentMap<String, Int>(
ConcurrentHashMap<String, Int>().apply {
putAll(mapOf("A" to 123, "B" to 987))
}
)
val map = mutableMapOf("A" to 123, "B" to 987).withFlow()
val events = Channel<Map<String, Int>>()
map.onChanged.onEach(events::send).launchIn(GlobalScope)
val job = map.onChanged.onEach(events::send).launchIn(this)

assertFalse(map.remove("B", -1))
// Skip `receiveAndAssert` because the `remove` failed (didn't trigger onChanged).
Expand All @@ -88,17 +92,15 @@ class FlowConcurrentMapTest {

map.remove("A")
events.receiveAndAssert(emptyMap())

job.cancelAndJoin()
}

@Test
fun `putIfAbsent produces onChanged events`() = runBlocking {
val map = FlowConcurrentMap<String, Int>(
ConcurrentHashMap<String, Int>().apply {
putAll(mapOf("A" to 123, "B" to 987))
}
)
val map = mutableMapOf("A" to 123, "B" to 987).withFlow()
val events = Channel<Map<String, Int>>()
map.onChanged.onEach(events::send).launchIn(GlobalScope)
val job = map.onChanged.onEach(events::send).launchIn(this)

assertEquals(
expected = 987, // Previous value (indicates we didn't perform a "put").
Expand All @@ -108,43 +110,43 @@ class FlowConcurrentMapTest {

assertNull(map.putIfAbsent("C", -128))
events.receiveAndAssert(mapOf("A" to 123, "B" to 987, "C" to -128))

job.cancelAndJoin()
}

@Test
fun `Clearing produces onChanged event`() = runBlocking {
val map = FlowConcurrentMap<String, Int>(
ConcurrentHashMap<String, Int>().apply {
putAll(mapOf("A" to 123, "B" to 987))
}
)
val map = mutableMapOf("A" to 123, "B" to 987).withFlow()
val events = Channel<Map<String, Int>>()
map.onChanged.onEach(events::send).launchIn(GlobalScope)
val job = map.onChanged.onEach(events::send).launchIn(this)

map["C"] = 1337
events.receiveAndAssert(mapOf("A" to 123, "B" to 987, "C" to 1337))

map.clear()
events.receiveAndAssert(emptyMap())

job.cancelAndJoin()
}

@Test
fun `FlowConcurrentMap 'entries' throws UnsupportedOperationException`() {
assertFailsWith<UnsupportedOperationException> {
FlowConcurrentMap<String, Int>().entries
mutableMapOf<String, Int>().withFlow().entries
}
}

@Test
fun `FlowConcurrentMap 'keys' throws UnsupportedOperationException`() {
assertFailsWith<UnsupportedOperationException> {
FlowConcurrentMap<String, Int>().keys
mutableMapOf<String, Int>().withFlow().keys
}
}

@Test
fun `FlowConcurrentMap 'values' throws UnsupportedOperationException`() {
assertFailsWith<UnsupportedOperationException> {
FlowConcurrentMap<String, Int>().values
mutableMapOf<String, Int>().withFlow().values
}
}
}
3 changes: 2 additions & 1 deletion coroutines/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ apply from: rootProject.file('gradle/publish.gradle')

dependencies {
api deps.kotlin.stdlib
api deps.kotlin.coroutines
api deps.kotlin.coroutines.core
testImplementation deps.kotlin.coroutines.debug
testImplementation deps.kotlin.junit
}
5 changes: 5 additions & 0 deletions coroutines/src/test/kotlin/CombineParametersTest.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.juul.tuulbox.coroutines.flow

import kotlinx.coroutines.debug.junit4.CoroutinesTimeout
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.single
import kotlinx.coroutines.runBlocking
import org.junit.Rule

class CombineParametersTest {

@get:Rule
val timeoutRule = CoroutinesTimeout.seconds(1)

@Test
fun testSixParameters() = runBlocking {
val flow = combine(
Expand Down
5 changes: 4 additions & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
ext.deps = [
kotlin: [
stdlib: "org.jetbrains.kotlin:kotlin-stdlib",
coroutines: "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7",
coroutines: [
core: "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7",
debug: "org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.7",
],
junit: "org.jetbrains.kotlin:kotlin-test-junit",
],
]

0 comments on commit c7ee114

Please sign in to comment.