Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python debugging support #3075

Merged
merged 12 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,42 @@

public class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadCounter = new AtomicInteger(0);
public final String name;
private final Class clazz;
private boolean daemon;
private final String name;
private final Class<?> clazz;
private final boolean daemon;
private final ThreadGroup threadGroup;

public NamingThreadFactory(final Class clazz, final String name) {
this(clazz, name, false);
/**
* Creates a thread factory using the provided class and name as part of the thread name. All created threads will
* be daemon threads.
*
* @param clazz a class to use when naming each thread
* @param name a name component to add after the class name when naming each thread
*/
public NamingThreadFactory(final Class<?> clazz, final String name) {
this(clazz, name, true);
}

public NamingThreadFactory(final Class clazz, final String name, boolean daemon) {
/**
* Creates a thread factory using the provided class and name as part of the thread name.
*
* @param clazz a class to use when naming each thread
* @param name a name component to add after the class name when naming each thread
* @param daemon true to make each thread a daemon thread
*/
public NamingThreadFactory(final Class<?> clazz, final String name, boolean daemon) {
this(null, clazz, name, daemon);
}

public NamingThreadFactory(ThreadGroup threadGroup, final Class clazz, final String name, boolean daemon) {
/**
* Creates a thread factory using the provided class and name as part of the thread name.
*
* @param threadGroup a thread group to add each thread to
* @param clazz a class to use when naming each thread
* @param name a name component to add after the class name when naming each thread
* @param daemon true to make each thread a daemon thread
*/
public NamingThreadFactory(ThreadGroup threadGroup, final Class<?> clazz, final String name, boolean daemon) {
this.threadGroup = threadGroup;
this.clazz = clazz;
this.name = name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.deephaven.util.thread;

import io.deephaven.configuration.Configuration;

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* Extension point to allow threads that will run user code from within the platform to be controlled by configuration.
*/
public interface ThreadInitializationFactory {
/* private */ String[] CONFIGURED_INITIALIZATION_TYPES =
Configuration.getInstance().getStringArrayFromProperty("thread.initialization");
Comment on lines +14 to +15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work if the property doesn't exist or is empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It fails if it does not exist, but succeeds if empty (with an empty array).

Due to how defaults work (or rather, don't work), I was under the impression we wanted to generally avoid using them, so dh-defaults.prop now contains this property.

/* private */ List<ThreadInitializationFactory> INITIALIZERS = Arrays.stream(CONFIGURED_INITIALIZATION_TYPES)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering, do we think there are occasions for more than 1 ThreadInitializationFactory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have assumed for now that each factory should be created once, but it if wants per-thread state, it should do so by putting that state in the createInitializer method itself, or in the returned Runnable. By only instantiating these once, we grant control to each implementation what kind of scope it wants to have.

For example, the current DebuggingInitializer never closes the deephaven module (in its current form, it should), but could be modified to instead avoid paying the cost of re-creating that proxy for each thread (not just UGP threads, but also the DeephavenApiServerModule scheduler threads too).

.filter(str -> !str.isBlank())
.map(type -> {
try {
// noinspection unchecked
Class<? extends ThreadInitializationFactory> clazz =
(Class<? extends ThreadInitializationFactory>) Class.forName(type);
return clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException
| InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException(
"Error instantiating initializer " + type + ", please check configuration");
}
})
.collect(Collectors.toUnmodifiableList());

/**
* Chains configured initializers to run before/around any given runnable, returning a runnable intended to be run
* by a new thread.
*/
static Runnable wrapRunnable(Runnable runnable) {
Runnable acc = runnable;
for (ThreadInitializationFactory INITIALIZER : INITIALIZERS) {
acc = INITIALIZER.createInitializer(acc);
}
return acc;
}
Comment on lines +36 to +42
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure I understand, the reason for this pattern is to allow for initializer-specific cleanup on exit? Otherwise, a pattern of iterative runnable invocations would seem less error-prone. I suppose either version allows arbitrary hijacking of the intended run method; the current pattern could skip calling the wrapped runnable, but the iterative approach could just never return or throw an exception on termination.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't that we need to do cleanup on the way out per se, but that we want to insert a stack frame in python. If we end up dropping this frame (which has some weird side effects when you "step out" of the apparent top-level frame), then we could just run one after the other, no wrapping.

The current impl has its option of how it wants to do it - I played with an API that would offer a DSL for either wrapping or prefixing, but didn't really love it, I can try bringing that back if you'd prefer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, in light of the arm64 issues, a wrapper is necessary. The question then will be how we name that, if JavaThread is clear and simple enough, or if we want to use it to get some more info to the user.


Runnable createInitializer(Runnable runnable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.chunk.util.pools.MultiChunkPool;
import io.deephaven.configuration.Configuration;
import io.deephaven.util.thread.NamingThreadFactory;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -39,11 +40,11 @@ public static boolean isInitializationThread() {
true) {
@Override
public Thread newThread(@NotNull Runnable r) {
return super.newThread(() -> {
return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> {
isInitializationThread.set(true);
MultiChunkPool.enableDedicatedPoolForThisThread();
r.run();
});
}));
}
};
executorService = Executors.newFixedThreadPool(NUM_THREADS, threadFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class SparseSelect {

private final static ExecutorService executor = SPARSE_SELECT_THREADS == 1 ? null
: Executors.newFixedThreadPool(SPARSE_SELECT_THREADS,
new NamingThreadFactory(SparseSelect.class, "copyThread", true));
new NamingThreadFactory(SparseSelect.class, "copyThread"));

private SparseSelect() {} // static use only

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.table.impl.locations.impl.AbstractTableLocationProvider;
import io.deephaven.engine.table.impl.locations.impl.SubscriptionAggregator;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.util.thread.NamingThreadFactory;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.Future;
Expand All @@ -31,8 +32,6 @@ public class ExecutorTableDataRefreshService implements TableDataRefreshService
private final long tableLocationProviderRefreshIntervalMillis;
private final long tableLocationRefreshIntervalMillis;

private final AtomicInteger threadCount = new AtomicInteger(0);

private final ScheduledThreadPoolExecutor scheduler;

private final Value providerSubscriptions;
Expand All @@ -50,8 +49,9 @@ public ExecutorTableDataRefreshService(@NotNull final String name,
this.tableLocationRefreshIntervalMillis =
Require.gtZero(tableLocationRefreshIntervalMillis, "tableLocationRefreshIntervalMillis");

NamingThreadFactory threadFactory = new NamingThreadFactory(TableDataRefreshService.class, "refreshThread");
scheduler =
new ScheduledThreadPoolExecutor(threadPoolSize, this::makeThread, new ThreadPoolExecutor.AbortPolicy());
new ScheduledThreadPoolExecutor(threadPoolSize, threadFactory, new ThreadPoolExecutor.AbortPolicy());
scheduler.setRemoveOnCancelPolicy(true);

providerSubscriptions = Stats.makeItem(NAME_PREFIX + name, "providerSubscriptions", Counter.FACTORY).getValue();
Expand All @@ -62,13 +62,6 @@ public ExecutorTableDataRefreshService(@NotNull final String name,
.makeItem(NAME_PREFIX + name, "locationSubscriptionRefreshDurationNanos", State.FACTORY).getValue();
}

private Thread makeThread(final Runnable runnable) {
final Thread thread =
new Thread(runnable, NAME_PREFIX + name + "-refreshThread-" + threadCount.incrementAndGet());
thread.setDaemon(true);
return thread;
}

@Override
public void submitOneTimeAsyncTask(@NotNull final Runnable task) {
scheduler.submit(task);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import io.deephaven.util.locks.AwareFunctionalLock;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.thread.NamingThreadFactory;
import io.deephaven.util.thread.ThreadDump;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -262,17 +265,14 @@ public synchronized void take(final AccumulatedCycleStats out) {
notificationProcessor = makeNotificationProcessor();
jvmIntrospectionContext = new JvmIntrospectionContext();

refreshThread = new Thread("UpdateGraphProcessor." + name() + ".refreshThread") {
@Override
public void run() {
configureRefreshThread();
// noinspection InfiniteLoopStatement
while (true) {
Assert.eqFalse(ALLOW_UNIT_TEST_MODE, "ALLOW_UNIT_TEST_MODE");
refreshTablesAndFlushNotifications();
}
refreshThread = new Thread(ThreadInitializationFactory.wrapRunnable(() -> {
configureRefreshThread();
// noinspection InfiniteLoopStatement
while (true) {
Assert.eqFalse(ALLOW_UNIT_TEST_MODE, "ALLOW_UNIT_TEST_MODE");
refreshTablesAndFlushNotifications();
}
};
}), "UpdateGraphProcessor." + name() + ".refreshThread");
refreshThread.setDaemon(true);

final int updateThreads =
Expand Down Expand Up @@ -1800,10 +1800,10 @@ private UpdateGraphProcessorThreadFactory(@NotNull final ThreadGroup threadGroup

@Override
public Thread newThread(@NotNull final Runnable r) {
return super.newThread(() -> {
return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> {
configureRefreshThread();
r.run();
});
}));
}
}

Expand Down Expand Up @@ -1835,7 +1835,7 @@ private ExecutorService makeUnitTestRefreshExecutor() {
private class UnitTestRefreshThreadFactory extends NamingThreadFactory {

private UnitTestRefreshThreadFactory() {
super(UpdateGraphProcessor.class, "unitTestRefresh", true);
super(UpdateGraphProcessor.class, "unitTestRefresh");
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions props/configs/src/main/resources/dh-defaults.prop
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ default.processEnvironmentFactory=io.deephaven.util.process.DefaultProcessEnviro

OperationInitializationThreadPool.threads=1

deephaven.console.type=python

# Default session duration is 5 minutes
http.session.durationMs=300000

Expand All @@ -63,3 +65,7 @@ client.configuration.list=java.version,deephaven.version,barrage.version,http.se
# jar, and a class that is found in that jar. Any such keys will be made available to the client.configuration.list
# as <key>.version.
client.version.list=deephaven=io.deephaven.engine.table.Table,barrage=io.deephaven.barrage.flatbuf.BarrageMessageWrapper


# Specifies additional setup to run on threads that can perform table operations with user code. Comma-separated list, instances must be of type io.deephaven.util.thread.ThreadInitializationFactory
thread.initialization=io.deephaven.server.console.python.DebuggingInitializer
4 changes: 4 additions & 0 deletions props/test-configs/src/main/resources/dh-tests.prop
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,8 @@ http.session.durationMs=300000
AuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler
authentication.client.configuration.list=
client.version.list=

authentication.anonymous.warn=false

deephaven.console.type=none
thread.initialization=
Loading