diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 11171e94c52..dc4bce0145a 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -69,23 +69,8 @@ final class DelayedClientTransport implements ManagedClientTransport { @GuardedBy("lock") private Collection pendingStreams = new LinkedHashSet<>(); - /** - * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered - * terminated. - */ - @GuardedBy("lock") - private Status shutdownStatus; - - /** - * The last picker that {@link #reprocess} has used. May be set to null when the channel has moved - * to idle. - */ - @GuardedBy("lock") - @Nullable - private SubchannelPicker lastPicker; - - @GuardedBy("lock") - private long lastPickerVersion; + /** Immutable state needed for picking. 'lock' must be held for writing. */ + private volatile PickerState pickerState = new PickerState(null, null); /** * Creates a new delayed transport. @@ -139,33 +124,30 @@ public final ClientStream newStream( try { PickSubchannelArgs args = new PickSubchannelArgsImpl( method, headers, callOptions, new PickDetailsConsumerImpl(tracers)); - SubchannelPicker picker = null; - long pickerVersion = -1; + PickerState state = pickerState; while (true) { - synchronized (lock) { - if (shutdownStatus != null) { - return new FailingClientStream(shutdownStatus, tracers); - } - if (lastPicker == null) { - return createPendingStream(args, tracers); - } - // Check for second time through the loop, and whether anything changed - if (picker != null && pickerVersion == lastPickerVersion) { - return createPendingStream(args, tracers); - } - picker = lastPicker; - pickerVersion = lastPickerVersion; + if (state.shutdownStatus != null) { + return new FailingClientStream(state.shutdownStatus, tracers); } - PickResult pickResult = picker.pickSubchannel(args); - ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, - callOptions.isWaitForReady()); - if (transport != null) { - return transport.newStream( - args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(), - tracers); + if (state.lastPicker != null) { + PickResult pickResult = state.lastPicker.pickSubchannel(args); + ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, + callOptions.isWaitForReady()); + if (transport != null) { + return transport.newStream( + args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(), + tracers); + } } // This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible // race with reprocess()), we will buffer it. Otherwise, will try with the new picker. + synchronized (lock) { + PickerState newerState = pickerState; + if (state == newerState) { + return createPendingStream(args, tracers); + } + state = newerState; + } } } finally { syncContext.drain(); @@ -210,10 +192,10 @@ public ListenableFuture getStats() { @Override public final void shutdown(final Status status) { synchronized (lock) { - if (shutdownStatus != null) { + if (pickerState.shutdownStatus != null) { return; } - shutdownStatus = status; + pickerState = pickerState.withShutdownStatus(status); syncContext.executeLater(new Runnable() { @Override public void run() { @@ -288,8 +270,7 @@ final int getPendingStreamsCount() { final void reprocess(@Nullable SubchannelPicker picker) { ArrayList toProcess; synchronized (lock) { - lastPicker = picker; - lastPickerVersion++; + pickerState = pickerState.withPicker(picker); if (picker == null || !hasPendingStreams()) { return; } @@ -338,7 +319,7 @@ final void reprocess(@Nullable SubchannelPicker picker) { // (which would shutdown the transports and LoadBalancer) because the gap should be shorter // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). syncContext.executeLater(reportTransportNotInUse); - if (shutdownStatus != null && reportTransportTerminated != null) { + if (pickerState.shutdownStatus != null && reportTransportTerminated != null) { syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } @@ -384,7 +365,7 @@ public void cancel(Status reason) { boolean justRemovedAnElement = pendingStreams.remove(this); if (!hasPendingStreams() && justRemovedAnElement) { syncContext.executeLater(reportTransportNotInUse); - if (shutdownStatus != null) { + if (pickerState.shutdownStatus != null) { syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } @@ -409,4 +390,32 @@ public void appendTimeoutInsight(InsightBuilder insight) { super.appendTimeoutInsight(insight); } } + + static final class PickerState { + /** + * The last picker that {@link #reprocess} has used. May be set to null when the channel has + * moved to idle. + */ + @Nullable + final SubchannelPicker lastPicker; + /** + * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered + * terminated. + */ + @Nullable + final Status shutdownStatus; + + private PickerState(SubchannelPicker lastPicker, Status shutdownStatus) { + this.lastPicker = lastPicker; + this.shutdownStatus = shutdownStatus; + } + + public PickerState withPicker(SubchannelPicker newPicker) { + return new PickerState(newPicker, this.shutdownStatus); + } + + public PickerState withShutdownStatus(Status newShutdownStatus) { + return new PickerState(this.lastPicker, newShutdownStatus); + } + } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 177124cb850..09ca4684bd5 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -471,57 +471,20 @@ private void refreshNameResolution() { private final class ChannelStreamProvider implements ClientStreamProvider { volatile Throttle throttle; - private ClientTransport getTransport(PickSubchannelArgs args) { - SubchannelPicker pickerCopy = subchannelPicker; - if (shutdown.get()) { - // If channel is shut down, delayedTransport is also shut down which will fail the stream - // properly. - return delayedTransport; - } - if (pickerCopy == null) { - final class ExitIdleModeForTransport implements Runnable { - @Override - public void run() { - exitIdleMode(); - } - } - - syncContext.execute(new ExitIdleModeForTransport()); - return delayedTransport; - } - // There is no need to reschedule the idle timer here. - // - // pickerCopy != null, which means idle timer has not expired when this method starts. - // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer - // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after - // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it. - // - // In most cases the idle timer is scheduled to fire after the transport has created the - // stream, which would have reported in-use state to the channel that would have cancelled - // the idle timer. - PickResult pickResult = pickerCopy.pickSubchannel(args); - ClientTransport transport = GrpcUtil.getTransportFromPickResult( - pickResult, args.getCallOptions().isWaitForReady()); - if (transport != null) { - return transport; - } - return delayedTransport; - } - @Override public ClientStream newStream( final MethodDescriptor method, final CallOptions callOptions, final Metadata headers, final Context context) { + // There is no need to reschedule the idle timer here. If the channel isn't shut down, either + // the delayed transport or a real transport will go in-use and cancel the idle timer. if (!retryEnabled) { ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( callOptions, headers, 0, /* isTransparentRetry= */ false); - ClientTransport transport = getTransport(new PickSubchannelArgsImpl( - method, headers, callOptions, new PickDetailsConsumerImpl(tracers))); Context origContext = context.attach(); try { - return transport.newStream(method, headers, callOptions, tracers); + return delayedTransport.newStream(method, headers, callOptions, tracers); } finally { context.detach(origContext); } @@ -562,11 +525,9 @@ ClientStream newSubstream( CallOptions newOptions = callOptions.withStreamTracerFactory(factory); ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( newOptions, newHeaders, previousAttempts, isTransparentRetry); - ClientTransport transport = getTransport(new PickSubchannelArgsImpl( - method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers))); Context origContext = context.attach(); try { - return transport.newStream(method, newHeaders, newOptions, tracers); + return delayedTransport.newStream(method, newHeaders, newOptions, tracers); } finally { context.detach(origContext); }