From 14b1cfc581082507f0de7fdac56f3f26c1353bb9 Mon Sep 17 00:00:00 2001 From: evgeny Date: Mon, 9 Sep 2024 12:30:51 +0100 Subject: [PATCH] [ECO-4943] feat: basic sending and receiving messages in the Chat In this commit, we introduce the basic implementation for sending and receiving messages. Corner cases and potential race conditions are not the focus at this stage and will be addressed later, once Room lifecycle management is implemented and a unified test suite is set up. --- .../src/main/java/com/ably/chat/ChatApi.kt | 87 +------ .../src/main/java/com/ably/chat/ChatClient.kt | 3 +- .../src/main/java/com/ably/chat/JsonUtils.kt | 84 +++++++ .../src/main/java/com/ably/chat/Messages.kt | 162 ++++++++++++- .../src/main/java/com/ably/chat/Room.kt | 10 +- .../main/java/com/ably/chat/RoomOptions.kt | 8 +- .../src/main/java/com/ably/chat/Rooms.kt | 7 +- .../src/main/java/com/ably/chat/Utils.kt | 59 ++++- .../test/java/com/ably/chat/ChatApiTest.kt | 131 ++++------ .../test/java/com/ably/chat/MessagesTest.kt | 225 ++++++++++++++++++ .../src/test/java/com/ably/chat/TestUtils.kt | 51 ++++ .../io/ably/lib/realtime/RealtimeUtils.kt | 28 +++ .../com/ably/chat/example/MainActivity.kt | 80 +++++-- 13 files changed, 729 insertions(+), 206 deletions(-) create mode 100644 chat-android/src/main/java/com/ably/chat/JsonUtils.kt create mode 100644 chat-android/src/test/java/com/ably/chat/MessagesTest.kt create mode 100644 chat-android/src/test/java/com/ably/chat/TestUtils.kt create mode 100644 chat-android/src/test/java/io/ably/lib/realtime/RealtimeUtils.kt diff --git a/chat-android/src/main/java/com/ably/chat/ChatApi.kt b/chat-android/src/main/java/com/ably/chat/ChatApi.kt index cbd9733..5da7f85 100644 --- a/chat-android/src/main/java/com/ably/chat/ChatApi.kt +++ b/chat-android/src/main/java/com/ably/chat/ChatApi.kt @@ -2,9 +2,6 @@ package com.ably.chat import com.google.gson.JsonElement import com.google.gson.JsonObject -import com.google.gson.JsonPrimitive -import io.ably.lib.http.HttpCore -import io.ably.lib.http.HttpUtils import io.ably.lib.types.AblyException import io.ably.lib.types.AsyncHttpPaginatedResponse import io.ably.lib.types.ErrorInfo @@ -17,19 +14,20 @@ private const val API_PROTOCOL_VERSION = 3 private const val PROTOCOL_VERSION_PARAM_NAME = "v" private val apiProtocolParam = Param(PROTOCOL_VERSION_PARAM_NAME, API_PROTOCOL_VERSION.toString()) -// TODO make this class internal -class ChatApi(private val realtimeClient: RealtimeClient, private val clientId: String) { +internal class ChatApi(private val realtimeClient: RealtimeClient, private val clientId: String) { /** * Get messages from the Chat Backend * * @return paginated result with messages */ - suspend fun getMessages(roomId: String, params: QueryOptions): PaginatedResult { + suspend fun getMessages(roomId: String, options: QueryOptions, fromSerial: String? = null): PaginatedResult { + val baseParams = options.toParams() + val params = fromSerial?.let { baseParams + Param("fromSerial", it) } ?: baseParams return makeAuthorizedPaginatedRequest( url = "/chat/v1/rooms/$roomId/messages", method = "GET", - params = params.toParams(), + params = params, ) { Message( timeserial = it.requireString("timeserial"), @@ -137,17 +135,6 @@ class ChatApi(private val realtimeClient: RealtimeClient, private val clientId: } } -private fun JsonElement?.toRequestBody(useBinaryProtocol: Boolean = false): HttpCore.RequestBody = - HttpUtils.requestBodyFromGson(this, useBinaryProtocol) - -private fun Map.toJson() = JsonObject().apply { - forEach { (key, value) -> addProperty(key, value) } -} - -private fun JsonElement.toMap() = buildMap { - requireJsonObject().entrySet().filter { (_, value) -> value.isJsonPrimitive }.forEach { (key, value) -> put(key, value.asString) } -} - private fun QueryOptions.toParams() = buildList { start?.let { add(Param("start", it)) } end?.let { add(Param("end", it)) } @@ -162,67 +149,3 @@ private fun QueryOptions.toParams() = buildList { ), ) } - -private fun JsonElement.requireJsonObject(): JsonObject { - if (!isJsonObject) { - throw AblyException.fromErrorInfo( - ErrorInfo("Response value expected to be JsonObject, got primitive instead", HttpStatusCodes.InternalServerError), - ) - } - return asJsonObject -} - -private fun JsonElement.requireString(memberName: String): String { - val memberElement = requireField(memberName) - if (!memberElement.isJsonPrimitive) { - throw AblyException.fromErrorInfo( - ErrorInfo( - "Value for \"$memberName\" field expected to be JsonPrimitive, got object instead", - HttpStatusCodes.InternalServerError, - ), - ) - } - return memberElement.asString -} - -private fun JsonElement.requireLong(memberName: String): Long { - val memberElement = requireJsonPrimitive(memberName) - try { - return memberElement.asLong - } catch (formatException: NumberFormatException) { - throw AblyException.fromErrorInfo( - formatException, - ErrorInfo("Required numeric field \"$memberName\" is not a valid long", HttpStatusCodes.InternalServerError), - ) - } -} - -private fun JsonElement.requireInt(memberName: String): Int { - val memberElement = requireJsonPrimitive(memberName) - try { - return memberElement.asInt - } catch (formatException: NumberFormatException) { - throw AblyException.fromErrorInfo( - formatException, - ErrorInfo("Required numeric field \"$memberName\" is not a valid int", HttpStatusCodes.InternalServerError), - ) - } -} - -private fun JsonElement.requireJsonPrimitive(memberName: String): JsonPrimitive { - val memberElement = requireField(memberName) - if (!memberElement.isJsonPrimitive) { - throw AblyException.fromErrorInfo( - ErrorInfo( - "Value for \"$memberName\" field expected to be JsonPrimitive, got object instead", - HttpStatusCodes.InternalServerError, - ), - ) - } - return memberElement.asJsonPrimitive -} - -private fun JsonElement.requireField(memberName: String): JsonElement = requireJsonObject().get(memberName) - ?: throw AblyException.fromErrorInfo( - ErrorInfo("Required field \"$memberName\" is missing", HttpStatusCodes.InternalServerError), - ) diff --git a/chat-android/src/main/java/com/ably/chat/ChatClient.kt b/chat-android/src/main/java/com/ably/chat/ChatClient.kt index b621db9..e933666 100644 --- a/chat-android/src/main/java/com/ably/chat/ChatClient.kt +++ b/chat-android/src/main/java/com/ably/chat/ChatClient.kt @@ -37,7 +37,8 @@ interface ChatClient { val clientOptions: ClientOptions } -fun ChatClient(realtimeClient: RealtimeClient, clientOptions: ClientOptions): ChatClient = DefaultChatClient(realtimeClient, clientOptions) +fun ChatClient(realtimeClient: RealtimeClient, clientOptions: ClientOptions = ClientOptions()): ChatClient = + DefaultChatClient(realtimeClient, clientOptions) internal class DefaultChatClient( override val realtime: RealtimeClient, diff --git a/chat-android/src/main/java/com/ably/chat/JsonUtils.kt b/chat-android/src/main/java/com/ably/chat/JsonUtils.kt new file mode 100644 index 0000000..2e119b8 --- /dev/null +++ b/chat-android/src/main/java/com/ably/chat/JsonUtils.kt @@ -0,0 +1,84 @@ +package com.ably.chat + +import com.google.gson.JsonElement +import com.google.gson.JsonObject +import com.google.gson.JsonPrimitive +import io.ably.lib.http.HttpCore +import io.ably.lib.http.HttpUtils +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo + +internal fun JsonElement?.toRequestBody(useBinaryProtocol: Boolean = false): HttpCore.RequestBody = + HttpUtils.requestBodyFromGson(this, useBinaryProtocol) + +internal fun Map.toJson() = JsonObject().apply { + forEach { (key, value) -> addProperty(key, value) } +} + +internal fun JsonElement.toMap() = buildMap { + requireJsonObject().entrySet().filter { (_, value) -> value.isJsonPrimitive }.forEach { (key, value) -> put(key, value.asString) } +} + +internal fun JsonElement.requireJsonObject(): JsonObject { + if (!isJsonObject) { + throw AblyException.fromErrorInfo( + ErrorInfo("Response value expected to be JsonObject, got primitive instead", HttpStatusCodes.InternalServerError), + ) + } + return asJsonObject +} + +internal fun JsonElement.requireString(memberName: String): String { + val memberElement = requireField(memberName) + if (!memberElement.isJsonPrimitive) { + throw AblyException.fromErrorInfo( + ErrorInfo( + "Value for \"$memberName\" field expected to be JsonPrimitive, got object instead", + HttpStatusCodes.InternalServerError, + ), + ) + } + return memberElement.asString +} + +internal fun JsonElement.requireLong(memberName: String): Long { + val memberElement = requireJsonPrimitive(memberName) + try { + return memberElement.asLong + } catch (formatException: NumberFormatException) { + throw AblyException.fromErrorInfo( + formatException, + ErrorInfo("Required numeric field \"$memberName\" is not a valid long", HttpStatusCodes.InternalServerError), + ) + } +} + +internal fun JsonElement.requireInt(memberName: String): Int { + val memberElement = requireJsonPrimitive(memberName) + try { + return memberElement.asInt + } catch (formatException: NumberFormatException) { + throw AblyException.fromErrorInfo( + formatException, + ErrorInfo("Required numeric field \"$memberName\" is not a valid int", HttpStatusCodes.InternalServerError), + ) + } +} + +internal fun JsonElement.requireJsonPrimitive(memberName: String): JsonPrimitive { + val memberElement = requireField(memberName) + if (!memberElement.isJsonPrimitive) { + throw AblyException.fromErrorInfo( + ErrorInfo( + "Value for \"$memberName\" field expected to be JsonPrimitive, got object instead", + HttpStatusCodes.InternalServerError, + ), + ) + } + return memberElement.asJsonPrimitive +} + +internal fun JsonElement.requireField(memberName: String): JsonElement = requireJsonObject().get(memberName) + ?: throw AblyException.fromErrorInfo( + ErrorInfo("Required field \"$memberName\" is missing", HttpStatusCodes.InternalServerError), + ) diff --git a/chat-android/src/main/java/com/ably/chat/Messages.kt b/chat-android/src/main/java/com/ably/chat/Messages.kt index eb9600f..3790c5c 100644 --- a/chat-android/src/main/java/com/ably/chat/Messages.kt +++ b/chat-android/src/main/java/com/ably/chat/Messages.kt @@ -2,7 +2,17 @@ package com.ably.chat +import com.ably.chat.QueryOptions.MessageOrder.NewestFirst +import com.google.gson.JsonObject +import io.ably.lib.realtime.AblyRealtime import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.ChannelState +import io.ably.lib.realtime.ChannelStateListener +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo + +typealias PubSubMessageListener = Channel.MessageListener +typealias PubSubMessage = io.ably.lib.types.Message /** * This interface is used to interact with messages in a chat room: subscribing @@ -91,7 +101,7 @@ data class QueryOptions( /** * The order of messages in the query result. */ - val orderBy: MessageOrder = MessageOrder.NewestFirst, + val orderBy: MessageOrder = NewestFirst, ) { /** * Represents direction to query messages in. @@ -170,25 +180,96 @@ data class SendMessageParams( ) interface MessagesSubscription : Subscription { - suspend fun getPreviousMessages(queryOptions: QueryOptions): PaginatedResult + suspend fun getPreviousMessages(start: Long? = null, end: Long? = null, limit: Int = 100): PaginatedResult +} + +internal class DefaultMessagesSubscription( + private val chatApi: ChatApi, + private val roomId: String, + private val subscription: Subscription, + internal val fromSerialProvider: () -> DeferredValue, +) : MessagesSubscription { + override fun unsubscribe() { + subscription.unsubscribe() + } + + override suspend fun getPreviousMessages(start: Long?, end: Long?, limit: Int): PaginatedResult { + val fromSerial = fromSerialProvider().await() + val queryOptions = QueryOptions(start = start, end = end, limit = limit, orderBy = NewestFirst) + return chatApi.getMessages( + roomId = roomId, + options = queryOptions, + fromSerial = fromSerial, + ) + } } -class DefaultMessages( +internal class DefaultMessages( private val roomId: String, - private val realtimeClient: RealtimeClient, + realtimeChannels: AblyRealtime.Channels, private val chatApi: ChatApi, ) : Messages { + private var listeners: Map> = emptyMap() + + private var channelStateListener: ChannelStateListener + + private var lock = Any() + /** * the channel name for the chat messages channel. */ private val messagesChannelName = "$roomId::\$chat::\$chatMessages" - override val channel: Channel - get() = realtimeClient.channels.get(messagesChannelName, ChatChannelOptions()) + override val channel: Channel = realtimeChannels.get(messagesChannelName, ChatChannelOptions()) + + init { + channelStateListener = ChannelStateListener { + if (!it.resumed) updateChannelSerialsAfterDiscontinuity() + } + channel.on(channelStateListener) + } override fun subscribe(listener: Messages.Listener): MessagesSubscription { - TODO("Not yet implemented") + val deferredChannelSerial = DeferredValue() + addListener(listener, deferredChannelSerial) + val messageListener = PubSubMessageListener { + val pubSubMessage = it ?: throw AblyException.fromErrorInfo( + ErrorInfo("Got empty pubsub channel message", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), + ) + val data = parsePubSubMessageData(pubSubMessage.data) + val chatMessage = Message( + roomId = roomId, + createdAt = pubSubMessage.timestamp, + clientId = pubSubMessage.clientId, + timeserial = pubSubMessage.extras.asJsonObject().requireString("timeserial"), + text = data.text, + metadata = data.metadata, + headers = pubSubMessage.extras.asJsonObject().get("headers")?.toMap() ?: mapOf(), + ) + listener.onEvent(MessageEvent(type = MessageEventType.Created, message = chatMessage)) + } + + channel.subscribe(MessageEventType.Created.eventName, messageListener) + associateWithCurrentChannelSerial(deferredChannelSerial) + + return DefaultMessagesSubscription( + chatApi = chatApi, + roomId = roomId, + subscription = { + removeListener(listener) + channel.unsubscribe(MessageEventType.Created.eventName, messageListener) + }, + fromSerialProvider = { + listeners[listener] ?: throw AblyException.fromErrorInfo( + ErrorInfo( + "This messages subscription instance was already unsubscribed", + HttpStatusCodes.BadRequest, + ErrorCodes.BadRequest, + ), + ) + }, + ) } override suspend fun get(options: QueryOptions): PaginatedResult = chatApi.getMessages(roomId, options) @@ -198,4 +279,71 @@ class DefaultMessages( override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription { TODO("Not yet implemented") } + + fun release() { + channel.off(channelStateListener) + } + + /** + * Associate deferred channel serial value with the current channel's serial + * + * WARN: it not deterministic because of race condition, + * this can lead to duplicated messages in `getPreviousMessages` calls + */ + private fun associateWithCurrentChannelSerial(channelSerialProvider: DeferredValue) { + if (channel.state === ChannelState.attached) { + channelSerialProvider.completeWith(requireChannelSerial()) + } + + channel.once(ChannelState.attached) { + channelSerialProvider.completeWith(requireChannelSerial()) + } + } + + private fun requireChannelSerial(): String { + return channel.properties.channelSerial + ?: throw AblyException.fromErrorInfo( + ErrorInfo("Channel has been attached, but channelSerial is not defined", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), + ) + } + + private fun addListener(listener: Messages.Listener, deferredChannelSerial: DeferredValue) { + synchronized(lock) { + listeners += listener to deferredChannelSerial + } + } + + private fun removeListener(listener: Messages.Listener) { + synchronized(lock) { + listeners -= listener + } + } + + private fun updateChannelSerialsAfterDiscontinuity() { + val deferredChannelSerial = DeferredValue() + associateWithCurrentChannelSerial(deferredChannelSerial) + + synchronized(lock) { + listeners = listeners.mapValues { + if (it.value.completed) deferredChannelSerial else it.value + } + } + } +} + +/** + * Parsed data from the Pub/Sub channel's message data field + */ +private data class PubSubMessageData(val text: String, val metadata: MessageMetadata) + +private fun parsePubSubMessageData(data: Any): PubSubMessageData { + if (data !is JsonObject) { + throw AblyException.fromErrorInfo( + ErrorInfo("Unrecognized Pub/Sub channel's message for `Message.created` event", HttpStatusCodes.InternalServerError), + ) + } + return PubSubMessageData( + text = data.requireString("text"), + metadata = data.get("metadata")?.toMap() ?: mapOf(), + ) } diff --git a/chat-android/src/main/java/com/ably/chat/Room.kt b/chat-android/src/main/java/com/ably/chat/Room.kt index 8550eb7..84cecaf 100644 --- a/chat-android/src/main/java/com/ably/chat/Room.kt +++ b/chat-android/src/main/java/com/ably/chat/Room.kt @@ -90,12 +90,14 @@ internal class DefaultRoom( chatApi: ChatApi, ) : Room { - override val messages: Messages = DefaultMessages( + private val _messages = DefaultMessages( roomId = roomId, - realtimeClient = realtimeClient, + realtimeChannels = realtimeClient.channels, chatApi = chatApi, ) + override val messages: Messages = _messages + override val presence: Presence = DefaultPresence( messages = messages, ) @@ -130,4 +132,8 @@ internal class DefaultRoom( typing.channel.detachCoroutine() reactions.channel.detachCoroutine() } + + fun release() { + _messages.release() + } } diff --git a/chat-android/src/main/java/com/ably/chat/RoomOptions.kt b/chat-android/src/main/java/com/ably/chat/RoomOptions.kt index 2263d93..b6d9d96 100644 --- a/chat-android/src/main/java/com/ably/chat/RoomOptions.kt +++ b/chat-android/src/main/java/com/ably/chat/RoomOptions.kt @@ -21,13 +21,13 @@ data class RoomOptions( * The reactions options for the room. To enable reactions in the room, set this property. You may use * {@link RoomOptionsDefaults.reactions} to enable reactions with default options. */ - val reactions: RoomReactionsOptions = RoomReactionsOptions(), + val reactions: RoomReactionsOptions = RoomReactionsOptions, /** * The occupancy options for the room. To enable occupancy in the room, set this property. You may use * {@link RoomOptionsDefaults.occupancy} to enable occupancy with default options. */ - val occupancy: OccupancyOptions = OccupancyOptions(), + val occupancy: OccupancyOptions = OccupancyOptions, ) /** @@ -66,9 +66,9 @@ data class TypingOptions( /** * Represents the reactions options for a chat room. */ -class RoomReactionsOptions +object RoomReactionsOptions /** * Represents the occupancy options for a chat room. */ -class OccupancyOptions +object OccupancyOptions diff --git a/chat-android/src/main/java/com/ably/chat/Rooms.kt b/chat-android/src/main/java/com/ably/chat/Rooms.kt index 44d917d..31c0c49 100644 --- a/chat-android/src/main/java/com/ably/chat/Rooms.kt +++ b/chat-android/src/main/java/com/ably/chat/Rooms.kt @@ -24,7 +24,7 @@ interface Rooms { * @throws {@link ErrorInfo} if a room with the same ID but different options already exists. * @returns Room A new or existing Room object. */ - fun get(roomId: String, options: RoomOptions): Room + fun get(roomId: String, options: RoomOptions = RoomOptions()): Room /** * Release the Room object if it exists. This method only releases the reference @@ -47,7 +47,7 @@ internal class DefaultRooms( private val chatApi: ChatApi, override val clientOptions: ClientOptions, ) : Rooms { - private val roomIdToRoom: MutableMap = mutableMapOf() + private val roomIdToRoom: MutableMap = mutableMapOf() override fun get(roomId: String, options: RoomOptions): Room { return synchronized(this) { @@ -72,7 +72,8 @@ internal class DefaultRooms( override suspend fun release(roomId: String) { synchronized(this) { - roomIdToRoom.remove(roomId) + val room = roomIdToRoom.remove(roomId) + room?.release() } } } diff --git a/chat-android/src/main/java/com/ably/chat/Utils.kt b/chat-android/src/main/java/com/ably/chat/Utils.kt index 531a4eb..66d9451 100644 --- a/chat-android/src/main/java/com/ably/chat/Utils.kt +++ b/chat-android/src/main/java/com/ably/chat/Utils.kt @@ -39,8 +39,65 @@ suspend fun Channel.detachCoroutine() = suspendCoroutine { continuation -> fun ChatChannelOptions(init: (ChannelOptions.() -> Unit)? = null): ChannelOptions { val options = ChannelOptions() init?.let { options.it() } - options.params = options.params + mapOf( + options.params = (options.params ?: mapOf()) + mapOf( AGENT_PARAMETER_NAME to "chat-kotlin/${BuildConfig.APP_VERSION}", ) return options } + +/** + * A value that can be evaluated at a later time, similar to `kotlinx.coroutines.Deferred` or a JavaScript Promise. + * + * This class provides a thread-safe, simple blocking implementation. It is not designed for use in scenarios with + * heavy concurrency. + * + * @param T the type of the value that will be evaluated. + */ +internal class DeferredValue { + + private var value: T? = null + + private val lock = Any() + + private var observers: Set<(T) -> Unit> = setOf() + + private var _completed = false + + /** + * `true` if value has been set, `false` otherwise + */ + val completed get() = _completed + + /** + * Set value and mark DeferredValue completed, should be invoked only once + * + * @throws IllegalStateException if it's already `completed` + */ + fun completeWith(completionValue: T) { + synchronized(lock) { + check(!_completed) { "DeferredValue has already been completed" } + value = completionValue + _completed = true + observers.forEach { it(completionValue) } + observers = setOf() + } + } + + /** + * Wait until value is completed + * + * @return completed value + */ + suspend fun await(): T { + val result = suspendCoroutine { continuation -> + synchronized(lock) { + if (_completed) continuation.resume(value!!) + val observer: (T) -> Unit = { + continuation.resume(it) + } + observers += observer + } + } + return result + } +} diff --git a/chat-android/src/test/java/com/ably/chat/ChatApiTest.kt b/chat-android/src/test/java/com/ably/chat/ChatApiTest.kt index 29e8069..f3f2eb1 100644 --- a/chat-android/src/test/java/com/ably/chat/ChatApiTest.kt +++ b/chat-android/src/test/java/com/ably/chat/ChatApiTest.kt @@ -1,10 +1,7 @@ package com.ably.chat -import com.google.gson.JsonElement import com.google.gson.JsonObject import io.ably.lib.types.AblyException -import io.ably.lib.types.AsyncHttpPaginatedResponse -import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest @@ -20,25 +17,19 @@ class ChatApiTest { @Test fun `getMessages should ignore unknown fields for Chat Backend`() = runTest { - every { - realtime.requestAsync("GET", "/chat/v1/rooms/roomId/messages", any(), any(), any(), any()) - } answers { - val callback = lastArg() - callback.onResponse( - buildAsyncHttpPaginatedResponse( - listOf( - JsonObject().apply { - addProperty("foo", "bar") - addProperty("timeserial", "timeserial") - addProperty("roomId", "roomId") - addProperty("clientId", "clientId") - addProperty("text", "hello") - addProperty("createdAt", 1_000_000) - }, - ), - ), - ) - } + mockMessagesApiResponse( + realtime, + listOf( + JsonObject().apply { + addProperty("foo", "bar") + addProperty("timeserial", "timeserial") + addProperty("roomId", "roomId") + addProperty("clientId", "clientId") + addProperty("text", "hello") + addProperty("createdAt", 1_000_000) + }, + ), + ) val messages = chatApi.getMessages("roomId", QueryOptions()) @@ -60,20 +51,14 @@ class ChatApiTest { @Test fun `getMessages should throws AblyException if some required fields are missing`() = runTest { - every { - realtime.requestAsync("GET", "/chat/v1/rooms/roomId/messages", any(), any(), any(), any()) - } answers { - val callback = lastArg() - callback.onResponse( - buildAsyncHttpPaginatedResponse( - listOf( - JsonObject().apply { - addProperty("foo", "bar") - }, - ), - ), - ) - } + mockMessagesApiResponse( + realtime, + listOf( + JsonObject().apply { + addProperty("foo", "bar") + }, + ), + ) val exception = assertThrows(AblyException::class.java) { runBlocking { chatApi.getMessages("roomId", QueryOptions()) } @@ -84,22 +69,14 @@ class ChatApiTest { @Test fun `sendMessage should ignore unknown fields for Chat Backend`() = runTest { - every { - realtime.requestAsync("POST", "/chat/v1/rooms/roomId/messages", any(), any(), any(), any()) - } answers { - val callback = lastArg() - callback.onResponse( - buildAsyncHttpPaginatedResponse( - listOf( - JsonObject().apply { - addProperty("foo", "bar") - addProperty("timeserial", "timeserial") - addProperty("createdAt", 1_000_000) - }, - ), - ), - ) - } + mockSendMessageApiResponse( + realtime, + JsonObject().apply { + addProperty("foo", "bar") + addProperty("timeserial", "timeserial") + addProperty("createdAt", 1_000_000) + }, + ) val message = chatApi.sendMessage("roomId", SendMessageParams(text = "hello")) @@ -119,21 +96,13 @@ class ChatApiTest { @Test fun `sendMessage should throw exception if 'timeserial' field is not presented`() = runTest { - every { - realtime.requestAsync("POST", "/chat/v1/rooms/roomId/messages", any(), any(), any(), any()) - } answers { - val callback = lastArg() - callback.onResponse( - buildAsyncHttpPaginatedResponse( - listOf( - JsonObject().apply { - addProperty("foo", "bar") - addProperty("createdAt", 1_000_000) - }, - ), - ), - ) - } + mockSendMessageApiResponse( + realtime, + JsonObject().apply { + addProperty("foo", "bar") + addProperty("createdAt", 1_000_000) + }, + ) assertThrows(AblyException::class.java) { runBlocking { chatApi.sendMessage("roomId", SendMessageParams(text = "hello")) } @@ -142,31 +111,15 @@ class ChatApiTest { @Test fun `getOccupancy should throw exception if 'connections' field is not presented`() = runTest { - every { - realtime.requestAsync("GET", "/chat/v1/rooms/roomId/occupancy", any(), any(), any(), any()) - } answers { - val callback = lastArg() - callback.onResponse( - buildAsyncHttpPaginatedResponse( - listOf( - JsonObject().apply { - addProperty("presenceMembers", 1_000) - }, - ), - ), - ) - } + mockOccupancyApiResponse( + realtime, + JsonObject().apply { + addProperty("presenceMembers", 1_000) + }, + ) assertThrows(AblyException::class.java) { runBlocking { chatApi.getOccupancy("roomId") } } } } - -private fun buildAsyncHttpPaginatedResponse(items: List): AsyncHttpPaginatedResponse { - val response = mockk() - every { - response.items() - } returns items.toTypedArray() - return response -} diff --git a/chat-android/src/test/java/com/ably/chat/MessagesTest.kt b/chat-android/src/test/java/com/ably/chat/MessagesTest.kt new file mode 100644 index 0000000..410a08b --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/MessagesTest.kt @@ -0,0 +1,225 @@ +package com.ably.chat + +import com.google.gson.JsonObject +import io.ably.lib.realtime.AblyRealtime.Channels +import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.ChannelBase +import io.ably.lib.realtime.ChannelState +import io.ably.lib.realtime.ChannelStateListener +import io.ably.lib.realtime.buildChannelStateChange +import io.ably.lib.realtime.buildRealtimeChannel +import io.ably.lib.types.AblyException +import io.ably.lib.types.MessageExtras +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.spyk +import io.mockk.verify +import java.lang.reflect.Field +import java.util.HashMap +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Assert.assertThrows +import org.junit.Before +import org.junit.Test + +class MessagesTest { + + private val realtimeClient = mockk(relaxed = true) + private val realtimeChannels = mockk(relaxed = true) + private val realtimeChannel = spyk(buildRealtimeChannel()) + private val chatApi = spyk(ChatApi(realtimeClient, "clientId")) + private lateinit var messages: DefaultMessages + + private val channelStateListenerSlot = slot() + + @Before + fun setUp() { + every { realtimeChannels.get(any(), any()) } returns realtimeChannel + + every { realtimeChannel.on(capture(channelStateListenerSlot)) } answers { + println("Channel state listener registered") + } + + messages = DefaultMessages( + roomId = "room1", + realtimeChannels = realtimeChannels, + chatApi = chatApi, + ) + } + + @Test + fun `should be able to send message and get it back from response`() = runTest { + mockSendMessageApiResponse( + realtimeClient, + JsonObject().apply { + addProperty("timeserial", "abcdefghij@1672531200000-123") + addProperty("createdAt", 1_000_000) + }, + roomId = "room1", + ) + + val sentMessage = messages.send( + SendMessageParams( + text = "lala", + headers = mapOf("foo" to "bar"), + metadata = mapOf("meta" to "data"), + ), + ) + + assertEquals( + Message( + timeserial = "abcdefghij@1672531200000-123", + clientId = "clientId", + roomId = "room1", + text = "lala", + createdAt = 1_000_000, + metadata = mapOf("meta" to "data"), + headers = mapOf("foo" to "bar"), + ), + sentMessage, + ) + } + + @Test + fun `should be able to subscribe to incoming messages`() = runTest { + val pubSubMessageListenerSlot = slot() + + every { realtimeChannel.subscribe("message.created", capture(pubSubMessageListenerSlot)) } answers { + println("Pub/Sub message listener registered") + } + + val deferredValue = DeferredValue() + + messages.subscribe { + deferredValue.completeWith(it) + } + + verify { realtimeChannel.subscribe("message.created", any()) } + + pubSubMessageListenerSlot.captured.onMessage( + PubSubMessage().apply { + data = JsonObject().apply { + addProperty("text", "some text") + } + clientId = "clientId" + timestamp = 1000L + extras = MessageExtras( + JsonObject().apply { + addProperty("timeserial", "abcdefghij@1672531200000-123") + add( + "headers", + JsonObject().apply { + addProperty("foo", "bar") + }, + ) + }, + ) + }, + ) + + val messageEvent = deferredValue.await() + + assertEquals(MessageEventType.Created, messageEvent.type) + assertEquals( + Message( + roomId = "room1", + createdAt = 1000L, + clientId = "clientId", + timeserial = "abcdefghij@1672531200000-123", + text = "some text", + metadata = mapOf(), + headers = mapOf("foo" to "bar"), + ), + messageEvent.message, + ) + } + + @Test + fun `should throw an exception for listener history if not subscribed`() = runTest { + val subscription = messages.subscribe {} + + subscription.unsubscribe() + + val exception = assertThrows(AblyException::class.java) { + runBlocking { subscription.getPreviousMessages() } + } + + assertEquals(40_000, exception.errorInfo.code) + } + + @Test + fun `every subscription should have own channel serial`() = runTest { + messages.channel.properties.channelSerial = "channel-serial-1" + messages.channel.state = ChannelState.attached + + val subscription1 = (messages.subscribe {}) as DefaultMessagesSubscription + assertEquals("channel-serial-1", subscription1.fromSerialProvider().await()) + + messages.channel.properties.channelSerial = "channel-serial-2" + val subscription2 = (messages.subscribe {}) as DefaultMessagesSubscription + + assertEquals("channel-serial-2", subscription2.fromSerialProvider().await()) + assertEquals("channel-serial-1", subscription1.fromSerialProvider().await()) + } + + @Test + fun `subscription should update channel serial after reattach with resume = false`() = runTest { + messages.channel.properties.channelSerial = "channel-serial-1" + messages.channel.state = ChannelState.attached + + val subscription1 = (messages.subscribe {}) as DefaultMessagesSubscription + assertEquals("channel-serial-1", subscription1.fromSerialProvider().await()) + + messages.channel.properties.channelSerial = "channel-serial-2" + channelStateListenerSlot.captured.onChannelStateChanged( + buildChannelStateChange( + current = ChannelState.attached, + previous = ChannelState.attaching, + resumed = false, + ), + ) + + assertEquals("channel-serial-2", subscription1.fromSerialProvider().await()) + } + + @Test + fun `subscription should invoke once for each incoming message`() = runTest { + val listener1 = mockk(relaxed = true) + val listener2 = mockk(relaxed = true) + + messages.subscribe(listener1) + + messages.channel.channelMulticaster.onMessage(buildDummyPubSubMessage()) + + verify(exactly = 1) { listener1.onEvent(any()) } + + messages.subscribe(listener2) + + messages.channel.channelMulticaster.onMessage(buildDummyPubSubMessage()) + + verify(exactly = 2) { listener1.onEvent(any()) } + verify(exactly = 1) { listener2.onEvent(any()) } + } +} + +private val Channel.channelMulticaster: ChannelBase.MessageListener get() { + val field: Field = (ChannelBase::class.java).getDeclaredField("eventListeners") + field.isAccessible = true + val eventListeners = field.get(this) as HashMap<*, *> + return eventListeners["message.created"] as ChannelBase.MessageListener +} + +private fun buildDummyPubSubMessage() = PubSubMessage().apply { + data = JsonObject().apply { + addProperty("text", "dummy text") + } + clientId = "dummy" + timestamp = 1000L + extras = MessageExtras( + JsonObject().apply { + addProperty("timeserial", "abcdefghij@1672531200000-123") + }, + ) +} diff --git a/chat-android/src/test/java/com/ably/chat/TestUtils.kt b/chat-android/src/test/java/com/ably/chat/TestUtils.kt new file mode 100644 index 0000000..e493418 --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/TestUtils.kt @@ -0,0 +1,51 @@ +package com.ably.chat + +import com.google.gson.JsonElement +import io.ably.lib.types.AsyncHttpPaginatedResponse +import io.mockk.every +import io.mockk.mockk + +fun buildAsyncHttpPaginatedResponse(items: List): AsyncHttpPaginatedResponse { + val response = mockk() + every { + response.items() + } returns items.toTypedArray() + return response +} + +fun mockMessagesApiResponse(realtimeClientMock: RealtimeClient, response: List, roomId: String = "roomId") { + every { + realtimeClientMock.requestAsync("GET", "/chat/v1/rooms/$roomId/messages", any(), any(), any(), any()) + } answers { + val callback = lastArg() + callback.onResponse( + buildAsyncHttpPaginatedResponse(response), + ) + } +} + +fun mockSendMessageApiResponse(realtimeClientMock: RealtimeClient, response: JsonElement, roomId: String = "roomId") { + every { + realtimeClientMock.requestAsync("POST", "/chat/v1/rooms/$roomId/messages", any(), any(), any(), any()) + } answers { + val callback = lastArg() + callback.onResponse( + buildAsyncHttpPaginatedResponse( + listOf(response), + ), + ) + } +} + +fun mockOccupancyApiResponse(realtimeClientMock: RealtimeClient, response: JsonElement, roomId: String = "roomId") { + every { + realtimeClientMock.requestAsync("GET", "/chat/v1/rooms/$roomId/occupancy", any(), any(), any(), any()) + } answers { + val callback = lastArg() + callback.onResponse( + buildAsyncHttpPaginatedResponse( + listOf(response), + ), + ) + } +} diff --git a/chat-android/src/test/java/io/ably/lib/realtime/RealtimeUtils.kt b/chat-android/src/test/java/io/ably/lib/realtime/RealtimeUtils.kt new file mode 100644 index 0000000..6c27668 --- /dev/null +++ b/chat-android/src/test/java/io/ably/lib/realtime/RealtimeUtils.kt @@ -0,0 +1,28 @@ +package io.ably.lib.realtime + +import io.ably.lib.realtime.ChannelStateListener.ChannelStateChange +import io.ably.lib.types.ClientOptions +import io.ably.lib.types.ErrorInfo + +/** + * This function build ChannelStateChange object, which is package-private in ably-java. + * + * We can get rid of it, if we decide to increase constructor visibility + */ +fun buildChannelStateChange( + current: ChannelState, + previous: ChannelState, + reason: ErrorInfo? = null, + resumed: Boolean = true, +): ChannelStateChange = ChannelStateChange(current, previous, reason, resumed) + +/** + * This function build realtime Channel object, it has backlink to the realtime client + * and also package-private constructor. + */ +fun buildRealtimeChannel(channelName: String = "channel") = AblyRealtime( + ClientOptions().apply { + key = "dummy-key" + autoConnect = false + }, +).channels.get(channelName) diff --git a/example/src/main/java/com/ably/chat/example/MainActivity.kt b/example/src/main/java/com/ably/chat/example/MainActivity.kt index 8fc363b..d12596a 100644 --- a/example/src/main/java/com/ably/chat/example/MainActivity.kt +++ b/example/src/main/java/com/ably/chat/example/MainActivity.kt @@ -14,12 +14,14 @@ import androidx.compose.foundation.layout.fillMaxWidth import androidx.compose.foundation.layout.imePadding import androidx.compose.foundation.layout.padding import androidx.compose.foundation.lazy.LazyColumn +import androidx.compose.foundation.lazy.rememberLazyListState import androidx.compose.foundation.shape.RoundedCornerShape import androidx.compose.material3.Button import androidx.compose.material3.Scaffold import androidx.compose.material3.Text import androidx.compose.material3.TextField import androidx.compose.runtime.Composable +import androidx.compose.runtime.DisposableEffect import androidx.compose.runtime.getValue import androidx.compose.runtime.mutableStateOf import androidx.compose.runtime.remember @@ -29,11 +31,10 @@ import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.graphics.Color import androidx.compose.ui.text.input.TextFieldValue +import androidx.compose.ui.tooling.preview.Preview import androidx.compose.ui.unit.dp -import com.ably.chat.ChatApi +import com.ably.chat.ChatClient import com.ably.chat.Message -import com.ably.chat.QueryOptions -import com.ably.chat.QueryOptions.MessageOrder.OldestFirst import com.ably.chat.RealtimeClient import com.ably.chat.SendMessageParams import com.ably.chat.example.ui.theme.AblyChatExampleTheme @@ -46,6 +47,7 @@ val randomClientId = UUID.randomUUID().toString() class MainActivity : ComponentActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) + val realtimeClient = RealtimeClient( ClientOptions().apply { key = BuildConfig.ABLY_KEY @@ -53,13 +55,15 @@ class MainActivity : ComponentActivity() { logLevel = 2 }, ) - val chatApi = ChatApi(realtimeClient, randomClientId) + + val chatClient = ChatClient(realtimeClient) + enableEdgeToEdge() setContent { AblyChatExampleTheme { Scaffold(modifier = Modifier.fillMaxSize()) { innerPadding -> Chat( - chatApi, + chatClient, modifier = Modifier.padding(innerPadding), ) } @@ -69,29 +73,42 @@ class MainActivity : ComponentActivity() { } @Composable -fun Chat(chatApi: ChatApi, modifier: Modifier = Modifier) { +fun Chat(chatClient: ChatClient, modifier: Modifier = Modifier) { var messageText by remember { mutableStateOf(TextFieldValue("")) } var sending by remember { mutableStateOf(false) } var messages by remember { mutableStateOf(listOf()) } + val listState = rememberLazyListState() val coroutineScope = rememberCoroutineScope() val roomId = "my-room" + val room = chatClient.rooms.get(roomId) - Column( - modifier = Modifier.fillMaxSize(), - verticalArrangement = Arrangement.SpaceBetween, - ) { - Button(modifier = modifier.align(Alignment.CenterHorizontally), onClick = { + DisposableEffect(Unit) { + val subscription = room.messages.subscribe { + messages += it.message coroutineScope.launch { - messages = chatApi.getMessages(roomId, QueryOptions(orderBy = OldestFirst)).items + listState.animateScrollToItem(messages.size - 1) } - }) { - Text("Load") } + coroutineScope.launch { + messages = subscription.getPreviousMessages().items.reversed() + if (messages.isNotEmpty()) listState.animateScrollToItem(messages.size - 1) + } + + onDispose { + subscription.unsubscribe() + } + } + + Column( + modifier = modifier.fillMaxSize(), + verticalArrangement = Arrangement.SpaceBetween, + ) { LazyColumn( modifier = Modifier.weight(1f).padding(16.dp), userScrollEnabled = true, + state = listState, ) { items(messages.size) { index -> MessageBubble(messages[index]) @@ -105,8 +122,7 @@ fun Chat(chatApi: ChatApi, modifier: Modifier = Modifier) { ) { sending = true coroutineScope.launch { - chatApi.sendMessage( - roomId, + room.messages.send( SendMessageParams( text = messageText.text, ), @@ -160,7 +176,6 @@ fun ChatInputField( TextField( value = messageInput, onValueChange = onMessageChange, - readOnly = sending, modifier = Modifier .weight(1f) .background(Color.White), @@ -171,3 +186,34 @@ fun ChatInputField( } } } + +@Preview +@Composable +fun MessageBubblePreview() { + AblyChatExampleTheme { + MessageBubble( + message = Message( + text = "Hello World!", + timeserial = "fake", + roomId = "roomId", + clientId = "clientId", + createdAt = System.currentTimeMillis(), + metadata = mapOf(), + headers = mapOf(), + ), + ) + } +} + +@Preview +@Composable +fun ChatInputPreview() { + AblyChatExampleTheme { + ChatInputField( + sending = false, + messageInput = TextFieldValue(""), + onMessageChange = {}, + onSendClick = {}, + ) + } +}