From a697614f7c1ce67f1d57233221bca2002d2143da Mon Sep 17 00:00:00 2001 From: Nate Bauernfeind Date: Fri, 8 Sep 2023 15:31:08 -0600 Subject: [PATCH] Setup ExecutionContext in Barrage Client Examples (#4461) --- .../thread/ThreadInitializationFactory.java | 11 ++++++++++ .../examples/BarrageClientExampleBase.java | 21 ++++++++++++++++++- .../client/examples/SubscribeExampleBase.java | 2 +- .../client/impl/BarrageSubscriptionImpl.java | 9 ++++---- 4 files changed, 37 insertions(+), 6 deletions(-) diff --git a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java index dddb9c380fc..56a5436adb7 100644 --- a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java +++ b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java @@ -5,6 +5,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -23,10 +24,20 @@ public interface ThreadInitializationFactory { return clazz.getDeclaredConstructor().newInstance(); } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { + + // TODO (https://github.com/deephaven/deephaven-core/issues/4040): + // Currently the default property file is shared between both the java client and the server. This + // means that client-side usage will attempt to load the thread.initialization property intended for + // the server which is not available on the class path. + if (e instanceof ClassNotFoundException && type.startsWith("io.deephaven.server.")) { + return null; + } + throw new IllegalArgumentException( "Error instantiating initializer " + type + ", please check configuration", e); } }) + .filter(Objects::nonNull) .collect(Collectors.toUnmodifiableList()); /** diff --git a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/BarrageClientExampleBase.java b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/BarrageClientExampleBase.java index 54bbe6c6190..f1ee1e040aa 100644 --- a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/BarrageClientExampleBase.java +++ b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/BarrageClientExampleBase.java @@ -7,6 +7,9 @@ import io.deephaven.client.impl.BarrageSessionFactory; import io.deephaven.client.impl.BarrageSubcomponent.Builder; import io.deephaven.client.impl.DaggerDeephavenBarrageRoot; +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; +import io.deephaven.util.SafeCloseable; import io.grpc.ManagedChannel; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -36,6 +39,22 @@ public final Void call() throws Exception { Runtime.getRuntime() .addShutdownHook(new Thread(() -> onShutdown(scheduler, managedChannel))); + // Note that a DEFAULT update graph is required for engine operation. Users may wish to create additional update + // graphs for their own purposes, but the DEFAULT must be created first. + final PeriodicUpdateGraph updateGraph = + PeriodicUpdateGraph.newBuilder("DEFAULT").existingOrBuild(); + + // Prepare this thread for client-side Deephaven engine. We don't intend to create any subtables, so we'll use + // an empty query scope, an empty query library, a poisoned query compiler, and the update graph we just made. + // Note that it's a good habit to mark the most basic execution context as systemic, so that it's not + // accidentally used in contexts when the calling-code should be providing their own context. + final ExecutionContext executionContext = ExecutionContext.newBuilder() + .markSystemic() + .emptyQueryScope() + .newQueryLibrary() + .setUpdateGraph(updateGraph) + .build(); + final Builder builder = DaggerDeephavenBarrageRoot.create().factoryBuilder() .managedChannel(managedChannel) .scheduler(scheduler) @@ -46,7 +65,7 @@ public final Void call() throws Exception { final BarrageSessionFactory barrageFactory = builder.build(); final BarrageSession deephavenSession = barrageFactory.newBarrageSession(); try { - try { + try (final SafeCloseable ignored = executionContext.open()) { execute(deephavenSession); } finally { deephavenSession.close(); diff --git a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java index 8e39ec11c9a..1b31a428ceb 100644 --- a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java +++ b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java @@ -72,7 +72,7 @@ protected void execute(final BarrageSession client) throws Exception { System.out.println("Table info: rows = " + subscriptionTable.size() + ", cols = " + DataAccessHelpers.getColumns(subscriptionTable).length); TableTools.show(subscriptionTable); - System.out.println(""); + System.out.println(); subscriptionTable.addUpdateListener(listener = new InstrumentedTableUpdateListener("example-listener") { @ReferentialIntegrity diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java index c3bcb2f2522..694b7fb7c46 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java @@ -148,14 +148,15 @@ public void onNext(final BarrageMessage barrageMessage) { @Override public void onError(final Throwable t) { - log.error().append(BarrageSubscriptionImpl.this) - .append(": Error detected in subscription: ") - .append(t).endl(); - final Listener listener = resultTable; if (!connected || listener == null) { return; } + + log.error().append(BarrageSubscriptionImpl.this) + .append(": Error detected in subscription: ") + .append(t).endl(); + listener.handleBarrageError(t); handleDisconnect(); }