Skip to content

Commit

Permalink
Refactor to use 'strategy' pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Jul 8, 2024
1 parent f4f824c commit b191ce2
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 64 deletions.
128 changes: 70 additions & 58 deletions py/server/deephaven/_table_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#
"""This module supports reading the data in a Deephaven table in a chunked manner."""
from collections import namedtuple
from typing import Union, Sequence, Generator, Dict, Optional, Any, Tuple
from typing import Union, Sequence, Generator, Dict, Optional, Any, Tuple, Callable, TypeVar

import jpy
import numpy as np
Expand All @@ -16,6 +16,7 @@

_JTableUpdateDataReader = jpy.get_type("io.deephaven.integrations.python.PythonTableDataReader")

T = TypeVar('T')

def _col_defs(table: Table, cols: Union[str, Sequence[str]]) -> Sequence[Column]:
if not cols:
Expand Down Expand Up @@ -54,19 +55,15 @@ def _table_reader_all_dict(table: Table, cols: Optional[Union[str, Sequence[str]
with update_graph.auto_locking_ctx(table):
try:
j_array = _JTableUpdateDataReader.readChunkColumnMajor(j_reader_context, row_set, col_sources, prev)

col_dict = {}
for i, col_def in enumerate(col_defs):
col_dict[col_def.name] = _column_to_numpy_array(col_def, j_array[i]) if to_numpy else j_array[i]

return col_dict
return {col_def.name: _column_to_numpy_array(col_def, j_array[i]) if to_numpy else j_array[i] for i, col_def in enumerate(col_defs)}
finally:
j_reader_context.close()


def _table_reader_chunk_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, row_set: jpy.JType,
chunk_size: int = 2048, prev: bool = False, to_numpy: bool = True) \
-> Generator[Dict[str, Union[np.ndarray | jpy.JType]], None, None]:
def _table_reader_chunk(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *,
emitter: Callable[[Sequence[Column], jpy.JType], Generator[T, None, None]], row_set: jpy.JType,
chunk_size: int = 2048, prev: bool = False) \
-> Generator[T, None, None]:
""" A generator that reads the chunks of rows over the given row set of a table into a dictionary. The dictionary is
a map of column names to numpy arrays or Java arrays.
Expand All @@ -76,7 +73,6 @@ def _table_reader_chunk_dict(table: Table, cols: Optional[Union[str, Sequence[st
row_set (jpy.JType): The row set to read.
chunk_size (int): The number of rows to read at a time. Default is 4096.
prev (bool): If True, read the previous values. Default is False.
to_numpy (bool): If True, convert the column data to numpy arrays. Default is True.
Returns:
A generator that yields a dictionary of column names to numpy arrays or Java arrays.
Expand All @@ -98,51 +94,41 @@ def _table_reader_chunk_dict(table: Table, cols: Optional[Union[str, Sequence[st
chunk_row_set = row_sequence_iterator.getNextRowSequenceWithLength(chunk_size)
j_array = _JTableUpdateDataReader.readChunkColumnMajor(j_reader_context, chunk_row_set, col_sources,
prev)

col_dict = {}
for i, col_def in enumerate(col_defs):
col_dict[col_def.name] = _column_to_numpy_array(col_def, j_array[i]) if to_numpy else j_array[i]

yield col_dict
yield from emitter(col_defs, j_array)
finally:
j_reader_context.close()
row_sequence_iterator.close()


def _table_reader_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, chunk_size: int = 2048) \
-> Generator[Dict[str, Any], None, None]:
""" A generator that reads one row at a time from a table into a dictionary. The dictionary is a map of column names
to scalar values of the column data type.
def _table_reader_chunk_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, row_set: jpy.JType,
chunk_size: int = 2048, prev: bool = False) \
-> Generator[Dict[str, np.ndarray], None, None]:
""" A generator that reads the chunks of rows over the given row set of a table into a dictionary. The dictionary is
a map of column names to numpy arrays or Java arrays.
Args:
table (Table): The table to read.
table (Table): The table to read.
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read.
chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary
crossings. Default is 2048.
row_set (jpy.JType): The row set to read.
chunk_size (int): The number of rows to read at a time. Default is 4096.
prev (bool): If True, read the previous values. Default is False.
Returns:
A generator that yields a dictionary of column names to values.
A generator that yields a dictionary of column names to numpy arrays or Java arrays.
Raises:
ValueError
"""
col_defs = _col_defs(table, cols)
def _emitter(col_defs, j_array) -> Generator[Dict[str, np.ndarray], None, None]:
yield {col_def.name: _column_to_numpy_array(col_def, j_array[i]) for i, col_def in enumerate(col_defs)}

for chunk_dict in _table_reader_chunk_dict(table, cols=cols, row_set=table.j_table.getRowSet(),
chunk_size=chunk_size,
prev=False, to_numpy=False):
chunk_size = len(chunk_dict[col_defs[0].name])
for i in range(chunk_size):
col_dict = {}
for col_def in col_defs:
col_dict[col_def.name] = chunk_dict[col_def.name][i]
yield col_dict
yield from _table_reader_chunk(table, cols, emitter=_emitter, row_set=row_set, chunk_size=chunk_size, prev=prev)


def _table_reader_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven',
chunk_size: int = 2048) -> Generator[Tuple[Any, ...], None, None]:
""" Returns a generator that reads one row at a time from the table into a named tuple. The named tuple is made
up of fields with their names being the column names and their values being of the column data types.
def _table_reader_chunk_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *,
tuple_name: str = 'Deephaven', chunk_size: int = 2048,) -> Generator[Tuple[np.ndarray, ...], None, None]:
""" Returns a generator that reads one chunk of rows at a time from the table into a named tuple. The named
tuple is made up of fields with their names being the column names and their values being numpy arrays of the
column data types.
If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire
the shared lock of the update graph before reading the table data. This provides a consistent view of the data.
Expand All @@ -154,28 +140,51 @@ def _table_reader_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]]
Args:
table (Table): The table to read.
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. Default is None.
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read.
tuple_name (str): The name of the named tuple. Default is 'Deephaven'.
chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary
crossings. Default is 2048.
chunk_size (int): The number of rows to read at a time. Default is 4096.
Returns:
A generator that yields a named tuple for each row in the table
A generator that yields a named tuple for each row in the table.
Raises:
ValueError
"""
named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False)

for row in _table_reader_dict(table, cols, chunk_size=chunk_size):
yield named_tuple_class(**row)
def _emitter(col_defs, j_array) -> Generator[Tuple[np.ndarray, ...], None, None]:
yield named_tuple_class._make((_column_to_numpy_array(col_def, j_array[i]) for i, col_def in enumerate(col_defs)))

yield from _table_reader_chunk(table, cols, emitter=_emitter, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, prev=False)

def _table_reader_chunk_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *,
tuple_name: str = 'Deephaven', chunk_size: int = 2048,) -> Generator[Tuple[np.ndarray, ...], None, None]:
""" Returns a generator that reads one chunk of rows at a time from the table into a named tuple. The named
tuple is made up of fields with their names being the column names and their values being numpy arrays of the
column data types.
def _table_reader_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, chunk_size: int = 2048) \
-> Generator[Dict[str, Any], None, None]:
""" A generator that reads one row at a time from a table into a dictionary. The dictionary is a map of column names
to scalar values of the column data type.
Args:
table (Table): The table to read.
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read.
chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary
crossings. Default is 2048.
Returns:
A generator that yields a dictionary of column names to values.
Raises:
ValueError
"""
def _emitter(col_defs, j_array) -> Generator[Dict[str, Any], None, None]:
row_count = len(j_array[0])
for i in range(row_count):
yield {col_def.name: j_array[j][i] for j, col_def in enumerate(col_defs)}

