diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonReplayListenerAdapter.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonReplayListenerAdapter.java index 5b26f71461a..b6eae8bcda5 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonReplayListenerAdapter.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonReplayListenerAdapter.java @@ -3,6 +3,7 @@ // package io.deephaven.integrations.python; +import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.TableUpdateImpl; @@ -11,9 +12,15 @@ import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.RowSetShiftData; +import io.deephaven.engine.updategraph.NotificationQueue; +import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.ScriptApi; import org.jpy.PyObject; +import javax.annotation.Nullable; +import java.util.Arrays; + /** * A Deephaven table listener which passes update events to a Python listener object. The listener can also replay the @@ -27,48 +34,31 @@ public class PythonReplayListenerAdapter extends InstrumentedTableUpdateListener implements TableSnapshotReplayer { private static final long serialVersionUID = -8882402061960621245L; private final PyObject pyCallable; + private final NotificationQueue.Dependency[] dependencies; /** * Create a Python listener. * - * No description for this listener will be provided. A hard reference to this listener will be maintained to - * prevent garbage collection. See {@link #PythonReplayListenerAdapter(String, Table, boolean, PyObject)} if you do - * not want to prevent garbage collection of this listener. - * - * @param source The source table to which this listener will subscribe. - * @param pyObjectIn Python listener object. - */ - public PythonReplayListenerAdapter(Table source, PyObject pyObjectIn) { - this(null, source, true, pyObjectIn); - } - - /** - * Create a Python listener. - * - * A hard reference to this listener will be maintained to prevent garbage collection. See - * {@link #PythonReplayListenerAdapter(String, Table, boolean, PyObject)} if you do not want to prevent garbage - * collection of this listener. - * - * @param description A description for the UpdatePerformanceTracker to append to its entry description. + * @param description A description for the UpdatePerformanceTracker to append to its entry description, may be + * null. * @param source The source table to which this listener will subscribe. + * @param retain Whether a hard reference to this listener should be maintained to prevent it from being collected. * @param pyObjectIn Python listener object. + * @param dependencies The tables that must be satisfied before this listener is executed. */ - public PythonReplayListenerAdapter(String description, Table source, PyObject pyObjectIn) { - this(description, source, true, pyObjectIn); + public static PythonReplayListenerAdapter create(@Nullable String description, Table source, boolean retain, + PyObject pyObjectIn, NotificationQueue.Dependency... dependencies) { + final UpdateGraph updateGraph = source.getUpdateGraph(dependencies); + try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { + return new PythonReplayListenerAdapter(description, source, retain, pyObjectIn, dependencies); + } } - /** - * Create a Python listener. - * - * @param description A description for the UpdatePerformanceTracker to append to its entry description. - * @param source The source table to which this listener will subscribe. - * @param retain Whether a hard reference to this listener should be maintained to prevent it from being collected. - * @param pyObjectIn Python listener object. - */ - public PythonReplayListenerAdapter(String description, Table source, boolean retain, - PyObject pyObjectIn) { + private PythonReplayListenerAdapter(@Nullable String description, Table source, boolean retain, PyObject pyObjectIn, + NotificationQueue.Dependency... dependencies) { super(description, source, retain); - pyCallable = PythonUtils.pyListenerFunc(pyObjectIn); + this.dependencies = dependencies; + this.pyCallable = PythonUtils.pyListenerFunc(pyObjectIn); } @Override @@ -87,4 +77,10 @@ public void onUpdate(final TableUpdate update) { final boolean isReplay = false; pyCallable.call("__call__", update, isReplay); } + + @Override + public boolean canExecute(final long step) { + return super.canExecute(step) + && (dependencies.length == 0 || Arrays.stream(dependencies).allMatch(t -> t.satisfied(step))); + } } diff --git a/py/server/deephaven/table_listener.py b/py/server/deephaven/table_listener.py index af7340eb9be..d07162a07ad 100644 --- a/py/server/deephaven/table_listener.py +++ b/py/server/deephaven/table_listener.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from functools import wraps from inspect import signature -from typing import Callable, Union, List, Generator, Dict, Optional, Literal +from typing import Callable, Union, List, Generator, Dict, Optional, Literal, Sequence import jpy import numpy @@ -61,6 +61,8 @@ def _changes_to_numpy(table: Table, cols: Union[str, List[str]], row_set, chunk_ class TableUpdate(JObjectWrapper): + """A TableUpdate object represents a table update event. It contains the added, removed, and modified rows in the + table. """ j_object_type = _JTableUpdate def __init__(self, table: Table, j_table_update: jpy.JType): @@ -306,39 +308,12 @@ def _wrap_listener_obj(t: Table, listener: TableListener): return listener -def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False, - replay_lock: Literal["shared", "exclusive"] = "shared"): - """This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen - for table updates. - - The function returns the created TableListenerHandle object whose 'stop' method can be called to stop listening. - If it goes out of scope and is garbage collected, the listener will stop receiving any table updates. - - Args: - t (Table): table to listen to - listener (Union[Callable, TableListener]): listener for table changes - description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry - description, default is None - do_replay (bool): whether to replay the initial snapshot of the table, default is False - replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive' - - Returns: - a TableListenerHandle - - Raises: - DHError - """ - table_listener_handle = TableListenerHandle(t=t, listener=listener, description=description) - table_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock) - return table_listener_handle - - -class TableListenerHandle(JObjectWrapper): +class TableListenerHandle: """A handle to manage a table listener's lifecycle.""" - j_object_type = _JPythonReplayListenerAdapter - def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None): - """Creates a new table listener handle. + def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None, + dependencies: Union[Table, Sequence[Table]] = None): + """Creates a new table listener handle with dependencies. Table change events are processed by 'listener', which can be either (1) a callable (e.g. function) or @@ -350,25 +325,43 @@ def __init__(self, t: Table, listener: Union[Callable, TableListener], descripti The 'is_replay' parameter is used only by replay listeners, it is set to 'true' when replaying the initial snapshot and 'false' during normal updates. + Note: Don't do table operations in the listener. Do them beforehand, and add the results as dependencies. + Args: t (Table): table to listen to listener (Union[Callable, TableListener]): listener for table changes description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry description, default is None + dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. + A refreshing table is considered to be satisfied if all possible updates to the table have been processed + in the current update graph cycle. A static table is always considered to be satisfied. If a specified + table is refreshing, it must belong to the same update graph as the table being listened to. Default is + None. + + Dependencies ensure that the listener can safely access the dependent tables during its execution. This + mainly includes reading the data from the tables. While performing operations on the dependent tables in + the listener is safe, it is not recommended because reading or operating on the result tables of those + operations may not be safe. It is best to perform the operations on the dependent tables beforehand, + and then add the result tables as dependencies to the listener so that they can be safely read in it. Raises: - ValueError + DHError """ self.t = t + self.description = description + self.dependencies = to_sequence(dependencies) if callable(listener): - listener_wrapped = _wrap_listener_func(t, listener) + self.listener_wrapped = _wrap_listener_func(t, listener) elif isinstance(listener, TableListener): - listener_wrapped = _wrap_listener_obj(t, listener) + self.listener_wrapped = _wrap_listener_obj(t, listener) else: - raise ValueError("listener is neither callable nor TableListener object") - self.listener = _JPythonReplayListenerAdapter(description, t.j_table, False, listener_wrapped) + raise DHError(message="listener is neither callable nor TableListener object") + try: + self.listener = _JPythonReplayListenerAdapter.create(description, t.j_table, False, self.listener_wrapped, self.dependencies) + except Exception as e: + raise DHError(e, "failed to create a table listener.") from e self.started = False def start(self, do_replay: bool = False, replay_lock: Literal["shared", "exclusive"] = "shared") -> None: @@ -408,6 +401,44 @@ def stop(self) -> None: self.t.j_table.removeUpdateListener(self.listener) self.started = False - @property - def j_object(self) -> jpy.JType: - return self.listener + +def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False, + replay_lock: Literal["shared", "exclusive"] = "shared", dependencies: Union[Table, Sequence[Table]] = None)\ + -> TableListenerHandle: + """This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen + for table updates. + + The function returns the created TableListenerHandle object whose 'stop' method can be called to stop listening. + If it goes out of scope and is garbage collected, the listener will stop receiving any table updates. + + Note: Don't do table operations in the listener. Do them beforehand, and add the results as dependencies. + + Args: + t (Table): table to listen to + listener (Union[Callable, TableListener]): listener for table changes + description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry + description, default is None + do_replay (bool): whether to replay the initial snapshot of the table, default is False + replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive' + dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. + A refreshing table is considered to be satisfied if all possible updates to the table have been processed + in the current update graph cycle. A static table is always considered to be satisfied. If a specified + table is refreshing, it must belong to the same update graph as the table being listened to. Default is + None. + + Dependencies ensure that the listener can safely access the dependent tables during its execution. This + mainly includes reading the data from the tables. While performing operations on the dependent tables in + the listener is safe, it is not recommended because reading or operating on the result tables of those + operations may not be safe. It is best to perform the operations on the dependent tables beforehand, + and then add the result tables as dependencies to the listener so that they can be safely read in it. + + Returns: + a TableListenerHandle + + Raises: + DHError + """ + table_listener_handle = TableListenerHandle(t=t, dependencies=dependencies, listener=listener, + description=description) + table_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock) + return table_listener_handle diff --git a/py/server/tests/test_table_listener.py b/py/server/tests/test_table_listener.py index 4b9110ca226..bded80c1786 100644 --- a/py/server/tests/test_table_listener.py +++ b/py/server/tests/test_table_listener.py @@ -7,8 +7,10 @@ from typing import List, Union import numpy +import jpy -from deephaven import time_table +from deephaven import time_table, new_table, input_table, DHError +from deephaven.column import bool_col, string_col from deephaven.experimental import time_window from deephaven.jcompat import to_sequence from deephaven.table import Table @@ -17,6 +19,7 @@ from deephaven.update_graph import exclusive_lock from tests.testbase import BaseTestCase +_JColumnVectors = jpy.get_type("io.deephaven.engine.table.vectors.ColumnVectors") class TableUpdateRecorder: def __init__(self, table: Table, chunk_size: int = None, cols: Union[str, List[str]] = None): @@ -51,7 +54,7 @@ def record(self, update, is_replay): self.modified_columns_list.append(update.modified_columns) -def ensure_ugp_cycles(table_update_recorder: TableUpdateRecorder, cycles: int = 2): +def ensure_ugp_cycles(table_update_recorder: TableUpdateRecorder, cycles: int = 2) -> None: while len(table_update_recorder.replays) < cycles: time.sleep(1) @@ -191,5 +194,139 @@ def listener_func(update, is_replay): has_added=True, has_removed=False, has_modified=True) + def test_listener_obj_with_deps(self): + dep_table = time_table("PT00:00:05").update("X = i % 11") + ec = get_exec_ctx() + + with self.subTest("view op on deps"): + table_update_recorder = TableUpdateRecorder(self.test_table) + j_arrays = [] + + class ListenerClass(TableListener): + def on_update(self, update, is_replay): + table_update_recorder.record(update, is_replay) + with ec: + t2 = dep_table.view(["Y = i % 8"]) + j_arrays.append(_JColumnVectors.of(t2.j_table, "Y").copyToArray()) + + listener = ListenerClass() + table_listener_handle = listen(self.test_table, listener, dependencies=dep_table) + ensure_ugp_cycles(table_update_recorder, cycles=3) + table_listener_handle.stop() + + self.check_update_recorder(table_update_recorder=table_update_recorder, cols="X", has_replay=False, + has_added=True, has_removed=True, has_modified=False) + self.assertTrue(all([len(ja) > 0 for ja in j_arrays])) + + with (self.subTest("update/group_by op on deps")): + table_update_recorder = TableUpdateRecorder(self.test_table) + j_arrays = [] + + class ListenerClass(TableListener): + def on_update(self, update, is_replay): + table_update_recorder.record(update, is_replay) + with ec: + t2 = dep_table.update(["Y = i % 8"]).group_by("X") + j_arrays.append(_JColumnVectors.of(t2.j_table, "Y").copyToArray()) + + listener = ListenerClass() + self.test_table.await_update() + table_listener_handle = listen(self.test_table, listener, dependencies=dep_table) + ensure_ugp_cycles(table_update_recorder, cycles=3) + table_listener_handle.stop() + + self.check_update_recorder(table_update_recorder=table_update_recorder, cols="X", has_replay=False, + has_added=True, has_removed=True, has_modified=False) + self.assertTrue(all([len(ja) > 0 for ja in j_arrays])) + + with (self.subTest("join op on deps")): + table_update_recorder = TableUpdateRecorder(self.test_table) + j_arrays = [] + dep_table_2 = time_table("PT00:00:05").update("X = i % 11") + + class ListenerClass(TableListener): + def on_update(self, update, is_replay): + table_update_recorder.record(update, is_replay) + with ec: + t2 = dep_table.update(["Y = i % 8"]).group_by("X").join(dep_table_2, on="X", + joins="Ts2=Timestamp") + j_arrays.append(_JColumnVectors.of(t2.j_table, "Y").copyToArray()) + + listener = ListenerClass() + self.test_table.await_update() + table_listener_handle = listen(self.test_table, listener, dependencies=[dep_table, dep_table_2]) + ensure_ugp_cycles(table_update_recorder, cycles=3) + table_listener_handle.stop() + + self.check_update_recorder(table_update_recorder=table_update_recorder, cols="X", has_replay=False, + has_added=True, has_removed=True, has_modified=False) + self.assertTrue(all([len(ja) > 0 for ja in j_arrays])) + + + def test_listener_func_with_deps(self): + cols = [ + bool_col(name="Boolean", data=[True, False]), + string_col(name="String", data=["foo", "bar"]), + ] + t = new_table(cols=cols) + self.assertEqual(t.size, 2) + col_defs = {c.name: c.data_type for c in t.columns} + dep_table = input_table(col_defs=col_defs) + + def listener_func(update, is_replay): + table_update_recorder.record(update, is_replay) + try: + dep_table.add(t) + except Exception as e: + self.assertIn("Attempted to make a blocking input table edit", str(e)) + pass + + with self.subTest("do_replay=False"): + table_update_recorder = TableUpdateRecorder(self.test_table) + table_listener_handle = TableListenerHandle(self.test_table, listener_func, dependencies=dep_table) + table_listener_handle.start(do_replay=False) + ensure_ugp_cycles(table_update_recorder, cycles=3) + table_listener_handle.stop() + self.check_update_recorder(table_update_recorder, has_replay=False, has_added=True, has_removed=True, + has_modified=False) + self.assertEqual(dep_table.size, 0) + + with self.subTest("do_replay=True, replay_lock='exclusive'"): + table_update_recorder = TableUpdateRecorder(self.test_table) + table_listener_handle.start(do_replay=True, replay_lock="exclusive") + ensure_ugp_cycles(table_update_recorder, cycles=3) + table_listener_handle.stop() + self.check_update_recorder(table_update_recorder, has_replay=True, has_added=True, has_removed=True, has_modified=False) + + with self.subTest("do_replay=True, replay_lock='shared'"): + table_update_recorder = TableUpdateRecorder(self.test_table) + table_listener_handle.start(do_replay=True, replay_lock="shared") # noqa + ensure_ugp_cycles(table_update_recorder, cycles=3) + table_listener_handle.stop() + self.check_update_recorder(table_update_recorder, has_replay=True, has_added=True, has_removed=True, has_modified=False) + + def test_listener_obj_with_deps_error(self): + _JPUG = jpy.get_type('io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph') + update_graph = _JPUG.newBuilder("TestUG").existingOrBuild() + + from deephaven import execution_context + _JEC = jpy.get_type('io.deephaven.engine.context.ExecutionContext') + ug_ctx = execution_context.ExecutionContext(j_exec_ctx=_JEC.newBuilder() + .emptyQueryScope() + .newQueryLibrary() + .captureQueryCompiler() + .setUpdateGraph(update_graph) + .build()) + + with ug_ctx: + dep_table = time_table("PT1s") + + def listener_func(update, is_replay): + pass + + with self.assertRaises(DHError): + table_listener_handle = TableListenerHandle(self.test_table, listener_func, dependencies=dep_table) + + if __name__ == "__main__": unittest.main()