Skip to content

Commit

Permalink
Add a listener with deps needed to be satisfied
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Jun 7, 2024
1 parent 6711cd1 commit f137328
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.integrations.python;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.TableUpdateImpl;
Expand All @@ -11,9 +12,13 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import org.jpy.PyObject;

import java.util.Arrays;


/**
* A Deephaven table listener which passes update events to a Python listener object. The listener can also replay the
Expand All @@ -27,35 +32,7 @@ public class PythonReplayListenerAdapter extends InstrumentedTableUpdateListener
implements TableSnapshotReplayer {
private static final long serialVersionUID = -8882402061960621245L;
private final PyObject pyCallable;

/**
* Create a Python listener.
*
* No description for this listener will be provided. A hard reference to this listener will be maintained to
* prevent garbage collection. See {@link #PythonReplayListenerAdapter(String, Table, boolean, PyObject)} if you do
* not want to prevent garbage collection of this listener.
*
* @param source The source table to which this listener will subscribe.
* @param pyObjectIn Python listener object.
*/
public PythonReplayListenerAdapter(Table source, PyObject pyObjectIn) {
this(null, source, true, pyObjectIn);
}

/**
* Create a Python listener.
*
* A hard reference to this listener will be maintained to prevent garbage collection. See
* {@link #PythonReplayListenerAdapter(String, Table, boolean, PyObject)} if you do not want to prevent garbage
* collection of this listener.
*
* @param description A description for the UpdatePerformanceTracker to append to its entry description.
* @param source The source table to which this listener will subscribe.
* @param pyObjectIn Python listener object.
*/
public PythonReplayListenerAdapter(String description, Table source, PyObject pyObjectIn) {
this(description, source, true, pyObjectIn);
}
private final Table[] dependencies;

/**
* Create a Python listener.
Expand All @@ -64,11 +41,21 @@ public PythonReplayListenerAdapter(String description, Table source, PyObject py
* @param source The source table to which this listener will subscribe.
* @param retain Whether a hard reference to this listener should be maintained to prevent it from being collected.
* @param pyObjectIn Python listener object.
* @param dependencies The tables that must be satisfied before this listener is executed.
*/
public PythonReplayListenerAdapter(String description, Table source, boolean retain,
PyObject pyObjectIn) {
private PythonReplayListenerAdapter(String description, Table source, boolean retain, PyObject pyObjectIn,
Table... dependencies) {
super(description, source, retain);
pyCallable = PythonUtils.pyListenerFunc(pyObjectIn);
this.dependencies = dependencies;
this.pyCallable = PythonUtils.pyListenerFunc(pyObjectIn);
}

public static PythonReplayListenerAdapter create(String description, Table source, boolean retain,
PyObject pyObjectIn, Table... dependencies) {
final UpdateGraph updateGraph = source.getUpdateGraph(dependencies);
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return new PythonReplayListenerAdapter(description, source, retain, pyObjectIn, dependencies);
}
}

@Override
Expand All @@ -87,4 +74,9 @@ public void onUpdate(final TableUpdate update) {
final boolean isReplay = false;
pyCallable.call("__call__", update, isReplay);
}

@Override
public boolean canExecute(final long step) {
return source.satisfied(step) && Arrays.stream(dependencies).allMatch(t -> t.satisfied(step));
}
}
79 changes: 41 additions & 38 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABC, abstractmethod
from functools import wraps
from inspect import signature
from typing import Callable, Union, List, Generator, Dict, Optional, Literal
from typing import Callable, Union, List, Generator, Dict, Optional, Literal, Sequence

