Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExportObject: Add onSuccess Callback for use in TableService#batch #4772

Merged
merged 4 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 47 additions & 27 deletions server/src/main/java/io/deephaven/server/session/SessionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.auth.AuthContext;
import io.deephaven.util.datastructures.SimpleReferenceManager;
import io.deephaven.util.process.ProcessEnvironment;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.apache.arrow.flight.impl.Flight;
Expand Down Expand Up @@ -564,7 +565,7 @@ public final static class ExportObject<T> extends LivenessArtifact {

/** used to identify and propagate error details */
private String errorId;
private String dependentHandle;
private String failedDependencyLogIdentity;
private Exception caughtException;

/**
Expand Down Expand Up @@ -604,7 +605,7 @@ private ExportObject(final T result) {
this.logIdentity = Integer.toHexString(System.identityHashCode(this)) + "-sessionless";

if (result == null) {
assignErrorId();
maybeAssignErrorId();
state = ExportNotification.State.FAILED;
} else {
state = ExportNotification.State.EXPORTED;
Expand Down Expand Up @@ -663,11 +664,16 @@ private synchronized void setWork(
hasHadWorkSet = true;
this.requiresSerialQueue = requiresSerialQueue;

if (isExportStateTerminal(this.state)) {
// nothing to do because dependency already failed; hooray??
if (isExportStateTerminal(state)) {
// The following scenarios cause us to get into this state:
// - this export object was released/cancelled
// - the session expiration propagated to this export object
// Note that already failed dependencies will be handled in the onResolveOne method below.

// since this is the first we know of the errorHandler, it could not have been invoked yet
if (errorHandler != null) {
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
assignErrorId();
errorHandler.onError(state, errorId, null, null);
maybeAssignErrorId();
errorHandler.onError(state, errorId, caughtException, failedDependencyLogIdentity);
}
return;
}
Expand Down Expand Up @@ -784,6 +790,9 @@ private synchronized void setState(final ExportNotification.State state) {
|| isExportStateTerminal(this.state)) {
throw new IllegalStateException("cannot change state if export is already in terminal state");
}
if (this.state != ExportNotification.State.UNKNOWN && this.state.getNumber() >= state.getNumber()) {
throw new IllegalStateException("export object state changes must advance toward a terminal state");
}
this.state = state;

// Send an export notification before possibly notifying children of our state change.
Expand All @@ -801,9 +810,7 @@ private synchronized void setState(final ExportNotification.State state) {
}

if (isExportStateFailure(state) && errorHandler != null) {
if (errorId == null) {
assignErrorId();
}
maybeAssignErrorId();
try {
final Exception toReport;
if (caughtException != null && errorTransformer != null) {
Expand All @@ -812,20 +819,25 @@ private synchronized void setState(final ExportNotification.State state) {
toReport = caughtException;
}

errorHandler.onError(state, errorId, toReport, dependentHandle);
} catch (final Exception err) {
errorHandler.onError(state, errorId, toReport, failedDependencyLogIdentity);
} catch (final Throwable err) {
// this is a serious error; crash the jvm to ensure that we don't miss it
log.error().append("Unexpected error while reporting failure: ").append(err).endl();
ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync(
"Unexpected error while reporting ExportObject failure", err);
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}
}

final boolean isNowExported = state == ExportNotification.State.EXPORTED;
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
if (isNowExported && successHandler != null) {
try {
successHandler.accept(result);
} catch (final Exception err) {
} catch (final Throwable err) {
// this is a serious error; crash the jvm to ensure that we don't miss it
log.error().append("Unexpected error while reporting success: ").append(err).endl();
nbauernfeind marked this conversation as resolved.
Show resolved Hide resolved
ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync(
"Unexpected error while reporting ExportObject success", err);
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}
successHandler = null;
}

if (isNowExported || isExportStateTerminal(state)) {
Expand Down Expand Up @@ -881,8 +893,8 @@ private void onResolveOne(@Nullable final ExportObject<?> parent) {
break;
}

assignErrorId();
dependentHandle = parent.logIdentity;
maybeAssignErrorId();
failedDependencyLogIdentity = parent.logIdentity;
if (!(caughtException instanceof StatusRuntimeException)) {
log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails)
.endl();
Expand Down Expand Up @@ -933,38 +945,41 @@ private void doExport() {
|| session.isExpired()
|| capturedExport == null
|| !tryRetainReference()) {
if (errorHandler != null) {
// fulfill promise to client that we will notify them whether we succeed or fail
assignErrorId();
errorHandler.onError(ExportNotification.State.CANCELLED, errorId, null, null);
if (!isExportStateTerminal(state)) {
setState(ExportNotification.State.CANCELLED);
} else if (errorHandler != null) {
// noinspection ThrowableNotThrown
Assert.statementNeverExecuted("in terminal state but error handler is not null");
}
return;
}
dropReference();
setState(ExportNotification.State.RUNNING);
}

T localResult = null;
boolean shouldLog = false;
int evaluationNumber = -1;
QueryProcessingResults queryProcessingResults = null;
try (final SafeCloseable ignored1 = session.executionContext.open()) {
try (final SafeCloseable ignored2 = LivenessScopeStack.open()) {
try (final SafeCloseable ignored1 = session.executionContext.open();
final SafeCloseable ignored2 = LivenessScopeStack.open()) {
try {
queryProcessingResults = new QueryProcessingResults(
QueryPerformanceRecorder.getInstance());

evaluationNumber = QueryPerformanceRecorder.getInstance()
.startQuery("session=" + session.sessionId + ",exportId=" + logIdentity);

try {
setResult(capturedExport.call());
localResult = capturedExport.call();
} finally {
shouldLog = QueryPerformanceRecorder.getInstance().endQuery();
}
} catch (final Exception err) {
caughtException = err;
synchronized (this) {
if (!isExportStateTerminal(state)) {
assignErrorId();
maybeAssignErrorId();
if (!(caughtException instanceof StatusRuntimeException)) {
log.error().append("Internal Error '").append(errorId).append("' ").append(err).endl();
}
Expand Down Expand Up @@ -1005,11 +1020,16 @@ private void doExport() {
log.error().append("Failed to log query performance data: ").append(e).endl();
}
}
if (caughtException == null) {
setResult(localResult);
}
}
}

private void assignErrorId() {
errorId = UuidCreator.toString(UuidCreator.getRandomBased());
private void maybeAssignErrorId() {
if (errorId == null) {
errorId = UuidCreator.toString(UuidCreator.getRandomBased());
}
}

/**
Expand Down Expand Up @@ -1098,8 +1118,8 @@ private synchronized ExportNotification makeExportNotification() {
if (errorId != null) {
builder.setContext(errorId);
}
if (dependentHandle != null) {
builder.setDependentHandle(dependentHandle);
if (failedDependencyLogIdentity != null) {
builder.setDependentHandle(failedDependencyLogIdentity);
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.AssertionFailure;
import io.deephaven.engine.context.TestExecutionContext;
import io.deephaven.engine.testutil.testcase.FakeProcessEnvironment;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.engine.liveness.LivenessArtifact;
Expand All @@ -17,6 +18,7 @@
import io.deephaven.proto.backplane.grpc.Ticket;
import io.deephaven.util.SafeCloseable;
import io.deephaven.auth.AuthContext;
import io.deephaven.util.process.ProcessEnvironment;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -52,6 +54,7 @@ public class SessionStateTest {
private TestControlledScheduler scheduler;
private SessionState session;
private int nextExportId;
private ProcessEnvironment oldProcessEnvironment;

@Before
public void setup() {
Expand All @@ -64,10 +67,19 @@ public void setup() {
session.initializeExpiration(new SessionService.TokenExpiration(UUID.randomUUID(),
DateTimeUtils.epochMillis(DateTimeUtils.epochNanosToInstant(Long.MAX_VALUE)), session));
nextExportId = 1;

oldProcessEnvironment = ProcessEnvironment.tryGet();
ProcessEnvironment.set(FakeProcessEnvironment.INSTANCE, true);
}

@After
public void teardown() {
if (oldProcessEnvironment == null) {
ProcessEnvironment.clear();
} else {
ProcessEnvironment.set(oldProcessEnvironment, true);
}

LivenessScopeStack.pop(livenessScope);
livenessScope.release();
livenessScope = null;
Expand Down Expand Up @@ -228,12 +240,43 @@ public void testThrowInErrorHandler() {
Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()");
Assert.eqFalse(success.booleanValue(), "success.booleanValue()");
Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.QUEUED);
scheduler.runUntilQueueEmpty();
boolean caught = false;
try {
scheduler.runUntilQueueEmpty();
} catch (final FakeProcessEnvironment.FakeFatalException ignored) {
caught = true;
}
Assert.eqTrue(caught, "caught");
Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()");
Assert.eqFalse(success.booleanValue(), "success.booleanValue()");
Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.FAILED);
}

@Test
public void testThrowInSuccessHandler() {
final MutableBoolean failed = new MutableBoolean();
final MutableBoolean submitted = new MutableBoolean();
final SessionState.ExportObject<Object> exportObj = session.newExport(nextExportId++)
.onErrorHandler(err -> failed.setTrue())
.onSuccess(ignored -> {
throw new RuntimeException("on success exception");
}).submit(submitted::setTrue);
Assert.eqFalse(submitted.booleanValue(), "submitted.booleanValue()");
Assert.eqFalse(failed.booleanValue(), "success.booleanValue()");
Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.QUEUED);
boolean caught = false;
try {
scheduler.runUntilQueueEmpty();
} catch (final FakeProcessEnvironment.FakeFatalException ignored) {
caught = true;
}
Assert.eqTrue(caught, "caught");
Assert.eqTrue(submitted.booleanValue(), "submitted.booleanValue()");
Assert.eqFalse(failed.booleanValue(), "success.booleanValue()");
// although we will want the jvm to exit -- we expect that the export to be successful
Assert.eq(exportObj.getState(), "exportObj.getState()", ExportNotification.State.EXPORTED);
}

@Test
public void testCancelBeforeDefined() {
final SessionState.ExportObject<Object> exportObj = session.getExport(nextExportId);
Expand Down
Loading