Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: DelayedStream should start() real stream immediately #7750

Merged
merged 3 commits into from
Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -238,7 +239,13 @@ public final void shutdownNow(Status status) {
}
if (savedReportTransportTerminated != null) {
for (PendingStream stream : savedPendingStreams) {
stream.cancel(status);
Runnable runnable = stream.setStream(new FailingClientStream(status, RpcProgress.REFUSED));
if (runnable != null) {
// Drain in-line instead of using an executor as failing stream just throws everything
// away. This is essentially the same behavior as DelayedStream.cancel() but can be done
// before stream.start().
runnable.run();
}
}
syncContext.execute(savedReportTransportTerminated);
}
Expand Down Expand Up @@ -294,12 +301,10 @@ final void reprocess(@Nullable SubchannelPicker picker) {
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});
Runnable runnable = stream.createRealStream(transport);
if (runnable != null) {
executor.execute(runnable);
}
toRemove.add(stream);
} // else: stay pending
}
Expand Down Expand Up @@ -346,7 +351,8 @@ private PendingStream(PickSubchannelArgs args) {
this.args = args;
}

private void createRealStream(ClientTransport transport) {
/** Runnable may be null. */
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
private Runnable createRealStream(ClientTransport transport) {
ClientStream realStream;
Context origContext = context.attach();
try {
Expand All @@ -355,7 +361,7 @@ private void createRealStream(ClientTransport transport) {
} finally {
context.detach(origContext);
}
setStream(realStream);
return setStream(realStream);
}

@Override
Expand Down
126 changes: 78 additions & 48 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.CheckReturnValue;
import javax.annotation.concurrent.GuardedBy;

/**
Expand Down Expand Up @@ -59,38 +60,35 @@ class DelayedStream implements ClientStream {
private long startTimeNanos;
@GuardedBy("this")
private long streamSetTimeNanos;
// No need to synchronize; start() synchronization provides a happens-before
private List<Runnable> preStartPendingCalls = new ArrayList<>();

@Override
public void setMaxInboundMessageSize(final int maxSize) {
if (passThrough) {
realStream.setMaxInboundMessageSize(maxSize);
} else {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setMaxInboundMessageSize(maxSize);
}
});
}
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setMaxInboundMessageSize(maxSize);
}
});
}

@Override
public void setMaxOutboundMessageSize(final int maxSize) {
if (passThrough) {
realStream.setMaxOutboundMessageSize(maxSize);
} else {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setMaxOutboundMessageSize(maxSize);
}
});
}
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setMaxOutboundMessageSize(maxSize);
}
});
}

@Override
public void setDeadline(final Deadline deadline) {
delayOrExecute(new Runnable() {
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setDeadline(deadline);
Expand All @@ -115,21 +113,41 @@ public void appendTimeoutInsight(InsightBuilder insight) {
}

/**
* Transfers all pending and future requests and mutations to the given stream.
* Transfers all pending and future requests and mutations to the given stream. Method will return
* quickly, but if the returned Runnable is non-null it must be called to complete the process.
* The Runnable may take a while to execute.
*
* <p>No-op if either this method or {@link #cancel} have already been called.
*/
// When this method returns, passThrough is guaranteed to be true
final void setStream(ClientStream stream) {
// When this method returns, start() has been called on realStream or passThrough is guaranteed to
// be true
@CheckReturnValue
final Runnable setStream(ClientStream stream) {
ClientStreamListener savedListener;
synchronized (this) {
// If realStream != null, then either setStream() or cancel() has been called.
if (realStream != null) {
return;
return null;
}
setRealStream(checkNotNull(stream, "stream"));
savedListener = listener;
if (savedListener == null) {
assert pendingCalls.isEmpty();
pendingCalls = null;
passThrough = true;
}
}
if (savedListener == null) {
return null;
} else {
internalStart(savedListener);
return new Runnable() {
@Override
public void run() {
drainPendingCalls();
}
};
}

drainPendingCalls();
}

/**
Expand Down Expand Up @@ -177,6 +195,7 @@ private void drainPendingCalls() {
* only if {@code runnable} is thread-safe.
*/
private void delayOrExecute(Runnable runnable) {
checkState(listener != null, "May only be called after start");
synchronized (this) {
if (!passThrough) {
pendingCalls.add(runnable);
Expand All @@ -190,7 +209,7 @@ private void delayOrExecute(Runnable runnable) {
public void setAuthority(final String authority) {
checkState(listener == null, "May only be called before start");
checkNotNull(authority, "authority");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setAuthority(authority);
Expand All @@ -200,18 +219,19 @@ public void run() {

@Override
public void start(ClientStreamListener listener) {
checkNotNull(listener, "listener");
checkState(this.listener == null, "already started");

Status savedError;
boolean savedPassThrough;
synchronized (this) {
this.listener = checkNotNull(listener, "listener");
// If error != null, then cancel() has been called and was unable to close the listener
savedError = error;
savedPassThrough = passThrough;
if (!savedPassThrough) {
listener = delayedListener = new DelayedStreamListener(listener);
}
this.listener = listener;
startTimeNanos = System.nanoTime();
}
if (savedError != null) {
Expand All @@ -220,16 +240,20 @@ public void start(ClientStreamListener listener) {
}

if (savedPassThrough) {
realStream.start(listener);
} else {
final ClientStreamListener finalListener = listener;
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.start(finalListener);
}
});
internalStart(listener);
} // else internalStart() will be called by setStream
}

/**
* Starts stream without synchronization. {@code listener} should be same instance as {@link
* #listener}.
*/
private void internalStart(ClientStreamListener listener) {
for (Runnable runnable : preStartPendingCalls) {
runnable.run();
}
preStartPendingCalls = null;
realStream.start(listener);
}

@Override
Expand All @@ -247,6 +271,7 @@ public Attributes getAttributes() {

@Override
public void writeMessage(final InputStream message) {
checkState(listener != null, "May only be called after start");
checkNotNull(message, "message");
if (passThrough) {
realStream.writeMessage(message);
Expand All @@ -262,6 +287,7 @@ public void run() {

@Override
public void flush() {
checkState(listener != null, "May only be called after start");
if (passThrough) {
realStream.flush();
} else {
Expand All @@ -277,16 +303,14 @@ public void run() {
// When this method returns, passThrough is guaranteed to be true
@Override
public void cancel(final Status reason) {
checkState(listener != null, "May only be called after start");
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
checkNotNull(reason, "reason");
boolean delegateToRealStream = true;
ClientStreamListener listenerToClose = null;
synchronized (this) {
// If realStream != null, then either setStream() or cancel() has been called
if (realStream == null) {
setRealStream(NoopClientStream.INSTANCE);
delegateToRealStream = false;
// If listener == null, then start() will later call listener with 'error'
listenerToClose = listener;
error = reason;
}
}
Expand All @@ -298,10 +322,9 @@ public void run() {
}
});
} else {
if (listenerToClose != null) {
listenerToClose.closed(reason, new Metadata());
}
drainPendingCalls();
// Note that listener is a DelayedStreamListener
listener.closed(reason, new Metadata());
}
}

Expand All @@ -314,6 +337,7 @@ private void setRealStream(ClientStream realStream) {

@Override
public void halfClose() {
checkState(listener != null, "May only be called after start");
delayOrExecute(new Runnable() {
@Override
public void run() {
Expand All @@ -324,6 +348,7 @@ public void run() {

@Override
public void request(final int numMessages) {
checkState(listener != null, "May only be called after start");
if (passThrough) {
realStream.request(numMessages);
} else {
Expand All @@ -338,7 +363,8 @@ public void run() {

@Override
public void optimizeForDirectExecutor() {
delayOrExecute(new Runnable() {
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.optimizeForDirectExecutor();
Expand All @@ -348,8 +374,9 @@ public void run() {

@Override
public void setCompressor(final Compressor compressor) {
checkState(listener == null, "May only be called before start");
checkNotNull(compressor, "compressor");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setCompressor(compressor);
Expand All @@ -359,7 +386,8 @@ public void run() {

@Override
public void setFullStreamDecompression(final boolean fullStreamDecompression) {
delayOrExecute(
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(
new Runnable() {
@Override
public void run() {
Expand All @@ -370,8 +398,9 @@ public void run() {

@Override
public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
checkState(listener == null, "May only be called before start");
checkNotNull(decompressorRegistry, "decompressorRegistry");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setDecompressorRegistry(decompressorRegistry);
Expand All @@ -390,6 +419,7 @@ public boolean isReady() {

@Override
public void setMessageCompression(final boolean enable) {
checkState(listener != null, "May only be called after start");
if (passThrough) {
realStream.setMessageCompression(enable);
} else {
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/io/grpc/internal/MetadataApplierImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ private void finalizeWith(ClientStream stream) {
// returnStream() has been called before me, thus delayedStream must have been
// created.
checkState(delayedStream != null, "delayedStream is null");
delayedStream.setStream(stream);
Runnable slow = delayedStream.setStream(stream);
if (slow != null) {
// TODO(ejona): run this on a separate thread
slow.run();
}
}

/**
Expand Down
Loading