import jpy
import numpy
Expand Down Expand Up @@ -61,6 +61,8 @@ def _changes_to_numpy(table: Table, cols: Union[str, List[str]], row_set, chunk_


class TableUpdate(JObjectWrapper):
"""A TableUpdate object represents a table update event. It contains the added, removed, and modified rows in the
table. """
j_object_type = _JTableUpdate

def __init__(self, table: Table, j_table_update: jpy.JType):
Expand Down Expand Up @@ -306,39 +308,12 @@ def _wrap_listener_obj(t: Table, listener: TableListener):
return listener


def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False,
replay_lock: Literal["shared", "exclusive"] = "shared"):
"""This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen
for table updates.
The function returns the created TableListenerHandle object whose 'stop' method can be called to stop listening.
If it goes out of scope and is garbage collected, the listener will stop receiving any table updates.
Args:
t (Table): table to listen to
listener (Union[Callable, TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
do_replay (bool): whether to replay the initial snapshot of the table, default is False
replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'
Returns:
a TableListenerHandle
Raises:
DHError
"""
table_listener_handle = TableListenerHandle(t=t, listener=listener, description=description)
table_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock)
return table_listener_handle


class TableListenerHandle(JObjectWrapper):
class TableListenerHandle:
"""A handle to manage a table listener's lifecycle."""
j_object_type = _JPythonReplayListenerAdapter

