Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example with Ktor and Kafka #1

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions alert/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
@file:Suppress("DSL_SCOPE_VIOLATION")

plugins {
alias(libs.plugins.kotlin.multiplatform)
alias(libs.plugins.kotlinx.serialization)
}

kotlin {
explicitApi()

// set targets
jvm {
jvmToolchain(8)
}



sourceSets {
val jvmMain by getting {
dependencies {
implementation(projects.kotestTraceCore)
implementation(projects.kotestTraceKtor)
implementation(libs.ktor.client.resources)
implementation(libs.ktor.client.contentNegotiation)
implementation(libs.ktor.server.resources)
implementation(libs.ktor.server.contentNegotiation)
implementation(libs.ktor.server.test)
implementation(libs.ktor.client.resources)
implementation(libs.kotlinx.serialization.json)
implementation(libs.ktor.serialization.json)
implementation(libs.kotest.frameworkEngine)
implementation(libs.kotest.assertionsCore)
implementation(libs.kotest.property)
implementation(libs.kafka)
implementation(libs.testcontainers.kafka)
implementation(libs.arrow.fx.stm)
}
}
}
}

tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().all {
kotlinOptions.freeCompilerArgs += "-Xcontext-receivers"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.fortyseven.kotest.trace.alert

// these things may happen in our system
public sealed interface Action {
public data class Subscribe(val event: String, val user: String): Action
public data class Event(val event: String, val info: String): Action
public object ReadMessageQueue: Action
}

// this is the type in the Kafka queue
public data class Message(val event: String, val info: String, val user: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.fortyseven.kotest.trace.alert

import arrow.fx.stm.TMap
import arrow.fx.stm.atomically
import io.ktor.resources.*
import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.request.*
import io.ktor.server.resources.Resources
import io.ktor.server.resources.post
import io.ktor.server.response.*
import io.ktor.server.routing.*
import kotlinx.serialization.Serializable

public object Routes {
@Serializable @Resource("/subscribe")
public class Subscribe() {
@Serializable
public data class Request(val event: String, val user: String)
}

@Serializable @Resource("/event")
public class Event() {
@Serializable
public data class Request(val event: String, val info: String)
}
}

// this is what we use to send the messages
// eventually a Kafka queue
public interface MessageSender {
public suspend fun Message.send(): Unit
}

context(MessageSender) public suspend fun Application.server() {
install(ContentNegotiation) { json() }
install(Resources)

val subscriptions: TMap<String, List<String>> = TMap.new()
routing {
post<Routes.Subscribe> {
val req = call.receive<Routes.Subscribe.Request>()
atomically {
subscriptions[req.event] = subscriptions[req.event].orEmpty() + req.user
}
call.respond("OK")
}

post<Routes.Event> {
val req = call.receive<Routes.Event.Request>()
val users = atomically { subscriptions[req.event].orEmpty() }
users.forEach { Message(req.event, req.info, it).send() }
}
}
}
6 changes: 6 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ kotlin = "1.8.0"
kotest = "5.5.4"
ktor = "2.2.3"
serialization = "1.5.0-RC"
kafka = "0.3.0"
testcontainers = "1.17.6"
arrow = "1.1.4"

[libraries]
kotest-assertionsCore = { module = "io.kotest:kotest-assertions-core", version.ref = "kotest" }
Expand All @@ -18,6 +21,9 @@ ktor-server-resources = { module = "io.ktor:ktor-server-resources", version.ref
ktor-server-contentNegotiation = { module = "io.ktor:ktor-server-content-negotiation", version.ref = "ktor" }
ktor-client-resources = { module = "io.ktor:ktor-client-resources", version.ref = "ktor" }
ktor-client-contentNegotiation = { module = "io.ktor:ktor-client-content-negotiation", version.ref = "ktor" }
kafka = { module = "io.github.nomisrev:kotlin-kafka", version.ref = "kafka" }
testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "testcontainers" }
arrow-fx-stm = { module = "io.arrow-kt:arrow-fx-stm", version.ref = "arrow" }

[plugins]
kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
Expand Down
3 changes: 3 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ project(":kotest-trace-core").projectDir = file("core")

include("kotest-trace-ktor")
project(":kotest-trace-ktor").projectDir = file("ktor")

include("kotest-trace-example-alert")
project(":kotest-trace-example-alert").projectDir = file("alert")