-
Notifications
You must be signed in to change notification settings - Fork 9.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow Addresses to have a max calls per connection #8386
base: master
Are you sure you want to change the base?
Changes from all commits
5c15058
776239b
04774b8
8c4a64e
a4befc3
96345cf
6edc2f1
5724e9d
6e9b44c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS | |
import java.util.concurrent.locks.ReentrantLock | ||
import javax.net.ssl.SSLPeerUnverifiedException | ||
import javax.net.ssl.SSLSocket | ||
import kotlin.concurrent.withLock | ||
import kotlin.math.min | ||
import okhttp3.Address | ||
import okhttp3.Connection | ||
import okhttp3.ConnectionListener | ||
|
@@ -119,6 +119,8 @@ class RealConnection( | |
internal var allocationLimit = 1 | ||
private set | ||
|
||
private var lastMaxConcurrentStreamsFromSettings: Int? = null | ||
|
||
/** Current calls carried by this connection. */ | ||
val calls = mutableListOf<Reference<RealCall>>() | ||
|
||
|
@@ -176,7 +178,8 @@ class RealConnection( | |
.flowControlListener(flowControlListener) | ||
.build() | ||
this.http2Connection = http2Connection | ||
this.allocationLimit = Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams() | ||
this.lastMaxConcurrentStreamsFromSettings = Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams() | ||
recalculateAllocationLimit() | ||
http2Connection.start() | ||
} | ||
|
||
|
@@ -335,7 +338,7 @@ class RealConnection( | |
return http2Connection.isHealthy(nowNs) | ||
} | ||
|
||
val idleDurationNs = lock.withLock { nowNs - idleAtNs } | ||
val idleDurationNs = this.withLock { nowNs - idleAtNs } | ||
if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) { | ||
return socket.isHealthy(source) | ||
} | ||
|
@@ -354,9 +357,21 @@ class RealConnection( | |
connection: Http2Connection, | ||
settings: Settings, | ||
) { | ||
lock.withLock { | ||
this.withLock { | ||
this.lastMaxConcurrentStreamsFromSettings = settings.getMaxConcurrentStreams() | ||
recalculateAllocationLimit() | ||
} | ||
} | ||
|
||
/** | ||
* Resets the [allocationLimit] field based on any settings which may have been applied | ||
* Needed to allow for policy changes to adjust the limit, similarly to the change | ||
* made during settings changes | ||
*/ | ||
internal fun recalculateAllocationLimit() { | ||
this.withLock { | ||
val oldLimit = allocationLimit | ||
allocationLimit = settings.getMaxConcurrentStreams() | ||
allocationLimit = getMaximumAllocationLimit() | ||
|
||
if (allocationLimit < oldLimit) { | ||
// We might need new connections to keep policies satisfied | ||
|
@@ -368,6 +383,17 @@ class RealConnection( | |
} | ||
} | ||
|
||
private fun getMaximumAllocationLimit(): Int { | ||
// if we have not negotiated a max per streams yet, don't check for the policy override | ||
val negotiatedMaxCurrentStreams = lastMaxConcurrentStreamsFromSettings ?: return 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this just defaulted to 1 for Http/1 and Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams() for Http/2? should it be a lateinit and start sets to one or the other? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe?
Would it be better to have the null and early return or check the policy every time, even if we're not in HTTP/2 land? |
||
|
||
val maxPolicyValue = | ||
connectionPool.getMaximumCallsPerConnection(route.address) | ||
?: Int.MAX_VALUE | ||
|
||
return min(maxPolicyValue, negotiatedMaxCurrentStreams) | ||
} | ||
|
||
override fun handshake(): Handshake? = handshake | ||
|
||
/** Track a bad route in the route database. Other routes will be attempted first. */ | ||
|
@@ -398,7 +424,7 @@ class RealConnection( | |
e: IOException?, | ||
) { | ||
var noNewExchangesEvent = false | ||
lock.withLock { | ||
this.withLock { | ||
if (e is StreamResetException) { | ||
when { | ||
e.errorCode == ErrorCode.REFUSED_STREAM -> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer need when the references from
lock.withLock
were changed tothis.withLock
lock.withLock
so same concurrent method used one step further.