Skip to content

Commit

Permalink
Move rpcWrapper to gRPC Interceptor; Open Session's ExecutionContext (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind authored Mar 10, 2023
1 parent 36facaf commit 34e353f
Show file tree
Hide file tree
Showing 45 changed files with 1,664 additions and 1,575 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.table.BarrageTable;
import io.deephaven.io.streams.ByteBufferInputStream;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.MessageHeader;
Expand Down Expand Up @@ -157,7 +158,7 @@ private void signalCompletion(final Condition completedCondition) {

protected void parseSchema(final Schema header) {
if (resultTable != null) {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, "Schema evolution not supported");
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Schema evolution not supported");
}

final BarrageUtil.ConvertedArrowSchema result = BarrageUtil.convertArrowSchema(header);
Expand Down Expand Up @@ -218,7 +219,7 @@ protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, i
}

if (acd.data.get(0).size() != numRowsAdded) {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT,
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT,
"Inconsistent num records per column: " + numRowsAdded + " != " + acd.data.get(0).size());
}
acd.type = columnTypes[ci];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.flight.util.MessageHelper;
import io.deephaven.proto.flight.util.SchemaHelper;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.time.DateTime;
import io.deephaven.api.util.NameValidator;
import io.deephaven.engine.util.ColumnFormatting;
Expand Down Expand Up @@ -293,7 +294,7 @@ private static Class<?> getDefaultType(final ArrowType arrowType, final Converte
return long.class;
}
}
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
" of intType(signed=" + intType.getIsSigned() + ", bitWidth=" + intType.getBitWidth() + ")");
case Bool:
return boolean.class;
Expand All @@ -303,7 +304,7 @@ private static Class<?> getDefaultType(final ArrowType arrowType, final Converte
if (maybeConvertForTimeUnit(durationUnit, result, i)) {
return long.class;
}
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
" of durationType(unit=" + durationUnit.toString() + ")");
case Timestamp:
final ArrowType.Timestamp timestampType = (ArrowType.Timestamp) arrowType;
Expand All @@ -314,7 +315,7 @@ private static Class<?> getDefaultType(final ArrowType arrowType, final Converte
return DateTime.class;
}
}
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
" of timestampType(Timezone=" + tz +
", Unit=" + timestampUnit.toString() + ")");
case FloatingPoint:
Expand All @@ -326,13 +327,13 @@ private static Class<?> getDefaultType(final ArrowType arrowType, final Converte
return double.class;
case HALF:
default:
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
" of floatingPointType(Precision=" + floatingPointType.getPrecision().toString() + ")");
}
case Utf8:
return java.lang.String.class;
default:
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
" of type " + arrowType.getTypeID().toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,76 +5,17 @@

import io.deephaven.io.logger.Logger;
import com.google.rpc.Code;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.util.FunctionalInterfaces;
import io.deephaven.util.SafeCloseable;
import io.deephaven.internal.log.LoggerFactory;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Callable;

public class GrpcUtil {
private static final Logger log = LoggerFactory.getLogger(GrpcUtil.class);

/**
* Utility to avoid errors escaping to the stream, to make sure the server log and client both see the message if
* there is an error, and if the error was not meant to propagate to a gRPC client, obfuscates it.
*
* @param log the current class's logger
* @param response the responseStream used to send messages to the client
* @param lambda the code to safely execute
* @param <T> some IOException type, so that we can handle IO errors as well as runtime exceptions.
*/
public static <T extends IOException> void rpcWrapper(final Logger log, final StreamObserver<?> response,
final FunctionalInterfaces.ThrowingRunnable<T> lambda) {
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
lambda.run();
} catch (final StatusRuntimeException err) {
if (err.getStatus().equals(Status.UNAUTHENTICATED)) {
log.info().append("ignoring unauthenticated request").endl();
} else {
log.error().append(err).endl();
}
safelyError(response, err);
} catch (final RuntimeException | IOException err) {
safelyError(response, securelyWrapError(log, err));
}
}

