From f996e9df2b0e81709197c97f091fab6f1079c0d2 Mon Sep 17 00:00:00 2001 From: Dmitry Tretyakov Date: Wed, 31 Jan 2024 00:36:57 +0100 Subject: [PATCH] KTOR-5199 re-run multi perform loop to schedule requests --- .../ktor/client/engine/curl/CurlProcessor.kt | 22 +++++++++++++--- .../curl/internal/CurlMultiApiHandler.kt | 11 ++++++-- .../engine/curl/test/CurlWebSocketTests.kt | 25 +++++++++++++++++++ 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlProcessor.kt b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlProcessor.kt index 0b0fede3259..605863ba2ce 100644 --- a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlProcessor.kt +++ b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlProcessor.kt @@ -18,6 +18,12 @@ internal class RequestContainer( val completionHandler: CompletableDeferred ) +/** + * A class responsible for processing requests asynchronously. + * + * It holds a dispatcher interacting with curl multi interface API, + * which requires API calls from single thread. + */ internal class CurlProcessor(coroutineContext: CoroutineContext) { @OptIn(InternalAPI::class) private val curlDispatcher: CloseableCoroutineDispatcher = @@ -28,6 +34,7 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) { private val curlScope = CoroutineScope(coroutineContext + curlDispatcher) private val requestQueue: Channel = Channel(Channel.UNLIMITED) + private val requestCounter = atomic(0L) private val curlProtocols by lazy { getCurlProtocols() } init { @@ -48,8 +55,9 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) { } val result = CompletableDeferred() - requestQueue.send(RequestContainer(request, result)) - curlApi!!.wakeup() + nextRequest { + requestQueue.send(RequestContainer(request, result)) + } return result.await() } @@ -59,7 +67,7 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) { val api = curlApi!! while (!requestQueue.isClosedForReceive) { drainRequestQueue(api) - api.perform() + api.perform(requestCounter) } } } @@ -91,6 +99,8 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) { if (!closed.compareAndSet(false, true)) return requestQueue.close() + nextRequest() + GlobalScope.launch(curlDispatcher) { curlScope.coroutineContext[Job]!!.join() curlApi!!.close() @@ -105,4 +115,10 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) { curlApi!!.cancelRequest(easyHandle, cause) } } + + private inline fun nextRequest(body: (Long) -> Unit = {}) = try { + body(requestCounter.incrementAndGet()) + } finally { + curlApi!!.wakeup() + } } diff --git a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt index 840c58c30ea..4c6410c8aac 100644 --- a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt +++ b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt @@ -8,6 +8,7 @@ import io.ktor.client.plugins.* import io.ktor.utils.io.* import io.ktor.utils.io.core.* import io.ktor.utils.io.locks.* +import kotlinx.atomicfu.* import kotlinx.cinterop.* import kotlinx.coroutines.* import kotlinx.io.* @@ -25,6 +26,11 @@ private class RequestHolder @OptIn(ExperimentalForeignApi::class) constructor( } } +/** + * Handles requests using libcurl with multi interface. + * + * @see Multi interface overview + */ @OptIn(InternalAPI::class) internal class CurlMultiApiHandler : Closeable { @OptIn(ExperimentalForeignApi::class) @@ -130,11 +136,12 @@ internal class CurlMultiApiHandler : Closeable { } @OptIn(ExperimentalForeignApi::class) - internal fun perform() { + internal fun perform(counter: AtomicLong) { if (activeHandles.isEmpty()) return memScoped { val transfersRunning = alloc() + val requestId = counter.value do { synchronized(easyHandlesToUnpauseLock) { var handle = easyHandlesToUnpause.removeFirstOrNull() @@ -150,7 +157,7 @@ internal class CurlMultiApiHandler : Closeable { if (transfersRunning.value < activeHandles.size) { handleCompleted() } - } while (transfersRunning.value != 0) + } while (transfersRunning.value != 0 && requestId == counter.value) } } diff --git a/ktor-client/ktor-client-curl/desktop/test/io/ktor/client/engine/curl/test/CurlWebSocketTests.kt b/ktor-client/ktor-client-curl/desktop/test/io/ktor/client/engine/curl/test/CurlWebSocketTests.kt index b2bb0745f44..5be6e178315 100644 --- a/ktor-client/ktor-client-curl/desktop/test/io/ktor/client/engine/curl/test/CurlWebSocketTests.kt +++ b/ktor-client/ktor-client-curl/desktop/test/io/ktor/client/engine/curl/test/CurlWebSocketTests.kt @@ -7,6 +7,7 @@ package io.ktor.client.engine.curl.test import io.ktor.client.* import io.ktor.client.engine.curl.* import io.ktor.client.plugins.websocket.* +import io.ktor.client.request.* import io.ktor.websocket.* import kotlinx.coroutines.* import kotlinx.serialization.json.* @@ -14,6 +15,7 @@ import kotlin.test.* class CurlWebSocketTests { + private val TEST_SERVER: String = "http://127.0.0.1:8080" private val TEST_WEBSOCKET_SERVER: String = "ws://127.0.0.1:8080" @Test @@ -75,4 +77,27 @@ class CurlWebSocketTests { } } } + + @Test + fun testParallelSessions() { + val client = HttpClient(Curl) { + install(WebSockets) + } + + runBlocking { + val websocketInitialized = CompletableDeferred() + + launch { + client.webSocket("$TEST_WEBSOCKET_SERVER/websockets/echo") { + websocketInitialized.complete(true) + delay(20) + } + } + + websocketInitialized.await() + + val response = client.get(TEST_SERVER) + assertEquals(200, response.status.value) + } + } }