From a673f45ba6de670a0fa1d4e0e06e4e6d9fb2a26f Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Mon, 15 Apr 2024 08:37:21 +0100 Subject: [PATCH] Move RealCall and RealConnection to loom safe locks (#8290) --- .../main/kotlin/okhttp3/TestValueFactory.kt | 3 +- okhttp/src/main/kotlin/okhttp3/Dispatcher.kt | 76 +++-- .../internal/connection/ConnectPlan.kt | 3 +- .../okhttp3/internal/connection/RealCall.kt | 31 +- .../internal/connection/RealConnection.kt | 56 ++-- .../internal/connection/RealConnectionPool.kt | 33 +- .../internal/connection/RealRoutePlanner.kt | 5 +- .../okhttp3/internal/http2/Http2Connection.kt | 119 ++++---- .../okhttp3/internal/http2/Http2Stream.kt | 157 +++++----- .../okhttp3/internal/http2/Http2Writer.kt | 286 +++++++++--------- .../internal/connection/ConnectionPoolTest.kt | 5 +- 11 files changed, 419 insertions(+), 355 deletions(-) diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt index bbc348fa8ab0..adaf9b99420d 100644 --- a/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt +++ b/okhttp-testing-support/src/main/kotlin/okhttp3/TestValueFactory.kt @@ -31,6 +31,7 @@ import javax.net.SocketFactory import javax.net.ssl.HostnameVerifier import javax.net.ssl.HttpsURLConnection import javax.net.ssl.SSLSocketFactory +import kotlin.concurrent.withLock import okhttp3.internal.RecordingOkAuthenticator import okhttp3.internal.concurrent.TaskFaker import okhttp3.internal.concurrent.TaskRunner @@ -93,7 +94,7 @@ class TestValueFactory : Closeable { socket = Socket(), idleAtNs = idleAtNanos, ) - synchronized(result) { pool.put(result) } + result.lock.withLock { pool.put(result) } return result } diff --git a/okhttp/src/main/kotlin/okhttp3/Dispatcher.kt b/okhttp/src/main/kotlin/okhttp3/Dispatcher.kt index e869053c1982..0494446a60ad 100644 --- a/okhttp/src/main/kotlin/okhttp3/Dispatcher.kt +++ b/okhttp/src/main/kotlin/okhttp3/Dispatcher.kt @@ -22,7 +22,9 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.SynchronousQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit -import okhttp3.internal.assertThreadDoesntHoldLock +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock +import okhttp3.internal.assertNotHeld import okhttp3.internal.connection.RealCall import okhttp3.internal.connection.RealCall.AsyncCall import okhttp3.internal.okHttpName @@ -36,6 +38,8 @@ import okhttp3.internal.threadFactory * concurrently. */ class Dispatcher() { + internal val lock: ReentrantLock = ReentrantLock() + /** * The maximum number of requests to execute concurrently. Above this requests queue in memory, * waiting for the running calls to complete. @@ -43,10 +47,11 @@ class Dispatcher() { * If more than [maxRequests] requests are in flight when this is invoked, those requests will * remain in flight. */ - @get:Synchronized var maxRequests = 64 + var maxRequests = 64 + get() = lock.withLock { field } set(maxRequests) { require(maxRequests >= 1) { "max < 1: $maxRequests" } - synchronized(this) { + lock.withLock { field = maxRequests } promoteAndExecute() @@ -62,10 +67,11 @@ class Dispatcher() { * * WebSocket connections to hosts **do not** count against this limit. */ - @get:Synchronized var maxRequestsPerHost = 5 + var maxRequestsPerHost = 5 + get() = lock.withLock { field } set(maxRequestsPerHost) { require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" } - synchronized(this) { + lock.withLock { field = maxRequestsPerHost } promoteAndExecute() @@ -82,29 +88,31 @@ class Dispatcher() { * This means that if you are doing synchronous calls the network layer will not truly be idle * until every returned [Response] has been closed. */ - @set:Synchronized - @get:Synchronized var idleCallback: Runnable? = null + get() = lock.withLock { field } + set(value) { + lock.withLock { field = value } + } private var executorServiceOrNull: ExecutorService? = null - @get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService - get() { - if (executorServiceOrNull == null) { - executorServiceOrNull = - ThreadPoolExecutor( - 0, - Int.MAX_VALUE, - 60, - TimeUnit.SECONDS, - SynchronousQueue(), - threadFactory("$okHttpName Dispatcher", false), - ) + get() = + lock.withLock { + if (executorServiceOrNull == null) { + executorServiceOrNull = + ThreadPoolExecutor( + 0, + Int.MAX_VALUE, + 60, + TimeUnit.SECONDS, + SynchronousQueue(), + threadFactory("$okHttpName Dispatcher", false), + ) + } + return executorServiceOrNull!! } - return executorServiceOrNull!! - } /** Ready async calls in the order they'll be run. */ private val readyAsyncCalls = ArrayDeque() @@ -120,7 +128,7 @@ class Dispatcher() { } internal fun enqueue(call: AsyncCall) { - synchronized(this) { + lock.withLock { readyAsyncCalls.add(call) // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to @@ -147,15 +155,17 @@ class Dispatcher() { * Cancel all calls currently enqueued or executing. Includes calls executed both * [synchronously][Call.execute] and [asynchronously][Call.enqueue]. */ - @Synchronized fun cancelAll() { - for (call in readyAsyncCalls) { - call.call.cancel() - } - for (call in runningAsyncCalls) { - call.call.cancel() - } - for (call in runningSyncCalls) { - call.cancel() + fun cancelAll() { + lock.withLock { + for (call in readyAsyncCalls) { + call.call.cancel() + } + for (call in runningAsyncCalls) { + call.call.cancel() + } + for (call in runningSyncCalls) { + call.cancel() + } } } @@ -167,11 +177,11 @@ class Dispatcher() { * @return true if the dispatcher is currently running calls. */ private fun promoteAndExecute(): Boolean { - this.assertThreadDoesntHoldLock() + lock.assertNotHeld() val executableCalls = mutableListOf() val isRunning: Boolean - synchronized(this) { + lock.withLock { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt index a5c31f9cf03b..b4b5133e07a9 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt @@ -26,6 +26,7 @@ import java.security.cert.X509Certificate import java.util.concurrent.TimeUnit import javax.net.ssl.SSLPeerUnverifiedException import javax.net.ssl.SSLSocket +import kotlin.concurrent.withLock import okhttp3.CertificatePinner import okhttp3.ConnectionSpec import okhttp3.Handshake @@ -503,7 +504,7 @@ class ConnectPlan( val pooled3 = routePlanner.planReusePooledConnection(this, routes) if (pooled3 != null) return pooled3.connection - synchronized(connection) { + connection.lock.withLock { connectionPool.put(connection) user.acquireConnectionNoEvents(connection) } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt index 2f7be64ba69e..20e55f74461d 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealCall.kt @@ -25,6 +25,8 @@ import java.util.concurrent.RejectedExecutionException import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock import okhttp3.Call import okhttp3.Callback import okhttp3.EventListener @@ -32,8 +34,9 @@ import okhttp3.Interceptor import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.Response +import okhttp3.internal.assertHeld +import okhttp3.internal.assertNotHeld import okhttp3.internal.assertThreadDoesntHoldLock -import okhttp3.internal.assertThreadHoldsLock import okhttp3.internal.cache.CacheInterceptor import okhttp3.internal.closeQuietly import okhttp3.internal.http.BridgeInterceptor @@ -60,6 +63,8 @@ class RealCall( val originalRequest: Request, val forWebSocket: Boolean, ) : Call, Cloneable { + internal val lock: ReentrantLock = ReentrantLock() + private val connectionPool: RealConnectionPool = client.connectionPool.delegate internal val eventListener: EventListener = client.eventListenerFactory.create(this) @@ -95,7 +100,7 @@ class RealCall( internal var interceptorScopedExchange: Exchange? = null private set - // These properties are guarded by this. They are typically only accessed by the thread executing + // These properties are guarded by lock. They are typically only accessed by the thread executing // the call, but they may be accessed by other threads for duplex requests. /** True if this call still has a request body open. */ @@ -231,7 +236,7 @@ class RealCall( ) { check(interceptorScopedExchange == null) - synchronized(this) { + lock.withLock { check(!responseBodyOpen) { "cannot make a new request because the previous response is still open: " + "please call response.close()" @@ -265,7 +270,7 @@ class RealCall( /** Finds a new or pooled connection to carry a forthcoming request and response. */ internal fun initExchange(chain: RealInterceptorChain): Exchange { - synchronized(this) { + lock.withLock { check(expectMoreExchanges) { "released" } check(!responseBodyOpen) check(!requestBodyOpen) @@ -277,7 +282,7 @@ class RealCall( val result = Exchange(this, eventListener, exchangeFinder, codec) this.interceptorScopedExchange = result this.exchange = result - synchronized(this) { + lock.withLock { this.requestBodyOpen = true this.responseBodyOpen = true } @@ -287,7 +292,7 @@ class RealCall( } fun acquireConnectionNoEvents(connection: RealConnection) { - connection.assertThreadHoldsLock() + connection.lock.assertHeld() check(this.connection == null) this.connection = connection @@ -312,7 +317,7 @@ class RealCall( var bothStreamsDone = false var callDone = false - synchronized(this) { + lock.withLock { if (requestDone && requestBodyOpen || responseDone && responseBodyOpen) { if (requestDone) requestBodyOpen = false if (responseDone) responseBodyOpen = false @@ -335,7 +340,7 @@ class RealCall( internal fun noMoreExchanges(e: IOException?): IOException? { var callDone = false - synchronized(this) { + lock.withLock { if (expectMoreExchanges) { expectMoreExchanges = false callDone = !requestBodyOpen && !responseBodyOpen @@ -362,13 +367,13 @@ class RealCall( * additional context. Otherwise [e] is returned as-is. */ private fun callDone(e: E): E { - assertThreadDoesntHoldLock() + lock.assertNotHeld() val connection = this.connection if (connection != null) { - connection.assertThreadDoesntHoldLock() + connection.lock.assertNotHeld() val toClose: Socket? = - synchronized(connection) { + connection.lock.withLock { // Sets this.connection to null. releaseConnectionNoEvents() } @@ -399,7 +404,7 @@ class RealCall( */ internal fun releaseConnectionNoEvents(): Socket? { val connection = this.connection!! - connection.assertThreadHoldsLock() + connection.lock.assertHeld() val calls = connection.calls val index = calls.indexOfFirst { it.get() == this@RealCall } @@ -443,7 +448,7 @@ class RealCall( * This is usually due to either an exception or a retry. */ internal fun exitNetworkInterceptorExchange(closeExchange: Boolean) { - synchronized(this) { + lock.withLock { check(expectMoreExchanges) { "released" } } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt index f379a4e29efa..abcbb90da1e9 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt @@ -23,8 +23,10 @@ import java.net.Socket import java.net.SocketException import java.security.cert.X509Certificate import java.util.concurrent.TimeUnit.MILLISECONDS +import java.util.concurrent.locks.ReentrantLock import javax.net.ssl.SSLPeerUnverifiedException import javax.net.ssl.SSLSocket +import kotlin.concurrent.withLock import okhttp3.Address import okhttp3.Connection import okhttp3.ConnectionListener @@ -33,8 +35,8 @@ import okhttp3.HttpUrl import okhttp3.OkHttpClient import okhttp3.Protocol import okhttp3.Route -import okhttp3.internal.assertThreadDoesntHoldLock -import okhttp3.internal.assertThreadHoldsLock +import okhttp3.internal.assertHeld +import okhttp3.internal.assertNotHeld import okhttp3.internal.closeQuietly import okhttp3.internal.concurrent.TaskRunner import okhttp3.internal.http.ExchangeCodec @@ -80,7 +82,9 @@ class RealConnection( ) : Http2Connection.Listener(), Connection, ExchangeCodec.Carrier { private var http2Connection: Http2Connection? = null - // These properties are guarded by this. + internal val lock: ReentrantLock = ReentrantLock() + + // These properties are guarded by lock. /** * If true, no new exchanges can be created on this connection. It is necessary to set this to @@ -129,19 +133,23 @@ class RealConnection( /** Prevent further exchanges from being created on this connection. */ override fun noNewExchanges() { - synchronized(this) { + lock.withLock { noNewExchanges = true } connectionListener.noNewExchanges(this) } /** Prevent this connection from being used for hosts other than the one in [route]. */ - @Synchronized internal fun noCoalescedConnections() { - noCoalescedConnections = true + internal fun noCoalescedConnections() { + lock.withLock { + noCoalescedConnections = true + } } - @Synchronized internal fun incrementSuccessCount() { - successCount++ + internal fun incrementSuccessCount() { + lock.withLock { + successCount++ + } } @Throws(IOException::class) @@ -179,7 +187,7 @@ class RealConnection( address: Address, routes: List?, ): Boolean { - assertThreadHoldsLock() + lock.assertHeld() // If this connection is not accepting new exchanges, we're done. if (calls.size >= allocationLimit || noNewExchanges) return false @@ -232,7 +240,7 @@ class RealConnection( } private fun supportsUrl(url: HttpUrl): Boolean { - assertThreadHoldsLock() + lock.assertHeld() val routeUrl = route.address.url @@ -308,7 +316,7 @@ class RealConnection( /** Returns true if this connection is ready to host new streams. */ fun isHealthy(doExtensiveChecks: Boolean): Boolean { - assertThreadDoesntHoldLock() + lock.assertNotHeld() val nowNs = System.nanoTime() @@ -326,7 +334,7 @@ class RealConnection( return http2Connection.isHealthy(nowNs) } - val idleDurationNs = synchronized(this) { nowNs - idleAtNs } + val idleDurationNs = lock.withLock { nowNs - idleAtNs } if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) { return socket.isHealthy(source) } @@ -341,19 +349,21 @@ class RealConnection( } /** When settings are received, adjust the allocation limit. */ - @Synchronized override fun onSettings( + override fun onSettings( connection: Http2Connection, settings: Settings, ) { - val oldLimit = allocationLimit - allocationLimit = settings.getMaxConcurrentStreams() - - if (allocationLimit < oldLimit) { - // We might need new connections to keep policies satisfied - connectionPool.scheduleOpener(route.address) - } else if (allocationLimit > oldLimit) { - // We might no longer need some connections - connectionPool.scheduleCloser() + lock.withLock { + val oldLimit = allocationLimit + allocationLimit = settings.getMaxConcurrentStreams() + + if (allocationLimit < oldLimit) { + // We might need new connections to keep policies satisfied + connectionPool.scheduleOpener(route.address) + } else if (allocationLimit > oldLimit) { + // We might no longer need some connections + connectionPool.scheduleCloser() + } } } @@ -387,7 +397,7 @@ class RealConnection( e: IOException?, ) { var noNewExchangesEvent = false - synchronized(this) { + lock.withLock { if (e is StreamResetException) { when { e.errorCode == ErrorCode.REFUSED_STREAM -> { diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt index c4d376c92f11..b154ea493641 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnectionPool.kt @@ -21,11 +21,12 @@ import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReferenceFieldUpdater +import kotlin.concurrent.withLock import okhttp3.Address import okhttp3.ConnectionListener import okhttp3.ConnectionPool import okhttp3.Route -import okhttp3.internal.assertThreadHoldsLock +import okhttp3.internal.assertHeld import okhttp3.internal.closeQuietly import okhttp3.internal.concurrent.Task import okhttp3.internal.concurrent.TaskQueue @@ -80,7 +81,7 @@ class RealConnectionPool( fun idleConnectionCount(): Int { return connections.count { - synchronized(it) { it.calls.isEmpty() } + it.lock.withLock { it.calls.isEmpty() } } } @@ -110,7 +111,7 @@ class RealConnectionPool( for (connection in connections) { // In the first synchronized block, acquire the connection if it can satisfy this call. val acquired = - synchronized(connection) { + connection.lock.withLock { when { requireMultiplexed && !connection.isMultiplexed -> false !connection.isEligible(address, routes) -> false @@ -129,7 +130,7 @@ class RealConnectionPool( // the hook to close this connection if it's no longer in use. val noNewExchangesEvent: Boolean val toClose: Socket? = - synchronized(connection) { + connection.lock.withLock { noNewExchangesEvent = !connection.noNewExchanges connection.noNewExchanges = true connectionUser.releaseConnectionNoEvents() @@ -145,7 +146,7 @@ class RealConnectionPool( } fun put(connection: RealConnection) { - connection.assertThreadHoldsLock() + connection.lock.assertHeld() connections.add(connection) // connection.queueEvent { connectionListener.connectEnd(connection) } @@ -157,7 +158,7 @@ class RealConnectionPool( * removed from the pool and should be closed. */ fun connectionBecameIdle(connection: RealConnection): Boolean { - connection.assertThreadHoldsLock() + connection.lock.assertHeld() return if (connection.noNewExchanges || maxIdleConnections == 0) { connection.noNewExchanges = true @@ -176,13 +177,13 @@ class RealConnectionPool( while (i.hasNext()) { val connection = i.next() val socketToClose = - synchronized(connection) { + connection.lock.withLock { if (connection.calls.isEmpty()) { i.remove() connection.noNewExchanges = true - return@synchronized connection.socket() + return@withLock connection.socket() } else { - return@synchronized null + return@withLock null } } if (socketToClose != null) { @@ -214,7 +215,7 @@ class RealConnectionPool( } for (connection in connections) { val addressState = addressStates[connection.route.address] ?: continue - synchronized(connection) { + connection.lock.withLock { addressState.concurrentCallCapacity += connection.allocationLimit } } @@ -237,11 +238,11 @@ class RealConnectionPool( var inUseConnectionCount = 0 var evictableConnectionCount = 0 for (connection in connections) { - synchronized(connection) { + connection.lock.withLock { // If the connection is in use, keep searching. if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++ - return@synchronized + return@withLock } val idleAtNs = connection.idleAtNs @@ -285,7 +286,7 @@ class RealConnectionPool( when { toEvict != null -> { // We've chosen a connection to evict. Confirm it's still okay to be evicted, then close it. - synchronized(toEvict) { + toEvict.lock.withLock { if (toEvict.calls.isNotEmpty()) return 0L // No longer idle. if (toEvict.idleAtNs != toEvictIdleAtNs) return 0L // No longer oldest. toEvict.noNewExchanges = true @@ -336,7 +337,7 @@ class RealConnectionPool( connection: RealConnection, now: Long, ): Int { - connection.assertThreadHoldsLock() + connection.lock.assertHeld() val references = connection.calls var i = 0 @@ -415,7 +416,7 @@ class RealConnectionPool( var concurrentCallCapacity = 0 for (connection in connections) { if (state.address != connection.route.address) continue - synchronized(connection) { + connection.lock.withLock { concurrentCallCapacity += connection.allocationLimit } @@ -430,7 +431,7 @@ class RealConnectionPool( // RealRoutePlanner will add the connection to the pool itself, other RoutePlanners may not // TODO: make all RoutePlanners consistent in this behavior if (connection !in connections) { - synchronized(connection) { put(connection) } + connection.lock.withLock { put(connection) } } return 0L // run again immediately to create more connections if needed diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt index 3684a19da56c..e87b12908b24 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/RealRoutePlanner.kt @@ -19,6 +19,7 @@ import java.io.IOException import java.net.HttpURLConnection import java.net.Socket import java.net.UnknownServiceException +import kotlin.concurrent.withLock import okhttp3.Address import okhttp3.ConnectionSpec import okhttp3.HttpUrl @@ -94,7 +95,7 @@ class RealRoutePlanner( val healthy = candidate.isHealthy(connectionUser.doExtensiveHealthChecks()) var noNewExchangesEvent = false val toClose: Socket? = - synchronized(candidate) { + candidate.lock.withLock { when { !healthy -> { noNewExchangesEvent = !candidate.noNewExchanges @@ -319,7 +320,7 @@ class RealRoutePlanner( * connections. */ private fun retryRoute(connection: RealConnection): Route? { - return synchronized(connection) { + return connection.lock.withLock { when { connection.routeFailureCount != 0 -> null diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt index 686568d3d0fd..5505a46d1f61 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt @@ -20,6 +20,9 @@ import java.io.IOException import java.io.InterruptedIOException import java.net.Socket import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock import okhttp3.internal.EMPTY_BYTE_ARRAY import okhttp3.internal.EMPTY_HEADERS import okhttp3.internal.assertThreadDoesntHoldLock @@ -29,13 +32,11 @@ import okhttp3.internal.http2.ErrorCode.REFUSED_STREAM import okhttp3.internal.http2.Settings.Companion.DEFAULT_INITIAL_WINDOW_SIZE import okhttp3.internal.http2.flowcontrol.WindowCounter import okhttp3.internal.ignoreIoExceptions -import okhttp3.internal.notifyAll import okhttp3.internal.okHttpName import okhttp3.internal.peerName import okhttp3.internal.platform.Platform import okhttp3.internal.platform.Platform.Companion.INFO import okhttp3.internal.toHeaders -import okhttp3.internal.wait import okio.Buffer import okio.BufferedSink import okio.BufferedSource @@ -54,7 +55,10 @@ import okio.source */ @Suppress("NAME_SHADOWING") class Http2Connection internal constructor(builder: Builder) : Closeable { - // Internal state of this connection is guarded by 'this'. No blocking operations may be + internal val lock: ReentrantLock = ReentrantLock() + internal val condition: Condition = lock.newCondition() + + // Internal state of this connection is guarded by 'lock'. No blocking operations may be // performed while holding this lock! // // Socket writes are guarded by frameWriter. @@ -149,12 +153,12 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { val pingIntervalNanos = TimeUnit.MILLISECONDS.toNanos(builder.pingIntervalMillis.toLong()) writerQueue.schedule("$connectionName ping", pingIntervalNanos) { val failDueToMissingPong = - synchronized(this@Http2Connection) { + lock.withLock { if (intervalPongsReceived < intervalPingsSent) { - return@synchronized true + return@withLock true } else { intervalPingsSent++ - return@synchronized false + return@withLock false } } if (failDueToMissingPong) { @@ -171,30 +175,31 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { /** * Returns the number of [open streams][Http2Stream.isOpen] on this connection. */ - @Synchronized - fun openStreamCount(): Int = streams.size + fun openStreamCount(): Int = lock.withLock { streams.size } - @Synchronized - fun getStream(id: Int): Http2Stream? = streams[id] + fun getStream(id: Int): Http2Stream? = lock.withLock { streams[id] } - @Synchronized internal fun removeStream(streamId: Int): Http2Stream? { - val stream = streams.remove(streamId) + lock.withLock { + val stream = streams.remove(streamId) - // The removed stream may be blocked on a connection-wide window update. - notifyAll() + // The removed stream may be blocked on a connection-wide window update. + condition.signalAll() - return stream + return stream + } } - @Synchronized internal fun updateConnectionFlowControl(read: Long) { - readBytes.update(total = read) - val readBytesToAcknowledge = readBytes.unacknowledged - if (readBytesToAcknowledge >= okHttpSettings.initialWindowSize / 2) { - writeWindowUpdateLater(0, readBytesToAcknowledge) - readBytes.update(acknowledged = readBytesToAcknowledge) + internal fun updateConnectionFlowControl(read: Long) { + lock.withLock { + readBytes.update(total = read) + val readBytesToAcknowledge = readBytes.unacknowledged + if (readBytesToAcknowledge >= okHttpSettings.initialWindowSize / 2) { + writeWindowUpdateLater(0, readBytesToAcknowledge) + readBytes.update(acknowledged = readBytesToAcknowledge) + } + flowControlListener.receivingConnectionWindowChanged(readBytes) } - flowControlListener.receivingConnectionWindowChanged(readBytes) } /** @@ -240,8 +245,8 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { val stream: Http2Stream val streamId: Int - synchronized(writer) { - synchronized(this) { + writer.lock.withLock { + lock.withLock { if (nextStreamId > Int.MAX_VALUE / 2) { shutdown(REFUSED_STREAM) } @@ -311,7 +316,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { var byteCount = byteCount while (byteCount > 0L) { var toWrite: Int - synchronized(this@Http2Connection) { + lock.withLock { try { while (writeBytesTotal >= writeBytesMaximum) { // Before blocking, confirm that the stream we're writing is still open. It's possible @@ -319,7 +324,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { if (!streams.containsKey(streamId)) { throw IOException("stream closed") } - this@Http2Connection.wait() // Wait until we receive a WINDOW_UPDATE. + condition.await() // Wait until we receive a WINDOW_UPDATE. } } catch (e: InterruptedException) { Thread.currentThread().interrupt() // Retain interrupted status. @@ -392,7 +397,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { /** For testing: sends a ping to be awaited with [awaitPong]. */ @Throws(InterruptedException::class) fun writePing() { - synchronized(this) { + lock.withLock { awaitPingsSent++ } @@ -401,11 +406,12 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { } /** For testing: awaits a pong. */ - @Synchronized @Throws(InterruptedException::class) fun awaitPong() { - while (awaitPongsReceived < awaitPingsSent) { - wait() + lock.withLock { + while (awaitPongsReceived < awaitPingsSent) { + condition.await() + } } } @@ -421,9 +427,9 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { */ @Throws(IOException::class) fun shutdown(statusCode: ErrorCode) { - synchronized(writer) { + writer.lock.withLock { val lastGoodStreamId: Int - synchronized(this) { + lock.withLock { if (isShutdown) { return } @@ -456,7 +462,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { } var streamsToClose: Array? = null - synchronized(this) { + lock.withLock { if (streams.isNotEmpty()) { streamsToClose = streams.values.toTypedArray() streams.clear() @@ -516,8 +522,8 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { /** Merges [settings] into this peer's settings and sends them to the remote peer. */ @Throws(IOException::class) fun setSettings(settings: Settings) { - synchronized(writer) { - synchronized(this) { + writer.lock.withLock { + lock.withLock { if (isShutdown) { throw ConnectionShutdownException() } @@ -527,14 +533,15 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { } } - @Synchronized fun isHealthy(nowNs: Long): Boolean { - if (isShutdown) return false + lock.withLock { + if (isShutdown) return false - // A degraded pong is overdue. - if (degradedPongsReceived < degradedPingsSent && nowNs >= degradedPongDeadlineNs) return false + // A degraded pong is overdue. + if (degradedPongsReceived < degradedPingsSent && nowNs >= degradedPongDeadlineNs) return false - return true + return true + } } /** @@ -553,7 +560,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { * The deadline is currently hardcoded. We may make this configurable in the future! */ internal fun sendDegradedPingLater() { - synchronized(this) { + lock.withLock { if (degradedPongsReceived < degradedPingsSent) return // Already awaiting a degraded pong. degradedPingsSent++ degradedPongDeadlineNs = System.nanoTime() + DEGRADED_PONG_TIMEOUT_NS @@ -682,7 +689,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { return } val stream: Http2Stream? - synchronized(this@Http2Connection) { + lock.withLock { stream = getStream(streamId) if (stream == null) { @@ -761,8 +768,8 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { var delta: Long var streamsToNotify: Array? var newPeerSettings: Settings - synchronized(writer) { - synchronized(this@Http2Connection) { + writer.lock.withLock { + lock.withLock { val previousPeerSettings = peerSettings newPeerSettings = if (clearPrevious) { @@ -796,7 +803,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { } if (streamsToNotify != null) { for (stream in streamsToNotify!!) { - synchronized(stream) { + stream.lock.withLock { stream.addBytesToWriteWindow(delta) } } @@ -813,7 +820,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { payload2: Int, ) { if (ack) { - synchronized(this@Http2Connection) { + lock.withLock { when (payload1) { INTERVAL_PING -> { intervalPongsReceived++ @@ -823,7 +830,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { } AWAIT_PING -> { awaitPongsReceived++ - this@Http2Connection.notifyAll() + condition.signalAll() } else -> { // Ignore an unexpected pong. @@ -849,7 +856,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { // Copy the streams first. We don't want to hold a lock when we call receiveRstStream(). val streamsCopy: Array - synchronized(this@Http2Connection) { + lock.withLock { streamsCopy = streams.values.toTypedArray() isShutdown = true } @@ -868,14 +875,14 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { windowSizeIncrement: Long, ) { if (streamId == 0) { - synchronized(this@Http2Connection) { + lock.withLock { writeBytesMaximum += windowSizeIncrement - this@Http2Connection.notifyAll() + condition.signalAll() } } else { val stream = getStream(streamId) if (stream != null) { - synchronized(stream) { + stream.lock.withLock { stream.addBytesToWriteWindow(windowSizeIncrement) } } @@ -918,7 +925,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { streamId: Int, requestHeaders: List
, ) { - synchronized(this) { + lock.withLock { if (streamId in currentPushRequests) { writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR) return @@ -930,7 +937,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { ignoreIoExceptions { if (cancel) { writer.rstStream(streamId, ErrorCode.CANCEL) - synchronized(this@Http2Connection) { + lock.withLock { currentPushRequests.remove(streamId) } } @@ -948,7 +955,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { ignoreIoExceptions { if (cancel) writer.rstStream(streamId, ErrorCode.CANCEL) if (cancel || inFinished) { - synchronized(this@Http2Connection) { + lock.withLock { currentPushRequests.remove(streamId) } } @@ -975,7 +982,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { val cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished) if (cancel) writer.rstStream(streamId, ErrorCode.CANCEL) if (cancel || inFinished) { - synchronized(this@Http2Connection) { + lock.withLock { currentPushRequests.remove(streamId) } } @@ -989,7 +996,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { ) { pushQueue.execute("$connectionName[$streamId] onReset") { pushObserver.onReset(streamId, errorCode) - synchronized(this@Http2Connection) { + lock.withLock { currentPushRequests.remove(streamId) } } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt index dd1fa1e49584..32a419561505 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt @@ -20,13 +20,14 @@ import java.io.IOException import java.io.InterruptedIOException import java.net.SocketTimeoutException import java.util.ArrayDeque +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock import okhttp3.Headers import okhttp3.internal.EMPTY_HEADERS -import okhttp3.internal.assertThreadDoesntHoldLock +import okhttp3.internal.assertNotHeld import okhttp3.internal.http2.flowcontrol.WindowCounter -import okhttp3.internal.notifyAll import okhttp3.internal.toHeaderList -import okhttp3.internal.wait import okio.AsyncTimeout import okio.Buffer import okio.BufferedSource @@ -43,7 +44,10 @@ class Http2Stream internal constructor( inFinished: Boolean, headers: Headers?, ) { - // Internal state is guarded by this. No long-running or potentially blocking operations are + internal val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() + + // Internal state is guarded by lock. No long-running or potentially blocking operations are // performed while the lock is held. /** The bytes consumed and acknowledged by the stream. */ @@ -82,7 +86,8 @@ class Http2Stream internal constructor( * If there are multiple reasons to abnormally close this stream (such as both peers closing it * near-simultaneously) then this is the first reason known to this peer. */ - @get:Synchronized internal var errorCode: ErrorCode? = null + internal var errorCode: ErrorCode? = null + get() = lock.withLock { field } /** The exception that explains [errorCode]. Null if no exception was provided. */ internal var errorException: IOException? = null @@ -106,17 +111,19 @@ class Http2Stream internal constructor( * not open. This is because input data is buffered. */ val isOpen: Boolean - @Synchronized get() { - if (errorCode != null) { - return false - } - if ((source.finished || source.closed) && - (sink.finished || sink.closed) && - hasResponseHeaders - ) { - return false + get() { + lock.withLock { + if (errorCode != null) { + return false + } + if ((source.finished || source.closed) && + (sink.finished || sink.closed) && + hasResponseHeaders + ) { + return false + } + return true } - return true } /** Returns true if this stream was created by this peer. */ @@ -135,42 +142,44 @@ class Http2Stream internal constructor( * This is true after a `Expect-Continue` request, false for duplex requests, and false for * all other requests. */ - @Synchronized @Throws(IOException::class) fun takeHeaders(callerIsIdle: Boolean = false): Headers { - while (headersQueue.isEmpty() && errorCode == null) { - val doReadTimeout = callerIsIdle || doReadTimeout() - if (doReadTimeout) { - readTimeout.enter() - } - try { - waitForIo() - } finally { + lock.withLock { + while (headersQueue.isEmpty() && errorCode == null) { + val doReadTimeout = callerIsIdle || doReadTimeout() if (doReadTimeout) { - readTimeout.exitAndThrowIfTimedOut() + readTimeout.enter() + } + try { + waitForIo() + } finally { + if (doReadTimeout) { + readTimeout.exitAndThrowIfTimedOut() + } } } + if (headersQueue.isNotEmpty()) { + return headersQueue.removeFirst() + } + throw errorException ?: StreamResetException(errorCode!!) } - if (headersQueue.isNotEmpty()) { - return headersQueue.removeFirst() - } - throw errorException ?: StreamResetException(errorCode!!) } /** * Returns the trailers. It is only safe to call this once the source stream has been completely * exhausted. */ - @Synchronized @Throws(IOException::class) fun trailers(): Headers { - if (source.finished && source.receiveBuffer.exhausted() && source.readBuffer.exhausted()) { - return source.trailers ?: EMPTY_HEADERS - } - if (errorCode != null) { - throw errorException ?: StreamResetException(errorCode!!) + lock.withLock { + if (source.finished && source.receiveBuffer.exhausted() && source.readBuffer.exhausted()) { + return source.trailers ?: EMPTY_HEADERS + } + if (errorCode != null) { + throw errorException ?: StreamResetException(errorCode!!) + } + throw IllegalStateException("too early; can't read the trailers yet") } - throw IllegalStateException("too early; can't read the trailers yet") } /** @@ -187,21 +196,21 @@ class Http2Stream internal constructor( outFinished: Boolean, flushHeaders: Boolean, ) { - this@Http2Stream.assertThreadDoesntHoldLock() + lock.assertNotHeld() var flushHeaders = flushHeaders - synchronized(this) { + lock.withLock { this.hasResponseHeaders = true if (outFinished) { this.sink.finished = true - this@Http2Stream.notifyAll() // Because doReadTimeout() may have changed. + condition.signalAll() // Because doReadTimeout() may have changed. } } // Only DATA frames are subject to flow-control. Transmit the HEADER frame if the connection // flow-control window is fully depleted. if (!flushHeaders) { - synchronized(connection) { + lock.withLock { flushHeaders = (connection.writeBytesTotal >= connection.writeBytesMaximum) } } @@ -214,7 +223,7 @@ class Http2Stream internal constructor( } fun enqueueTrailers(trailers: Headers) { - synchronized(this) { + lock.withLock { check(!sink.finished) { "already finished" } require(trailers.size != 0) { "trailers.size() == 0" } this.sink.trailers = trailers @@ -235,7 +244,7 @@ class Http2Stream internal constructor( * not yet been sent. */ fun getSink(): Sink { - synchronized(this) { + lock.withLock { check(hasResponseHeaders || isLocallyInitiated) { "reply before requesting the sink" } @@ -273,15 +282,15 @@ class Http2Stream internal constructor( errorCode: ErrorCode, errorException: IOException?, ): Boolean { - this.assertThreadDoesntHoldLock() + lock.assertNotHeld() - synchronized(this) { + lock.withLock { if (this.errorCode != null) { return false } this.errorCode = errorCode this.errorException = errorException - notifyAll() + condition.signalAll() if (source.finished && sink.finished) { return false } @@ -295,7 +304,7 @@ class Http2Stream internal constructor( source: BufferedSource, length: Int, ) { - this@Http2Stream.assertThreadDoesntHoldLock() + lock.assertNotHeld() this.source.receive(source, length.toLong()) } @@ -305,10 +314,10 @@ class Http2Stream internal constructor( headers: Headers, inFinished: Boolean, ) { - this@Http2Stream.assertThreadDoesntHoldLock() + lock.assertNotHeld() val open: Boolean - synchronized(this) { + lock.withLock { if (!hasResponseHeaders || headers[Header.RESPONSE_STATUS_UTF8] != null || headers[Header.TARGET_METHOD_UTF8] != null @@ -322,17 +331,19 @@ class Http2Stream internal constructor( this.source.finished = true } open = isOpen - notifyAll() + condition.signalAll() } if (!open) { connection.removeStream(id) } } - @Synchronized fun receiveRstStream(errorCode: ErrorCode) { - if (this.errorCode == null) { - this.errorCode = errorCode - notifyAll() + fun receiveRstStream(errorCode: ErrorCode) { + lock.withLock { + if (this.errorCode == null) { + this.errorCode = errorCode + condition.signalAll() + } } } @@ -389,7 +400,7 @@ class Http2Stream internal constructor( // 1. Decide what to do in a synchronized block. - synchronized(this@Http2Stream) { + lock.withLock { val doReadTimeout = doReadTimeout() if (doReadTimeout) { readTimeout.enter() @@ -452,7 +463,7 @@ class Http2Stream internal constructor( } private fun updateConnectionFlowControl(read: Long) { - this@Http2Stream.assertThreadDoesntHoldLock() + lock.assertNotHeld() connection.updateConnectionFlowControl(read) } @@ -466,14 +477,14 @@ class Http2Stream internal constructor( source: BufferedSource, byteCount: Long, ) { - this@Http2Stream.assertThreadDoesntHoldLock() + lock.assertNotHeld() var remainingByteCount = byteCount while (remainingByteCount > 0L) { val finished: Boolean val flowControlError: Boolean - synchronized(this@Http2Stream) { + lock.withLock { finished = this.finished flowControlError = remainingByteCount + readBuffer.size > maxByteCount } @@ -499,14 +510,14 @@ class Http2Stream internal constructor( // Move the received data to the read buffer to the reader can read it. If this source has // been closed since this read began we must discard the incoming data and tell the // connection we've done so. - synchronized(this@Http2Stream) { + lock.withLock { if (closed) { receiveBuffer.clear() } else { val wasEmpty = readBuffer.size == 0L readBuffer.writeAll(receiveBuffer) if (wasEmpty) { - this@Http2Stream.notifyAll() + condition.signalAll() } } } @@ -527,11 +538,11 @@ class Http2Stream internal constructor( @Throws(IOException::class) override fun close() { val bytesDiscarded: Long - synchronized(this@Http2Stream) { + lock.withLock { closed = true bytesDiscarded = readBuffer.size readBuffer.clear() - this@Http2Stream.notifyAll() // TODO(jwilson): Unnecessary? + condition.signalAll() // TODO(jwilson): Unnecessary? } if (bytesDiscarded > 0L) { updateConnectionFlowControl(bytesDiscarded) @@ -542,11 +553,11 @@ class Http2Stream internal constructor( @Throws(IOException::class) internal fun cancelStreamIfNecessary() { - this@Http2Stream.assertThreadDoesntHoldLock() + lock.assertNotHeld() val open: Boolean val cancel: Boolean - synchronized(this) { + lock.withLock { cancel = !source.finished && source.closed && (sink.finished || sink.closed) open = isOpen } @@ -581,7 +592,7 @@ class Http2Stream internal constructor( source: Buffer, byteCount: Long, ) { - this@Http2Stream.assertThreadDoesntHoldLock() + lock.assertNotHeld() sendBuffer.write(source, byteCount) while (sendBuffer.size >= EMIT_BUFFER_SIZE) { @@ -597,7 +608,7 @@ class Http2Stream internal constructor( private fun emitFrame(outFinishedOnLastFrame: Boolean) { val toWrite: Long val outFinished: Boolean - synchronized(this@Http2Stream) { + lock.withLock { writeTimeout.enter() try { while (writeBytesTotal >= writeBytesMaximum && @@ -627,9 +638,9 @@ class Http2Stream internal constructor( @Throws(IOException::class) override fun flush() { - this@Http2Stream.assertThreadDoesntHoldLock() + lock.assertNotHeld() - synchronized(this@Http2Stream) { + lock.withLock { checkOutNotClosed() } // TODO(jwilson): flush the connection?! @@ -643,10 +654,10 @@ class Http2Stream internal constructor( @Throws(IOException::class) override fun close() { - this@Http2Stream.assertThreadDoesntHoldLock() + lock.assertNotHeld() val outFinished: Boolean - synchronized(this@Http2Stream) { + lock.withLock { if (closed) return outFinished = errorCode == null } @@ -675,9 +686,9 @@ class Http2Stream internal constructor( } } } - synchronized(this@Http2Stream) { + lock.withLock { closed = true - this@Http2Stream.notifyAll() // Because doReadTimeout() may have changed. + condition.signalAll() // Because doReadTimeout() may have changed. } connection.flush() cancelStreamIfNecessary() @@ -692,7 +703,7 @@ class Http2Stream internal constructor( fun addBytesToWriteWindow(delta: Long) { writeBytesMaximum += delta if (delta > 0L) { - this@Http2Stream.notifyAll() + condition.signalAll() } } @@ -712,7 +723,7 @@ class Http2Stream internal constructor( @Throws(InterruptedIOException::class) internal fun waitForIo() { try { - wait() + condition.await() } catch (_: InterruptedException) { Thread.currentThread().interrupt() // Retain interrupted status. throw InterruptedIOException() diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Writer.kt b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Writer.kt index ff72536e9b49..496757427193 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Writer.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Writer.kt @@ -17,8 +17,10 @@ package okhttp3.internal.http2 import java.io.Closeable import java.io.IOException +import java.util.concurrent.locks.ReentrantLock import java.util.logging.Level.FINE import java.util.logging.Logger +import kotlin.concurrent.withLock import okhttp3.internal.format import okhttp3.internal.http2.Http2.CONNECTION_PREFACE import okhttp3.internal.http2.Http2.FLAG_ACK @@ -47,39 +49,43 @@ class Http2Writer( private val sink: BufferedSink, private val client: Boolean, ) : Closeable { + internal val lock: ReentrantLock = ReentrantLock() + private val hpackBuffer: Buffer = Buffer() private var maxFrameSize: Int = INITIAL_MAX_FRAME_SIZE private var closed: Boolean = false val hpackWriter: Hpack.Writer = Hpack.Writer(out = hpackBuffer) - @Synchronized @Throws(IOException::class) fun connectionPreface() { - if (closed) throw IOException("closed") - if (!client) return // Nothing to write; servers don't send connection headers! - if (logger.isLoggable(FINE)) { - logger.fine(format(">> CONNECTION ${CONNECTION_PREFACE.hex()}")) + lock.withLock { + if (closed) throw IOException("closed") + if (!client) return // Nothing to write; servers don't send connection headers! + if (logger.isLoggable(FINE)) { + logger.fine(format(">> CONNECTION ${CONNECTION_PREFACE.hex()}")) + } + sink.write(CONNECTION_PREFACE) + sink.flush() } - sink.write(CONNECTION_PREFACE) - sink.flush() } /** Applies `peerSettings` and then sends a settings ACK. */ - @Synchronized @Throws(IOException::class) fun applyAndAckSettings(peerSettings: Settings) { - if (closed) throw IOException("closed") - this.maxFrameSize = peerSettings.getMaxFrameSize(maxFrameSize) - if (peerSettings.headerTableSize != -1) { - hpackWriter.resizeHeaderTable(peerSettings.headerTableSize) + lock.withLock { + if (closed) throw IOException("closed") + this.maxFrameSize = peerSettings.getMaxFrameSize(maxFrameSize) + if (peerSettings.headerTableSize != -1) { + hpackWriter.resizeHeaderTable(peerSettings.headerTableSize) + } + frameHeader( + streamId = 0, + length = 0, + type = TYPE_SETTINGS, + flags = FLAG_ACK, + ) + sink.flush() } - frameHeader( - streamId = 0, - length = 0, - type = TYPE_SETTINGS, - flags = FLAG_ACK, - ) - sink.flush() } /** @@ -94,54 +100,57 @@ class Http2Writer( * @param promisedStreamId server-initiated stream ID. Must be an even number. * @param requestHeaders minimally includes `:method`, `:scheme`, `:authority`, and `:path`. */ - @Synchronized @Throws(IOException::class) fun pushPromise( streamId: Int, promisedStreamId: Int, requestHeaders: List
, ) { - if (closed) throw IOException("closed") - hpackWriter.writeHeaders(requestHeaders) + lock.withLock { + if (closed) throw IOException("closed") + hpackWriter.writeHeaders(requestHeaders) - val byteCount = hpackBuffer.size - val length = minOf(maxFrameSize - 4L, byteCount).toInt() - frameHeader( - streamId = streamId, - length = length + 4, - type = TYPE_PUSH_PROMISE, - flags = if (byteCount == length.toLong()) FLAG_END_HEADERS else 0, - ) - sink.writeInt(promisedStreamId and 0x7fffffff) - sink.write(hpackBuffer, length.toLong()) + val byteCount = hpackBuffer.size + val length = minOf(maxFrameSize - 4L, byteCount).toInt() + frameHeader( + streamId = streamId, + length = length + 4, + type = TYPE_PUSH_PROMISE, + flags = if (byteCount == length.toLong()) FLAG_END_HEADERS else 0, + ) + sink.writeInt(promisedStreamId and 0x7fffffff) + sink.write(hpackBuffer, length.toLong()) - if (byteCount > length) writeContinuationFrames(streamId, byteCount - length) + if (byteCount > length) writeContinuationFrames(streamId, byteCount - length) + } } - @Synchronized @Throws(IOException::class) fun flush() { - if (closed) throw IOException("closed") - sink.flush() + lock.withLock { + if (closed) throw IOException("closed") + sink.flush() + } } - @Synchronized @Throws(IOException::class) fun rstStream( streamId: Int, errorCode: ErrorCode, ) { - if (closed) throw IOException("closed") - require(errorCode.httpCode != -1) + lock.withLock { + if (closed) throw IOException("closed") + require(errorCode.httpCode != -1) - frameHeader( - streamId = streamId, - length = 4, - type = TYPE_RST_STREAM, - flags = FLAG_NONE, - ) - sink.writeInt(errorCode.httpCode) - sink.flush() + frameHeader( + streamId = streamId, + length = 4, + type = TYPE_RST_STREAM, + flags = FLAG_NONE, + ) + sink.writeInt(errorCode.httpCode) + sink.flush() + } } /** The maximum size of bytes that may be sent in a single call to [data]. */ @@ -154,7 +163,6 @@ class Http2Writer( * @param source the buffer to draw bytes from. May be null if byteCount is 0. * @param byteCount must be between 0 and the minimum of `source.length` and [maxDataLength]. */ - @Synchronized @Throws(IOException::class) fun data( outFinished: Boolean, @@ -162,10 +170,12 @@ class Http2Writer( source: Buffer?, byteCount: Int, ) { - if (closed) throw IOException("closed") - var flags = FLAG_NONE - if (outFinished) flags = flags or FLAG_END_STREAM - dataFrame(streamId, flags, source, byteCount) + lock.withLock { + if (closed) throw IOException("closed") + var flags = FLAG_NONE + if (outFinished) flags = flags or FLAG_END_STREAM + dataFrame(streamId, flags, source, byteCount) + } } @Throws(IOException::class) @@ -187,51 +197,53 @@ class Http2Writer( } /** Write okhttp's settings to the peer. */ - @Synchronized @Throws(IOException::class) fun settings(settings: Settings) { - if (closed) throw IOException("closed") - frameHeader( - streamId = 0, - length = settings.size() * 6, - type = TYPE_SETTINGS, - flags = FLAG_NONE, - ) - for (i in 0 until Settings.COUNT) { - if (!settings.isSet(i)) continue - val id = - when (i) { - 4 -> 3 // SETTINGS_MAX_CONCURRENT_STREAMS renumbered. - 7 -> 4 // SETTINGS_INITIAL_WINDOW_SIZE renumbered. - else -> i - } - sink.writeShort(id) - sink.writeInt(settings[i]) + lock.withLock { + if (closed) throw IOException("closed") + frameHeader( + streamId = 0, + length = settings.size() * 6, + type = TYPE_SETTINGS, + flags = FLAG_NONE, + ) + for (i in 0 until Settings.COUNT) { + if (!settings.isSet(i)) continue + val id = + when (i) { + 4 -> 3 // SETTINGS_MAX_CONCURRENT_STREAMS renumbered. + 7 -> 4 // SETTINGS_INITIAL_WINDOW_SIZE renumbered. + else -> i + } + sink.writeShort(id) + sink.writeInt(settings[i]) + } + sink.flush() } - sink.flush() } /** * Send a connection-level ping to the peer. `ack` indicates this is a reply. The data in * `payload1` and `payload2` opaque binary, and there are no rules on the content. */ - @Synchronized @Throws(IOException::class) fun ping( ack: Boolean, payload1: Int, payload2: Int, ) { - if (closed) throw IOException("closed") - frameHeader( - streamId = 0, - length = 8, - type = TYPE_PING, - flags = if (ack) FLAG_ACK else FLAG_NONE, - ) - sink.writeInt(payload1) - sink.writeInt(payload2) - sink.flush() + lock.withLock { + if (closed) throw IOException("closed") + frameHeader( + streamId = 0, + length = 8, + type = TYPE_PING, + flags = if (ack) FLAG_ACK else FLAG_NONE, + ) + sink.writeInt(payload1) + sink.writeInt(payload2) + sink.flush() + } } /** @@ -242,61 +254,63 @@ class Http2Writer( * @param errorCode reason for closing the connection. * @param debugData only valid for HTTP/2; opaque debug data to send. */ - @Synchronized @Throws(IOException::class) fun goAway( lastGoodStreamId: Int, errorCode: ErrorCode, debugData: ByteArray, ) { - if (closed) throw IOException("closed") - require(errorCode.httpCode != -1) { "errorCode.httpCode == -1" } - frameHeader( - streamId = 0, - length = 8 + debugData.size, - type = TYPE_GOAWAY, - flags = FLAG_NONE, - ) - sink.writeInt(lastGoodStreamId) - sink.writeInt(errorCode.httpCode) - if (debugData.isNotEmpty()) { - sink.write(debugData) + lock.withLock { + if (closed) throw IOException("closed") + require(errorCode.httpCode != -1) { "errorCode.httpCode == -1" } + frameHeader( + streamId = 0, + length = 8 + debugData.size, + type = TYPE_GOAWAY, + flags = FLAG_NONE, + ) + sink.writeInt(lastGoodStreamId) + sink.writeInt(errorCode.httpCode) + if (debugData.isNotEmpty()) { + sink.write(debugData) + } + sink.flush() } - sink.flush() } /** * Inform peer that an additional `windowSizeIncrement` bytes can be sent on `streamId`, or the * connection if `streamId` is zero. */ - @Synchronized @Throws(IOException::class) fun windowUpdate( streamId: Int, windowSizeIncrement: Long, ) { - if (closed) throw IOException("closed") - require(windowSizeIncrement != 0L && windowSizeIncrement <= 0x7fffffffL) { - "windowSizeIncrement == 0 || windowSizeIncrement > 0x7fffffffL: $windowSizeIncrement" - } - if (logger.isLoggable(FINE)) { - logger.fine( - frameLogWindowUpdate( - inbound = false, - streamId = streamId, - length = 4, - windowSizeIncrement = windowSizeIncrement, - ), + lock.withLock { + if (closed) throw IOException("closed") + require(windowSizeIncrement != 0L && windowSizeIncrement <= 0x7fffffffL) { + "windowSizeIncrement == 0 || windowSizeIncrement > 0x7fffffffL: $windowSizeIncrement" + } + if (logger.isLoggable(FINE)) { + logger.fine( + frameLogWindowUpdate( + inbound = false, + streamId = streamId, + length = 4, + windowSizeIncrement = windowSizeIncrement, + ), + ) + } + frameHeader( + streamId = streamId, + length = 4, + type = TYPE_WINDOW_UPDATE, + flags = FLAG_NONE, ) + sink.writeInt(windowSizeIncrement.toInt()) + sink.flush() } - frameHeader( - streamId = streamId, - length = 4, - type = TYPE_WINDOW_UPDATE, - flags = FLAG_NONE, - ) - sink.writeInt(windowSizeIncrement.toInt()) - sink.flush() } @Throws(IOException::class) @@ -317,11 +331,12 @@ class Http2Writer( sink.writeInt(streamId and 0x7fffffff) } - @Synchronized @Throws(IOException::class) override fun close() { - closed = true - sink.close() + lock.withLock { + closed = true + sink.close() + } } @Throws(IOException::class) @@ -343,29 +358,30 @@ class Http2Writer( } } - @Synchronized @Throws(IOException::class) fun headers( outFinished: Boolean, streamId: Int, headerBlock: List
, ) { - if (closed) throw IOException("closed") - hpackWriter.writeHeaders(headerBlock) + lock.withLock { + if (closed) throw IOException("closed") + hpackWriter.writeHeaders(headerBlock) - val byteCount = hpackBuffer.size - val length = minOf(maxFrameSize.toLong(), byteCount) - var flags = if (byteCount == length) FLAG_END_HEADERS else 0 - if (outFinished) flags = flags or FLAG_END_STREAM - frameHeader( - streamId = streamId, - length = length.toInt(), - type = TYPE_HEADERS, - flags = flags, - ) - sink.write(hpackBuffer, length) + val byteCount = hpackBuffer.size + val length = minOf(maxFrameSize.toLong(), byteCount) + var flags = if (byteCount == length) FLAG_END_HEADERS else 0 + if (outFinished) flags = flags or FLAG_END_STREAM + frameHeader( + streamId = streamId, + length = length.toInt(), + type = TYPE_HEADERS, + flags = flags, + ) + sink.write(hpackBuffer, length) - if (byteCount > length) writeContinuationFrames(streamId, byteCount - length) + if (byteCount > length) writeContinuationFrames(streamId, byteCount - length) + } } companion object { diff --git a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt index 240816047d82..8e58158d6e6e 100644 --- a/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt +++ b/okhttp/src/test/java/okhttp3/internal/connection/ConnectionPoolTest.kt @@ -21,6 +21,7 @@ import assertk.assertions.isEqualTo import assertk.assertions.isFalse import assertk.assertions.isNotEmpty import assertk.assertions.isTrue +import kotlin.concurrent.withLock import okhttp3.Address import okhttp3.ConnectionPool import okhttp3.FakeRoutePlanner @@ -96,7 +97,7 @@ class ConnectionPoolTest { .build() val call = client.newCall(Request(addressA.url)) as RealCall call.enterNetworkInterceptorExchange(call.request(), true, factory.newChain(call)) - synchronized(c1) { call.acquireConnectionNoEvents(c1) } + c1.lock.withLock { call.acquireConnectionNoEvents(c1) } // Running at time 50, the pool returns that nothing can be evicted until time 150. assertThat(pool.closeConnections(50L)).isEqualTo(100L) @@ -342,6 +343,6 @@ class ConnectionPoolTest { .build() val call = client.newCall(Request(connection.route().address.url)) as RealCall call.enterNetworkInterceptorExchange(call.request(), true, factory.newChain(call)) - synchronized(connection) { call.acquireConnectionNoEvents(connection) } + connection.lock.withLock { call.acquireConnectionNoEvents(connection) } } }