/**
* Utility to avoid errors escaping to the stream, to make sure the server log and client both see the message if
* there is an error, and if the error was not meant to propagate to a gRPC client, obfuscates it.
*
* @param log the current class's logger
* @param response the responseStream used to send messages to the client
* @param lambda the code to safely execute
* @param <T> the type of the value to be returned
* @return the result of the lambda
*/
public static <T> T rpcWrapper(final Logger log, final StreamObserver<?> response, final Callable<T> lambda) {
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
return lambda.call();
} catch (final StatusRuntimeException err) {
if (err.getStatus().equals(Status.UNAUTHENTICATED)) {
log.info().append("ignoring unauthenticated request").endl();
} else {
log.error().append(err).endl();
}
safelyError(response, err);
} catch (final InterruptedException err) {
Thread.currentThread().interrupt();
safelyError(response, securelyWrapError(log, err, Code.UNAVAILABLE));
} catch (final Throwable err) {
safelyError(response, securelyWrapError(log, err));
}
return null;
}

public static StatusRuntimeException securelyWrapError(final Logger log, final Throwable err) {
return securelyWrapError(log, err, Code.INVALID_ARGUMENT);
}
Expand All @@ -87,11 +28,7 @@ public static StatusRuntimeException securelyWrapError(final Logger log, final T

final UUID errorId = UUID.randomUUID();
log.error().append("Internal Error '").append(errorId.toString()).append("' ").append(err).endl();
return statusRuntimeException(statusCode, "Details Logged w/ID '" + errorId + "'");
}

public static StatusRuntimeException statusRuntimeException(final Code statusCode, final String details) {
return Exceptions.statusRuntimeException(statusCode, details);
return Exceptions.statusRuntimeException(statusCode, "Details Logged w/ID '" + errorId + "'");
}

