Skip to content

Commit

Permalink
Python library for embedded server (deephaven#2353)
Browse files Browse the repository at this point in the history
Introduces a Python library that can start the Deephaven server from
python code. This consists of a wheel that holds not only some python
glue to start the server, but all of the jars required to be on the
deephaven classpath. Presently this depends on deephaven's own jpy
fork, but soon this limitation will be dropped and the upstream jpy
project will work.

Partial deephaven#2221
  • Loading branch information
niloc132 authored and SuperTails committed Jun 29, 2022
1 parent 95bfb25 commit 281b193
Show file tree
Hide file tree
Showing 37 changed files with 753 additions and 83 deletions.
33 changes: 32 additions & 1 deletion buildSrc/src/main/groovy/io.deephaven.python-wheel.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,43 @@ plugins {
id 'io.deephaven.project.register'
}

import io.deephaven.python.PythonWheelExtension

project.extensions.create('wheel', PythonWheelExtension, project.objects)

configurations {
pythonWheel
getByName(Dependency.DEFAULT_CONFIGURATION).extendsFrom(pythonWheel)
}

def buildWheel = Docker.buildPyWheel(project, 'buildWheel', "deephaven/${project.name}", project.projectDir.toString())
project.evaluationDependsOn(Docker.registryProject('python'))
def buildWheel = Docker.registerDockerTask(project, 'buildWheel') { config ->
config.copyIn { Sync sync ->
// apply the extension spec, copying into src
sync.from(project.projectDir) { CopySpec copy ->
wheel.contents().execute(copy)
copy.into 'src'
}
}
config.imageName = "deephaven/${project.name}:local-build"
config.dockerfile { action ->
// set up the container, env vars - things that aren't likely to change
action.from 'deephaven/python:local-build as sources'
action.arg 'DEEPHAVEN_VERSION'
action.environmentVariable 'DEEPHAVEN_VERSION', project.version.toString()
action.workingDir '/usr/src/app'
action.copyFile '/src', '.'
action.from 'sources as build'
action.runCommand '''set -eux; \\
test -n "${DEEPHAVEN_VERSION}";\\
python setup.py bdist_wheel'''
}
config.parentContainers = [ Docker.registryTask(project, 'python') ]
config.containerOutPath='/usr/src/app/dist'
config.copyOut { Sync sync ->
sync.into "build/wheel"
}
}

artifacts {
pythonWheel buildWheel
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.deephaven.python

import org.gradle.api.Action
import org.gradle.api.file.CopySpec
import org.gradle.api.internal.file.copy.DefaultCopySpec
import org.gradle.api.model.ObjectFactory
import org.gradle.api.provider.Property
import org.gradle.util.ConfigureUtil

class PythonWheelExtension {
private Action<? super CopySpec> contents

PythonWheelExtension(ObjectFactory objectFactory) {
//objectFactory.newInstance(DefaultCopySpec.class)
contents {
exclude 'build', 'dist'
}
}

void contents(Action<? super CopySpec> action) {
contents = action
}
void contents(Closure closure) {
contents(ConfigureUtil.configureUsing(closure))
}

Action<? super CopySpec> contents() {
return contents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,32 @@ public String getClassPath() {
}

protected synchronized void publishInitial() {
applyDiff(emptySnapshot(), takeSnapshot(), null);
try (S empty = emptySnapshot(); S snapshot = takeSnapshot()) {
applyDiff(empty, snapshot, null);
}
}

interface Snapshot extends SafeCloseable {

}

interface Snapshot {
@Override
public synchronized SnapshotScope snapshot(@Nullable SnapshotScope previousIfPresent) {
// TODO deephaven-core#2453 this should be redone, along with other scope change handling
if (previousIfPresent != null) {
previousIfPresent.close();
}
S snapshot = takeSnapshot();
return () -> finishSnapshot(snapshot);
}

private synchronized void finishSnapshot(S beforeSnapshot) {
try (beforeSnapshot; S afterSnapshot = takeSnapshot()) {
applyDiff(beforeSnapshot, afterSnapshot, null);
}
}


protected abstract S emptySnapshot();

protected abstract S takeSnapshot();
Expand All @@ -127,35 +146,38 @@ protected Changes applyDiff(S from, S to, RuntimeException e) {
@Override
public synchronized final Changes evaluateScript(final String script, final @Nullable String scriptName) {
RuntimeException evaluateErr = null;
final S fromSnapshot = takeSnapshot();

// store pointers to exist query scope static variables
final QueryLibrary prevQueryLibrary = QueryLibrary.getLibrary();
final CompilerTools.Context prevCompilerContext = CompilerTools.getContext();
final QueryScope prevQueryScope = QueryScope.getScope();

// retain any objects which are created in the executed code, we'll release them when the script session closes
try (final SafeCloseable ignored = LivenessScopeStack.open(this, false)) {
// point query scope static state to our session's state
QueryScope.setScope(queryScope);
CompilerTools.setContext(compilerContext);
QueryLibrary.setLibrary(queryLibrary);
final Changes diff;
try (S fromSnapshot = takeSnapshot()) {

// store pointers to exist query scope static variables
final QueryLibrary prevQueryLibrary = QueryLibrary.getLibrary();
final CompilerTools.Context prevCompilerContext = CompilerTools.getContext();
final QueryScope prevQueryScope = QueryScope.getScope();

// retain any objects which are created in the executed code, we'll release them when the script session
// closes
try (final SafeCloseable ignored = LivenessScopeStack.open(this, false)) {
// point query scope static state to our session's state
QueryScope.setScope(queryScope);
CompilerTools.setContext(compilerContext);
QueryLibrary.setLibrary(queryLibrary);

// actually evaluate the script
evaluate(script, scriptName);
} catch (final RuntimeException err) {
evaluateErr = err;
} finally {
// restore pointers to query scope static variables
QueryScope.setScope(prevQueryScope);
CompilerTools.setContext(prevCompilerContext);
QueryLibrary.setLibrary(prevQueryLibrary);
}

// actually evaluate the script
evaluate(script, scriptName);
} catch (final RuntimeException err) {
evaluateErr = err;
} finally {
// restore pointers to query scope static variables
QueryScope.setScope(prevQueryScope);
CompilerTools.setContext(prevCompilerContext);
QueryLibrary.setLibrary(prevQueryLibrary);
try (S toSnapshot = takeSnapshot()) {
diff = applyDiff(fromSnapshot, toSnapshot, evaluateErr);
}
}

final S toSnapshot = takeSnapshot();

final Changes diff = applyDiff(fromSnapshot, toSnapshot, evaluateErr);

// re-throw any captured exception now that our listener knows what query scope state had changed prior
// to the script session execution error
if (evaluateErr != null) {
Expand Down Expand Up @@ -265,7 +287,7 @@ protected void destroy() {
protected abstract QueryScope newQueryScope();

@Override
public Class getVariableType(final String var) {
public Class<?> getVariableType(final String var) {
final Object result = getVariable(var, null);
if (result == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand All @@ -25,7 +26,7 @@ public class DelegatingScriptSession implements ScriptSession {
private final Set<String> knownVariables = new HashSet<>();

public DelegatingScriptSession(final ScriptSession delegate) {
this.delegate = delegate;
this.delegate = Objects.requireNonNull(delegate);
}

private Changes contextualizeChanges(final Changes diff) {
Expand All @@ -51,6 +52,11 @@ private Changes contextualizeChanges(final Changes diff) {
return diff;
}

@Override
public SnapshotScope snapshot(@Nullable SnapshotScope previousIfPresent) {
return delegate.snapshot(previousIfPresent);
}

@NotNull
@Override
public Object getVariable(String name) throws QueryScope.MissingVariableException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,11 @@ protected static class GroovySnapshot implements Snapshot {
public GroovySnapshot(Map<String, Object> existingScope) {
this.scope = Objects.requireNonNull(existingScope);
}

@Override
public void close() {
// no-op
}
}

public Set<String> getVariableNames() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,17 @@ public class PythonDeephavenSession extends AbstractScriptSession<PythonSnapshot
* @param listener an optional listener that will be notified whenever the query scope changes
* @param runInitScripts if init scripts should be executed
* @param isDefaultScriptSession true if this is in the default context of a worker jvm
* @param pythonEvaluator
* @throws IOException if an IO error occurs running initialization scripts
* @throws InterruptedException if the current thread is interrupted while starting JPY
* @throws TimeoutException if jpy times out while obtaining configuration details
*/
public PythonDeephavenSession(
ObjectTypeLookup objectTypeLookup, @Nullable final Listener listener, boolean runInitScripts,
boolean isDefaultScriptSession)
throws IOException, InterruptedException, TimeoutException {
boolean isDefaultScriptSession, PythonEvaluatorJpy pythonEvaluator)
throws IOException {
super(objectTypeLookup, listener, isDefaultScriptSession);

// Start Jpy, if not already running from the python instance
JpyInit.init(log);

PythonEvaluatorJpy jpy = PythonEvaluatorJpy.withGlobalCopy();
evaluator = jpy;
scope = jpy.getScope();
evaluator = pythonEvaluator;
scope = pythonEvaluator.getScope();
this.module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session")
.createProxy(CallableKind.FUNCTION, PythonScriptSessionModule.class);
this.scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH);
Expand Down Expand Up @@ -229,10 +224,7 @@ protected Changes createDiff(PythonSnapshot from, PythonSnapshot to, RuntimeExce
// It would be great if we could push down the maybeUnwrap logic into create_change_list (it could handle the
// unwrapping), but we are unable to tell jpy that we want to unwrap JType objects, but pass back python objects
// as PyObject.
try (
PythonSnapshot fromSnapshot = from;
PythonSnapshot toSnapshot = to;
PyObject changes = module.create_change_list(fromSnapshot.dict.unwrap(), toSnapshot.dict.unwrap())) {
try (PyObject changes = module.create_change_list(from.dict.unwrap(), to.dict.unwrap())) {
final Changes diff = new Changes();
diff.error = e;
for (PyObject change : changes.asList()) {
Expand Down Expand Up @@ -277,19 +269,21 @@ public boolean hasVariableName(String name) {

@Override
public synchronized void setVariable(String name, @Nullable Object newValue) {
final PythonSnapshot fromSnapshot = takeSnapshot();
final PyDictWrapper globals = scope.globals();
if (newValue == null) {
try {
globals.delItem(name);
} catch (KeyError key) {
// ignore
try (PythonSnapshot fromSnapshot = takeSnapshot()) {
final PyDictWrapper globals = scope.globals();
if (newValue == null) {
try {
globals.delItem(name);
} catch (KeyError key) {
// ignore
}
} else {
globals.setItem(name, newValue);
}
try (PythonSnapshot toSnapshot = takeSnapshot()) {
applyDiff(fromSnapshot, toSnapshot, null);
}
} else {
globals.setItem(name, newValue);
}
final PythonSnapshot toSnapshot = takeSnapshot();
applyDiff(fromSnapshot, toSnapshot, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.deephaven.engine.util;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

import io.deephaven.engine.util.jpy.JpyInit;
import org.jpy.KeyError;
import org.jpy.PyDictWrapper;
import org.jpy.PyInputMode;
Expand All @@ -16,7 +20,10 @@
*/
public class PythonEvaluatorJpy implements PythonEvaluator {

public static PythonEvaluatorJpy withGlobalCopy() {
public static PythonEvaluatorJpy withGlobalCopy() throws IOException, InterruptedException, TimeoutException {
// Start Jpy, if not already running from the python instance
JpyInit.init();

// TODO: We still have to reach into the __main__ dictionary to push classes and import the Deephaven
// quasi-module
// because after we dill the item, the undilled item has a reference to the __main__ globals() and not our
Expand All @@ -26,6 +33,13 @@ public static PythonEvaluatorJpy withGlobalCopy() {
return new PythonEvaluatorJpy(PyLib.getMainGlobals().asDict().copy());
}

public static PythonEvaluatorJpy withGlobals() throws IOException, InterruptedException, TimeoutException {
// Start Jpy, if not already running from the python instance
JpyInit.init();

return new PythonEvaluatorJpy(PyLib.getMainGlobals().asDict());
}

private final PyDictWrapper globals;

private PythonEvaluatorJpy(PyDictWrapper globals) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.engine.liveness.ReleasableLivenessManager;
import io.deephaven.engine.util.scripts.ScriptPathLoader;
import io.deephaven.engine.util.scripts.ScriptPathLoaderState;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -73,6 +74,28 @@ interface Listener {
void onScopeChanges(ScriptSession scriptSession, Changes changes);
}

/**
* Tracks changes in the script session bindings until the SnapshotScope is closed.
*
* @return a new SnapshotScope, so that the caller can control when to stop tracking changes to bindings.
*/
default SnapshotScope snapshot() {
return snapshot(null);
}

/**
* Tracks changes in the script session bindings until the SnapshotScope is closed.
*
* This API should be considered unstable, see deephaven-core#2453.
*
* @param previousIfPresent if non-null, will be closed atomically with the new scope being opened.
* @return a new SnapshotScope, so that the caller can control when to stop tracking changes to bindings.
*/
SnapshotScope snapshot(@Nullable SnapshotScope previousIfPresent);

interface SnapshotScope extends SafeCloseable {
}

/**
* Evaluates the script and manages liveness of objects that are exported to the user. This method should be called
* from the serial executor as it manipulates static state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
public interface VariableProvider {
Set<String> getVariableNames();

Class getVariableType(String var);
Class<?> getVariableType(String var);

<T> T getVariable(String var, T defaultValue);

Expand Down
Loading

0 comments on commit 281b193

Please sign in to comment.