Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow "on changed" Flow emission for more general MutableMap #13

Merged
merged 3 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions collections/api/collections.api
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
public final class com/juul/tuulbox/collections/FlowConcurrentMap : java/util/concurrent/ConcurrentMap {
public fun <init> ()V
public fun <init> (Ljava/util/concurrent/ConcurrentMap;)V
public synthetic fun <init> (Ljava/util/concurrent/ConcurrentMap;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final class com/juul/tuulbox/collections/FlowMutableMap : java/util/Map, kotlin/jvm/internal/markers/KMutableMap {
public fun clear ()V
public fun containsKey (Ljava/lang/Object;)Z
public fun containsValue (Ljava/lang/Object;)Z
Expand All @@ -25,7 +22,7 @@ public final class com/juul/tuulbox/collections/FlowConcurrentMap : java/util/co
public final fun values ()Ljava/util/Collection;
}

public final class com/juul/tuulbox/collections/FlowConcurrentMapKt {
public static final fun withFlow (Ljava/util/concurrent/ConcurrentMap;)Lcom/juul/tuulbox/collections/FlowConcurrentMap;
public final class com/juul/tuulbox/collections/FlowMutableMapKt {
public static final fun withFlow (Ljava/util/Map;)Lcom/juul/tuulbox/collections/FlowMutableMap;
}

3 changes: 2 additions & 1 deletion collections/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ apply from: rootProject.file('gradle/publish.gradle')

dependencies {
api deps.kotlin.stdlib
api deps.kotlin.coroutines
api deps.kotlin.coroutines.core
testImplementation deps.kotlin.coroutines.debug
testImplementation deps.kotlin.junit
}
Original file line number Diff line number Diff line change
@@ -1,90 +1,89 @@
package com.juul.tuulbox.collections

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.filterNotNull
import java.util.Collections

fun <K, V> ConcurrentMap<K, V>.withFlow() = FlowConcurrentMap(this)
fun <K, V> MutableMap<K, V>.withFlow() = FlowMutableMap(this)

/**
* Wraps a [ConcurrentMap] to provide a [Flow] of [onChanged] events. The [onChanged] events emit
* a copy of the [ConcurrentMap] at the time of the change event.
* Wraps a [MutableMap] to provide a [Flow] of [onChanged] events. The [onChanged] events emit a
* copy of the [MutableMap] at the time of the change event.
Comment on lines +11 to +12

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be forgetting something, but what if the onChanged emitted a non-mutable copy?

If a listener wanted a mutable copy of the map, they could just make a local mutable copy, right?

I guess my question is: why emit a mutable copy of the map?

Copy link
Member Author

@twyatt twyatt Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually emitting a Map (not MutableMap):

val onChanged: Flow<Map<K, V>> = _onChanged.filterNotNull()

MutableMap is just a facade though, so unfortunately every Map is also mutable if casted to MutableMap (thanks Java). So enforcement is more at the level of just making it more difficult to modify.

...but, by copying the original MutableMap ensures that any changes done to the emitted map (from the onChanged event) via casting means developer will be modifying the copy, not the original. Thereby protecting the expected dataset in the original map.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks for the explanation. Flow<Map<K, V>> 🤦‍♂️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@twyatt if we're willing to JVM-lock, there is always Collections.unmodifiableMap(yourMap) to return a runtime-enforced read-only wrapper (not a copy, reads through) map. Because it's a Java function you can still cast it to MutableMap, but mutate attempts will at least fail at runtime.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cedrickcooke ohh, I do like that better; would presumably be more efficient than a copy.
I'll make that change.
I'm fine with the JVM-lock; as going multiplatform wouldn't be difficult and just entail a basic expect / actual function to provide the platform specific "immutable collection" for the change event.

Copy link
Member Author

@twyatt twyatt Aug 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, Collections.unmodifiableMap(yourMap) won't adhere to the desired functionality. It's just a "view" of the underlying Map.

I need the emitted Map to reflect the collection at a point in time.

I believe Collections.unmodifiableMap(yourMap) would produce different values over time if the underlying Map was changing; so doesn't properly represent an onChanged "event".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the same map is shared by all observers of the flow it might make sense to copy it and then wrap it in the unmodifiable view. I'm not sure if that's too defensive or if it's still reasonable.

Copy link
Member Author

@twyatt twyatt Aug 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the same map is shared by all observers of the flow it might make sense to copy it and then wrap it in the unmodifiable view.

Ohhh ya. Very good point.

I'm not sure if that's too defensive or if it's still reasonable.

I think it's definitely reasonable; I'd rather start too defensive and later lift restriction if needed (e.g. performance reasons or something). Almost always easier to lift restrictions than to impose them later.

*/
class FlowConcurrentMap<K, V>(
private val map: ConcurrentMap<K, V> = ConcurrentHashMap()
) : ConcurrentMap<K, V> by map {
class FlowMutableMap<K, V> internal constructor(
private val map: MutableMap<K, V>
) : MutableMap<K, V> by map {

private val _onChanged = MutableStateFlow<Map<K, V>?>(null)
val onChanged: Flow<Map<K, V>> = _onChanged.filterNotNull()

/** @see ConcurrentMap.put */
/** @see MutableMap.put */
override fun put(
key: K,
value: V
): V? = map.emitChangedAfter { put(key, value) }

/** @see ConcurrentMap.remove */
/** @see MutableMap.remove */
override fun remove(
key: K
): V? = map.emitChangedAfter { remove(key) }

/**
* Emits `onChanged` event when [remove] returns `true`.
*
* @see ConcurrentMap.remove
* @see MutableMap.remove
*/
override fun remove(
key: K,
value: V
): Boolean = map.remove(key, value).also { didRemove ->
if (didRemove) _onChanged.value = map.toMap()
if (didRemove) _onChanged.value = map.unmodifiableCopy()
}

/** @see ConcurrentMap.clear */
/** @see MutableMap.clear */
override fun clear() = map.emitChangedAfter { clear() }

/** @see ConcurrentMap.putAll */
/** @see MutableMap.putAll */
override fun putAll(
from: Map<out K, V>
) = map.emitChangedAfter { putAll(from) }

/**
* Emits `onChanged` event when [putIfAbsent] returns `null`.
*
* @see ConcurrentMap.putIfAbsent
* @see MutableMap.putIfAbsent
*/
override fun putIfAbsent(
key: K,
value: V
): V? = map.putIfAbsent(key, value).also { previousValue ->
if (previousValue == null) _onChanged.value = map.toMap()
if (previousValue == null) _onChanged.value = map.unmodifiableCopy()
}

/**
* Emits `onChanged` event when [replace] returns `true`.
*
* @see ConcurrentMap.replace
* @see MutableMap.replace
*/
override fun replace(
key: K,
oldValue: V,
newValue: V
): Boolean = map.replace(key, oldValue, newValue).also { didReplace ->
if (didReplace) _onChanged.value = map.toMap()
if (didReplace) _onChanged.value = map.unmodifiableCopy()
}

/**
* Emits `onChanged` event when [replace] returns non-`null`.
*
* @see ConcurrentMap.replace
* @see MutableMap.replace
*/
override fun replace(
key: K,
value: V
): V? = map.replace(key, value).also { previousValue ->
if (previousValue != null) _onChanged.value = map.toMap()
if (previousValue != null) _onChanged.value = map.unmodifiableCopy()
}

/** @throws UnsupportedOperationException */
Expand All @@ -99,11 +98,14 @@ class FlowConcurrentMap<K, V>(
override val values: MutableCollection<V>
get() = throw UnsupportedOperationException()

private inline fun <T> ConcurrentMap<K, V>.emitChangedAfter(
action: ConcurrentMap<K, V>.() -> T
private inline fun <T> MutableMap<K, V>.emitChangedAfter(
action: MutableMap<K, V>.() -> T
): T {
val result = action.invoke(this)
_onChanged.value = toMap()
_onChanged.value = unmodifiableCopy()
return result
}
}

private fun <K, V> MutableMap<K, V>.unmodifiableCopy() =
Collections.unmodifiableMap(toMap())
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
package com.juul.tuulbox.collections.test

import com.juul.tuulbox.collections.FlowConcurrentMap
import java.util.concurrent.ConcurrentHashMap
import com.juul.tuulbox.collections.withFlow
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertNull
import kotlin.test.assertTrue
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.debug.junit4.CoroutinesTimeout
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking
import org.junit.Rule

class FlowConcurrentMapTest {
class FlowMutableMapTest {

@get:Rule
val timeoutRule = CoroutinesTimeout.seconds(1)

@Test
fun `Adding produces onChanged events`() = runBlocking {
val map = FlowConcurrentMap<String, Int>()
val map = mutableMapOf<String, Int>().withFlow()
val events = Channel<Map<String, Int>>()
map.onChanged.onEach(events::send).launchIn(GlobalScope)
val job = map.onChanged.onEach(events::send).launchIn(this)

assertEquals(
expected = 0,
Expand All @@ -40,13 +44,15 @@ class FlowConcurrentMapTest {
expected = 2,
actual = map.size
)

job.cancelAndJoin()
}

@Test
fun `Replacing produces onChanged events`() = runBlocking {
val map = FlowConcurrentMap<String, Int>()
val map = mutableMapOf<String, Int>().withFlow()
val events = Channel<Map<String, Int>>()
map.onChanged.onEach(events::send).launchIn(GlobalScope)
val job = map.onChanged.onEach(events::send).launchIn(this)

map.putAll(mapOf("1" to 1, "2" to 2))
events.receiveAndAssert(mapOf("1" to 1, "2" to 2))
Expand All @@ -68,17 +74,15 @@ class FlowConcurrentMapTest {

assertTrue(map.replace("1", 100, 250))
events.receiveAndAssert(mapOf("1" to 250, "2" to -200))

job.cancelAndJoin()
}

@Test
fun `Removing produces onChanged events`() = runBlocking {
val map = FlowConcurrentMap<String, Int>(
ConcurrentHashMap<String, Int>().apply {
putAll(mapOf("A" to 123, "B" to 987))
}
)
val map = mutableMapOf("A" to 123, "B" to 987).withFlow()
val events = Channel<Map<String, Int>>()
map.onChanged.onEach(events::send).launchIn(GlobalScope)
val job = map.onChanged.onEach(events::send).launchIn(this)

assertFalse(map.remove("B", -1))
// Skip `receiveAndAssert` because the `remove` failed (didn't trigger onChanged).
Expand All @@ -88,17 +92,15 @@ class FlowConcurrentMapTest {

map.remove("A")
events.receiveAndAssert(emptyMap())

job.cancelAndJoin()
}

@Test
fun `putIfAbsent produces onChanged events`() = runBlocking {
val map = FlowConcurrentMap<String, Int>(
ConcurrentHashMap<String, Int>().apply {
putAll(mapOf("A" to 123, "B" to 987))
}
)
val map = mutableMapOf("A" to 123, "B" to 987).withFlow()
val events = Channel<Map<String, Int>>()
map.onChanged.onEach(events::send).launchIn(GlobalScope)
val job = map.onChanged.onEach(events::send).launchIn(this)

assertEquals(
expected = 987, // Previous value (indicates we didn't perform a "put").
Expand All @@ -108,43 +110,43 @@ class FlowConcurrentMapTest {

assertNull(map.putIfAbsent("C", -128))
events.receiveAndAssert(mapOf("A" to 123, "B" to 987, "C" to -128))

job.cancelAndJoin()
}

@Test
fun `Clearing produces onChanged event`() = runBlocking {
val map = FlowConcurrentMap<String, Int>(
ConcurrentHashMap<String, Int>().apply {
putAll(mapOf("A" to 123, "B" to 987))
}
)
val map = mutableMapOf("A" to 123, "B" to 987).withFlow()
val events = Channel<Map<String, Int>>()
map.onChanged.onEach(events::send).launchIn(GlobalScope)
val job = map.onChanged.onEach(events::send).launchIn(this)

map["C"] = 1337
events.receiveAndAssert(mapOf("A" to 123, "B" to 987, "C" to 1337))

map.clear()
events.receiveAndAssert(emptyMap())

job.cancelAndJoin()
}

@Test
fun `FlowConcurrentMap 'entries' throws UnsupportedOperationException`() {
assertFailsWith<UnsupportedOperationException> {
FlowConcurrentMap<String, Int>().entries
mutableMapOf<String, Int>().withFlow().entries
}
}

@Test
fun `FlowConcurrentMap 'keys' throws UnsupportedOperationException`() {
assertFailsWith<UnsupportedOperationException> {
FlowConcurrentMap<String, Int>().keys
mutableMapOf<String, Int>().withFlow().keys
}
}

@Test
fun `FlowConcurrentMap 'values' throws UnsupportedOperationException`() {
assertFailsWith<UnsupportedOperationException> {
FlowConcurrentMap<String, Int>().values
mutableMapOf<String, Int>().withFlow().values
}
}
}
3 changes: 2 additions & 1 deletion coroutines/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ apply from: rootProject.file('gradle/publish.gradle')

dependencies {
api deps.kotlin.stdlib
api deps.kotlin.coroutines
api deps.kotlin.coroutines.core
testImplementation deps.kotlin.coroutines.debug
testImplementation deps.kotlin.junit
}
5 changes: 5 additions & 0 deletions coroutines/src/test/kotlin/CombineParametersTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package com.juul.tuulbox.coroutines.flow

import kotlin.test.Test
import kotlin.test.assertEquals
import kotlinx.coroutines.debug.junit4.CoroutinesTimeout
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.single
import kotlinx.coroutines.runBlocking
import org.junit.Rule

class CombineParametersTest {

@get:Rule
val timeoutRule = CoroutinesTimeout.seconds(1)

@Test
fun testSixParameters() = runBlocking {
val flow = combine(
Expand Down
5 changes: 4 additions & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
ext.deps = [
kotlin: [
stdlib: "org.jetbrains.kotlin:kotlin-stdlib",
coroutines: "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7",
coroutines: [
core: "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7",
debug: "org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.7",
],
junit: "org.jetbrains.kotlin:kotlin-test-junit",
],
]