-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
check pending stream completion at delayed transport lifecycle #7720
Conversation
core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Outdated
Show resolved
Hide resolved
core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Outdated
Show resolved
Hide resolved
@@ -66,6 +66,23 @@ | |||
@Nonnull | |||
@GuardedBy("lock") | |||
private Collection<PendingStream> pendingStreams = new LinkedHashSet<>(); | |||
@GuardedBy("lock") | |||
private Collection<PendingStream> toCheckCompletionStreams = new LinkedHashSet<>(); | |||
private Runnable pollForStreamTransferCompletion = new Runnable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it final
?
@@ -132,6 +135,24 @@ final void setStream(ClientStream stream) { | |||
drainPendingCalls(); | |||
} | |||
|
|||
protected boolean isStreamTransferCompleted() { | |||
return realStreamStarted.getCount() == 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that getCount()
is typically used for debugging and testing purposes. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html#getCount()
Better avoid it in the main source if possible.
return; | ||
} | ||
pendingStreams.removeAll(toRemove); | ||
pendingStreams.removeAll(newlyCreated); | ||
toCheckCompletionStreams.addAll(newlyCreated); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer making toCheckCompletionStreams
be either 'non-final and immutable' or 'final and mutable', but not 'non-final' and 'mutable'.
@VisibleForTesting | ||
final int getUncommittedStreamsCount() { | ||
synchronized (lock) { | ||
return toCheckCompletionStreams.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using the same name for toCheckCompletionStreams
and uncommittedStreams
?
public void run() { | ||
ArrayList<PendingStream> savedToCheckCompletionStreams; | ||
synchronized (lock) { | ||
savedToCheckCompletionStreams = new ArrayList<>(toCheckCompletionStreams); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need copy the list, just saving the reference should be sufficient.
synchronized (lock) { | ||
lastPicker = picker; | ||
lastPickerVersion++; | ||
if (picker == null || !hasPendingStreams()) { | ||
if ((picker == null || !hasPendingStreams()) && !hasUncommittedStreams()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think changing this line is not necessary. Regardless of hasUncommittedStreams
, if return
then stream.createRealStream()
or drain()
will not be called and will not cause trouble.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is because it needs to take care another case: when shutdown() is called but there are still uncommittedStreams, the shutdown path would return, then it will never shutdown. It relied on reprocess() to trigger the termination. (Similar to cancel() takes care of the last item and then trigger termination callback). This is different in shudownNow() which would take care of waiting uncommitted streams and then finalize termination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hen shutdown() is called but there are still uncommittedStreams, the shutdown path would return, then it will never shutdown. It relied on reprocess() to trigger the termination.
uncommittedStreams
can be considered as existing RPCs, so shutdown()
should not terminate them. uncommittedStreams
will complete transfer by themselves, and they don't rely on a second reprocess()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown()
never terminates then existing RPCs. Do you mean we await() during shutdown()?
It seems it's hard to avoid that. Say, if there are both pendingStreams
and uncommittedStreams
when shutdown is called, so shutdown has to return. Then during next reprocess(), the newly created stream has not been drained , which would cause pendingStreams
empty but uncommittedstream
still has items in it, and we need a way to drain it. It looks reprocess() is that place, it would trigger in the next call or after idle timer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should await() during shutdown(), because there is no "next" reprocess()
after shutdown, so we can not rely on reprocess()
for existing uncommittedStreams
.
Another way is introducing an abstract method DelayedStream.onTransferComplete()
, and implementing DelayedClientTransport.PendingStream.onTransferComplete()
to managed decrement of `uncommittedStreams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Indeed it appears that idle timeout is permanently cancelled immediately after shutdown is called so there is no next reprocess()
. It looks like just awaitTransferCompletion()
during shutdown is not enough, moreover, I believe generally we are not supposed to await() during shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
* Provides the place to define actions at the point when transfer is done. | ||
* Call this method to trigger those transfer completion activities. No-op by default. | ||
*/ | ||
public void onTransferComplete() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: public
redundant.
core/src/main/java/io/grpc/internal/TransferableClientStream.java
Outdated
Show resolved
Hide resolved
core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
boolean justRemovedAnElement = pendingStreams.remove(this); | ||
if (!hasPendingStreams() && justRemovedAnElement && reportTransportTerminated != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check reportTransportTerminated != null
after the change. Previously if reportTransportTerminated == null
, pendingStream
is guaranteed reset to empty and justRemovedAnElement
will never happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that in time order: 1. super.cancel() 2.onTransferComplete(), reportTransportTerminated=null
3. this lock block might reportNotInUse
, which won't happen previously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think the check for reportTransportTerminated != null
is unnecessary and the (!hasPendingStreams() && justRemovedAnElement)
is the canonical invariant for reportTransportNotInUse
. But ether way seems working anyway.
…#7720) add onTransferComplete() at delayedStream and wait for all pending streams to complete transfer when shutting down delayedClientTransport
To fix #6283
Problem
The issue was that delayedClientTransport
reprocess()
schedules workcreateRealStream()
but does not track whether it is committed, i.e. the real stream done set to the delayed stream which completes the delegation. Because it does not track the completion, shutdown prematurely goes through and then executor was destroyed.The inflight work that has been scheduled happen to use the same executor, causing
RejectedExecutionException
Reproduce
The problem can be reproduced by putting a pause at
createRealStream()
, and then run a client call.grpc-java/core/src/main/java/io/grpc/internal/DelayedStream.java
Line 140 in ddaf1c8