From a69328ddfe648875c4ef9bf30f609fb0a8c4eed5 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 7 Nov 2022 14:02:28 -0600 Subject: [PATCH 01/11] Remove unused threadfactory types --- .../ExpandingThreadPoolExecutorFactory.java | 117 ------------------ .../engine/util/DaemonThreadFactory.java | 19 --- 2 files changed, 136 deletions(-) delete mode 100644 Util/src/main/java/io/deephaven/util/ExpandingThreadPoolExecutorFactory.java delete mode 100644 engine/table/src/main/java/io/deephaven/engine/util/DaemonThreadFactory.java diff --git a/Util/src/main/java/io/deephaven/util/ExpandingThreadPoolExecutorFactory.java b/Util/src/main/java/io/deephaven/util/ExpandingThreadPoolExecutorFactory.java deleted file mode 100644 index 19305619d32..00000000000 --- a/Util/src/main/java/io/deephaven/util/ExpandingThreadPoolExecutorFactory.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.util; - -import io.deephaven.base.log.LogOutput; -import io.deephaven.base.log.LogOutputAppendable; -import io.deephaven.io.logger.Logger; - -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Creates a ThreadPoolExecutor which can then be used to submit or execute tasks. This is intended for cases where a - * relatively small number of threads can handle most circumstances, but occasional abnormal events may exceed - * expectations. The executor has the following characteristics: - * - * If the executor has been shut down, any excess events will be discarded. - * - * To create one of these executors, use {@link ExpandingThreadPoolExecutorFactory#createThreadPoolExecutor}. - */ -public class ExpandingThreadPoolExecutorFactory { - - // Stop anything from creating one of these - only use the createThreadPoolExecutor method - private ExpandingThreadPoolExecutorFactory() {} - - /** - * Class to handle rejection events from a ThreadPoolExecutor by creating a new Thread to run the task, unless the - * executor has been shut down, in which case the task is discarded. - */ - private static class RejectedExecutionPolicy implements RejectedExecutionHandler, LogOutputAppendable { - final Logger log; - final String executorName; - final String threadName; - private final AtomicInteger executorThreadNumber; - - /** - * Creates a {@code RejectedExecutionPolicy}. - * - * @param log a Logger - * @param executorName a name to be used when logging thread startup messages - * @param threadName the name prefix for new threads - */ - private RejectedExecutionPolicy(final Logger log, - final String executorName, - final String threadName, - final AtomicInteger executorThreadNumber) { - this.log = log; - this.executorName = executorName; - this.threadName = threadName; - this.executorThreadNumber = executorThreadNumber; - } - - /** - * Executes task r in a new thread, unless the executor has been shut down, in which case the task is discarded. - * - * @param r the runnable task requested to be executed - * @param e the executor attempting to execute this task - */ - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - if (!e.isShutdown()) { - final String newThreadName = threadName + executorThreadNumber.getAndIncrement(); - log.warn().append("Executor has run out of threads for ").append(this).append(", creating new thread ") - .append(newThreadName).endl(); - newDaemonThread(r, newThreadName).start(); - } - } - - @Override - public LogOutput append(LogOutput logOutput) { - return logOutput.append("executor ").append(executorName); - } - } - - private static Thread newDaemonThread(Runnable r, final String name) { - final Thread t = new Thread(r, name); - t.setDaemon(true); - return t; - } - - /** - * Create a {@link ThreadPoolExecutor} with the characteristics defined in - * {@link ExpandingThreadPoolExecutorFactory}. - * - * @param log a Logger to log messages - * @param corePoolSize the core pool size (the executor will use this value for the initial core and maximum pool - * sizes) - * @param keepAliveMinutes the number of minutes to keep alive core threads - * @param executorName the name of the executor, used when logging dynamic thread creation - * @param poolThreadNamePrefix the prefix for thread pool threads - * @param dynamicThreadNamePrefix the prefix for dynamic (overflow) threads created when the maximum number of pool - * threads is exceeded - */ - public static ThreadPoolExecutor createThreadPoolExecutor(final Logger log, - final int corePoolSize, - final int keepAliveMinutes, - final String executorName, - final String poolThreadNamePrefix, - final String dynamicThreadNamePrefix) { - final AtomicInteger executorThreadNumber = new AtomicInteger(1); - return new ThreadPoolExecutor(corePoolSize, - corePoolSize, - keepAliveMinutes, - TimeUnit.MINUTES, - new SynchronousQueue<>(), - r -> newDaemonThread(r, poolThreadNamePrefix + executorThreadNumber.getAndIncrement()), - new RejectedExecutionPolicy(log, executorName, dynamicThreadNamePrefix, executorThreadNumber)); - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/util/DaemonThreadFactory.java b/engine/table/src/main/java/io/deephaven/engine/util/DaemonThreadFactory.java deleted file mode 100644 index abbe6f5b307..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/util/DaemonThreadFactory.java +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.engine.util; - -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; - -public class DaemonThreadFactory implements ThreadFactory { - - private final ThreadFactory wrappedFactory = Executors.defaultThreadFactory(); - - @Override - public Thread newThread(Runnable r) { - Thread t = wrappedFactory.newThread(r); - t.setDaemon(true); - return t; - } -} From 27b1c2236807e975c66b077ecba3459087827cf0 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 7 Nov 2022 14:03:25 -0600 Subject: [PATCH 02/11] Expand usage of NamingThreadFactory, make the usual value for daemon default --- .../util/thread/NamingThreadFactory.java | 32 +++++++++++++++---- .../engine/table/impl/SparseSelect.java | 2 +- .../util/ExecutorTableDataRefreshService.java | 13 ++------ .../updategraph/UpdateGraphProcessor.java | 3 +- .../runner/DeephavenApiServerModule.java | 2 +- 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java b/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java index fbd9b059855..525624f9f44 100644 --- a/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java +++ b/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java @@ -10,20 +10,38 @@ public class NamingThreadFactory implements ThreadFactory { private final AtomicInteger threadCounter = new AtomicInteger(0); - public final String name; - private final Class clazz; - private boolean daemon; + private final String name; + private final Class clazz; + private final boolean daemon; private final ThreadGroup threadGroup; - public NamingThreadFactory(final Class clazz, final String name) { - this(clazz, name, false); + /** + * Creates a thread factory using the provided class and name as part of the thread name. All created threads will be daemon threads. + * @param clazz a class to use when naming each thread + * @param name a name component to add after the class name when naming each thread + */ + public NamingThreadFactory(final Class clazz, final String name) { + this(clazz, name, true); } - public NamingThreadFactory(final Class clazz, final String name, boolean daemon) { + /** + * Creates a thread factory using the provided class and name as part of the thread name. + * @param clazz a class to use when naming each thread + * @param name a name component to add after the class name when naming each thread + * @param daemon true to make each thread a daemon thread + */ + public NamingThreadFactory(final Class clazz, final String name, boolean daemon) { this(null, clazz, name, daemon); } - public NamingThreadFactory(ThreadGroup threadGroup, final Class clazz, final String name, boolean daemon) { + /** + * Creates a thread factory using the provided class and name as part of the thread name. + * @param threadGroup a thread group to add each thread to + * @param clazz a class to use when naming each thread + * @param name a name component to add after the class name when naming each thread + * @param daemon true to make each thread a daemon thread + */ + public NamingThreadFactory(ThreadGroup threadGroup, final Class clazz, final String name, boolean daemon) { this.threadGroup = threadGroup; this.clazz = clazz; this.name = name; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java index 76dcc5a98ff..35cb6df1a58 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java @@ -61,7 +61,7 @@ public class SparseSelect { private final static ExecutorService executor = SPARSE_SELECT_THREADS == 1 ? null : Executors.newFixedThreadPool(SPARSE_SELECT_THREADS, - new NamingThreadFactory(SparseSelect.class, "copyThread", true)); + new NamingThreadFactory(SparseSelect.class, "copyThread")); private SparseSelect() {} // static use only diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/util/ExecutorTableDataRefreshService.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/util/ExecutorTableDataRefreshService.java index 12d3e010ac5..a36b49e7ef2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/util/ExecutorTableDataRefreshService.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/util/ExecutorTableDataRefreshService.java @@ -12,6 +12,7 @@ import io.deephaven.engine.table.impl.locations.impl.AbstractTableLocationProvider; import io.deephaven.engine.table.impl.locations.impl.SubscriptionAggregator; import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.util.thread.NamingThreadFactory; import org.jetbrains.annotations.NotNull; import java.util.concurrent.Future; @@ -31,8 +32,6 @@ public class ExecutorTableDataRefreshService implements TableDataRefreshService private final long tableLocationProviderRefreshIntervalMillis; private final long tableLocationRefreshIntervalMillis; - private final AtomicInteger threadCount = new AtomicInteger(0); - private final ScheduledThreadPoolExecutor scheduler; private final Value providerSubscriptions; @@ -50,8 +49,9 @@ public ExecutorTableDataRefreshService(@NotNull final String name, this.tableLocationRefreshIntervalMillis = Require.gtZero(tableLocationRefreshIntervalMillis, "tableLocationRefreshIntervalMillis"); + NamingThreadFactory threadFactory = new NamingThreadFactory(TableDataRefreshService.class, "refreshThread"); scheduler = - new ScheduledThreadPoolExecutor(threadPoolSize, this::makeThread, new ThreadPoolExecutor.AbortPolicy()); + new ScheduledThreadPoolExecutor(threadPoolSize, threadFactory, new ThreadPoolExecutor.AbortPolicy()); scheduler.setRemoveOnCancelPolicy(true); providerSubscriptions = Stats.makeItem(NAME_PREFIX + name, "providerSubscriptions", Counter.FACTORY).getValue(); @@ -62,13 +62,6 @@ public ExecutorTableDataRefreshService(@NotNull final String name, .makeItem(NAME_PREFIX + name, "locationSubscriptionRefreshDurationNanos", State.FACTORY).getValue(); } - private Thread makeThread(final Runnable runnable) { - final Thread thread = - new Thread(runnable, NAME_PREFIX + name + "-refreshThread-" + threadCount.incrementAndGet()); - thread.setDaemon(true); - return thread; - } - @Override public void submitOneTimeAsyncTask(@NotNull final Runnable task) { scheduler.submit(task); diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java index 162920faa0f..03875bda141 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java @@ -8,7 +8,6 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.base.reference.SimpleReference; import io.deephaven.base.verify.Assert; -import io.deephaven.base.verify.Require; import io.deephaven.configuration.Configuration; import io.deephaven.chunk.util.pools.MultiChunkPool; import io.deephaven.engine.context.ExecutionContext; @@ -1847,7 +1846,7 @@ private ExecutorService makeUnitTestRefreshExecutor() { private class UnitTestRefreshThreadFactory extends NamingThreadFactory { private UnitTestRefreshThreadFactory() { - super(UpdateGraphProcessor.class, "unitTestRefresh", true); + super(UpdateGraphProcessor.class, "unitTestRefresh"); } @Override diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java index 133b48f14ce..b030d39cd77 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java @@ -155,7 +155,7 @@ public static UpdateGraphProcessor provideUpdateGraphProcessor() { private static class ThreadFactory extends NamingThreadFactory { public ThreadFactory(final String name) { - super(DeephavenApiServer.class, name, true); + super(DeephavenApiServer.class, name); } @Override From c7be4df53bd32efb531f19e615bed915b5b746f2 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 7 Nov 2022 15:59:02 -0600 Subject: [PATCH 03/11] Move the default script/console lang to config as source of truth --- props/configs/src/main/resources/dh-defaults.prop | 1 + .../server/runner/DeephavenApiServerModule.java | 12 +++--------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/props/configs/src/main/resources/dh-defaults.prop b/props/configs/src/main/resources/dh-defaults.prop index 9be44bf3343..644fbf883ea 100644 --- a/props/configs/src/main/resources/dh-defaults.prop +++ b/props/configs/src/main/resources/dh-defaults.prop @@ -45,6 +45,7 @@ default.processEnvironmentFactory=io.deephaven.util.process.DefaultProcessEnviro OperationInitializationThreadPool.threads=1 +deephaven.console.type=python AuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java index b030d39cd77..66e42334e3e 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java @@ -83,16 +83,10 @@ static Set primeInterceptors() { @Provides @Singleton public ScriptSession provideScriptSession(Map> scriptTypes) { - final String DEEPHAVEN_CONSOLE_TYPE = "deephaven.console.type"; - boolean configuredConsole = Configuration.getInstance().hasProperty(DEEPHAVEN_CONSOLE_TYPE); + // Check which script language is configured + String scriptSessionType = Configuration.getInstance().getProperty("deephaven.console.type"); - if (!configuredConsole && scriptTypes.size() == 1) { - // if there is only one; use it - return scriptTypes.values().iterator().next().get(); - } - - // otherwise, assume we want python... - String scriptSessionType = Configuration.getInstance().getStringWithDefault(DEEPHAVEN_CONSOLE_TYPE, "python"); + // Emit an error if the selected language isn't provided if (!scriptTypes.containsKey(scriptSessionType)) { throw new IllegalArgumentException("Console type not found: " + scriptSessionType); } From a55c57f3c5d5ce6740d354d7d9f6157b3d12730d Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 7 Nov 2022 16:03:06 -0600 Subject: [PATCH 04/11] Wrap thread initialization with optional extra wiring, including py debug --- .../util/thread/NamingThreadFactory.java | 6 ++- .../thread/ThreadInitializationFactory.java | 44 ++++++++++++++++++ .../OperationInitializationThreadPool.java | 5 ++- .../updategraph/UpdateGraphProcessor.java | 20 ++++----- .../src/main/resources/dh-defaults.prop | 4 ++ py/server/deephaven/__init__.py | 31 +++++++++++++ .../console/python/DebuggingInitializer.java | 45 +++++++++++++++++++ .../runner/DeephavenApiServerModule.java | 3 +- 8 files changed, 143 insertions(+), 15 deletions(-) create mode 100644 Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java create mode 100644 server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java diff --git a/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java b/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java index 525624f9f44..910c28a38c1 100644 --- a/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java +++ b/Util/src/main/java/io/deephaven/util/thread/NamingThreadFactory.java @@ -16,7 +16,9 @@ public class NamingThreadFactory implements ThreadFactory { private final ThreadGroup threadGroup; /** - * Creates a thread factory using the provided class and name as part of the thread name. All created threads will be daemon threads. + * Creates a thread factory using the provided class and name as part of the thread name. All created threads will + * be daemon threads. + * * @param clazz a class to use when naming each thread * @param name a name component to add after the class name when naming each thread */ @@ -26,6 +28,7 @@ public NamingThreadFactory(final Class clazz, final String name) { /** * Creates a thread factory using the provided class and name as part of the thread name. + * * @param clazz a class to use when naming each thread * @param name a name component to add after the class name when naming each thread * @param daemon true to make each thread a daemon thread @@ -36,6 +39,7 @@ public NamingThreadFactory(final Class clazz, final String name, boolean daem /** * Creates a thread factory using the provided class and name as part of the thread name. + * * @param threadGroup a thread group to add each thread to * @param clazz a class to use when naming each thread * @param name a name component to add after the class name when naming each thread diff --git a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java new file mode 100644 index 00000000000..60eef034a26 --- /dev/null +++ b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java @@ -0,0 +1,44 @@ +package io.deephaven.util.thread; + +import io.deephaven.configuration.Configuration; + +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Extension point to allow threads that will run user code from within the platform to be controlled by configuration. + */ +public interface ThreadInitializationFactory { + /* private */ String[] CONFIGURED_INITIALIZATION_TYPES = + Configuration.getInstance().getStringArrayFromProperty("thread.initialization"); + /* private */ List INITIALIZERS = Arrays.stream(CONFIGURED_INITIALIZATION_TYPES) + .map(type -> { + try { + // noinspection unchecked + Class clazz = + (Class) Class.forName(type); + return clazz.getDeclaredConstructor().newInstance(); + } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException + | InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException( + "Error instantiating initializer " + type + ", please check configuration"); + } + }) + .collect(Collectors.toUnmodifiableList()); + + /** + * Chains configured initializers to run before/around any given runnable, returning a runnable intended to be run + * by a new thread. + */ + static Runnable wrapRunnable(Runnable runnable) { + Runnable acc = runnable; + for (ThreadInitializationFactory INITIALIZER : INITIALIZERS) { + acc = INITIALIZER.createInitializer(acc); + } + return acc; + } + + Runnable createInitializer(Runnable runnable); +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index e967bcab2fd..97e722f3a08 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -6,6 +6,7 @@ import io.deephaven.chunk.util.pools.MultiChunkPool; import io.deephaven.configuration.Configuration; import io.deephaven.util.thread.NamingThreadFactory; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import java.util.concurrent.ExecutorService; @@ -39,11 +40,11 @@ public static boolean isInitializationThread() { true) { @Override public Thread newThread(@NotNull Runnable r) { - return super.newThread(() -> { + return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { isInitializationThread.set(true); MultiChunkPool.enableDedicatedPoolForThisThread(); r.run(); - }); + })); } }; executorService = Executors.newFixedThreadPool(NUM_THREADS, threadFactory); diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java index 03875bda141..31df615be8d 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java @@ -34,6 +34,7 @@ import io.deephaven.util.process.ProcessEnvironment; import io.deephaven.util.thread.NamingThreadFactory; import io.deephaven.util.thread.ThreadDump; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.apache.commons.lang3.exception.ExceptionUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -265,17 +266,14 @@ public synchronized void take(final AccumulatedCycleStats out) { notificationProcessor = makeNotificationProcessor(); jvmIntrospectionContext = new JvmIntrospectionContext(); - refreshThread = new Thread("UpdateGraphProcessor." + name() + ".refreshThread") { - @Override - public void run() { - configureRefreshThread(); - // noinspection InfiniteLoopStatement - while (true) { - Assert.eqFalse(allowUnitTestMode, "allowUnitTestMode"); - refreshTablesAndFlushNotifications(); - } + refreshThread = new Thread(ThreadInitializationFactory.wrapRunnable(() -> { + configureRefreshThread(); + // noinspection InfiniteLoopStatement + while (true) { + Assert.eqFalse(allowUnitTestMode, "allowUnitTestMode"); + refreshTablesAndFlushNotifications(); } - }; + }), "UpdateGraphProcessor." + name() + ".refreshThread"); refreshThread.setDaemon(true); final int updateThreads = @@ -1813,7 +1811,7 @@ private UpdateGraphProcessorThreadFactory(@NotNull final ThreadGroup threadGroup public Thread newThread(@NotNull final Runnable r) { return super.newThread(() -> { configureRefreshThread(); - r.run(); + ThreadInitializationFactory.wrapRunnable(r).run(); }); } } diff --git a/props/configs/src/main/resources/dh-defaults.prop b/props/configs/src/main/resources/dh-defaults.prop index 644fbf883ea..dc1479102e5 100644 --- a/props/configs/src/main/resources/dh-defaults.prop +++ b/props/configs/src/main/resources/dh-defaults.prop @@ -59,3 +59,7 @@ client.configuration.list=java.version,deephaven.version,barrage.version, # jar, and a class that is found in that jar. Any such keys will be made available to the client.configuration.list # as .version. client.version.list=deephaven=io.deephaven.engine.table.Table,barrage=io.deephaven.barrage.flatbuf.BarrageMessageWrapper + + +# Specifies additional setup to run on threads that can perform table operations with user code. Comma-separated list, instances must be of type io.deephaven.util.thread.ThreadInitializationFactory +thread.initialization=io.deephaven.server.console.python.DebuggingInitializer \ No newline at end of file diff --git a/py/server/deephaven/__init__.py b/py/server/deephaven/__init__.py index 03c7094cac8..487579a6723 100644 --- a/py/server/deephaven/__init__.py +++ b/py/server/deephaven/__init__.py @@ -24,3 +24,34 @@ from .table_factory import empty_table, time_table, merge, merge_sorted, new_table, DynamicTableWriter, input_table from .replay import TableReplayer from ._gc import garbage_collect + +import threading + +def create_thread_entry(thread_name): + """ + Helper to call from the JVM into python to set up py thread state exactly once per jvm thread, and support debugging + """ + # First, ensure that this Java thread has a python _DummyThread instance registered, which will have the same + # lifetime as the pythreadstate (and so, the tracing). This ensures that if debugging is enabled after this thread + # was created, it will correctly be able to trace this thread. + thread = threading.current_thread() + + # Assign the java thread name to the python thread + thread.name = 'java-' + thread_name + # Then, if pydevd has already been initialized, we should attempt to make ourselves known to it. + try: + # TODO can we conditionally import the other ones, try each module until we find a working one? + import pydevd + + # We don't want to be the first one to call settrace(), so check to see if setup completed on another thread before attemption it here + if pydevd.SetupHolder.setup is not None: + pydevd.settrace(suspend=False) + except ImportError: + # Debugger hasn't started yet (or we don't know which one is in use), so registering our thread + # above should be sufficient + pass + + # Return a def to Java with a particular name that will call back into the Java stack + def JavaThread(runnable): + runnable.run() + return JavaThread diff --git a/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java b/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java new file mode 100644 index 00000000000..046b1a08de3 --- /dev/null +++ b/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java @@ -0,0 +1,45 @@ +package io.deephaven.server.console.python; + +import io.deephaven.configuration.Configuration; +import io.deephaven.util.thread.ThreadInitializationFactory; +import org.jpy.PyLib; +import org.jpy.PyModule; +import org.jpy.PyObject; + +import java.io.Closeable; + +/** + * If python is configured as the script language for this server, ensures that threads which may invoke python code + * will be able to be debugged. If python is disabled, this does nothing. + */ +public class DebuggingInitializer implements ThreadInitializationFactory { + @Override + public Runnable createInitializer(Runnable runnable) { + if (!"python".equals(Configuration.getInstance().getStringWithDefault("deephaven.console.type", null))) { + // python not enabled, don't accidentally start it + return runnable; + } + DeephavenModule py_deephaven = (DeephavenModule) PyModule.importModule("deephaven") + .createProxy(PyLib.CallableKind.FUNCTION, DeephavenModule.class); + + + return () -> { + // First call in to create a custom function that has the same name as the Java thread (plus a prefix) + PyObject runnableResult = py_deephaven.create_thread_entry(Thread.currentThread().getName()); + // runnable.run(); + // Invoke that function directly from Java, so that we have only this one initial frame + runnableResult.call("__call__", runnable); + }; + } + + interface DeephavenModule extends Closeable { + /** + * Creates a new function that will initialize a thread in python, including creating a simple frame + * + * @param threadName the name of the java thread + * @return a callable PyObject + */ + PyObject create_thread_entry(String threadName); + } + +} diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java index 66e42334e3e..2d4bfc38425 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java @@ -29,6 +29,7 @@ import io.deephaven.server.util.Scheduler; import io.deephaven.util.process.ProcessEnvironment; import io.deephaven.util.thread.NamingThreadFactory; +import io.deephaven.util.thread.ThreadInitializationFactory; import io.grpc.BindableService; import io.grpc.ServerInterceptor; import org.jetbrains.annotations.NotNull; @@ -156,7 +157,7 @@ public ThreadFactory(final String name) { public Thread newThread(final @NotNull Runnable r) { return super.newThread(() -> { MultiChunkPool.enableDedicatedPoolForThisThread(); - r.run(); + ThreadInitializationFactory.wrapRunnable(r).run(); }); } } From ad531d8fb67e1cb93381f6797bb5716c538c8bb8 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 22 Nov 2022 15:49:14 -0600 Subject: [PATCH 05/11] Move java+py threading to its own file, start deephaven module later --- .../updategraph/UpdateGraphProcessor.java | 20 +++++++----- py/server/deephaven/__init__.py | 31 ------------------- py/server/deephaven_internal/java_threads.py | 30 ++++++++++++++++++ .../console/python/DebuggingInitializer.java | 3 +- 4 files changed, 44 insertions(+), 40 deletions(-) create mode 100644 py/server/deephaven_internal/java_threads.py diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java index 31df615be8d..3b8bd8a2f16 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java @@ -266,14 +266,20 @@ public synchronized void take(final AccumulatedCycleStats out) { notificationProcessor = makeNotificationProcessor(); jvmIntrospectionContext = new JvmIntrospectionContext(); - refreshThread = new Thread(ThreadInitializationFactory.wrapRunnable(() -> { - configureRefreshThread(); - // noinspection InfiniteLoopStatement - while (true) { - Assert.eqFalse(allowUnitTestMode, "allowUnitTestMode"); - refreshTablesAndFlushNotifications(); + refreshThread = new Thread(() -> { + try { + configureRefreshThread(); + // noinspection InfiniteLoopStatement + ThreadInitializationFactory.wrapRunnable(() -> { + while (true) { + Assert.eqFalse(allowUnitTestMode, "allowUnitTestMode"); + refreshTablesAndFlushNotifications(); + } + }).run(); + } catch (Throwable e) { + e.printStackTrace(); } - }), "UpdateGraphProcessor." + name() + ".refreshThread"); + }, "UpdateGraphProcessor." + name() + ".refreshThread"); refreshThread.setDaemon(true); final int updateThreads = diff --git a/py/server/deephaven/__init__.py b/py/server/deephaven/__init__.py index 487579a6723..03c7094cac8 100644 --- a/py/server/deephaven/__init__.py +++ b/py/server/deephaven/__init__.py @@ -24,34 +24,3 @@ from .table_factory import empty_table, time_table, merge, merge_sorted, new_table, DynamicTableWriter, input_table from .replay import TableReplayer from ._gc import garbage_collect - -import threading - -def create_thread_entry(thread_name): - """ - Helper to call from the JVM into python to set up py thread state exactly once per jvm thread, and support debugging - """ - # First, ensure that this Java thread has a python _DummyThread instance registered, which will have the same - # lifetime as the pythreadstate (and so, the tracing). This ensures that if debugging is enabled after this thread - # was created, it will correctly be able to trace this thread. - thread = threading.current_thread() - - # Assign the java thread name to the python thread - thread.name = 'java-' + thread_name - # Then, if pydevd has already been initialized, we should attempt to make ourselves known to it. - try: - # TODO can we conditionally import the other ones, try each module until we find a working one? - import pydevd - - # We don't want to be the first one to call settrace(), so check to see if setup completed on another thread before attemption it here - if pydevd.SetupHolder.setup is not None: - pydevd.settrace(suspend=False) - except ImportError: - # Debugger hasn't started yet (or we don't know which one is in use), so registering our thread - # above should be sufficient - pass - - # Return a def to Java with a particular name that will call back into the Java stack - def JavaThread(runnable): - runnable.run() - return JavaThread diff --git a/py/server/deephaven_internal/java_threads.py b/py/server/deephaven_internal/java_threads.py new file mode 100644 index 00000000000..651cbbc416c --- /dev/null +++ b/py/server/deephaven_internal/java_threads.py @@ -0,0 +1,30 @@ +import threading + +def create_thread_entry(thread_name): + """ + Helper to call from the JVM into python to set up py thread state exactly once per jvm thread, and support debugging + """ + # First, ensure that this Java thread has a python _DummyThread instance registered, which will have the same + # lifetime as the pythreadstate (and so, the tracing). This ensures that if debugging is enabled after this thread + # was created, it will correctly be able to trace this thread. + thread = threading.current_thread() + + # Assign the java thread name to the python thread + thread.name = 'java-' + thread_name + # Then, if pydevd has already been initialized, we should attempt to make ourselves known to it. + try: + # TODO can we conditionally import the other ones, try each module until we find a working one? + import pydevd + + # We don't want to be the first one to call settrace(), so check to see if setup completed on another thread before attemption it here + if pydevd.SetupHolder.setup is not None: + pydevd.settrace(suspend=False) + except ImportError: + # Debugger hasn't started yet (or we don't know which one is in use), so registering our thread + # above should be sufficient + pass + + # Return a def to Java with a particular name that will call back into the Java stack + def JavaThread(runnable): + runnable.run() + return JavaThread \ No newline at end of file diff --git a/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java b/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java index 046b1a08de3..ba4bae4ccbf 100644 --- a/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java +++ b/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java @@ -19,10 +19,9 @@ public Runnable createInitializer(Runnable runnable) { // python not enabled, don't accidentally start it return runnable; } - DeephavenModule py_deephaven = (DeephavenModule) PyModule.importModule("deephaven") + DeephavenModule py_deephaven = (DeephavenModule) PyModule.importModule("deephaven_internal.java_threads") .createProxy(PyLib.CallableKind.FUNCTION, DeephavenModule.class); - return () -> { // First call in to create a custom function that has the same name as the Java thread (plus a prefix) PyObject runnableResult = py_deephaven.create_thread_entry(Thread.currentThread().getName()); From 42bed8c8dcd4a5bf3e83d7947865aa2305d2dabb Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 29 Nov 2022 08:40:57 -0600 Subject: [PATCH 06/11] Draft at checking other debug libraries, needs debugging --- py/server/deephaven_internal/java_threads.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/py/server/deephaven_internal/java_threads.py b/py/server/deephaven_internal/java_threads.py index 651cbbc416c..dfd25fb1d66 100644 --- a/py/server/deephaven_internal/java_threads.py +++ b/py/server/deephaven_internal/java_threads.py @@ -13,12 +13,14 @@ def create_thread_entry(thread_name): thread.name = 'java-' + thread_name # Then, if pydevd has already been initialized, we should attempt to make ourselves known to it. try: - # TODO can we conditionally import the other ones, try each module until we find a working one? - import pydevd + # Test each of our known + for name in ['pydevd', 'pydevd_pycharm']: + debugger = __import__(name) - # We don't want to be the first one to call settrace(), so check to see if setup completed on another thread before attemption it here - if pydevd.SetupHolder.setup is not None: - pydevd.settrace(suspend=False) + # We don't want to be the first one to call settrace(), so check to see if setup completed on another + # thread before attempting it here + if debugger.SetupHolder.setup is not None: + debugger.settrace(suspend=False) except ImportError: # Debugger hasn't started yet (or we don't know which one is in use), so registering our thread # above should be sufficient From 9fd1a57c0aa986eb39fdf15f8376ba7e2d4acd18 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 8 Dec 2022 15:53:58 -0600 Subject: [PATCH 07/11] Attempt at correctly setting up debugging in current thread --- py/server/deephaven_internal/java_threads.py | 25 ++++++++++---------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/py/server/deephaven_internal/java_threads.py b/py/server/deephaven_internal/java_threads.py index dfd25fb1d66..d18c0bb5b0e 100644 --- a/py/server/deephaven_internal/java_threads.py +++ b/py/server/deephaven_internal/java_threads.py @@ -12,21 +12,22 @@ def create_thread_entry(thread_name): # Assign the java thread name to the python thread thread.name = 'java-' + thread_name # Then, if pydevd has already been initialized, we should attempt to make ourselves known to it. - try: - # Test each of our known - for name in ['pydevd', 'pydevd_pycharm']: - debugger = __import__(name) - # We don't want to be the first one to call settrace(), so check to see if setup completed on another - # thread before attempting it here - if debugger.SetupHolder.setup is not None: - debugger.settrace(suspend=False) - except ImportError: + # Return a def to Java with a particular name that will call back into the Java stack + def JavaThread(runnable): + try: + # Test each of our known debugger impls + for name in ['pydevd', 'pydevd_pycharm']: + debugger = __import__(name) + + # We don't want to be the first one to call settrace(), so check to see if setup completed on another + # thread before attempting it here + if debugger.SetupHolder.setup is not None: + debugger.settrace(suspend=False) + except ImportError: # Debugger hasn't started yet (or we don't know which one is in use), so registering our thread # above should be sufficient - pass + pass - # Return a def to Java with a particular name that will call back into the Java stack - def JavaThread(runnable): runnable.run() return JavaThread \ No newline at end of file From e8146c8bcae85ad45ad62ad8ec9f4d9a368d0f31 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 9 Dec 2022 09:03:45 -0600 Subject: [PATCH 08/11] Handle absence of initializers, and add better defaults --- .../io/deephaven/util/thread/ThreadInitializationFactory.java | 1 + props/configs/src/main/resources/dh-defaults.prop | 4 +++- props/test-configs/src/main/resources/dh-tests.prop | 3 +++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java index 60eef034a26..2d5b963f4a2 100644 --- a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java +++ b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java @@ -14,6 +14,7 @@ public interface ThreadInitializationFactory { /* private */ String[] CONFIGURED_INITIALIZATION_TYPES = Configuration.getInstance().getStringArrayFromProperty("thread.initialization"); /* private */ List INITIALIZERS = Arrays.stream(CONFIGURED_INITIALIZATION_TYPES) + .filter(str -> !str.isBlank()) .map(type -> { try { // noinspection unchecked diff --git a/props/configs/src/main/resources/dh-defaults.prop b/props/configs/src/main/resources/dh-defaults.prop index dc1479102e5..d1713bfd2e3 100644 --- a/props/configs/src/main/resources/dh-defaults.prop +++ b/props/configs/src/main/resources/dh-defaults.prop @@ -62,4 +62,6 @@ client.version.list=deephaven=io.deephaven.engine.table.Table,barrage=io.deephav # Specifies additional setup to run on threads that can perform table operations with user code. Comma-separated list, instances must be of type io.deephaven.util.thread.ThreadInitializationFactory -thread.initialization=io.deephaven.server.console.python.DebuggingInitializer \ No newline at end of file +thread.initialization=io.deephaven.server.console.python.DebuggingInitializer + +deephaven.console.type=groovy \ No newline at end of file diff --git a/props/test-configs/src/main/resources/dh-tests.prop b/props/test-configs/src/main/resources/dh-tests.prop index 101cf80d26f..0066f3fc3d5 100644 --- a/props/test-configs/src/main/resources/dh-tests.prop +++ b/props/test-configs/src/main/resources/dh-tests.prop @@ -97,3 +97,6 @@ BarrageStreamGenerator.batchSize=4 AuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler authentication.client.configuration.list= client.version.list= + +deephaven.console.type=none +thread.initialization= \ No newline at end of file From 98d6553fec9cae23ebe465ace99a245e9807a19e Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 9 Dec 2022 13:56:04 -0600 Subject: [PATCH 09/11] review feedback --- .../updategraph/UpdateGraphProcessor.java | 26 +++++++------------ .../runner/DeephavenApiServerModule.java | 6 ++--- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java index 3b8bd8a2f16..5721a8bf10b 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraphProcessor.java @@ -266,20 +266,14 @@ public synchronized void take(final AccumulatedCycleStats out) { notificationProcessor = makeNotificationProcessor(); jvmIntrospectionContext = new JvmIntrospectionContext(); - refreshThread = new Thread(() -> { - try { - configureRefreshThread(); - // noinspection InfiniteLoopStatement - ThreadInitializationFactory.wrapRunnable(() -> { - while (true) { - Assert.eqFalse(allowUnitTestMode, "allowUnitTestMode"); - refreshTablesAndFlushNotifications(); - } - }).run(); - } catch (Throwable e) { - e.printStackTrace(); + refreshThread = new Thread(ThreadInitializationFactory.wrapRunnable(() -> { + configureRefreshThread(); + // noinspection InfiniteLoopStatement + while (true) { + Assert.eqFalse(allowUnitTestMode, "allowUnitTestMode"); + refreshTablesAndFlushNotifications(); } - }, "UpdateGraphProcessor." + name() + ".refreshThread"); + }), "UpdateGraphProcessor." + name() + ".refreshThread"); refreshThread.setDaemon(true); final int updateThreads = @@ -1815,10 +1809,10 @@ private UpdateGraphProcessorThreadFactory(@NotNull final ThreadGroup threadGroup @Override public Thread newThread(@NotNull final Runnable r) { - return super.newThread(() -> { + return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { configureRefreshThread(); - ThreadInitializationFactory.wrapRunnable(r).run(); - }); + r.run(); + })); } } diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java index 2d4bfc38425..4a817754152 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java @@ -155,10 +155,10 @@ public ThreadFactory(final String name) { @Override public Thread newThread(final @NotNull Runnable r) { - return super.newThread(() -> { + return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { MultiChunkPool.enableDedicatedPoolForThisThread(); - ThreadInitializationFactory.wrapRunnable(r).run(); - }); + r.run(); + })); } } } From a1c2d5393f7157460774748791a3c7b9ee778f2a Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 16 Dec 2022 12:05:45 -0600 Subject: [PATCH 10/11] remove test debugging property --- props/configs/src/main/resources/dh-defaults.prop | 2 -- 1 file changed, 2 deletions(-) diff --git a/props/configs/src/main/resources/dh-defaults.prop b/props/configs/src/main/resources/dh-defaults.prop index d1713bfd2e3..ac53865e649 100644 --- a/props/configs/src/main/resources/dh-defaults.prop +++ b/props/configs/src/main/resources/dh-defaults.prop @@ -63,5 +63,3 @@ client.version.list=deephaven=io.deephaven.engine.table.Table,barrage=io.deephav # Specifies additional setup to run on threads that can perform table operations with user code. Comma-separated list, instances must be of type io.deephaven.util.thread.ThreadInitializationFactory thread.initialization=io.deephaven.server.console.python.DebuggingInitializer - -deephaven.console.type=groovy \ No newline at end of file From 2a7b4b1f251f4a24a90c583291cec646ba5ec044 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 19 Jan 2023 15:45:09 -0600 Subject: [PATCH 11/11] Lazily create the py module for java threads --- .../deephaven/server/console/python/DebuggingInitializer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java b/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java index ba4bae4ccbf..97867acf324 100644 --- a/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java +++ b/server/src/main/java/io/deephaven/server/console/python/DebuggingInitializer.java @@ -19,10 +19,10 @@ public Runnable createInitializer(Runnable runnable) { // python not enabled, don't accidentally start it return runnable; } - DeephavenModule py_deephaven = (DeephavenModule) PyModule.importModule("deephaven_internal.java_threads") - .createProxy(PyLib.CallableKind.FUNCTION, DeephavenModule.class); return () -> { + DeephavenModule py_deephaven = (DeephavenModule) PyModule.importModule("deephaven_internal.java_threads") + .createProxy(PyLib.CallableKind.FUNCTION, DeephavenModule.class); // First call in to create a custom function that has the same name as the Java thread (plus a prefix) PyObject runnableResult = py_deephaven.create_thread_entry(Thread.currentThread().getName()); // runnable.run();