Skip to content

Commit

Permalink
Enable Bidirectional communication with custom server objects (#4226)
Browse files Browse the repository at this point in the history
Replaces the ObjectService.FetchObject gRPC call with one that
supports multiple messages from the server to the client, and supports
the client continuing to interact with the object beyond simply
fetching it. This patch includes changes to the Java API for ObjectType
plugins, while the Python API has already been released. Plugins are
still able to be "fetch-only", meaning that they do not interact with
the client beyond an initial message, and all existing plugins have
been updated in this way. This is still experimental, and may undergo
future breaking API changes.

Also introduces a Python client for this API, allowing python clients
to interact with objects on the server. This is also experimental, and
subject to change.

Fixes #3583
Co-authored-by: Matthew Runyon <matthewrunyon@deephaven.io>
Co-authored-by: jianfengmao <jianfengmao@deephaven.io>
Co-authored-by: Devin Smith <devinsmith@deephaven.io>
  • Loading branch information
niloc132 committed Aug 10, 2023
1 parent fe791f6 commit b6628d0
Show file tree
Hide file tree
Showing 72 changed files with 6,998 additions and 547 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ workspaces/
devopsLocal/
pyclient/venv
pyclient/dist
pyclient/*.egg-info
.venv/
*.egg-info/

jenkins/failures

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -87,7 +86,7 @@ public PythonDeephavenSession(
scope = pythonEvaluator.getScope();
executionContext.getQueryLibrary().importClass(org.jpy.PyObject.class);
try (final SafeCloseable ignored = executionContext.open()) {
module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session")
module = (PythonScriptSessionModule) PyModule.importModule("deephaven_internal.script_session")
.createProxy(CallableKind.FUNCTION, PythonScriptSessionModule.class);
}
scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH);
Expand Down Expand Up @@ -116,7 +115,7 @@ public PythonDeephavenSession(
evaluator = null;
this.scope = (PythonScope<PyObject>) scope;
try (final SafeCloseable ignored = executionContext.open()) {
module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session")
module = (PythonScriptSessionModule) PyModule.importModule("deephaven_internal.script_session")
.createProxy(CallableKind.FUNCTION, PythonScriptSessionModule.class);
}
scriptFinder = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.deephaven.auth.ServiceAuthWiring;
import io.deephaven.proto.backplane.grpc.FetchObjectRequest;
import io.deephaven.proto.backplane.grpc.ObjectServiceGrpc;
import io.deephaven.proto.backplane.grpc.StreamRequest;
import io.grpc.ServerServiceDefinition;

/**
Expand All @@ -30,6 +31,12 @@ default ServerServiceDefinition intercept(ObjectServiceGrpc.ObjectServiceImplBas

serviceBuilder.addMethod(ServiceAuthWiring.intercept(
service, "FetchObject", null, this::onMessageReceivedFetchObject));
serviceBuilder.addMethod(ServiceAuthWiring.intercept(
service, "MessageStream", this::onCallStartedMessageStream, this::onMessageReceivedMessageStream));
serviceBuilder.addMethod(ServiceAuthWiring.intercept(
service, "OpenMessageStream", null, this::onMessageReceivedOpenMessageStream));
serviceBuilder.addMethod(ServiceAuthWiring.intercept(
service, "NextMessageStream", null, this::onMessageReceivedNextMessageStream));

return serviceBuilder.build();
}
Expand All @@ -43,14 +50,77 @@ default ServerServiceDefinition intercept(ObjectServiceGrpc.ObjectServiceImplBas
*/
void onMessageReceivedFetchObject(AuthContext authContext, FetchObjectRequest request);

/**
* Authorize a request to open a client-streaming rpc MessageStream.
*
* @param authContext the authentication context of the request
* @throws io.grpc.StatusRuntimeException if the user is not authorized to invoke MessageStream
*/
void onCallStartedMessageStream(AuthContext authContext);

/**
* Authorize a request to MessageStream.
*
* @param authContext the authentication context of the request
* @param request the request to authorize
* @throws io.grpc.StatusRuntimeException if the user is not authorized to invoke MessageStream
*/
void onMessageReceivedMessageStream(AuthContext authContext, StreamRequest request);

/**
* Authorize a request to OpenMessageStream.
*
* @param authContext the authentication context of the request
* @param request the request to authorize
* @throws io.grpc.StatusRuntimeException if the user is not authorized to invoke OpenMessageStream
*/
void onMessageReceivedOpenMessageStream(AuthContext authContext, StreamRequest request);

/**
* Authorize a request to NextMessageStream.
*
* @param authContext the authentication context of the request
* @param request the request to authorize
* @throws io.grpc.StatusRuntimeException if the user is not authorized to invoke NextMessageStream
*/
void onMessageReceivedNextMessageStream(AuthContext authContext, StreamRequest request);

class AllowAll implements ObjectServiceAuthWiring {
public void onMessageReceivedFetchObject(AuthContext authContext, FetchObjectRequest request) {}

public void onCallStartedMessageStream(AuthContext authContext) {}

public void onMessageReceivedMessageStream(AuthContext authContext, StreamRequest request) {}

public void onMessageReceivedOpenMessageStream(AuthContext authContext,
StreamRequest request) {}

public void onMessageReceivedNextMessageStream(AuthContext authContext,
StreamRequest request) {}
}

class DenyAll implements ObjectServiceAuthWiring {
public void onMessageReceivedFetchObject(AuthContext authContext, FetchObjectRequest request) {
ServiceAuthWiring.operationNotAllowed();
}

public void onCallStartedMessageStream(AuthContext authContext) {
ServiceAuthWiring.operationNotAllowed();
}

public void onMessageReceivedMessageStream(AuthContext authContext, StreamRequest request) {
ServiceAuthWiring.operationNotAllowed();
}

public void onMessageReceivedOpenMessageStream(AuthContext authContext,
StreamRequest request) {
ServiceAuthWiring.operationNotAllowed();
}

public void onMessageReceivedNextMessageStream(AuthContext authContext,
StreamRequest request) {
ServiceAuthWiring.operationNotAllowed();
}
}

class TestUseOnly implements ObjectServiceAuthWiring {
Expand All @@ -61,5 +131,31 @@ public void onMessageReceivedFetchObject(AuthContext authContext, FetchObjectReq
delegate.onMessageReceivedFetchObject(authContext, request);
}
}

public void onCallStartedMessageStream(AuthContext authContext) {
if (delegate != null) {
delegate.onCallStartedMessageStream(authContext);
}
}

public void onMessageReceivedMessageStream(AuthContext authContext, StreamRequest request) {
if (delegate != null) {
delegate.onMessageReceivedMessageStream(authContext, request);
}
}

public void onMessageReceivedOpenMessageStream(AuthContext authContext,
StreamRequest request) {
if (delegate != null) {
delegate.onMessageReceivedOpenMessageStream(authContext, request);
}
}

public void onMessageReceivedNextMessageStream(AuthContext authContext,
StreamRequest request) {
if (delegate != null) {
delegate.onMessageReceivedNextMessageStream(authContext, request);
}
}
}
}
111 changes: 111 additions & 0 deletions cpp-client/deephaven/client/proto/deephaven/proto/object.grpc.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b6628d0

Please sign in to comment.