Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New buffer-based api #89

Merged
merged 11 commits into from
Aug 5, 2024
3 changes: 1 addition & 2 deletions .idea/codeStyles/Project.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion .idea/gradle.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions build-logic/src/main/kotlin/plugin.common.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,14 @@ tasks {
}
}
}

kotlin.targets.withType<KotlinNativeTarget> {
// Do not activate backtrace for Mingw
// https://kotlinlang.org/docs/whatsnew1620.html?_ga=2.5870007.58710271.1649248900-2086887657.1620731764#better-stack-traces-with-libbacktrace
// https://youtrack.jetbrains.com/issue/KT-51866/Compile-error-to-mingwX64-with-libbacktrace
if (this.konanTarget.family != Family.MINGW) {
binaries.all {
binaryOptions["sourceInfoType"] = "libbacktrace"
}
}
}
1 change: 0 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ kotlin.code.style=official
kotlin.mpp.enableCInteropCommonization=true
kotlin.native.ignoreDisabledTargets=true
kotlin.native.ignoreIncorrectDependencies=true
#kotlin.native.binary.sourceInfoType=libbacktrace
kotlin.js.generate.executable.default=false
kotlin.mpp.stability.nowarn=true
kotlin.tests.individualTaskReports=true
Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ kotlinx-coroutines = "1.9.0-RC"
kotlinx-atomicfu = "0.25.0"
kotlinx-benchmark = "0.4.11"
kotlinx-cli = "0.3.6"
kotlinx-io = "0.5.1"

ktor = "3.0.0-beta-2"
kermit = "2.0.4"
Expand Down Expand Up @@ -44,6 +45,7 @@ kotlinx-coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-t
kotlinx-atomicfu = { module = "org.jetbrains.kotlinx:atomicfu", version.ref = "kotlinx-atomicfu" }
kotlinx-benchmark-runtime = { module = "org.jetbrains.kotlinx:kotlinx-benchmark-runtime", version.ref = "kotlinx-benchmark" }
kotlinx-cli = { module = "org.jetbrains.kotlinx:kotlinx-cli", version.ref = "kotlinx-cli" }
kotlinx-io-core = { module = "org.jetbrains.kotlinx:kotlinx-io-core", version.ref = "kotlinx-io" }

ktor-io = { module = "io.ktor:ktor-io", version.ref = "ktor" }
ktor-network = { module = "io.ktor:ktor-network", version.ref = "ktor" }
Expand Down
1 change: 1 addition & 0 deletions kzmq-benchmarks/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ kotlin {
dependencies {
implementation(libs.kotlinx.benchmark.runtime)
implementation(project(":kzmq-core"))
implementation(libs.kotlinx.io.core)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package org.zeromq.benchmarks

import kotlinx.benchmark.*
import kotlinx.coroutines.*
import kotlinx.io.*
import kotlinx.io.bytestring.*
import org.zeromq.*
import kotlin.random.*

Expand All @@ -23,7 +25,7 @@ open class PullPushBenchmark() {
@Param("10", "100", "1000", "10000", "100000")
var messageSize = 10

private lateinit var message: Message
private lateinit var messageData: ByteString

private lateinit var scope: CoroutineScope
private lateinit var context: Context
Expand All @@ -41,7 +43,7 @@ open class PullPushBenchmark() {
else -> error("Unsuported transport '$transport'")
}

message = Message(ByteArray(messageSize))
messageData = ByteString(ByteArray(messageSize))

val engine = engines.find { it.name.lowercase() == engineName } ?: error("Engine '$engineName' not found")
if (!engine.supportedTransports.contains(transport))
Expand All @@ -63,8 +65,8 @@ open class PullPushBenchmark() {
}

@Benchmark
fun sendReceive() = runBlocking {
pushSocket.send(message)
pullSocket.receive()
fun sendReceive(blackhole: Blackhole) = runBlocking {
pushSocket.send { writeFrame { write(messageData) } }
blackhole.consume(pullSocket.receive { readFrame { readByteString() } })
}
}
5 changes: 5 additions & 0 deletions kzmq-cio/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,10 @@ kotlin {
implementation(libs.kermit)
}
}
commonTest {
dependencies {
implementation(project(":kzmq-test"))
}
}
}
}
63 changes: 22 additions & 41 deletions kzmq-cio/src/commonMain/kotlin/org/zeromq/CIODealerSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.zeromq

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.io.bytestring.*
import org.zeromq.internal.*
import org.zeromq.internal.utils.*

Expand Down Expand Up @@ -60,52 +61,13 @@ internal class CIODealerSocket(
) : CIOSocket(engine, Type.DEALER), CIOSendSocket, CIOReceiveSocket, DealerSocket {

override val validPeerTypes: Set<Type> get() = validPeerSocketTypes

override val sendChannel = Channel<Message>()
override val receiveChannel = Channel<Message>()

init {
setHandler {
val forwardJobs = JobMap<PeerMailbox>()

while (isActive) {
val (kind, peerMailbox) = peerEvents.receive()
when (kind) {
PeerEvent.Kind.ADDITION -> forwardJobs.add(peerMailbox) { dispatchRequestsReplies(peerMailbox) }
PeerEvent.Kind.REMOVAL -> forwardJobs.remove(peerMailbox)
else -> {}
}
}
}
}

private fun CoroutineScope.dispatchRequestsReplies(peerMailbox: PeerMailbox) = launch {
launch {
while (isActive) {
val request = sendChannel.receive()
logger.d { "Dispatching request $request to $peerMailbox" }
peerMailbox.sendChannel.send(CommandOrMessage(request))
}
}
launch {
try {
while (isActive) {
val reply = peerMailbox.receiveChannel.receive().messageOrThrow()
logger.d { "Dispatching reply $reply from $peerMailbox" }
receiveChannel.send(reply)
}
} catch (e: ClosedReceiveChannelException) {
// Coroutine's cancellation happened while suspending on receive
// and the receiveChannel of the peerMailbox has already been closed
}
}
}
override val handler = setupHandler(DealerSocketHandler())

override var conflate: Boolean
get() = TODO("Not yet implemented")
set(value) {}

override var routingId: ByteArray? by options::routingId
override var routingId: ByteString? by options::routingId

override var probeRouter: Boolean
get() = TODO("Not yet implemented")
Expand All @@ -115,3 +77,22 @@ internal class CIODealerSocket(
private val validPeerSocketTypes = setOf(Type.REP, Type.ROUTER)
}
}

internal class DealerSocketHandler : SocketHandler {
private val mailboxes = CircularQueue<PeerMailbox>()

override suspend fun handle(peerEvents: ReceiveChannel<PeerEvent>) = coroutineScope {
while (isActive) {
mailboxes.update(peerEvents.receive())
}
}

override suspend fun send(message: Message) {
mailboxes.sendToFirstAvailable(message)
}

override suspend fun receive(): Message {
val (_, message) = mailboxes.receiveFromFirst()
return message
}
}
77 changes: 35 additions & 42 deletions kzmq-cio/src/commonMain/kotlin/org/zeromq/CIOPairSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.zeromq

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.zeromq.internal.*
Expand Down Expand Up @@ -36,7 +37,7 @@ import org.zeromq.internal.*
* socket SHALL destroy its double queue and SHALL discard any messages it contains.
* 6. SHOULD constrain incoming and outgoing queue sizes to a runtime-configurable limit.
*
* B. For processing incoming messages:
* B. For processing outgoing messages:
* 1. SHALL consider its peer as available only when it has a outgoing queue that is not full.
* 2. SHALL block on sending, or return a suitable error, when it has no available peer.
* 3. SHALL not accept further messages when it has no available peer.
Expand All @@ -51,56 +52,48 @@ internal class CIOPairSocket(
) : CIOSocket(engine, Type.PAIR), CIOReceiveSocket, CIOSendSocket, PairSocket {

override val validPeerTypes: Set<Type> get() = validPeerSocketTypes
override val handler = setupHandler(PairSocketHandler())

override val receiveChannel = Channel<Message>()
override val sendChannel = Channel<Message>()

init {
setHandler {
var forwardJob: Job? = null

while (isActive) {
val (kind, peerMailbox) = peerEvents.receive()
when (kind) {
PeerEvent.Kind.ADDITION -> {
// FIXME what should we do if it already has a peer?
if (forwardJob != null) continue

forwardJob = forwardJob(peerMailbox)
}

PeerEvent.Kind.REMOVAL -> {
if (forwardJob == null) continue
companion object {
private val validPeerSocketTypes = setOf(Type.PAIR)
}
}

forwardJob.cancel()
forwardJob = null
}
internal class PairSocketHandler : SocketHandler {
private val mailbox = atomic<PeerMailbox?>(null)

else -> {}
}
}
private suspend fun awaitCurrentPeer() {
var counter = 0
while (mailbox.value == null) {
if (counter++ < 100) println("in awaitCurrentPeer: ${mailbox.value}")
yield()
}
}

private fun CoroutineScope.forwardJob(mailbox: PeerMailbox) = launch {
launch {
while (isActive) {
val message = sendChannel.receive()
logger.v { "Sending $message to $mailbox" }
mailbox.sendChannel.send(CommandOrMessage(message))
}
}
launch {
while (isActive) {
val commandOrMessage = mailbox.receiveChannel.receive()
val message = commandOrMessage.messageOrThrow()
logger.v { "Receiving $message from $mailbox" }
receiveChannel.send(message)
override suspend fun handle(peerEvents: ReceiveChannel<PeerEvent>) = coroutineScope {
while (isActive) {
val (kind, peerMailbox) = peerEvents.receive()
when (kind) {
PeerEvent.Kind.ADDITION -> mailbox.value = peerMailbox
PeerEvent.Kind.REMOVAL -> mailbox.value = null
else -> {}
}
}
}

companion object {
private val validPeerSocketTypes = setOf(Type.PAIR)
override suspend fun send(message: Message) {
awaitCurrentPeer()
val mailbox = mailbox.value!!
logger.v { "Sending $message to $mailbox" }
mailbox.sendChannel.send(CommandOrMessage(message))
}

override suspend fun receive(): Message {
awaitCurrentPeer()
val mailbox = mailbox.value!!
val commandOrMessage = mailbox.receiveChannel.receive()
val message = commandOrMessage.messageOrThrow()
logger.v { "Receiving $message from $mailbox" }
return message
}
}
Loading
Loading