Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOCS] Using KafkaReceiver with Ktor Server (cancellation, and terminal events) #76

Open
ageorgousakis opened this issue Sep 27, 2022 · 3 comments

Comments

@ageorgousakis
Copy link

Hi,

I'm trying to user KafkaReceiver inside Ktor 2. I'm not quite sure what is the best way to start KafkaReciever with Ktor.
What I did was to create an application plugin like the following and install it in Ktor engine.

import io.github.nomisRev.kafka.map
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import io.ktor.server.application.*
import io.ktor.server.application.hooks.*
import io.ktor.server.config.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
import java.util.*

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.ByteArrayDeserializer

@JvmInline
value class Key(val key: String)

@JvmInline
value class Message(val content: String)

val EntityUpdateEventPlugin = createApplicationPlugin(name = "EntityUpdateEventPlugin") {
    on(MonitoringEvent(ApplicationStarted)) { application ->
        val topicName = "odm-entity-update-event"
        val environment = application.environment
        val bootstrapServers = environment.config.property("kafka.bootstrapServers").getString()
        val kafkaPropertiesConfig = environment.config.config("kafka.properties")
        val receiverProperties = Properties().apply {
            kafkaPropertiesConfig.propertyOrNull(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)?.let {
                this[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = it.getString()
            }
            kafkaPropertiesConfig.propertyOrNull(SaslConfigs.SASL_JAAS_CONFIG)?.let {
                this[SaslConfigs.SASL_JAAS_CONFIG] = it.getString()
            }
            kafkaPropertiesConfig.propertyOrNull(SaslConfigs.SASL_MECHANISM)?.let {
                this[SaslConfigs.SASL_MECHANISM] = it.getString()
            }
        }

        runBlocking(Dispatchers.Default) {
            coroutineScope {
                launch(Dispatchers.IO) {
                    val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
                        bootstrapServers,
                        StringDeserializer().map(::Key),
                        ByteArrayDeserializer().map {
                            Message(String(it))
                        },
                        groupId = "my-event-group",
                        autoOffsetReset = AutoOffsetReset.Earliest,
                        properties = receiverProperties
                    )
                    KafkaReceiver(settings)
                        .receive(topicName)
                        .map { "${it.key()} -> ${it.value()}" }
                        .collect {
                            application.log.info(it)
                        }
                }
            }
        }
    }
}

I seems the receiver starts and collects the messages. The question is how can we stop the receiver when Ktor shutdowns. When I stop the Ktor application it seems stuck and I have to kill the process.

@ageorgousakis
Copy link
Author

I found a way to make it work. It seems runBlocking blocks the event loop of Ktor engine. The receiver and Ktor route handlers works fine if it is started in the following way:

on(MonitoringEvent(ApplicationStarted)) { application ->
  // .. code to initialise the receiver
  CoroutineScope(Dispatchers.IO).launch {
    receiver
        .receive("my-topic")
        .collect {
            application.log.info("${it.key()} -> ${it.value()}")
            it.offset.acknowledge()
        }
  }
}

I don't know which is the best way to start a receiver in Ktor.

@nomisRev Can you recommend or give us an example how to use it with Ktor?

Regards

@nomisRev
Copy link
Owner

nomisRev commented Sep 27, 2022

Hey @ageorgousakis,

Thanks for your interest in the library, and opening a ticket! 🙏
As you've show there already, the returning Flow is coupled to it's surrounding CoroutineScope.
The simplest way to running a Flow for the same length as the server is to use launchIn with Application.

So to provide a small snippet:

val flow: Flow<Unit> = receiver
        .receive("my-topic")
        .map { // <-- changed collect to map, so the result is still a Flow
            application.log.info("${it.key()} -> ${it.value()}")
            it.offset.acknowledge()
        }

val application: Application = TODO("This is the Application from Ktor")
flow.launchIn(application)

The Ktor Application implements CoroutineScope, and it cancels the CoroutineScope when the server is cancelled.
So it will also cancel your Flow.

NOTE: I've run into some issues where Ktor doesn't properly cancel on SIGINT from K8S for example, and I've therefor build suspendapp.
SuspendApp With Ktor and K8S
SuspendApp with Kafka.

@nomisRev
Copy link
Owner

Alternatively, you could also use ApplicationEngine#addShutdownHook and use it to manually control a CoroutineScope.

The take-away is the the Flow cancels with the CoroutineScope it's called from.

Hope that helps @ageorgousakis !
Let's leave this issue open, so it can serve as reminder to include this information it in the documentation 👍

@nomisRev nomisRev changed the title How can we stop KafkaReceiver when Ktor shutdowns? [DOCS] Using KafkaReceiver with Ktor Server (cancellation, and terminal events) Sep 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants