Skip to content

Commit

Permalink
Add a listener that only fires when all dependencies are satisfied (#…
Browse files Browse the repository at this point in the history
…5571)

* Add a listener with deps needed to be satisfied

* Clean up test cases

* Improve doc string and code pattern

* Make the code a bit more generic, update doc

* Rework the doc string

* Improve the doc strings to be more clear

* Update py/server/deephaven/table_listener.py

Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com>

* Update py/server/deephaven/table_listener.py

Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com>

---------

Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com>
  • Loading branch information
jmao-denver and chipkent committed Jun 18, 2024
1 parent 5501712 commit f3cd3b4
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.integrations.python;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.TableUpdateImpl;
Expand All @@ -11,9 +12,15 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import org.jpy.PyObject;

import javax.annotation.Nullable;
import java.util.Arrays;


/**
* A Deephaven table listener which passes update events to a Python listener object. The listener can also replay the
Expand All @@ -27,48 +34,31 @@ public class PythonReplayListenerAdapter extends InstrumentedTableUpdateListener
implements TableSnapshotReplayer {
private static final long serialVersionUID = -8882402061960621245L;
private final PyObject pyCallable;
private final NotificationQueue.Dependency[] dependencies;

/**
* Create a Python listener.
*
* No description for this listener will be provided. A hard reference to this listener will be maintained to
* prevent garbage collection. See {@link #PythonReplayListenerAdapter(String, Table, boolean, PyObject)} if you do
* not want to prevent garbage collection of this listener.
*
* @param source The source table to which this listener will subscribe.
* @param pyObjectIn Python listener object.
*/
public PythonReplayListenerAdapter(Table source, PyObject pyObjectIn) {
this(null, source, true, pyObjectIn);
}

/**
* Create a Python listener.
*
* A hard reference to this listener will be maintained to prevent garbage collection. See
* {@link #PythonReplayListenerAdapter(String, Table, boolean, PyObject)} if you do not want to prevent garbage
* collection of this listener.
*
* @param description A description for the UpdatePerformanceTracker to append to its entry description.
* @param description A description for the UpdatePerformanceTracker to append to its entry description, may be
* null.
* @param source The source table to which this listener will subscribe.
* @param retain Whether a hard reference to this listener should be maintained to prevent it from being collected.
* @param pyObjectIn Python listener object.
* @param dependencies The tables that must be satisfied before this listener is executed.
*/
public PythonReplayListenerAdapter(String description, Table source, PyObject pyObjectIn) {
this(description, source, true, pyObjectIn);
public static PythonReplayListenerAdapter create(@Nullable String description, Table source, boolean retain,
PyObject pyObjectIn, NotificationQueue.Dependency... dependencies) {
final UpdateGraph updateGraph = source.getUpdateGraph(dependencies);
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return new PythonReplayListenerAdapter(description, source, retain, pyObjectIn, dependencies);
}
}

/**
* Create a Python listener.
*
* @param description A description for the UpdatePerformanceTracker to append to its entry description.
* @param source The source table to which this listener will subscribe.
* @param retain Whether a hard reference to this listener should be maintained to prevent it from being collected.
* @param pyObjectIn Python listener object.
*/
public PythonReplayListenerAdapter(String description, Table source, boolean retain,
PyObject pyObjectIn) {
private PythonReplayListenerAdapter(@Nullable String description, Table source, boolean retain, PyObject pyObjectIn,
NotificationQueue.Dependency... dependencies) {
super(description, source, retain);
pyCallable = PythonUtils.pyListenerFunc(pyObjectIn);
this.dependencies = dependencies;
this.pyCallable = PythonUtils.pyListenerFunc(pyObjectIn);
}

@Override
Expand All @@ -87,4 +77,10 @@ public void onUpdate(final TableUpdate update) {
final boolean isReplay = false;
pyCallable.call("__call__", update, isReplay);
}

@Override
public boolean canExecute(final long step) {
return super.canExecute(step)
&& (dependencies.length == 0 || Arrays.stream(dependencies).allMatch(t -> t.satisfied(step)));
}
}
111 changes: 71 additions & 40 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABC, abstractmethod
from functools import wraps
from inspect import signature
from typing import Callable, Union, List, Generator, Dict, Optional, Literal
from typing import Callable, Union, List, Generator, Dict, Optional, Literal, Sequence

import jpy
import numpy
Expand Down Expand Up @@ -61,6 +61,8 @@ def _changes_to_numpy(table: Table, cols: Union[str, List[str]], row_set, chunk_


class TableUpdate(JObjectWrapper):
"""A TableUpdate object represents a table update event. It contains the added, removed, and modified rows in the
table. """
j_object_type = _JTableUpdate

def __init__(self, table: Table, j_table_update: jpy.JType):
Expand Down Expand Up @@ -306,39 +308,12 @@ def _wrap_listener_obj(t: Table, listener: TableListener):
return listener


def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False,
replay_lock: Literal["shared", "exclusive"] = "shared"):
"""This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen
for table updates.
The function returns the created TableListenerHandle object whose 'stop' method can be called to stop listening.
If it goes out of scope and is garbage collected, the listener will stop receiving any table updates.
Args:
t (Table): table to listen to
listener (Union[Callable, TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
do_replay (bool): whether to replay the initial snapshot of the table, default is False
replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'
Returns:
a TableListenerHandle
Raises:
DHError
"""
table_listener_handle = TableListenerHandle(t=t, listener=listener, description=description)
table_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock)
return table_listener_handle


class TableListenerHandle(JObjectWrapper):
class TableListenerHandle:
"""A handle to manage a table listener's lifecycle."""
j_object_type = _JPythonReplayListenerAdapter

def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None):
"""Creates a new table listener handle.
def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None,
dependencies: Union[Table, Sequence[Table]] = None):
"""Creates a new table listener handle with dependencies.
Table change events are processed by 'listener', which can be either
(1) a callable (e.g. function) or
Expand All @@ -350,25 +325,43 @@ def __init__(self, t: Table, listener: Union[Callable, TableListener], descripti
The 'is_replay' parameter is used only by replay listeners, it is set to 'true' when replaying the initial
snapshot and 'false' during normal updates.
Note: Don't do table operations in the listener. Do them beforehand, and add the results as dependencies.
Args:
t (Table): table to listen to
listener (Union[Callable, TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution.
A refreshing table is considered to be satisfied if all possible updates to the table have been processed
in the current update graph cycle. A static table is always considered to be satisfied. If a specified
table is refreshing, it must belong to the same update graph as the table being listened to. Default is
None.
Dependencies ensure that the listener can safely access the dependent tables during its execution. This
mainly includes reading the data from the tables. While performing operations on the dependent tables in
the listener is safe, it is not recommended because reading or operating on the result tables of those
operations may not be safe. It is best to perform the operations on the dependent tables beforehand,
and then add the result tables as dependencies to the listener so that they can be safely read in it.
Raises:
ValueError
DHError
"""
self.t = t
self.description = description
self.dependencies = to_sequence(dependencies)

if callable(listener):
listener_wrapped = _wrap_listener_func(t, listener)
self.listener_wrapped = _wrap_listener_func(t, listener)
elif isinstance(listener, TableListener):
listener_wrapped = _wrap_listener_obj(t, listener)
self.listener_wrapped = _wrap_listener_obj(t, listener)
else:
raise ValueError("listener is neither callable nor TableListener object")
self.listener = _JPythonReplayListenerAdapter(description, t.j_table, False, listener_wrapped)
raise DHError(message="listener is neither callable nor TableListener object")

try:
self.listener = _JPythonReplayListenerAdapter.create(description, t.j_table, False, self.listener_wrapped, self.dependencies)
except Exception as e:
raise DHError(e, "failed to create a table listener.") from e
self.started = False

def start(self, do_replay: bool = False, replay_lock: Literal["shared", "exclusive"] = "shared") -> None:
Expand Down Expand Up @@ -408,6 +401,44 @@ def stop(self) -> None:
self.t.j_table.removeUpdateListener(self.listener)
self.started = False

@property
def j_object(self) -> jpy.JType:
return self.listener

def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False,
replay_lock: Literal["shared", "exclusive"] = "shared", dependencies: Union[Table, Sequence[Table]] = None)\
-> TableListenerHandle:
"""This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen
for table updates.
The function returns the created TableListenerHandle object whose 'stop' method can be called to stop listening.
If it goes out of scope and is garbage collected, the listener will stop receiving any table updates.
Note: Don't do table operations in the listener. Do them beforehand, and add the results as dependencies.
Args:
t (Table): table to listen to
listener (Union[Callable, TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
do_replay (bool): whether to replay the initial snapshot of the table, default is False
replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'
dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution.
A refreshing table is considered to be satisfied if all possible updates to the table have been processed
in the current update graph cycle. A static table is always considered to be satisfied. If a specified
table is refreshing, it must belong to the same update graph as the table being listened to. Default is
None.
Dependencies ensure that the listener can safely access the dependent tables during its execution. This
mainly includes reading the data from the tables. While performing operations on the dependent tables in
the listener is safe, it is not recommended because reading or operating on the result tables of those
operations may not be safe. It is best to perform the operations on the dependent tables beforehand,
and then add the result tables as dependencies to the listener so that they can be safely read in it.
Returns:
a TableListenerHandle
Raises:
DHError
"""
table_listener_handle = TableListenerHandle(t=t, dependencies=dependencies, listener=listener,
description=description)
table_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock)
return table_listener_handle
Loading

0 comments on commit f3cd3b4

Please sign in to comment.