Skip to content

Commit

Permalink
check pending stream completion at delayed transport lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Dec 10, 2020
1 parent 4be68f3 commit ac99972
Show file tree
Hide file tree
Showing 5 changed files with 334 additions and 26 deletions.
103 changes: 78 additions & 25 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ final class DelayedClientTransport implements ManagedClientTransport {
@Nonnull
@GuardedBy("lock")
private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
@GuardedBy("lock")
private Collection<PendingStream> toCheckCompletionStreams = new LinkedHashSet<>();
private Runnable pollForStreamTransferComplete = new Runnable() {
@Override
public void run() {
ArrayList<PendingStream> savedToCheckCompletionStreams;
synchronized (lock) {
savedToCheckCompletionStreams = new ArrayList<>(toCheckCompletionStreams);
if (!toCheckCompletionStreams.isEmpty()) {
toCheckCompletionStreams = Collections.emptyList();
}
}
for (final PendingStream stream : savedToCheckCompletionStreams) {
stream.awaitStreamTransferCompletion();
}
}
};

/**
* When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
Expand Down Expand Up @@ -211,7 +228,7 @@ public void run() {
listener.transportShutdown(status);
}
});
if (!hasPendingStreams() && reportTransportTerminated != null) {
if (!hasPendingStreams() && !hasUncommittedStreams() && reportTransportTerminated != null) {
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
Expand All @@ -227,19 +244,27 @@ public void run() {
public final void shutdownNow(Status status) {
shutdown(status);
Collection<PendingStream> savedPendingStreams;
Collection<PendingStream> savedToCheckCompletionStreams;
Runnable savedReportTransportTerminated;
synchronized (lock) {
savedPendingStreams = pendingStreams;
savedToCheckCompletionStreams = toCheckCompletionStreams;
savedReportTransportTerminated = reportTransportTerminated;
reportTransportTerminated = null;
if (!pendingStreams.isEmpty()) {
pendingStreams = Collections.emptyList();
}
if (!toCheckCompletionStreams.isEmpty()) {
toCheckCompletionStreams = Collections.emptyList();
}
}
if (savedReportTransportTerminated != null) {
for (PendingStream stream : savedPendingStreams) {
stream.cancel(status);
}
for (PendingStream stream : savedToCheckCompletionStreams) {
stream.awaitStreamTransferCompletion();
}
syncContext.execute(savedReportTransportTerminated);
}
// If savedReportTransportTerminated == null, transportTerminated() has already been called in
Expand All @@ -252,13 +277,26 @@ public final boolean hasPendingStreams() {
}
}

public final boolean hasUncommittedStreams() {
synchronized (lock) {
return !toCheckCompletionStreams.isEmpty();
}
}

@VisibleForTesting
final int getPendingStreamsCount() {
synchronized (lock) {
return pendingStreams.size();
}
}

@VisibleForTesting
final int getUncommittedStreamCount() {
synchronized (lock) {
return toCheckCompletionStreams.size();
}
}

/**
* Use the picker to try picking a transport for every pending stream, proceed the stream if the
* pick is successful, otherwise keep it pending.
Expand All @@ -270,48 +308,61 @@ final int getPendingStreamsCount() {
* <p>This method <strong>must not</strong> be called concurrently with itself.
*/
final void reprocess(@Nullable SubchannelPicker picker) {
ArrayList<PendingStream> toProcess;
ArrayList<PendingStream> toCreateRealStream;
ArrayList<PendingStream> toCheckCompletion;
synchronized (lock) {
lastPicker = picker;
lastPickerVersion++;
if (picker == null || !hasPendingStreams()) {
if ((picker == null || !hasPendingStreams()) && !hasUncommittedStreams()) {
return;
}
toProcess = new ArrayList<>(pendingStreams);
toCreateRealStream = new ArrayList<>(pendingStreams);
toCheckCompletion = new ArrayList<>(toCheckCompletionStreams);
}
ArrayList<PendingStream> toRemove = new ArrayList<>();
ArrayList<PendingStream> newlyCreated = new ArrayList<>();

for (final PendingStream stream : toProcess) {
PickResult pickResult = picker.pickSubchannel(stream.args);
CallOptions callOptions = stream.args.getCallOptions();
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
Executor executor = defaultAppExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
// we are now on transport thread, we need to offload the work to an executor.
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
executor.execute(new Runnable() {

if (picker != null) {
for (final PendingStream stream : toCreateRealStream) {
PickResult pickResult = picker.pickSubchannel(stream.args);
CallOptions callOptions = stream.args.getCallOptions();
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
Executor executor = defaultAppExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
// we are now on transport thread, we need to offload the work to an executor.
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});
toRemove.add(stream);
} // else: stay pending
newlyCreated.add(stream);
} // else: stay pending
}
}
toCheckCompletion.addAll(newlyCreated);
ArrayList<PendingStream> completed = new ArrayList<>();
for (final PendingStream stream : toCheckCompletion) {
if (stream.isStreamTransferCompleted()) {
completed.add(stream);
}
}

synchronized (lock) {
// Between this synchronized and the previous one:
// - Streams may have been cancelled, which may turn pendingStreams into emptiness.
// - shutdown() may be called, which may turn pendingStreams into null.
if (!hasPendingStreams()) {
// - shutdownNow() may be called, which may turn pendingStreams into emptiness.
if (!hasPendingStreams() && !hasUncommittedStreams()) {
return;
}
pendingStreams.removeAll(toRemove);
pendingStreams.removeAll(newlyCreated);
toCheckCompletionStreams.addAll(newlyCreated);
toCheckCompletionStreams.removeAll(completed);
// Because delayed transport is long-lived, we take this opportunity to down-size the
// hashmap.
if (pendingStreams.isEmpty()) {
Expand All @@ -325,6 +376,7 @@ public void run() {
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null && reportTransportTerminated != null) {
syncContext.executeLater(pollForStreamTransferComplete);
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
Expand Down Expand Up @@ -367,6 +419,7 @@ public void cancel(Status reason) {
if (!hasPendingStreams() && justRemovedAnElement) {
syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null) {
syncContext.executeLater(pollForStreamTransferComplete);
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;

/**
Expand Down Expand Up @@ -59,6 +61,7 @@ class DelayedStream implements ClientStream {
private long startTimeNanos;
@GuardedBy("this")
private long streamSetTimeNanos;
private final CountDownLatch realStreamStarted = new CountDownLatch(1);

@Override
public void setMaxInboundMessageSize(final int maxSize) {
Expand Down Expand Up @@ -132,6 +135,24 @@ final void setStream(ClientStream stream) {
drainPendingCalls();
}

protected boolean isStreamTransferCompleted() {
return realStreamStarted.getCount() == 0;
}

protected void awaitStreamTransferCompletion() {
// Wait until accepted RPCs transfer to the real stream and so that we can properly cancel or
// shutdown. Not waiting transfer completed may cause pending calls orphaned.
boolean delegationComplete;
try {
delegationComplete = realStreamStarted.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
delegationComplete = false;
}
if (!delegationComplete) {
Thread.currentThread().interrupt();
}
}

/**
* Called to transition {@code passThrough} to {@code true}. This method is not safe to be called
* multiple times; the caller must ensure it will only be called once, ever. {@code this} lock
Expand Down Expand Up @@ -221,12 +242,14 @@ public void start(ClientStreamListener listener) {

if (savedPassThrough) {
realStream.start(listener);
realStreamStarted.countDown();
} else {
final ClientStreamListener finalListener = listener;
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.start(finalListener);
realStreamStarted.countDown();
}
});
}
Expand Down Expand Up @@ -302,7 +325,11 @@ public void run() {
listenerToClose.closed(reason, new Metadata());
}
drainPendingCalls();
if (!isStreamTransferCompleted()) {
realStreamStarted.countDown();
}
}
awaitStreamTransferCompletion();
}

@GuardedBy("this")
Expand Down
Loading

0 comments on commit ac99972

Please sign in to comment.