From aab530afab8e91cfd861ed28f04ad4a4f9399367 Mon Sep 17 00:00:00 2001 From: Leonid Stashevsky Date: Fri, 2 Sep 2022 10:15:25 +0200 Subject: [PATCH] KTOR-2036 Fix CIO connection limit (#3140) * KTOR-2036 Fix CIO connection limit --- .../ktor/client/engine/cio/CIORequestTest.kt | 11 ++++++++-- .../io/ktor/client/engine/cio/CIOEngine.kt | 7 ++++++- .../client/engine/cio/ConnectionFactory.kt | 20 +++++++++++++------ .../src/io/ktor/client/engine/cio/Endpoint.kt | 5 +++-- .../io/ktor/client/engine/mock/MockUtils.kt | 2 +- 5 files changed, 33 insertions(+), 12 deletions(-) diff --git a/ktor-client/ktor-client-cio/jvm/test/io/ktor/client/engine/cio/CIORequestTest.kt b/ktor-client/ktor-client-cio/jvm/test/io/ktor/client/engine/cio/CIORequestTest.kt index 6d1b962a79b..0bdca5a9317 100644 --- a/ktor-client/ktor-client-cio/jvm/test/io/ktor/client/engine/cio/CIORequestTest.kt +++ b/ktor-client/ktor-client-cio/jvm/test/io/ktor/client/engine/cio/CIORequestTest.kt @@ -5,6 +5,7 @@ package io.ktor.client.engine.cio import io.ktor.client.call.* +import io.ktor.client.network.sockets.* import io.ktor.client.plugins.* import io.ktor.client.request.* import io.ktor.client.statement.* @@ -147,13 +148,19 @@ class CIORequestTest : TestWithKtor() { } test { client -> + var fail: Throwable? = null for (i in 0..1000) { try { client.get("http://something.wrong").body() - } catch (cause: UnresolvedAddressException) { - // ignore + } catch (cause: Throwable) { + fail = cause } } + + assertNotNull(fail) + if (fail !is ConnectTimeoutException && fail !is UnresolvedAddressException) { + fail("Expected ConnectTimeoutException or UnresolvedAddressException, got $fail", fail) + } } } } diff --git a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/CIOEngine.kt b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/CIOEngine.kt index a2aa49ea077..0f4ba1c93d1 100644 --- a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/CIOEngine.kt +++ b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/CIOEngine.kt @@ -33,7 +33,11 @@ internal class CIOEngine( private val selectorManager: SelectorManager by lazy { SelectorManager(dispatcher) } - private val connectionFactory = ConnectionFactory(selectorManager, config.maxConnectionsCount) + private val connectionFactory = ConnectionFactory( + selectorManager, + config.maxConnectionsCount, + config.endpoint.maxConnectionsPerRoute + ) private val requestsJob: CoroutineContext @@ -42,6 +46,7 @@ internal class CIOEngine( private val proxy: ProxyConfig? = when (val type = config.proxy?.type) { ProxyType.SOCKS, null -> null + ProxyType.HTTP -> config.proxy else -> throw IllegalStateException("CIO engine does not currently support $type proxies.") } diff --git a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt index 3b392745349..dd2c3139dc0 100644 --- a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt +++ b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt @@ -6,29 +6,37 @@ package io.ktor.client.engine.cio import io.ktor.network.selector.* import io.ktor.network.sockets.* +import io.ktor.util.collections.* import kotlinx.coroutines.sync.* internal class ConnectionFactory( private val selector: SelectorManager, - maxConnectionsCount: Int + connectionsLimit: Int, + private val addressConnectionsLimit: Int ) { - private val semaphore = Semaphore(maxConnectionsCount) + private val limit = Semaphore(connectionsLimit) + private val addressLimit = ConcurrentMap() suspend fun connect( address: InetSocketAddress, configuration: SocketOptions.TCPClientSocketOptions.() -> Unit = {} ): Socket { - semaphore.acquire() + limit.acquire() + val addressSemaphore = addressLimit.computeIfAbsent(address) { Semaphore(addressConnectionsLimit) } + addressSemaphore.acquire() + return try { aSocket(selector).tcpNoDelay().tcp().connect(address, configuration) } catch (cause: Throwable) { // a failure or cancellation - semaphore.release() + addressSemaphore.release() + limit.release() throw cause } } - fun release() { - semaphore.release() + fun release(address: InetSocketAddress) { + addressLimit[address]!!.release() + limit.release() } } diff --git a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/Endpoint.kt b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/Endpoint.kt index c6e95c800f9..22cbe2401ee 100644 --- a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/Endpoint.kt +++ b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/Endpoint.kt @@ -188,7 +188,7 @@ internal class Endpoint( } catch (_: Throwable) { } - connectionFactory.release() + connectionFactory.release(address) throw cause } } @@ -229,7 +229,8 @@ internal class Endpoint( } private fun releaseConnection() { - connectionFactory.release() + val address = InetSocketAddress(host, port) + connectionFactory.release(address) connections.decrementAndGet() } diff --git a/ktor-client/ktor-client-mock/common/src/io/ktor/client/engine/mock/MockUtils.kt b/ktor-client/ktor-client-mock/common/src/io/ktor/client/engine/mock/MockUtils.kt index c96501da33e..c7efda3aead 100644 --- a/ktor-client/ktor-client-mock/common/src/io/ktor/client/engine/mock/MockUtils.kt +++ b/ktor-client/ktor-client-mock/common/src/io/ktor/client/engine/mock/MockUtils.kt @@ -31,8 +31,8 @@ public suspend fun OutgoingContent.toByteArray(): ByteArray = when (this) { else -> ByteArray(0) } -@OptIn(DelicateCoroutinesApi::class) @Suppress("KDocMissingDocumentation") +@OptIn(DelicateCoroutinesApi::class) public suspend fun OutgoingContent.toByteReadPacket(): ByteReadPacket = when (this) { is OutgoingContent.ByteArrayContent -> ByteReadPacket(bytes()) is OutgoingContent.ReadChannelContent -> readFrom().readRemaining()