diff --git a/.gitignore b/.gitignore index 18f3637..92c9f86 100644 --- a/.gitignore +++ b/.gitignore @@ -96,7 +96,6 @@ publish.properties /.idea/jarRepositories.xml /.idea/misc.xml /.idea/shelf -/.idea/uiDesigner.xml # general **/.DS_Store diff --git a/chat-android/src/main/java/com/ably/chat/Messages.kt b/chat-android/src/main/java/com/ably/chat/Messages.kt index 5a72184..06ffb99 100644 --- a/chat-android/src/main/java/com/ably/chat/Messages.kt +++ b/chat-android/src/main/java/com/ably/chat/Messages.kt @@ -227,7 +227,7 @@ internal class DefaultMessages( channelStateListener = ChannelStateListener { if (!it.resumed) updateChannelSerialsAfterDiscontinuity() } - channel.on(ChannelState.attached, channelStateListener) + channel.on(channelStateListener) } override fun subscribe(listener: Messages.Listener): MessagesSubscription { @@ -247,7 +247,7 @@ internal class DefaultMessages( metadata = data.metadata, headers = pubSubMessage.extras.asJsonObject().get("headers")?.toMap() ?: mapOf(), ) - emitMessage(chatMessage) + listener.onEvent(MessageEvent(type = MessageEventType.Created, message = chatMessage)) } channel.subscribe(MessageEventType.Created.eventName, messageListener) @@ -319,10 +319,6 @@ internal class DefaultMessages( } } - private fun emitMessage(message: Message) { - listeners.keys.forEach { listener -> listener.onEvent(MessageEvent(type = MessageEventType.Created, message = message)) } - } - private fun updateChannelSerialsAfterDiscontinuity() { val deferredChannelSerial = DeferredValue() associateWithCurrentChannelSerial(deferredChannelSerial) diff --git a/chat-android/src/test/java/com/ably/chat/MessagesTest.kt b/chat-android/src/test/java/com/ably/chat/MessagesTest.kt index b922716..1917212 100644 --- a/chat-android/src/test/java/com/ably/chat/MessagesTest.kt +++ b/chat-android/src/test/java/com/ably/chat/MessagesTest.kt @@ -3,6 +3,7 @@ package com.ably.chat import com.google.gson.JsonObject import io.ably.lib.realtime.AblyRealtime.Channels import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.ChannelBase import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.ChannelStateListener import io.ably.lib.realtime.buildChannelStateChange @@ -14,6 +15,8 @@ import io.mockk.mockk import io.mockk.slot import io.mockk.spyk import io.mockk.verify +import java.lang.reflect.Field +import java.util.HashMap import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import org.junit.Assert.assertEquals @@ -30,20 +33,15 @@ class MessagesTest { private lateinit var messages: DefaultMessages private val channelStateListenerSlot = slot() - private val channelStateOnceSlot = slot() @Before fun setUp() { every { realtimeChannels.get(any(), any()) } returns realtimeChannel - every { realtimeChannel.on(ChannelState.attached, capture(channelStateListenerSlot)) } answers { + every { realtimeChannel.on(capture(channelStateListenerSlot)) } answers { println("Channel state listener registered") } - every { realtimeChannel.once(ChannelState.attached, capture(channelStateOnceSlot)) } answers { - println("Channel state once listener registered") - } - messages = DefaultMessages( roomId = "room1", realtimeChannels = realtimeChannels, @@ -185,4 +183,43 @@ class MessagesTest { assertEquals("channel-serial-2", subscription1.fromSerialProvider().await()) } + + @Test + fun `subscription should invoke once for each incoming message`() = runTest { + val listener1 = mockk(relaxed = true) + val listener2 = mockk(relaxed = true) + + messages.subscribe(listener1) + + messages.channel.channelMulticaster.onMessage(buildDummyPubSubMessage()) + + verify(exactly = 1) { listener1.onEvent(any()) } + + messages.subscribe(listener2) + + messages.channel.channelMulticaster.onMessage(buildDummyPubSubMessage()) + + verify(exactly = 2) { listener1.onEvent(any()) } + verify(exactly = 1) { listener2.onEvent(any()) } + } +} + +private val Channel.channelMulticaster: ChannelBase.MessageListener get() { + val field: Field = (ChannelBase::class.java).getDeclaredField("eventListeners") + field.isAccessible = true + val eventListeners = field.get(this) as HashMap<*, *> + return eventListeners["message.created"] as ChannelBase.MessageListener +} + +private fun buildDummyPubSubMessage() = PubSubMessage().apply { + data = JsonObject().apply { + addProperty("text", "dummy text") + } + clientId = "dummy" + timestamp = 1000L + extras = MessageExtras( + JsonObject().apply { + addProperty("timeserial", "abcdefghij@1672531200000-123") + }, + ) }