Skip to content

Commit

Permalink
Setup ExecutionContext in Barrage Client Examples (#4461)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Sep 8, 2023
1 parent 6e8a740 commit a697614
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
Expand All @@ -23,10 +24,20 @@ public interface ThreadInitializationFactory {
return clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException
| InstantiationException | IllegalAccessException e) {

// TODO (https://github.com/deephaven/deephaven-core/issues/4040):
// Currently the default property file is shared between both the java client and the server. This
// means that client-side usage will attempt to load the thread.initialization property intended for
// the server which is not available on the class path.
if (e instanceof ClassNotFoundException && type.startsWith("io.deephaven.server.")) {
return null;
}

throw new IllegalArgumentException(
"Error instantiating initializer " + type + ", please check configuration", e);
}
})
.filter(Objects::nonNull)
.collect(Collectors.toUnmodifiableList());

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import io.deephaven.client.impl.BarrageSessionFactory;
import io.deephaven.client.impl.BarrageSubcomponent.Builder;
import io.deephaven.client.impl.DaggerDeephavenBarrageRoot;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.grpc.ManagedChannel;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
Expand Down Expand Up @@ -36,6 +39,22 @@ public final Void call() throws Exception {
Runtime.getRuntime()
.addShutdownHook(new Thread(() -> onShutdown(scheduler, managedChannel)));

// Note that a DEFAULT update graph is required for engine operation. Users may wish to create additional update
// graphs for their own purposes, but the DEFAULT must be created first.
final PeriodicUpdateGraph updateGraph =
PeriodicUpdateGraph.newBuilder("DEFAULT").existingOrBuild();

// Prepare this thread for client-side Deephaven engine. We don't intend to create any subtables, so we'll use
// an empty query scope, an empty query library, a poisoned query compiler, and the update graph we just made.
// Note that it's a good habit to mark the most basic execution context as systemic, so that it's not
// accidentally used in contexts when the calling-code should be providing their own context.
final ExecutionContext executionContext = ExecutionContext.newBuilder()
.markSystemic()
.emptyQueryScope()
.newQueryLibrary()
.setUpdateGraph(updateGraph)
.build();

final Builder builder = DaggerDeephavenBarrageRoot.create().factoryBuilder()
.managedChannel(managedChannel)
.scheduler(scheduler)
Expand All @@ -46,7 +65,7 @@ public final Void call() throws Exception {
final BarrageSessionFactory barrageFactory = builder.build();
final BarrageSession deephavenSession = barrageFactory.newBarrageSession();
try {
try {
try (final SafeCloseable ignored = executionContext.open()) {
execute(deephavenSession);
} finally {
deephavenSession.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected void execute(final BarrageSession client) throws Exception {
System.out.println("Table info: rows = " + subscriptionTable.size() + ", cols = " +
DataAccessHelpers.getColumns(subscriptionTable).length);
TableTools.show(subscriptionTable);
System.out.println("");
System.out.println();

subscriptionTable.addUpdateListener(listener = new InstrumentedTableUpdateListener("example-listener") {
@ReferentialIntegrity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,15 @@ public void onNext(final BarrageMessage barrageMessage) {

@Override
public void onError(final Throwable t) {
log.error().append(BarrageSubscriptionImpl.this)
.append(": Error detected in subscription: ")
.append(t).endl();

final Listener listener = resultTable;
if (!connected || listener == null) {
return;
}

log.error().append(BarrageSubscriptionImpl.this)
.append(": Error detected in subscription: ")
.append(t).endl();

listener.handleBarrageError(t);
handleDisconnect();
}
Expand Down

0 comments on commit a697614

Please sign in to comment.