diff --git a/chat-android/src/main/java/com/ably/chat/ChatApi.kt b/chat-android/src/main/java/com/ably/chat/ChatApi.kt index cbd9733..5da7f85 100644 --- a/chat-android/src/main/java/com/ably/chat/ChatApi.kt +++ b/chat-android/src/main/java/com/ably/chat/ChatApi.kt @@ -2,9 +2,6 @@ package com.ably.chat import com.google.gson.JsonElement import com.google.gson.JsonObject -import com.google.gson.JsonPrimitive -import io.ably.lib.http.HttpCore -import io.ably.lib.http.HttpUtils import io.ably.lib.types.AblyException import io.ably.lib.types.AsyncHttpPaginatedResponse import io.ably.lib.types.ErrorInfo @@ -17,19 +14,20 @@ private const val API_PROTOCOL_VERSION = 3 private const val PROTOCOL_VERSION_PARAM_NAME = "v" private val apiProtocolParam = Param(PROTOCOL_VERSION_PARAM_NAME, API_PROTOCOL_VERSION.toString()) -// TODO make this class internal -class ChatApi(private val realtimeClient: RealtimeClient, private val clientId: String) { +internal class ChatApi(private val realtimeClient: RealtimeClient, private val clientId: String) { /** * Get messages from the Chat Backend * * @return paginated result with messages */ - suspend fun getMessages(roomId: String, params: QueryOptions): PaginatedResult { + suspend fun getMessages(roomId: String, options: QueryOptions, fromSerial: String? = null): PaginatedResult { + val baseParams = options.toParams() + val params = fromSerial?.let { baseParams + Param("fromSerial", it) } ?: baseParams return makeAuthorizedPaginatedRequest( url = "/chat/v1/rooms/$roomId/messages", method = "GET", - params = params.toParams(), + params = params, ) { Message( timeserial = it.requireString("timeserial"), @@ -137,17 +135,6 @@ class ChatApi(private val realtimeClient: RealtimeClient, private val clientId: } } -private fun JsonElement?.toRequestBody(useBinaryProtocol: Boolean = false): HttpCore.RequestBody = - HttpUtils.requestBodyFromGson(this, useBinaryProtocol) - -private fun Map.toJson() = JsonObject().apply { - forEach { (key, value) -> addProperty(key, value) } -} - -private fun JsonElement.toMap() = buildMap { - requireJsonObject().entrySet().filter { (_, value) -> value.isJsonPrimitive }.forEach { (key, value) -> put(key, value.asString) } -} - private fun QueryOptions.toParams() = buildList { start?.let { add(Param("start", it)) } end?.let { add(Param("end", it)) } @@ -162,67 +149,3 @@ private fun QueryOptions.toParams() = buildList { ), ) } - -private fun JsonElement.requireJsonObject(): JsonObject { - if (!isJsonObject) { - throw AblyException.fromErrorInfo( - ErrorInfo("Response value expected to be JsonObject, got primitive instead", HttpStatusCodes.InternalServerError), - ) - } - return asJsonObject -} - -private fun JsonElement.requireString(memberName: String): String { - val memberElement = requireField(memberName) - if (!memberElement.isJsonPrimitive) { - throw AblyException.fromErrorInfo( - ErrorInfo( - "Value for \"$memberName\" field expected to be JsonPrimitive, got object instead", - HttpStatusCodes.InternalServerError, - ), - ) - } - return memberElement.asString -} - -private fun JsonElement.requireLong(memberName: String): Long { - val memberElement = requireJsonPrimitive(memberName) - try { - return memberElement.asLong - } catch (formatException: NumberFormatException) { - throw AblyException.fromErrorInfo( - formatException, - ErrorInfo("Required numeric field \"$memberName\" is not a valid long", HttpStatusCodes.InternalServerError), - ) - } -} - -private fun JsonElement.requireInt(memberName: String): Int { - val memberElement = requireJsonPrimitive(memberName) - try { - return memberElement.asInt - } catch (formatException: NumberFormatException) { - throw AblyException.fromErrorInfo( - formatException, - ErrorInfo("Required numeric field \"$memberName\" is not a valid int", HttpStatusCodes.InternalServerError), - ) - } -} - -private fun JsonElement.requireJsonPrimitive(memberName: String): JsonPrimitive { - val memberElement = requireField(memberName) - if (!memberElement.isJsonPrimitive) { - throw AblyException.fromErrorInfo( - ErrorInfo( - "Value for \"$memberName\" field expected to be JsonPrimitive, got object instead", - HttpStatusCodes.InternalServerError, - ), - ) - } - return memberElement.asJsonPrimitive -} - -private fun JsonElement.requireField(memberName: String): JsonElement = requireJsonObject().get(memberName) - ?: throw AblyException.fromErrorInfo( - ErrorInfo("Required field \"$memberName\" is missing", HttpStatusCodes.InternalServerError), - ) diff --git a/chat-android/src/main/java/com/ably/chat/ChatClient.kt b/chat-android/src/main/java/com/ably/chat/ChatClient.kt index b621db9..e933666 100644 --- a/chat-android/src/main/java/com/ably/chat/ChatClient.kt +++ b/chat-android/src/main/java/com/ably/chat/ChatClient.kt @@ -37,7 +37,8 @@ interface ChatClient { val clientOptions: ClientOptions } -fun ChatClient(realtimeClient: RealtimeClient, clientOptions: ClientOptions): ChatClient = DefaultChatClient(realtimeClient, clientOptions) +fun ChatClient(realtimeClient: RealtimeClient, clientOptions: ClientOptions = ClientOptions()): ChatClient = + DefaultChatClient(realtimeClient, clientOptions) internal class DefaultChatClient( override val realtime: RealtimeClient, diff --git a/chat-android/src/main/java/com/ably/chat/JsonUtils.kt b/chat-android/src/main/java/com/ably/chat/JsonUtils.kt new file mode 100644 index 0000000..2e119b8 --- /dev/null +++ b/chat-android/src/main/java/com/ably/chat/JsonUtils.kt @@ -0,0 +1,84 @@ +package com.ably.chat + +import com.google.gson.JsonElement +import com.google.gson.JsonObject +import com.google.gson.JsonPrimitive +import io.ably.lib.http.HttpCore +import io.ably.lib.http.HttpUtils +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo + +internal fun JsonElement?.toRequestBody(useBinaryProtocol: Boolean = false): HttpCore.RequestBody = + HttpUtils.requestBodyFromGson(this, useBinaryProtocol) + +internal fun Map.toJson() = JsonObject().apply { + forEach { (key, value) -> addProperty(key, value) } +} + +internal fun JsonElement.toMap() = buildMap { + requireJsonObject().entrySet().filter { (_, value) -> value.isJsonPrimitive }.forEach { (key, value) -> put(key, value.asString) } +} + +internal fun JsonElement.requireJsonObject(): JsonObject { + if (!isJsonObject) { + throw AblyException.fromErrorInfo( + ErrorInfo("Response value expected to be JsonObject, got primitive instead", HttpStatusCodes.InternalServerError), + ) + } + return asJsonObject +} + +internal fun JsonElement.requireString(memberName: String): String { + val memberElement = requireField(memberName) + if (!memberElement.isJsonPrimitive) { + throw AblyException.fromErrorInfo( + ErrorInfo( + "Value for \"$memberName\" field expected to be JsonPrimitive, got object instead", + HttpStatusCodes.InternalServerError, + ), + ) + } + return memberElement.asString +} + +internal fun JsonElement.requireLong(memberName: String): Long { + val memberElement = requireJsonPrimitive(memberName) + try { + return memberElement.asLong + } catch (formatException: NumberFormatException) { + throw AblyException.fromErrorInfo( + formatException, + ErrorInfo("Required numeric field \"$memberName\" is not a valid long", HttpStatusCodes.InternalServerError), + ) + } +} + +internal fun JsonElement.requireInt(memberName: String): Int { + val memberElement = requireJsonPrimitive(memberName) + try { + return memberElement.asInt + } catch (formatException: NumberFormatException) { + throw AblyException.fromErrorInfo( + formatException, + ErrorInfo("Required numeric field \"$memberName\" is not a valid int", HttpStatusCodes.InternalServerError), + ) + } +} + +internal fun JsonElement.requireJsonPrimitive(memberName: String): JsonPrimitive { + val memberElement = requireField(memberName) + if (!memberElement.isJsonPrimitive) { + throw AblyException.fromErrorInfo( + ErrorInfo( + "Value for \"$memberName\" field expected to be JsonPrimitive, got object instead", + HttpStatusCodes.InternalServerError, + ), + ) + } + return memberElement.asJsonPrimitive +} + +internal fun JsonElement.requireField(memberName: String): JsonElement = requireJsonObject().get(memberName) + ?: throw AblyException.fromErrorInfo( + ErrorInfo("Required field \"$memberName\" is missing", HttpStatusCodes.InternalServerError), + ) diff --git a/chat-android/src/main/java/com/ably/chat/Messages.kt b/chat-android/src/main/java/com/ably/chat/Messages.kt index edad9b5..5a72184 100644 --- a/chat-android/src/main/java/com/ably/chat/Messages.kt +++ b/chat-android/src/main/java/com/ably/chat/Messages.kt @@ -2,7 +2,17 @@ package com.ably.chat +import com.ably.chat.QueryOptions.MessageOrder.NewestFirst +import com.google.gson.JsonObject +import io.ably.lib.realtime.AblyRealtime import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.ChannelState +import io.ably.lib.realtime.ChannelStateListener +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo + +typealias PubSubMessageListener = Channel.MessageListener +typealias PubSubMessage = io.ably.lib.types.Message /** * This interface is used to interact with messages in a chat room: subscribing @@ -91,7 +101,7 @@ data class QueryOptions( /** * The order of messages in the query result. */ - val orderBy: MessageOrder = MessageOrder.NewestFirst, + val orderBy: MessageOrder = NewestFirst, ) { /** * Represents direction to query messages in. @@ -169,26 +179,97 @@ data class SendMessageParams( val headers: MessageHeaders? = null, ) -interface MessagesSubscription: Cancellation { - suspend fun getPreviousMessages(queryOptions: QueryOptions): PaginatedResult +interface MessagesSubscription : Cancellation { + suspend fun getPreviousMessages(start: Long? = null, end: Long? = null, limit: Int = 100): PaginatedResult } -class DefaultMessages( +internal class DefaultMessagesSubscription( + private val chatApi: ChatApi, private val roomId: String, - private val realtimeClient: RealtimeClient, + private val cancellation: Cancellation, + internal val fromSerialProvider: () -> DeferredValue, +) : MessagesSubscription { + override fun cancel() { + cancellation.cancel() + } + + override suspend fun getPreviousMessages(start: Long?, end: Long?, limit: Int): PaginatedResult { + val fromSerial = fromSerialProvider().await() + val queryOptions = QueryOptions(start = start, end = end, limit = limit, orderBy = NewestFirst) + return chatApi.getMessages( + roomId = roomId, + options = queryOptions, + fromSerial = fromSerial, + ) + } +} + +internal class DefaultMessages( + private val roomId: String, + realtimeChannels: AblyRealtime.Channels, private val chatApi: ChatApi, ) : Messages { + private var listeners: Map> = emptyMap() + + private var channelStateListener: ChannelStateListener + + private var lock = Any() + /** * the channel name for the chat messages channel. */ private val messagesChannelName = "$roomId::\$chat::\$chatMessages" - override val channel: Channel - get() = realtimeClient.channels.get(messagesChannelName, ChatChannelOptions()) + override val channel: Channel = realtimeChannels.get(messagesChannelName, ChatChannelOptions()) + + init { + channelStateListener = ChannelStateListener { + if (!it.resumed) updateChannelSerialsAfterDiscontinuity() + } + channel.on(ChannelState.attached, channelStateListener) + } override fun subscribe(listener: Messages.Listener): MessagesSubscription { - TODO("Not yet implemented") + val deferredChannelSerial = DeferredValue() + addListener(listener, deferredChannelSerial) + val messageListener = PubSubMessageListener { + val pubSubMessage = it ?: throw AblyException.fromErrorInfo( + ErrorInfo("Got empty pubsub channel message", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), + ) + val data = parsePubSubMessageData(pubSubMessage.data) + val chatMessage = Message( + roomId = roomId, + createdAt = pubSubMessage.timestamp, + clientId = pubSubMessage.clientId, + timeserial = pubSubMessage.extras.asJsonObject().requireString("timeserial"), + text = data.text, + metadata = data.metadata, + headers = pubSubMessage.extras.asJsonObject().get("headers")?.toMap() ?: mapOf(), + ) + emitMessage(chatMessage) + } + + channel.subscribe(MessageEventType.Created.eventName, messageListener) + associateWithCurrentChannelSerial(deferredChannelSerial) + + return DefaultMessagesSubscription( + chatApi = chatApi, + roomId = roomId, + cancellation = { + removeListener(listener) + channel.unsubscribe(MessageEventType.Created.eventName, messageListener) + }, + fromSerialProvider = { + listeners[listener] ?: throw AblyException.fromErrorInfo( + ErrorInfo( + "This messages subscription instance was already unsubscribed", + HttpStatusCodes.BadRequest, + ErrorCodes.BadRequest, + ), + ) + }, + ) } override suspend fun get(options: QueryOptions): PaginatedResult = chatApi.getMessages(roomId, options) @@ -198,4 +279,75 @@ class DefaultMessages( override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Cancellation { TODO("Not yet implemented") } + + fun release() { + channel.off(channelStateListener) + } + + /** + * Associate deferred channel serial value with the current channel's serial + * + * WARN: it not deterministic because of race condition, + * this can lead to duplicated messages in `getPreviousMessages` calls + */ + private fun associateWithCurrentChannelSerial(channelSerialProvider: DeferredValue) { + if (channel.state === ChannelState.attached) { + channelSerialProvider.completeWith(requireChannelSerial()) + } + + channel.once(ChannelState.attached) { + channelSerialProvider.completeWith(requireChannelSerial()) + } + } + + private fun requireChannelSerial(): String { + return channel.properties.channelSerial + ?: throw AblyException.fromErrorInfo( + ErrorInfo("Channel has been attached, but channelSerial is not defined", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), + ) + } + + private fun addListener(listener: Messages.Listener, deferredChannelSerial: DeferredValue) { + synchronized(lock) { + listeners += listener to deferredChannelSerial + } + } + + private fun removeListener(listener: Messages.Listener) { + synchronized(lock) { + listeners -= listener + } + } + + private fun emitMessage(message: Message) { + listeners.keys.forEach { listener -> listener.onEvent(MessageEvent(type = MessageEventType.Created, message = message)) } + } + + private fun updateChannelSerialsAfterDiscontinuity() { + val deferredChannelSerial = DeferredValue() + associateWithCurrentChannelSerial(deferredChannelSerial) + + synchronized(lock) { + listeners = listeners.mapValues { + if (it.value.completed) deferredChannelSerial else it.value + } + } + } +} + +/** + * Parsed data from the Pub/Sub channel's message data field + */ +private data class PubSubMessageData(val text: String, val metadata: MessageMetadata) + +private fun parsePubSubMessageData(data: Any): PubSubMessageData { + if (data !is JsonObject) { + throw AblyException.fromErrorInfo( + ErrorInfo("Unrecognized Pub/Sub channel's message for `Message.created` event", HttpStatusCodes.InternalServerError), + ) + } + return PubSubMessageData( + text = data.requireString("text"), + metadata = data.get("metadata")?.toMap() ?: mapOf(), + ) } diff --git a/chat-android/src/main/java/com/ably/chat/Room.kt b/chat-android/src/main/java/com/ably/chat/Room.kt index 8550eb7..84cecaf 100644 --- a/chat-android/src/main/java/com/ably/chat/Room.kt +++ b/chat-android/src/main/java/com/ably/chat/Room.kt @@ -90,12 +90,14 @@ internal class DefaultRoom( chatApi: ChatApi, ) : Room { - override val messages: Messages = DefaultMessages( + private val _messages = DefaultMessages( roomId = roomId, - realtimeClient = realtimeClient, + realtimeChannels = realtimeClient.channels, chatApi = chatApi, ) + override val messages: Messages = _messages + override val presence: Presence = DefaultPresence( messages = messages, ) @@ -130,4 +132,8 @@ internal class DefaultRoom( typing.channel.detachCoroutine() reactions.channel.detachCoroutine() } + + fun release() { + _messages.release() + } } diff --git a/chat-android/src/main/java/com/ably/chat/RoomOptions.kt b/chat-android/src/main/java/com/ably/chat/RoomOptions.kt index 2263d93..b6d9d96 100644 --- a/chat-android/src/main/java/com/ably/chat/RoomOptions.kt +++ b/chat-android/src/main/java/com/ably/chat/RoomOptions.kt @@ -21,13 +21,13 @@ data class RoomOptions( * The reactions options for the room. To enable reactions in the room, set this property. You may use * {@link RoomOptionsDefaults.reactions} to enable reactions with default options. */ - val reactions: RoomReactionsOptions = RoomReactionsOptions(), + val reactions: RoomReactionsOptions = RoomReactionsOptions, /** * The occupancy options for the room. To enable occupancy in the room, set this property. You may use * {@link RoomOptionsDefaults.occupancy} to enable occupancy with default options. */ - val occupancy: OccupancyOptions = OccupancyOptions(), + val occupancy: OccupancyOptions = OccupancyOptions, ) /** @@ -66,9 +66,9 @@ data class TypingOptions( /** * Represents the reactions options for a chat room. */ -class RoomReactionsOptions +object RoomReactionsOptions /** * Represents the occupancy options for a chat room. */ -class OccupancyOptions +object OccupancyOptions diff --git a/chat-android/src/main/java/com/ably/chat/Rooms.kt b/chat-android/src/main/java/com/ably/chat/Rooms.kt index 44d917d..31c0c49 100644 --- a/chat-android/src/main/java/com/ably/chat/Rooms.kt +++ b/chat-android/src/main/java/com/ably/chat/Rooms.kt @@ -24,7 +24,7 @@ interface Rooms { * @throws {@link ErrorInfo} if a room with the same ID but different options already exists. * @returns Room A new or existing Room object. */ - fun get(roomId: String, options: RoomOptions): Room + fun get(roomId: String, options: RoomOptions = RoomOptions()): Room /** * Release the Room object if it exists. This method only releases the reference @@ -47,7 +47,7 @@ internal class DefaultRooms( private val chatApi: ChatApi, override val clientOptions: ClientOptions, ) : Rooms { - private val roomIdToRoom: MutableMap = mutableMapOf() + private val roomIdToRoom: MutableMap = mutableMapOf() override fun get(roomId: String, options: RoomOptions): Room { return synchronized(this) { @@ -72,7 +72,8 @@ internal class DefaultRooms( override suspend fun release(roomId: String) { synchronized(this) { - roomIdToRoom.remove(roomId) + val room = roomIdToRoom.remove(roomId) + room?.release() } } } diff --git a/chat-android/src/main/java/com/ably/chat/Utils.kt b/chat-android/src/main/java/com/ably/chat/Utils.kt index 531a4eb..66d9451 100644 --- a/chat-android/src/main/java/com/ably/chat/Utils.kt +++ b/chat-android/src/main/java/com/ably/chat/Utils.kt @@ -39,8 +39,65 @@ suspend fun Channel.detachCoroutine() = suspendCoroutine { continuation -> fun ChatChannelOptions(init: (ChannelOptions.() -> Unit)? = null): ChannelOptions { val options = ChannelOptions() init?.let { options.it() } - options.params = options.params + mapOf( + options.params = (options.params ?: mapOf()) + mapOf( AGENT_PARAMETER_NAME to "chat-kotlin/${BuildConfig.APP_VERSION}", ) return options } + +/** + * A value that can be evaluated at a later time, similar to `kotlinx.coroutines.Deferred` or a JavaScript Promise. + * + * This class provides a thread-safe, simple blocking implementation. It is not designed for use in scenarios with + * heavy concurrency. + * + * @param T the type of the value that will be evaluated. + */ +internal class DeferredValue { + + private var value: T? = null + + private val lock = Any() + + private var observers: Set<(T) -> Unit> = setOf() + + private var _completed = false + + /** + * `true` if value has been set, `false` otherwise + */ + val completed get() = _completed + + /** + * Set value and mark DeferredValue completed, should be invoked only once + * + * @throws IllegalStateException if it's already `completed` + */ + fun completeWith(completionValue: T) { + synchronized(lock) { + check(!_completed) { "DeferredValue has already been completed" } + value = completionValue + _completed = true + observers.forEach { it(completionValue) } + observers = setOf() + } + } + + /** + * Wait until value is completed + * + * @return completed value + */ + suspend fun await(): T { + val result = suspendCoroutine { continuation -> + synchronized(lock) { + if (_completed) continuation.resume(value!!) + val observer: (T) -> Unit = { + continuation.resume(it) + } + observers += observer + } + } + return result + } +} diff --git a/chat-android/src/test/java/com/ably/chat/ChatApiTest.kt b/chat-android/src/test/java/com/ably/chat/ChatApiTest.kt index 29e8069..f3f2eb1 100644 --- a/chat-android/src/test/java/com/ably/chat/ChatApiTest.kt +++ b/chat-android/src/test/java/com/ably/chat/ChatApiTest.kt @@ -1,10 +1,7 @@ package com.ably.chat -import com.google.gson.JsonElement import com.google.gson.JsonObject import io.ably.lib.types.AblyException -import io.ably.lib.types.AsyncHttpPaginatedResponse -import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest @@ -20,25 +17,19 @@ class ChatApiTest { @Test fun `getMessages should ignore unknown fields for Chat Backend`() = runTest { - every { - realtime.requestAsync("GET", "/chat/v1/rooms/roomId/messages", any(), any(), any(), any()) - } answers { - val callback = lastArg() - callback.onResponse( - buildAsyncHttpPaginatedResponse( - listOf( - JsonObject().apply { - addProperty("foo", "bar") - addProperty("timeserial", "timeserial") - addProperty("roomId", "roomId") - addProperty("clientId", "clientId") - addProperty("text", "hello") - addProperty("createdAt", 1_000_000) - }, - ), - ), - ) - } + mockMessagesApiResponse( + realtime, + listOf( + JsonObject().apply { + addProperty("foo", "bar") + addProperty("timeserial", "timeserial") + addProperty("roomId", "roomId") + addProperty("clientId", "clientId") + addProperty("text", "hello") + addProperty("createdAt", 1_000_000) + }, + ), + ) val messages = chatApi.getMessages("roomId", QueryOptions()) @@ -60,20 +51,14 @@ class ChatApiTest { @Test fun `getMessages should throws AblyException if some required fields are missing`() = runTest { - every { - realtime.requestAsync("GET", "/chat/v1/rooms/roomId/messages", any(), any(), any(), any()) - } answers { - val callback = lastArg() - callback.onResponse( - buildAsyncHttpPaginatedResponse( - listOf( - JsonObject().apply { - addProperty("foo", "bar") - }, - ), - ), - ) - } + mockMessagesApiResponse( + realtime, + listOf( + JsonObject().apply { + addProperty("foo", "bar") + }, + ), + ) val exception = assertThrows(AblyException::class.java) { runBlocking { chatApi.getMessages("roomId", QueryOptions()) } @@ -84,22 +69,14 @@ class ChatApiTest { @Test fun `sendMessage should ignore unknown fields for Chat Backend`() = runTest { - every { - realtime.requestAsync("POST", "/chat/v1/rooms/roomId/messages", any(), any(), any(), any()) - } answers { - val callback = lastArg() - callback.onResponse( - buildAsyncHttpPaginatedResponse( - listOf( - JsonObject().apply { - addProperty("foo", "bar") - addProperty("timeserial", "timeserial") - addProperty("createdAt", 1_000_000) - }, - ), - ), - ) - } + mockSendMessageApiResponse( + realtime, + JsonObject().apply { + addProperty("foo", "bar") + addProperty("timeserial", "timeserial") + addProperty("createdAt", 1_000_000) + }, + ) val message = chatApi.sendMessage("roomId", SendMessageParams(text = "hello")) @@ -119,21 +96,13 @@ class ChatApiTest { @Test fun `sendMessage should throw exception if 'timeserial' field is not presented`() = runTest { - every { - realtime.requestAsync("POST", "/chat/v1/rooms/roomId/messages", any(), any(), any(), any()) - } answers { - val callback = lastArg() - callback.onResponse( - buildAsyncHttpPaginatedResponse( - listOf( - JsonObject().apply { - addProperty("foo", "bar") - addProperty("createdAt", 1_000_000) - }, - ), - ), - ) - } + mockSendMessageApiResponse( + realtime, + JsonObject().apply { + addProperty("foo", "bar") + addProperty("createdAt", 1_000_000) + }, + ) assertThrows(AblyException::class.java) { runBlocking { chatApi.sendMessage("roomId", SendMessageParams(text = "hello")) } @@ -142,31 +111,15 @@ class ChatApiTest { @Test fun `getOccupancy should throw exception if 'connections' field is not presented`() = runTest { - every { - realtime.requestAsync("GET", "/chat/v1/rooms/roomId/occupancy", any(), any(), any(), any()) - } answers { - val callback = lastArg() - callback.onResponse( - buildAsyncHttpPaginatedResponse( - listOf( - JsonObject().apply { - addProperty("presenceMembers", 1_000) - }, - ), - ), - ) - } + mockOccupancyApiResponse( + realtime, + JsonObject().apply { + addProperty("presenceMembers", 1_000) + }, + ) assertThrows(AblyException::class.java) { runBlocking { chatApi.getOccupancy("roomId") } } } } - -private fun buildAsyncHttpPaginatedResponse(items: List): AsyncHttpPaginatedResponse { - val response = mockk() - every { - response.items() - } returns items.toTypedArray() - return response -} diff --git a/chat-android/src/test/java/com/ably/chat/MessagesTest.kt b/chat-android/src/test/java/com/ably/chat/MessagesTest.kt new file mode 100644 index 0000000..3f4ac5a --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/MessagesTest.kt @@ -0,0 +1,189 @@ +package com.ably.chat + +import com.google.gson.JsonObject +import io.ably.lib.realtime.AblyRealtime.Channels +import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.ChannelState +import io.ably.lib.realtime.ChannelStateListener +import io.ably.lib.realtime.buildChannelStateChange +import io.ably.lib.realtime.buildRealtimeChannel +import io.ably.lib.types.AblyException +import io.ably.lib.types.MessageExtras +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.spyk +import io.mockk.verify +import kotlin.time.Duration.Companion.seconds +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Assert.assertThrows +import org.junit.Before +import org.junit.Test + +class MessagesTest { + + private val realtimeClient = mockk(relaxed = true) + private val realtimeChannels = mockk(relaxed = true) + private val realtimeChannel = spyk(buildRealtimeChannel()) + private val chatApi = spyk(ChatApi(realtimeClient, "clientId")) + private lateinit var messages: DefaultMessages + + private val channelStateListenerSlot = slot() + private val channelStateOnceSlot = slot() + + @Before + fun setUp() { + every { realtimeChannels.get(any(), any()) } returns realtimeChannel + + every { realtimeChannel.on(ChannelState.attached, capture(channelStateListenerSlot)) } answers { + println("Channel state listener registered") + } + + every { realtimeChannel.once(ChannelState.attached, capture(channelStateOnceSlot)) } answers { + println("Channel state once listener registered") + } + + messages = DefaultMessages( + roomId = "room1", + realtimeChannels = realtimeChannels, + chatApi = chatApi, + ) + } + + @Test + fun `should be able to send message and get it back from response`() = runTest { + mockSendMessageApiResponse( + realtimeClient, + JsonObject().apply { + addProperty("timeserial", "abcdefghij@1672531200000-123") + addProperty("createdAt", 1_000_000) + }, + roomId = "room1", + ) + + val sentMessage = messages.send( + SendMessageParams( + text = "lala", + headers = mapOf("foo" to "bar"), + metadata = mapOf("meta" to "data"), + ), + ) + + assertEquals( + Message( + timeserial = "abcdefghij@1672531200000-123", + clientId = "clientId", + roomId = "room1", + text = "lala", + createdAt = 1_000_000, + metadata = mapOf("meta" to "data"), + headers = mapOf("foo" to "bar"), + ), + sentMessage, + ) + } + + @Test + fun `should be able to subscribe to incoming messages`() = runTest(timeout = 1.seconds) { + val pubSubMessageListenerSlot = slot() + + every { realtimeChannel.subscribe("message.created", capture(pubSubMessageListenerSlot)) } answers { + println("Pub/Sub message listener registered") + } + + val deferredValue = DeferredValue() + + messages.subscribe { + deferredValue.completeWith(it) + } + + verify { realtimeChannel.subscribe("message.created", any()) } + + pubSubMessageListenerSlot.captured.onMessage( + PubSubMessage().apply { + data = JsonObject().apply { + addProperty("text", "some text") + } + clientId = "clientId" + timestamp = 1000L + extras = MessageExtras( + JsonObject().apply { + addProperty("timeserial", "abcdefghij@1672531200000-123") + add( + "headers", + JsonObject().apply { + addProperty("foo", "bar") + }, + ) + }, + ) + }, + ) + + val messageEvent = deferredValue.await() + + assertEquals(MessageEventType.Created, messageEvent.type) + assertEquals( + Message( + roomId = "room1", + createdAt = 1000L, + clientId = "clientId", + timeserial = "abcdefghij@1672531200000-123", + text = "some text", + metadata = mapOf(), + headers = mapOf("foo" to "bar"), + ), + messageEvent.message, + ) + } + + @Test + fun `should throw an exception for listener history if not subscribed`() = runTest { + val subscription = messages.subscribe {} + + subscription.cancel() + + val exception = assertThrows(AblyException::class.java) { + runBlocking { subscription.getPreviousMessages() } + } + + assertEquals(40_000, exception.errorInfo.code) + } + + @Test + fun `every subscription should have own channel serial`() = runTest { + messages.channel.properties.channelSerial = "channel-serial-1" + messages.channel.state = ChannelState.attached + + val subscription1 = (messages.subscribe {}) as DefaultMessagesSubscription + assertEquals("channel-serial-1", subscription1.fromSerialProvider().await()) + + messages.channel.properties.channelSerial = "channel-serial-2" + val subscription2 = (messages.subscribe {}) as DefaultMessagesSubscription + + assertEquals("channel-serial-2", subscription2.fromSerialProvider().await()) + assertEquals("channel-serial-1", subscription1.fromSerialProvider().await()) + } + + @Test + fun `subscription should update channel serial after reattach with resume = false`() = runTest { + messages.channel.properties.channelSerial = "channel-serial-1" + messages.channel.state = ChannelState.attached + + val subscription1 = (messages.subscribe {}) as DefaultMessagesSubscription + assertEquals("channel-serial-1", subscription1.fromSerialProvider().await()) + + messages.channel.properties.channelSerial = "channel-serial-2" + channelStateListenerSlot.captured.onChannelStateChanged( + buildChannelStateChange( + current = ChannelState.attached, + previous = ChannelState.attaching, + resumed = false, + ), + ) + + assertEquals("channel-serial-2", subscription1.fromSerialProvider().await()) + } +} diff --git a/chat-android/src/test/java/com/ably/chat/TestUtils.kt b/chat-android/src/test/java/com/ably/chat/TestUtils.kt new file mode 100644 index 0000000..e493418 --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/TestUtils.kt @@ -0,0 +1,51 @@ +package com.ably.chat + +import com.google.gson.JsonElement +import io.ably.lib.types.AsyncHttpPaginatedResponse +import io.mockk.every +import io.mockk.mockk + +fun buildAsyncHttpPaginatedResponse(items: List): AsyncHttpPaginatedResponse { + val response = mockk() + every { + response.items() + } returns items.toTypedArray() + return response +} + +fun mockMessagesApiResponse(realtimeClientMock: RealtimeClient, response: List, roomId: String = "roomId") { + every { + realtimeClientMock.requestAsync("GET", "/chat/v1/rooms/$roomId/messages", any(), any(), any(), any()) + } answers { + val callback = lastArg() + callback.onResponse( + buildAsyncHttpPaginatedResponse(response), + ) + } +} + +fun mockSendMessageApiResponse(realtimeClientMock: RealtimeClient, response: JsonElement, roomId: String = "roomId") { + every { + realtimeClientMock.requestAsync("POST", "/chat/v1/rooms/$roomId/messages", any(), any(), any(), any()) + } answers { + val callback = lastArg() + callback.onResponse( + buildAsyncHttpPaginatedResponse( + listOf(response), + ), + ) + } +} + +fun mockOccupancyApiResponse(realtimeClientMock: RealtimeClient, response: JsonElement, roomId: String = "roomId") { + every { + realtimeClientMock.requestAsync("GET", "/chat/v1/rooms/$roomId/occupancy", any(), any(), any(), any()) + } answers { + val callback = lastArg() + callback.onResponse( + buildAsyncHttpPaginatedResponse( + listOf(response), + ), + ) + } +} diff --git a/chat-android/src/test/java/io/ably/lib/realtime/RealtimeUtils.kt b/chat-android/src/test/java/io/ably/lib/realtime/RealtimeUtils.kt new file mode 100644 index 0000000..c879f5f --- /dev/null +++ b/chat-android/src/test/java/io/ably/lib/realtime/RealtimeUtils.kt @@ -0,0 +1,28 @@ +package io.ably.lib.realtime + +import io.ably.lib.realtime.ChannelStateListener.ChannelStateChange +import io.ably.lib.types.ClientOptions +import io.ably.lib.types.ErrorInfo + +/** + * This function build ChannelStateChange object, which is package-private in ably-java. + * + * We can get red of it, if we decide to increase constructor visibility + */ +fun buildChannelStateChange( + current: ChannelState, + previous: ChannelState, + reason: ErrorInfo? = null, + resumed: Boolean = true, +): ChannelStateChange = ChannelStateChange(current, previous, reason, resumed) + +/** + * This function build realtime Channel object, it has backlink to the realtime client + * and also package-private constructor. + */ +fun buildRealtimeChannel(channelName: String = "channel") = AblyRealtime( + ClientOptions().apply { + key = "dummy-key" + autoConnect = false + }, +).channels.get(channelName) diff --git a/example/src/main/java/com/ably/chat/example/MainActivity.kt b/example/src/main/java/com/ably/chat/example/MainActivity.kt index 8fc363b..eb10b1f 100644 --- a/example/src/main/java/com/ably/chat/example/MainActivity.kt +++ b/example/src/main/java/com/ably/chat/example/MainActivity.kt @@ -14,12 +14,14 @@ import androidx.compose.foundation.layout.fillMaxWidth import androidx.compose.foundation.layout.imePadding import androidx.compose.foundation.layout.padding import androidx.compose.foundation.lazy.LazyColumn +import androidx.compose.foundation.lazy.rememberLazyListState import androidx.compose.foundation.shape.RoundedCornerShape import androidx.compose.material3.Button import androidx.compose.material3.Scaffold import androidx.compose.material3.Text import androidx.compose.material3.TextField import androidx.compose.runtime.Composable +import androidx.compose.runtime.DisposableEffect import androidx.compose.runtime.getValue import androidx.compose.runtime.mutableStateOf import androidx.compose.runtime.remember @@ -30,10 +32,8 @@ import androidx.compose.ui.Modifier import androidx.compose.ui.graphics.Color import androidx.compose.ui.text.input.TextFieldValue import androidx.compose.ui.unit.dp -import com.ably.chat.ChatApi +import com.ably.chat.ChatClient import com.ably.chat.Message -import com.ably.chat.QueryOptions -import com.ably.chat.QueryOptions.MessageOrder.OldestFirst import com.ably.chat.RealtimeClient import com.ably.chat.SendMessageParams import com.ably.chat.example.ui.theme.AblyChatExampleTheme @@ -46,6 +46,7 @@ val randomClientId = UUID.randomUUID().toString() class MainActivity : ComponentActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) + val realtimeClient = RealtimeClient( ClientOptions().apply { key = BuildConfig.ABLY_KEY @@ -53,13 +54,15 @@ class MainActivity : ComponentActivity() { logLevel = 2 }, ) - val chatApi = ChatApi(realtimeClient, randomClientId) + + val chatClient = ChatClient(realtimeClient) + enableEdgeToEdge() setContent { AblyChatExampleTheme { Scaffold(modifier = Modifier.fillMaxSize()) { innerPadding -> Chat( - chatApi, + chatClient, modifier = Modifier.padding(innerPadding), ) } @@ -69,29 +72,42 @@ class MainActivity : ComponentActivity() { } @Composable -fun Chat(chatApi: ChatApi, modifier: Modifier = Modifier) { +fun Chat(chatClient: ChatClient, modifier: Modifier = Modifier) { var messageText by remember { mutableStateOf(TextFieldValue("")) } var sending by remember { mutableStateOf(false) } var messages by remember { mutableStateOf(listOf()) } + val listState = rememberLazyListState() val coroutineScope = rememberCoroutineScope() val roomId = "my-room" + val room = chatClient.rooms.get(roomId) - Column( - modifier = Modifier.fillMaxSize(), - verticalArrangement = Arrangement.SpaceBetween, - ) { - Button(modifier = modifier.align(Alignment.CenterHorizontally), onClick = { + DisposableEffect(Unit) { + val subscription = room.messages.subscribe { + messages += it.message coroutineScope.launch { - messages = chatApi.getMessages(roomId, QueryOptions(orderBy = OldestFirst)).items + listState.animateScrollToItem(messages.size - 1) } - }) { - Text("Load") } + coroutineScope.launch { + messages = subscription.getPreviousMessages().items.reversed() + if (messages.isNotEmpty()) listState.animateScrollToItem(messages.size - 1) + } + + onDispose { + subscription.cancel() + } + } + + Column( + modifier = modifier.fillMaxSize(), + verticalArrangement = Arrangement.SpaceBetween, + ) { LazyColumn( modifier = Modifier.weight(1f).padding(16.dp), userScrollEnabled = true, + state = listState, ) { items(messages.size) { index -> MessageBubble(messages[index]) @@ -105,8 +121,7 @@ fun Chat(chatApi: ChatApi, modifier: Modifier = Modifier) { ) { sending = true coroutineScope.launch { - chatApi.sendMessage( - roomId, + room.messages.send( SendMessageParams( text = messageText.text, ), @@ -160,7 +175,6 @@ fun ChatInputField( TextField( value = messageInput, onValueChange = onMessageChange, - readOnly = sending, modifier = Modifier .weight(1f) .background(Color.White),