From 8b3b0769002eb3d85caa980006e80f1a18e6f634 Mon Sep 17 00:00:00 2001 From: evgeny Date: Mon, 9 Sep 2024 12:30:51 +0100 Subject: [PATCH] WIP: basic chat implementation --- .../src/main/java/com/ably/chat/ChatApi.kt | 9 ++- .../src/main/java/com/ably/chat/ChatClient.kt | 3 +- .../src/main/java/com/ably/chat/Messages.kt | 79 +++++++++++++++++-- .../src/main/java/com/ably/chat/Rooms.kt | 15 ++-- .../src/main/java/com/ably/chat/Utils.kt | 2 +- .../com/ably/chat/example/MainActivity.kt | 41 ++++++---- 6 files changed, 111 insertions(+), 38 deletions(-) diff --git a/chat-android/src/main/java/com/ably/chat/ChatApi.kt b/chat-android/src/main/java/com/ably/chat/ChatApi.kt index cbd9733..50880bf 100644 --- a/chat-android/src/main/java/com/ably/chat/ChatApi.kt +++ b/chat-android/src/main/java/com/ably/chat/ChatApi.kt @@ -17,19 +17,20 @@ private const val API_PROTOCOL_VERSION = 3 private const val PROTOCOL_VERSION_PARAM_NAME = "v" private val apiProtocolParam = Param(PROTOCOL_VERSION_PARAM_NAME, API_PROTOCOL_VERSION.toString()) -// TODO make this class internal -class ChatApi(private val realtimeClient: RealtimeClient, private val clientId: String) { +internal class ChatApi(private val realtimeClient: RealtimeClient, private val clientId: String) { /** * Get messages from the Chat Backend * * @return paginated result with messages */ - suspend fun getMessages(roomId: String, params: QueryOptions): PaginatedResult { + suspend fun getMessages(roomId: String, options: QueryOptions, fromSerial: String? = null): PaginatedResult { + val baseParams = options.toParams() + val params = fromSerial?.let { baseParams + Param("fromSerial", it) } ?: baseParams return makeAuthorizedPaginatedRequest( url = "/chat/v1/rooms/$roomId/messages", method = "GET", - params = params.toParams(), + params = params, ) { Message( timeserial = it.requireString("timeserial"), diff --git a/chat-android/src/main/java/com/ably/chat/ChatClient.kt b/chat-android/src/main/java/com/ably/chat/ChatClient.kt index b621db9..e933666 100644 --- a/chat-android/src/main/java/com/ably/chat/ChatClient.kt +++ b/chat-android/src/main/java/com/ably/chat/ChatClient.kt @@ -37,7 +37,8 @@ interface ChatClient { val clientOptions: ClientOptions } -fun ChatClient(realtimeClient: RealtimeClient, clientOptions: ClientOptions): ChatClient = DefaultChatClient(realtimeClient, clientOptions) +fun ChatClient(realtimeClient: RealtimeClient, clientOptions: ClientOptions = ClientOptions()): ChatClient = + DefaultChatClient(realtimeClient, clientOptions) internal class DefaultChatClient( override val realtime: RealtimeClient, diff --git a/chat-android/src/main/java/com/ably/chat/Messages.kt b/chat-android/src/main/java/com/ably/chat/Messages.kt index edad9b5..bad6317 100644 --- a/chat-android/src/main/java/com/ably/chat/Messages.kt +++ b/chat-android/src/main/java/com/ably/chat/Messages.kt @@ -2,7 +2,13 @@ package com.ably.chat +import com.ably.chat.QueryOptions.MessageOrder.NewestFirst +import com.google.gson.JsonObject import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.Channel.MessageListener +import io.ably.lib.realtime.ChannelState +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine /** * This interface is used to interact with messages in a chat room: subscribing @@ -91,7 +97,7 @@ data class QueryOptions( /** * The order of messages in the query result. */ - val orderBy: MessageOrder = MessageOrder.NewestFirst, + val orderBy: MessageOrder = NewestFirst, ) { /** * Represents direction to query messages in. @@ -169,26 +175,73 @@ data class SendMessageParams( val headers: MessageHeaders? = null, ) -interface MessagesSubscription: Cancellation { +interface MessagesSubscription : Cancellation { suspend fun getPreviousMessages(queryOptions: QueryOptions): PaginatedResult } -class DefaultMessages( +internal class DefaultMessagesSubscription( + private val chatApi: ChatApi, + private val roomId: String, + private val cancellation: Cancellation, + private val timeserialProvider: suspend () -> String, +) : MessagesSubscription { + override fun cancel() { + cancellation.cancel() + } + + override suspend fun getPreviousMessages(queryOptions: QueryOptions): PaginatedResult { + val fromSerial = timeserialProvider() + return chatApi.getMessages( + roomId = roomId, + options = queryOptions.copy(orderBy = NewestFirst), + fromSerial = fromSerial, + ) + } +} + +internal class DefaultMessages( private val roomId: String, - private val realtimeClient: RealtimeClient, + realtimeClient: RealtimeClient, private val chatApi: ChatApi, ) : Messages { + private var observers: Set = emptySet() + + private var channelSerial: String? = null + /** * the channel name for the chat messages channel. */ private val messagesChannelName = "$roomId::\$chat::\$chatMessages" - override val channel: Channel - get() = realtimeClient.channels.get(messagesChannelName, ChatChannelOptions()) + override val channel: Channel = realtimeClient.channels.get(messagesChannelName, ChatChannelOptions()) override fun subscribe(listener: Messages.Listener): MessagesSubscription { - TODO("Not yet implemented") + observers += listener + val messageListener = MessageListener { + val pubSubMessage = it!! + val chatMessage = Message( + roomId = roomId, + createdAt = pubSubMessage.timestamp, + clientId = pubSubMessage.clientId, + timeserial = pubSubMessage.extras.asJsonObject().get("timeserial").asString, + text = (pubSubMessage.data as JsonObject).get("text").asString, + metadata = mapOf(), // rawPubSubMessage.data.metadata + headers = mapOf(), // rawPubSubMessage.extras.headers + ) + observers.forEach { listener -> listener.onEvent(MessageEvent(type = MessageEventType.Created, message = chatMessage)) } + } + channel.subscribe(messageListener) + + return DefaultMessagesSubscription( + chatApi = chatApi, + roomId = roomId, + cancellation = { + observers -= listener + channel.unsubscribe(messageListener) + }, + timeserialProvider = { getChannelSerial() }, + ) } override suspend fun get(options: QueryOptions): PaginatedResult = chatApi.getMessages(roomId, options) @@ -198,4 +251,16 @@ class DefaultMessages( override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Cancellation { TODO("Not yet implemented") } + + private suspend fun readAttachmentProperties() = suspendCoroutine { continuation -> + channel.once(ChannelState.attached) { + continuation.resume(channel.properties) + } + } + + private suspend fun getChannelSerial(): String { + if (channelSerial != null) return channelSerial!! + channelSerial = readAttachmentProperties().channelSerial + return channelSerial!! + } } diff --git a/chat-android/src/main/java/com/ably/chat/Rooms.kt b/chat-android/src/main/java/com/ably/chat/Rooms.kt index 44d917d..cd5708b 100644 --- a/chat-android/src/main/java/com/ably/chat/Rooms.kt +++ b/chat-android/src/main/java/com/ably/chat/Rooms.kt @@ -1,8 +1,5 @@ package com.ably.chat -import io.ably.lib.types.AblyException -import io.ably.lib.types.ErrorInfo - /** * Manages the lifecycle of chat rooms. */ @@ -24,7 +21,7 @@ interface Rooms { * @throws {@link ErrorInfo} if a room with the same ID but different options already exists. * @returns Room A new or existing Room object. */ - fun get(roomId: String, options: RoomOptions): Room + fun get(roomId: String, options: RoomOptions = RoomOptions()): Room /** * Release the Room object if it exists. This method only releases the reference @@ -60,11 +57,11 @@ internal class DefaultRooms( ) } - if (room.options != options) { - throw AblyException.fromErrorInfo( - ErrorInfo("Room already exists with different options", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), - ) - } +// if (room.options != options) { +// throw AblyException.fromErrorInfo( +// ErrorInfo("Room already exists with different options", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), +// ) +// } room } diff --git a/chat-android/src/main/java/com/ably/chat/Utils.kt b/chat-android/src/main/java/com/ably/chat/Utils.kt index 531a4eb..4e6bcf8 100644 --- a/chat-android/src/main/java/com/ably/chat/Utils.kt +++ b/chat-android/src/main/java/com/ably/chat/Utils.kt @@ -39,7 +39,7 @@ suspend fun Channel.detachCoroutine() = suspendCoroutine { continuation -> fun ChatChannelOptions(init: (ChannelOptions.() -> Unit)? = null): ChannelOptions { val options = ChannelOptions() init?.let { options.it() } - options.params = options.params + mapOf( + options.params = (options.params ?: mapOf()) + mapOf( AGENT_PARAMETER_NAME to "chat-kotlin/${BuildConfig.APP_VERSION}", ) return options diff --git a/example/src/main/java/com/ably/chat/example/MainActivity.kt b/example/src/main/java/com/ably/chat/example/MainActivity.kt index 8fc363b..d18013e 100644 --- a/example/src/main/java/com/ably/chat/example/MainActivity.kt +++ b/example/src/main/java/com/ably/chat/example/MainActivity.kt @@ -20,6 +20,7 @@ import androidx.compose.material3.Scaffold import androidx.compose.material3.Text import androidx.compose.material3.TextField import androidx.compose.runtime.Composable +import androidx.compose.runtime.DisposableEffect import androidx.compose.runtime.getValue import androidx.compose.runtime.mutableStateOf import androidx.compose.runtime.remember @@ -30,10 +31,9 @@ import androidx.compose.ui.Modifier import androidx.compose.ui.graphics.Color import androidx.compose.ui.text.input.TextFieldValue import androidx.compose.ui.unit.dp -import com.ably.chat.ChatApi +import com.ably.chat.ChatClient import com.ably.chat.Message import com.ably.chat.QueryOptions -import com.ably.chat.QueryOptions.MessageOrder.OldestFirst import com.ably.chat.RealtimeClient import com.ably.chat.SendMessageParams import com.ably.chat.example.ui.theme.AblyChatExampleTheme @@ -46,6 +46,7 @@ val randomClientId = UUID.randomUUID().toString() class MainActivity : ComponentActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) + val realtimeClient = RealtimeClient( ClientOptions().apply { key = BuildConfig.ABLY_KEY @@ -53,13 +54,15 @@ class MainActivity : ComponentActivity() { logLevel = 2 }, ) - val chatApi = ChatApi(realtimeClient, randomClientId) + + val chatClient = ChatClient(realtimeClient) + enableEdgeToEdge() setContent { AblyChatExampleTheme { Scaffold(modifier = Modifier.fillMaxSize()) { innerPadding -> Chat( - chatApi, + chatClient, modifier = Modifier.padding(innerPadding), ) } @@ -69,26 +72,33 @@ class MainActivity : ComponentActivity() { } @Composable -fun Chat(chatApi: ChatApi, modifier: Modifier = Modifier) { +fun Chat(chatClient: ChatClient, modifier: Modifier = Modifier) { var messageText by remember { mutableStateOf(TextFieldValue("")) } var sending by remember { mutableStateOf(false) } var messages by remember { mutableStateOf(listOf()) } val coroutineScope = rememberCoroutineScope() val roomId = "my-room" + val room = chatClient.rooms.get(roomId) + + DisposableEffect(Unit) { + val subscription = room.messages.subscribe { + messages += it.message + } + + coroutineScope.launch { + messages = subscription.getPreviousMessages(QueryOptions()).items + } + + onDispose { + subscription.cancel() + } + } Column( - modifier = Modifier.fillMaxSize(), + modifier = modifier.fillMaxSize(), verticalArrangement = Arrangement.SpaceBetween, ) { - Button(modifier = modifier.align(Alignment.CenterHorizontally), onClick = { - coroutineScope.launch { - messages = chatApi.getMessages(roomId, QueryOptions(orderBy = OldestFirst)).items - } - }) { - Text("Load") - } - LazyColumn( modifier = Modifier.weight(1f).padding(16.dp), userScrollEnabled = true, @@ -105,8 +115,7 @@ fun Chat(chatApi: ChatApi, modifier: Modifier = Modifier) { ) { sending = true coroutineScope.launch { - chatApi.sendMessage( - roomId, + room.messages.send( SendMessageParams( text = messageText.text, ),