def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None):
"""Creates a new table listener handle.
def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None,
dependencies: Union[Table, Sequence[Table]] = None):
"""Creates a new table listener handle with dependencies.
Table change events are processed by 'listener', which can be either
(1) a callable (e.g. function) or
Expand All @@ -355,20 +330,23 @@ def __init__(self, t: Table, listener: Union[Callable, TableListener], descripti
listener (Union[Callable, TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
dependencies (Union[Table, Sequence[Table]]): tables that must be safe to read during the listener's execution
Raises:
ValueError
"""
self.t = t
self.description = description
self.dependencies = to_sequence(dependencies)

if callable(listener):
listener_wrapped = _wrap_listener_func(t, listener)
self.listener_wrapped = _wrap_listener_func(t, listener)
elif isinstance(listener, TableListener):
listener_wrapped = _wrap_listener_obj(t, listener)
self.listener_wrapped = _wrap_listener_obj(t, listener) # type: ignore
else:
raise ValueError("listener is neither callable nor TableListener object")
self.listener = _JPythonReplayListenerAdapter(description, t.j_table, False, listener_wrapped)

self.listener = _JPythonReplayListenerAdapter.create(description, t.j_table, False, self.listener_wrapped, self.dependencies)
self.started = False

def start(self, do_replay: bool = False, replay_lock: Literal["shared", "exclusive"] = "shared") -> None:
Expand Down Expand Up @@ -408,6 +386,31 @@ def stop(self) -> None:
self.t.j_table.removeUpdateListener(self.listener)
self.started = False

@property
def j_object(self) -> jpy.JType:
return self.listener

def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False,
replay_lock: Literal["shared", "exclusive"] = "shared", dependencies: Union[Table, Sequence[Table]] = None):
"""This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen
for table updates.
The function returns the created TableListenerHandle object whose 'stop' method can be called to stop listening.
If it goes out of scope and is garbage collected, the listener will stop receiving any table updates.
Args:
t (Table): table to listen to
listener (Union[Callable, TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
do_replay (bool): whether to replay the initial snapshot of the table, default is False
replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'
dependencies (Union[Table, Sequence[Table]]): tables that must be safe to read during the listener's execution
Returns:
a TableListenerHandle
Raises:
DHError
"""
table_listener_handle = TableListenerHandle(t=t, dependencies=dependencies, listener=listener,
description=description)
table_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock)
return table_listener_handle
96 changes: 94 additions & 2 deletions py/server/tests/test_table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
from typing import List, Union

import numpy
import jpy

from deephaven import time_table
from deephaven import time_table, new_table, input_table
from deephaven.column import bool_col, string_col
from deephaven.experimental import time_window
from deephaven.jcompat import to_sequence
from deephaven.table import Table
Expand All @@ -17,6 +19,7 @@
from deephaven.update_graph import exclusive_lock
from tests.testbase import BaseTestCase

_JColumnVectors = jpy.get_type("io.deephaven.engine.table.vectors.ColumnVectors")

class TableUpdateRecorder:
def __init__(self, table: Table, chunk_size: int = None, cols: Union[str, List[str]] = None):
Expand Down Expand Up @@ -51,7 +54,7 @@ def record(self, update, is_replay):
self.modified_columns_list.append(update.modified_columns)


def ensure_ugp_cycles(table_update_recorder: TableUpdateRecorder, cycles: int = 2):
def ensure_ugp_cycles(table_update_recorder: TableUpdateRecorder, cycles: int = 2) -> None:
while len(table_update_recorder.replays) < cycles:
time.sleep(1)

Expand Down Expand Up @@ -191,5 +194,94 @@ def listener_func(update, is_replay):
has_added=True, has_removed=False, has_modified=True)


def test_listener_with_deps_obj(self):
table_update_recorder = TableUpdateRecorder(self.test_table)
dep_table = time_table("PT00:00:05").update("X = i % 11")
ec = get_exec_ctx()


with self.subTest("with deps"):
j_arrays = []

class ListenerClass(TableListener):
def on_update(self, update, is_replay):
table_update_recorder.record(update, is_replay)
with ec:
t2 = dep_table.view(["Y = i % 8"])
j_arrays.append(_JColumnVectors.of(t2.j_table, "Y").copyToArray())

listener = ListenerClass()
table_listener_handle = listen(self.test_table, listener, dependencies=dep_table)
ensure_ugp_cycles(table_update_recorder, cycles=3)
table_listener_handle.stop()

self.check_update_recorder(table_update_recorder=table_update_recorder, cols="X", has_replay=False,
has_added=True, has_removed=True, has_modified=False)
self.assertTrue(all([len(ja) > 0 for ja in j_arrays]))

with self.subTest("with deps, error"):
j_arrays = []

class ListenerClass(TableListener):
def on_update(self, update, is_replay):
table_update_recorder.record(update, is_replay)
with ec:
try:
t2 = dep_table.view(["Y = i % 8"]).group_by("X")
j_arrays.append(_JColumnVectors.of(t2.j_table, "Y").copyToArray())
except Exception as e:
pass

listener = ListenerClass()
table_listener_handle = listen(self.test_table, listener, dependencies=dep_table)
ensure_ugp_cycles(table_update_recorder, cycles=3)
table_listener_handle.stop()

self.check_update_recorder(table_update_recorder=table_update_recorder, cols="X", has_replay=False,
has_added=True, has_removed=True, has_modified=False)
self.assertTrue(len(j_arrays) == 0)

def test_listener_with_deps_func(self):
cols = [
bool_col(name="Boolean", data=[True, False]),
string_col(name="String", data=["foo", "bar"]),
]
t = new_table(cols=cols)
self.assertEqual(t.size, 2)
col_defs = {c.name: c.data_type for c in t.columns}
dep_table = input_table(col_defs=col_defs)

def listener_func(update, is_replay):
table_update_recorder.record(update, is_replay)
try:
dep_table.add(t)
except Exception as e:
self.assertIn("Attempted to make a blocking input table edit from a listener or notification. This is unsupported", str(e))
pass

with self.subTest("with deps"):
table_update_recorder = TableUpdateRecorder(self.test_table)
table_listener_handle = TableListenerHandle(self.test_table, listener_func, dependencies=dep_table)
table_listener_handle.start(do_replay=False)
ensure_ugp_cycles(table_update_recorder, cycles=3)
table_listener_handle.stop()
self.check_update_recorder(table_update_recorder, has_replay=False, has_added=True, has_removed=True,
has_modified=False)
self.assertEqual(dep_table.size, 0)

with self.subTest("with deps, replay_lock='exclusive'"):
table_listener_handle.start(do_replay=True, replay_lock="exclusive")
ensure_ugp_cycles(table_update_recorder, cycles=3)
table_listener_handle.stop()
self.check_update_recorder(table_update_recorder, has_replay=True, has_added=True, has_removed=True, has_modified=False)

with self.subTest("with deps, replay_lock='shared'"):
raise unittest.SkipTest("This test will dead lock, waiting for the resolution of https://github.com/deephaven/deephaven-core/issues/5585")
table_listener_handle.start(do_replay=True, replay_lock="shared") # noqa
ensure_ugp_cycles(table_update_recorder, cycles=3)
table_listener_handle.stop()
self.check_update_recorder(table_update_recorder, has_replay=True, has_added=True, has_removed=True, has_modified=False)


if __name__ == "__main__":
unittest.main()

0 comments on commit f137328

Please sign in to comment.