yield from _table_reader_chunk(table, cols, emitter=_emitter, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, prev=False)

def _table_reader_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven',
chunk_size: int = 2048) -> Generator[Tuple[Any, ...], None, None]:
""" Returns a generator that reads one row at a time from the table into a named tuple. The named tuple is made
up of fields with their names being the column names and their values being of the column data types.
If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire
the shared lock of the update graph before reading the table data. This provides a consistent view of the data.
Expand All @@ -187,19 +196,22 @@ def _table_reader_chunk_tuple(table: Table, cols: Optional[Union[str, Sequence[s
Args:
table (Table): The table to read.
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read.
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. Default is None.
tuple_name (str): The name of the named tuple. Default is 'Deephaven'.
chunk_size (int): The number of rows to read at a time. Default is 4096.
chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary
crossings. Default is 2048.
Returns:
A generator that yields a named tuple for each row in the table.
A generator that yields a named tuple for each row in the table
Raises:
ValueError
"""
named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False)

for chunk_dict in _table_reader_chunk_dict(table, cols=cols, row_set=table.j_table.getRowSet(),
chunk_size=chunk_size,
prev=False, to_numpy=True):
yield named_tuple_class(**chunk_dict)
def _emitter(col_defs, j_array) -> Generator[Tuple[Any, ...], None, None]:
row_count = len(j_array[0])
for i in range(row_count):
yield named_tuple_class._make((j_array[j][i] for j, _ in enumerate(col_defs)))

yield from _table_reader_chunk(table, cols, emitter=_emitter, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, prev=False)
2 changes: 1 addition & 1 deletion py/server/deephaven/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ def iter_chunk_dict(self, cols: Optional[Union[str, Sequence[str]]] = None, chun
from deephaven._table_reader import _table_reader_chunk_dict # to prevent circular import

return _table_reader_chunk_dict(self, cols=cols, row_set=self.j_table.getRowSet(), chunk_size=chunk_size,
prev=False, to_numpy=True)
prev=False)

def iter_chunk_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven',
chunk_size: int = 2048,)-> Generator[Tuple[np.ndarray, ...], None, None]:
Expand Down
10 changes: 5 additions & 5 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def added_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> G
return (_ for _ in ())

return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(),
chunk_size=chunk_size, prev=False, to_numpy=True)
chunk_size=chunk_size, prev=False)

def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]:
"""Returns a dict with each key being a column name and each value being a NumPy array of
Expand All @@ -83,7 +83,7 @@ def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray
return {}

return _table_reader_all_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(),
prev=True, to_numpy=True)
prev=True)

def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Generator[
Dict[str, numpy.ndarray], None, None]:
Expand All @@ -101,7 +101,7 @@ def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) ->
return (_ for _ in ())

return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(),
chunk_size=chunk_size, prev=True, to_numpy=True)
chunk_size=chunk_size, prev=True)

def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]:
"""Returns a dict with each key being a column name and each value being a NumPy array of the current values of
Expand Down Expand Up @@ -135,7 +135,7 @@ def modified_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -
return (_ for _ in ())

return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(),
chunk_size=chunk_size, prev=False, to_numpy=True)
chunk_size=chunk_size, prev=False)

def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]:
"""Returns a dict with each key being a column name and each value being a NumPy array of the previous values of
Expand Down Expand Up @@ -169,7 +169,7 @@ def modified_prev_chunks(self, chunk_size: int, cols: Union[str, List[str]] = No
return (_ for _ in ())

return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(),
chunk_size=chunk_size, prev=True, to_numpy=True)
chunk_size=chunk_size, prev=True)

@property
def shifted(self):
Expand Down

0 comments on commit b191ce2

Please sign in to comment.