Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel subscribeToLogs and messageStream RPCs when the Session closes #5433

Merged
merged 5 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ public void subscribeToLogs(
GrpcUtil.safelyError(responseObserver, Code.FAILED_PRECONDITION, "Remote console disabled");
return;
}
// Session close logic implicitly handled in
// io.deephaven.server.session.SessionServiceGrpcImpl.SessionServiceInterceptor
final LogsClient client =
new LogsClient(request, (ServerCallStreamObserver<LogSubscriptionData>) responseObserver);
client.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ public void onCompleted() {
@Override
public StreamObserver<StreamRequest> messageStream(StreamObserver<StreamResponse> responseObserver) {
SessionState session = sessionService.getCurrentSession();
// Session close logic implicitly handled in
// io.deephaven.server.session.SessionServiceGrpcImpl.SessionServiceInterceptor
return new SendMessageObserver(session, responseObserver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.*;
import io.deephaven.proto.backplane.script.grpc.ConsoleServiceGrpc;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.function.ThrowingRunnable;
import io.grpc.Context;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
Expand All @@ -36,9 +36,11 @@

import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.Closeable;
import java.lang.Object;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

public class SessionServiceGrpcImpl extends SessionServiceGrpc.SessionServiceImplBase {
Expand Down Expand Up @@ -310,10 +312,19 @@ private void addHeaders(final Metadata md) {

@Singleton
public static class SessionServiceInterceptor implements ServerInterceptor {
private static final Status AUTHENTICATION_DETAILS_INVALID =
Status.UNAUTHENTICATED.withDescription("Authentication details invalid");

// We can't use just io.grpc.MethodDescriptor (unless we chose provide and inject the named method descriptors),
// some of our methods are overridden from stock gRPC; for example,
// io.deephaven.server.object.ObjectServiceGrpcBinding.bindService.
// The goal should be to migrate all of the existing RPC Session close management logic to here if possible.
private static final Set<String> CANCEL_RPC_ON_SESSION_CLOSE = Set.of(
ConsoleServiceGrpc.getSubscribeToLogsMethod().getFullMethodName(),
ObjectServiceGrpc.getMessageStreamMethod().getFullMethodName());

private final SessionService service;
private final SessionService.ErrorTransformer errorTransformer;
private static final Status authenticationDetailsInvalid =
Status.UNAUTHENTICATED.withDescription("Authentication details invalid");

@Inject
public SessionServiceInterceptor(
Expand Down Expand Up @@ -344,12 +355,8 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<Re
try {
session = service.getSessionForAuthToken(token);
} catch (AuthenticationException e) {
try {
call.close(authenticationDetailsInvalid, new Metadata());
} catch (IllegalStateException ignored) {
// could be thrown if the call was already closed. As an interceptor, we can't throw,
// so ignoring this and just returning the no-op listener.
}
// As an interceptor, we can't throw, so ignoring this and just returning the no-op listener.
safeClose(call, AUTHENTICATION_DETAILS_INVALID, new Metadata(), false);
return new ServerCall.Listener<>() {};
}
}
Expand All @@ -363,33 +370,61 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<Re

final MutableObject<SessionServiceCallListener<ReqT, RespT>> listener = new MutableObject<>();
rpcWrapper(serverCall, context, finalSession, errorTransformer, () -> listener.setValue(
new SessionServiceCallListener<>(serverCallHandler.startCall(serverCall, metadata), serverCall,
context, finalSession, errorTransformer)));
listener(serverCall, metadata, serverCallHandler, context, finalSession)));
if (listener.getValue() == null) {
return new ServerCall.Listener<>() {};
}
return listener.getValue();
}

private <ReqT, RespT> @NotNull SessionServiceCallListener<ReqT, RespT> listener(
InterceptedCall<ReqT, RespT> serverCall,
Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler,
Context context,
SessionState session) {
return new SessionServiceCallListener<>(
serverCallHandler.startCall(serverCall, metadata),
serverCall,
context,
session,
errorTransformer,
CANCEL_RPC_ON_SESSION_CLOSE.contains(serverCall.getMethodDescriptor().getFullMethodName()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we did !serverCall.getMethodDescriptor().getType().serverSendsOneMessage()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm definitely in favor of adding more RPCs into auto cancel; that said, I'm trying to be cautious about changing existing behavior without further testing and validation.

I think we actually want this to be try for all RPCs, even ones where the server sends one message. As a real example,

  /*
   * Receive a best-effort message on-exit indicating why this server is exiting. Reception of this message cannot be
   * guaranteed.
   */
  rpc TerminationNotification(TerminationNotificationRequest) returns (TerminationNotificationResponse) {}

is a long-living RPC where the server only sends one message; I would want this to be auto cancelled as well. (Right now, it does have its own cancellation logic, but I think it could be automated here as well.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that makes sense. That's kind of annoying; maintaining a white list is fragile, maintaining a black list would be annoying for "power users" who inject additional gRPC services.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think we could safely move to auto cancel for every RPC attached to a Session (with a bit of due diligence to consider downstream effects). I don't think it's an unreasonable position for additional gRPC services that are implemented and are bound to a Session to inherit this behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#5534 created for follow-up to this point.

}
}

private static class SessionServiceCallListener<ReqT, RespT> extends
ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> implements Closeable {
private static final Status SESSION_CLOSED = Status.CANCELLED.withDescription("Session closed");

private final ServerCall<ReqT, RespT> call;
private final Context context;
private final SessionState session;
private final SessionService.ErrorTransformer errorTransformer;
private final boolean autoCancelOnSessionClose;

public SessionServiceCallListener(
SessionServiceCallListener(
ServerCall.Listener<ReqT> delegate,
ServerCall<ReqT, RespT> call,
Context context,
SessionState session,
SessionService.ErrorTransformer errorTransformer) {
SessionService.ErrorTransformer errorTransformer,
boolean autoCancelOnSessionClose) {
super(delegate);
this.call = call;
this.context = context;
this.session = session;
this.errorTransformer = errorTransformer;
this.autoCancelOnSessionClose = autoCancelOnSessionClose;
if (autoCancelOnSessionClose && session != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might actually want to error if no session exists and only allow server-streaming rpc's if there is a session to attach to them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, to add a safety check in the code path to here - I expect that assumption is already met today by our impls, but I can't guarantee it. I don't think that immediately absolves our server impls from still being a bit explicit (impls still need to be explicit and call io.deephaven.server.session.SessionService#getCurrentSession to verify). It might be worthwhile to capture these expectations as annotations at the RPCs layer to make our server impls simpler. (I would expect that an RPC would need an annotation to opt-out; assume Session is necessary by default.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a commit that accomplishes this extra safety check, but I think it would be best as an immediate follow-up PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up issue, #5535

session.addOnCloseCallback(this);
}
}

@Override
public void close() {
// session.addOnCloseCallback
safeClose(call, SESSION_CLOSED, new Metadata(), false);
}

@Override
Expand All @@ -405,11 +440,17 @@ public void onHalfClose() {
@Override
public void onCancel() {
rpcWrapper(call, context, session, errorTransformer, super::onCancel);
if (autoCancelOnSessionClose && session != null) {
session.removeOnCloseCallback(this);
}
}

@Override
public void onComplete() {
rpcWrapper(call, context, session, errorTransformer, super::onComplete);
if (autoCancelOnSessionClose && session != null) {
session.removeOnCloseCallback(this);
}
}

@Override
Expand All @@ -432,34 +473,44 @@ private static <ReqT, RespT> void rpcWrapper(
@NotNull final Context context,
@Nullable final SessionState session,
@NotNull final SessionService.ErrorTransformer errorTransformer,
@NotNull final ThrowingRunnable<InterruptedException> lambda) {
@NotNull final Runnable lambda) {
Context previous = context.attach();
// note: we'll open the execution context here so that it may be used by the error transformer
try (final SafeCloseable ignored1 = session == null ? null : session.getExecutionContext().open()) {
try (final SafeCloseable ignored2 = LivenessScopeStack.open()) {
lambda.run();
} catch (final InterruptedException err) {
Thread.currentThread().interrupt();
closeWithError(call, errorTransformer.transform(err));
} catch (final Throwable err) {
closeWithError(call, errorTransformer.transform(err));
} catch (final RuntimeException err) {
safeClose(call, errorTransformer.transform(err));
} catch (final Error error) {
// Indicates a very serious failure; debateable whether we should even try to send close.
safeClose(call, Status.INTERNAL, new Metadata(), false);
throw error;
} finally {
context.detach(previous);
}
}
}

private static <ReqT, RespT> void closeWithError(
@NotNull final ServerCall<ReqT, RespT> call,
private static void safeClose(
@NotNull final ServerCall<?, ?> call,
@NotNull final StatusRuntimeException err) {
Metadata metadata = Status.trailersFromThrowable(err);
if (metadata == null) {
metadata = new Metadata();
}
safeClose(call, Status.fromThrowable(err), metadata, true);
}

private static void safeClose(ServerCall<?, ?> call, Status status, Metadata trailers, boolean logOnError) {
try {
Metadata metadata = Status.trailersFromThrowable(err);
if (metadata == null) {
metadata = new Metadata();
call.close(status, trailers);
} catch (IllegalStateException e) {
// IllegalStateException is explicitly documented as thrown if the call is already closed. It might be nice
// if there was a more explicit exception type, but this should suffice. We _could_ try and check the text
// "call already closed", but that is an undocumented implementation detail we should probably not rely on.
if (logOnError && log.isDebugEnabled()) {
log.debug().append("call.close error: ").append(e).endl();
}
call.close(Status.fromThrowable(err), metadata);
} catch (final Exception unexpectedErr) {
log.debug().append("Unanticipated gRPC Error: ").append(unexpectedErr).endl();
}
}
}
Loading
Loading