/**
Expand Down Expand Up @@ -155,7 +92,7 @@ public static void safelyComplete(StreamObserver<?> observer) {
* stream.
*/
public static void safelyError(final StreamObserver<?> observer, final Code statusCode, final String msg) {
safelyError(observer, statusRuntimeException(statusCode, msg));
safelyError(observer, Exceptions.statusRuntimeException(statusCode, msg));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import io.deephaven.appmode.StaticClassApplication;
import io.deephaven.engine.util.GroovyDeephavenSession;
import io.deephaven.engine.util.ScriptSession;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.integrations.python.PythonDeephavenSession;
import io.deephaven.proto.util.Exceptions;

import java.nio.file.Path;
import java.util.List;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void visit(DynamicApplication<?> advanced) {

@Override
public void visit(QSTApplication qst) {
throw GrpcUtil.statusRuntimeException(Code.UNIMPLEMENTED, "See deephaven-core#1080; support qst application");
throw Exceptions.statusRuntimeException(Code.UNIMPLEMENTED, "See deephaven-core#1080; support qst application");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.deephaven.server.util.Scheduler;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.jetbrains.annotations.NotNull;

import javax.inject.Inject;
import javax.inject.Singleton;
Expand Down Expand Up @@ -125,22 +126,21 @@ private synchronized void propagateUpdates() {
}

@Override
public synchronized void listFields(ListFieldsRequest request,
StreamObserver<FieldsChangeUpdate> responseObserver) {
GrpcUtil.rpcWrapper(log, responseObserver, () -> {
final SessionState session = sessionService.getCurrentSession();
final Subscription subscription = new Subscription(session, responseObserver);

final FieldsChangeUpdate.Builder responseBuilder = FieldsChangeUpdate.newBuilder();
for (FieldInfo fieldInfo : known.values()) {
responseBuilder.addCreated(fieldInfo);
}
if (subscription.send(responseBuilder.build())) {
subscriptions.add(subscription);
} else {
subscription.onCancel();
}
});
public synchronized void listFields(
@NotNull final ListFieldsRequest request,
@NotNull final StreamObserver<FieldsChangeUpdate> responseObserver) {
final SessionState session = sessionService.getCurrentSession();
final Subscription subscription = new Subscription(session, responseObserver);

final FieldsChangeUpdate.Builder responseBuilder = FieldsChangeUpdate.newBuilder();
for (FieldInfo fieldInfo : known.values()) {
responseBuilder.addCreated(fieldInfo);
}
if (subscription.send(responseBuilder.build())) {
subscriptions.add(subscription);
} else {
subscription.onCancel();
}
}

synchronized void remove(Subscription sub) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.deephaven.appmode.Field;
import io.deephaven.base.string.EncodingInfo;
import io.deephaven.engine.table.Table;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.proto.backplane.grpc.Ticket;
import io.deephaven.proto.flight.util.TicketRouterHelper;
import io.deephaven.proto.util.ApplicationTicketHelper;
Expand Down Expand Up @@ -74,13 +73,13 @@ public <T> SessionState.ExportObject<T> resolve(

private <T> SessionState.ExportObject<T> resolve(final AppFieldId id, final String logId) {
if (id.app == null) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': field '" + getLogNameFor(id)
+ "' does not belong to an application");
}
final Field<Object> field = id.app.getField(id.fieldName);
if (field == null) {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
throw Exceptions.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': field '" + getLogNameFor(id) + "' not found");
}
Object value = authTransformation.transform(field.value());
Expand All @@ -93,23 +92,23 @@ public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(
final @Nullable SessionState session, final Flight.FlightDescriptor descriptor, final String logId) {
final AppFieldId id = appFieldIdFor(descriptor, logId);
if (id.app == null) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': field does not belong to an application");
}

final Flight.FlightInfo info;
synchronized (id.app) {
Field<?> field = id.app.getField(id.fieldName);
if (field == null) {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
throw Exceptions.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': field '" + getLogNameFor(id) + "' not found");
}
Object value = field.value();
if (value instanceof Table) {
value = authTransformation.transform(value);
info = TicketRouter.getFlightInfo((Table) value, descriptor, flightTicketForName(id.app, id.fieldName));
} else {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
throw Exceptions.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': field '" + getLogNameFor(id) + "' is not a flight");
}
}
Expand All @@ -120,14 +119,14 @@ public SessionState.ExportObject<Flight.FlightInfo> flightInfoFor(
@Override
public <T> SessionState.ExportBuilder<T> publish(
SessionState session, ByteBuffer ticket, final String logId) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not publish '" + logId + "': application tickets cannot be published to");
}

@Override
public <T> SessionState.ExportBuilder<T> publish(
final SessionState session, final Flight.FlightDescriptor descriptor, final String logId) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not publish '" + logId + "': application flight descriptors cannot be published to");
}

Expand Down Expand Up @@ -210,7 +209,7 @@ private AppFieldId appFieldIdFor(final ByteBuffer ticket, final String logId) {
try {
ticketAsString = decoder.decode(ticket).toString();
} catch (CharacterCodingException e) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': failed to decode: " + e.getMessage());
} finally {
ticket.position(initialPosition);
Expand All @@ -221,15 +220,15 @@ private AppFieldId appFieldIdFor(final ByteBuffer ticket, final String logId) {
final int endOfAppId = ticketAsString.indexOf('/', endOfRoute + 1);
final int endOfFieldSegment = ticketAsString.indexOf('/', endOfAppId + 1);
if (endOfAppId == -1 || endOfFieldSegment == -1) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': ticket does conform to expected format");
}
final String appId = ticketAsString.substring(endOfRoute + 1, endOfAppId);
final String fieldName = ticketAsString.substring(endOfFieldSegment + 1);

final ApplicationState app = applicationMap.get(appId);
if (app == null) {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
throw Exceptions.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': no application exists with the identifier: " + appId);
}

Expand All @@ -243,26 +242,26 @@ private AppFieldId appFieldIdFor(final Flight.FlightDescriptor descriptor, final
}

if (descriptor.getType() != Flight.FlightDescriptor.DescriptorType.PATH) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': only flight paths are supported");
}

// current structure: a/app_id/f/field_name
if (descriptor.getPathCount() != 4) {
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION,
throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Could not resolve '" + logId + "': unexpected path length (found: "
+ TicketRouterHelper.getLogNameFor(descriptor) + ", expected: 4)");
}

final String appId = descriptor.getPath(1);
final ApplicationState app = applicationMap.get(appId);
if (app == null) {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
throw Exceptions.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': no application exists with the identifier: " + appId);
}

if (!descriptor.getPath(2).equals(ApplicationTicketHelper.FIELD_PATH_SEGMENT)) {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND,
throw Exceptions.statusRuntimeException(Code.NOT_FOUND,
"Could not resolve '" + logId + "': path is not an application field");
}

Expand Down
Loading

0 comments on commit 34e353f

Please sign in to comment.