Skip to content

CallbackFlow

Devrath edited this page Aug 15, 2024 · 2 revisions

About callback flow

  • Callback flow is a flow builder in kotlin that allows us to convert the callback into the flow.
  • It is useful in scenarios where an API is built as callbacks but we consume it as flow.
  • By transforming the callback into flow we convert eh callbacks so that they can be processed by coroutines.
  • This helps in using the operators that flow provides.

Let us consider the example of WebSocket

import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import okhttp3.*

fun createWebSocketFlow(url: String): Flow<String> = callbackFlow {
    val client = OkHttpClient()
    val request = Request.Builder().url(url).build()

    val webSocketListener = object : WebSocketListener() {
        override fun onOpen(webSocket: WebSocket, response: Response) {
            // WebSocket connection established
            // You can send a message to the server if needed
            webSocket.send("Hello Server!")
        }

        override fun onMessage(webSocket: WebSocket, text: String) {
            // New message received from the server
            trySend(text).isSuccess
        }

        override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
            // Handle failure
            close(t)
        }

        override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
            // WebSocket closed
            close()
        }
    }

    val webSocket = client.newWebSocket(request, webSocketListener)

    // Close the WebSocket when the flow collector is done
    awaitClose {
        webSocket.close(1000, "Flow collector finished")
        client.dispatcher.executorService.shutdown()
    }
}

fun main() = runBlocking {
    val webSocketFlow = createWebSocketFlow("wss://example.com/socket")

    launch {
        webSocketFlow.collect { message ->
            println("Received: $message")
        }
    }
}
Clone this wiki locally