From b1feeeb00afe14ec69b21ed955c0dc4b81f08266 Mon Sep 17 00:00:00 2001 From: Jianfeng Mao <4297243+jmao-denver@users.noreply.github.com> Date: Wed, 10 Jul 2024 10:36:31 -0600 Subject: [PATCH] feat: generator-based chunky Table data iteration (#5595) Fixes #5186 --------- Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com> --- ...Reader.java => PythonTableDataReader.java} | 2 +- py/server/deephaven/_table_reader.py | 284 +++++++++++++ py/server/deephaven/table.py | 114 +++++- py/server/deephaven/table_listener.py | 90 +---- py/server/tests/test_table_iterator.py | 379 ++++++++++++++++++ 5 files changed, 795 insertions(+), 74 deletions(-) rename Integrations/src/main/java/io/deephaven/integrations/python/{PythonListenerTableUpdateDataReader.java => PythonTableDataReader.java} (98%) create mode 100644 py/server/deephaven/_table_reader.py create mode 100644 py/server/tests/test_table_iterator.py diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonListenerTableUpdateDataReader.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonTableDataReader.java similarity index 98% rename from Integrations/src/main/java/io/deephaven/integrations/python/PythonListenerTableUpdateDataReader.java rename to Integrations/src/main/java/io/deephaven/integrations/python/PythonTableDataReader.java index 1a0180f04df..9334952b9db 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonListenerTableUpdateDataReader.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonTableDataReader.java @@ -21,7 +21,7 @@ * An efficient reader for a Python table listener to extract columnar data based on the {@link RowSequence} in the * {@link TableUpdate} */ -public class PythonListenerTableUpdateDataReader { +public class PythonTableDataReader { /** * Factory method for instance of {@link Context} diff --git a/py/server/deephaven/_table_reader.py b/py/server/deephaven/_table_reader.py new file mode 100644 index 00000000000..bdb5de8af79 --- /dev/null +++ b/py/server/deephaven/_table_reader.py @@ -0,0 +1,284 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# +"""This module supports reading the data in a Deephaven table in a chunked manner.""" +from collections import namedtuple +from typing import Union, Sequence, Generator, Dict, Optional, Any, Tuple, Callable, TypeVar, Iterable + +import jpy +import numpy as np +from deephaven import update_graph + +from deephaven.column import Column +from deephaven.jcompat import to_sequence +from deephaven.numpy import _column_to_numpy_array +from deephaven.table import Table + +_JTableUpdateDataReader = jpy.get_type("io.deephaven.integrations.python.PythonTableDataReader") + +T = TypeVar('T') + +def _col_defs(table: Table, cols: Union[str, Sequence[str]]) -> Sequence[Column]: + if not cols: + col_defs = table.columns + else: + cols = to_sequence(cols) + col_defs = [col for col in table.columns if col.name in cols] + if len(col_defs) != len(cols): + raise ValueError(f"Invalid column names: {set(cols) - {col.name for col in col_defs} }") + + return col_defs + + +def _table_reader_all(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, + emitter: Callable[[Sequence[Column], jpy.JType], T], row_set: jpy.JType, + prev: bool = False) -> T: + """ Reads all the rows in the given row set of a table. The emitter converts the Java data into a desired Python + object. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) by setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + table (Table): The table to read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + emitter (Callable[[Sequence[Column], jpy.JType], T): The function that takes the column definitions + and the column data in the form of Java arrays and returns a Python object, usually a collection, such as + dictionary, tuple. + row_set (jpy.JType): The row set to read. + prev (bool): If True, read the previous values. Default is False. + + Returns: + A Python collection object, usually a dictionary or tuple. + + Raises: + ValueError + """ + col_defs = _col_defs(table, cols) + + col_sources = [table.j_table.getColumnSource(col_def.name) for col_def in col_defs] + j_reader_context = _JTableUpdateDataReader.makeContext(row_set.size(), *col_sources) + with update_graph.auto_locking_ctx(table): + try: + j_array = _JTableUpdateDataReader.readChunkColumnMajor(j_reader_context, row_set, col_sources, prev) + return emitter(col_defs, j_array) + finally: + j_reader_context.close() + + +def _table_reader_all_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, row_set: jpy.JType, + prev: bool = False, to_numpy: bool = True) -> Dict[str, Union[np.ndarray | jpy.JType]]: + """ Reads all the rows in the given row set of a table into a dictionary. The dictionary is a map of column names + to numpy arrays or Java arrays. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) by setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + table (Table): The table to read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + row_set (jpy.JType): The row set to read. + prev (bool): If True, read the previous values. Default is False. + to_numpy (bool): If True, convert the column data to numpy arrays. Default is True. + + Returns: + A dictionary of column names to numpy arrays or Java arrays. + + Raises: + ValueError + """ + _emitter = lambda col_defs, j_array: {col_def.name: _column_to_numpy_array(col_def, j_array[i]) if to_numpy else j_array[i] + for i, col_def in enumerate(col_defs)} + return _table_reader_all(table, cols, emitter=_emitter, row_set=row_set, prev=prev) + + +def _table_reader_chunk(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, + emitter: Callable[[Sequence[Column], jpy.JType], Iterable[T]], row_set: jpy.JType, + chunk_size: int = 2048, prev: bool = False) \ + -> Generator[T, None, None]: + """ Returns a generator that reads one chunk of rows at a time from the table. The emitter converts the Java chunk + into the generator output value. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) by setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + table (Table): The table to read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + emitter (Callable[[Sequence[Column], jpy.JType], Iterable[T]]): The function that takes the column definitions + and the column data in the form of Java arrays and returns an Iterable. + row_set (jpy.JType): The row set to read. + chunk_size (int): The number of rows to read at a time. Default is 2048. + prev (bool): If True, read the previous values. Default is False. + + Returns: + A generator that yields the desired Python type of the emitter. + + Raises: + ValueError + """ + if chunk_size < 0: + raise ValueError("chunk_size must not be negative.") + + col_defs = _col_defs(table, cols) + + row_sequence_iterator = row_set.getRowSequenceIterator() + col_sources = [table.j_table.getColumnSource(col_def.name) for col_def in col_defs] + j_reader_context = _JTableUpdateDataReader.makeContext(chunk_size, *col_sources) + with update_graph.auto_locking_ctx(table): + try: + while row_sequence_iterator.hasMore(): + chunk_row_set = row_sequence_iterator.getNextRowSequenceWithLength(chunk_size) + j_array = _JTableUpdateDataReader.readChunkColumnMajor(j_reader_context, chunk_row_set, col_sources, + prev) + yield from emitter(col_defs, j_array) + finally: + j_reader_context.close() + row_sequence_iterator.close() + +def _table_reader_chunk_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, row_set: jpy.JType, + chunk_size: int = 2048, prev: bool = False) \ + -> Generator[Dict[str, np.ndarray], None, None]: + """ Returns a generator that reads one chunk of rows at a time from the table into a dictionary. The dictionary is + a map of column names to numpy arrays or Java arrays. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) by setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + table (Table): The table to read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + row_set (jpy.JType): The row set to read. + chunk_size (int): The number of rows to read at a time. Default is 2048. + prev (bool): If True, read the previous values. Default is False. + + Returns: + A generator that yields a dictionary of column names to numpy arrays or Java arrays. + + Raises: + ValueError + """ + def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Generator[Dict[str, np.ndarray], None, None]: + yield {col_def.name: _column_to_numpy_array(col_def, j_array[i]) for i, col_def in enumerate(col_defs)} + + return _table_reader_chunk(table, cols, emitter=_emitter, row_set=row_set, chunk_size=chunk_size, prev=prev) + + +def _table_reader_chunk_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, + tuple_name: str = 'Deephaven', chunk_size: int = 2048,) -> Generator[Tuple[np.ndarray, ...], None, None]: + """ Returns a generator that reads one chunk of rows at a time from the table into a named tuple. The named + tuple is made up of fields with their names being the column names and their values being numpy arrays of the + column data types. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) by setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + table (Table): The table to read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + tuple_name (str): The name of the named tuple. Default is 'Deephaven'. + chunk_size (int): The number of rows to read at a time. Default is 2048. + + Returns: + A generator that yields a named tuple for each row in the table. + + Raises: + ValueError + """ + named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False) + + def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Generator[Tuple[np.ndarray], None, None]: + yield named_tuple_class._make([_column_to_numpy_array(col_def, j_array[i]) for i, col_def in enumerate(col_defs)]) + + return _table_reader_chunk(table, cols, emitter=_emitter, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, prev=False) + +def _table_reader_row_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, chunk_size: int = 2048) \ + -> Generator[Dict[str, Any], None, None]: + """ A generator that reads one row at a time from a table into a dictionary. The dictionary is a map of column names + to scalar values of the column data type. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) by setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + table (Table): The table to read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary + crossings. Default is 2048. + + Returns: + A generator that yields a dictionary of column names to values. + + Raises: + ValueError + """ + def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Iterable[Dict[str, Any]]: + make_dict = lambda values: {col_def.name: value for col_def, value in zip(col_defs, values)} + mvs = [memoryview(j_array[i]) if col_def.data_type.is_primitive else j_array[i] for i, col_def in enumerate(col_defs)] + return map(make_dict, zip(*mvs)) + + return _table_reader_chunk(table, cols, emitter=_emitter, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, prev=False) + +def _table_reader_row_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven', + chunk_size: int = 2048) -> Generator[Tuple[Any, ...], None, None]: + """ Returns a generator that reads one row at a time from the table into a named tuple. The named tuple is made + up of fields with their names being the column names and their values being of the column data types. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) by setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + table (Table): The table to read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. Default is None. + tuple_name (str): The name of the named tuple. Default is 'Deephaven'. + chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary + crossings. Default is 2048. + + Returns: + A generator that yields a named tuple for each row in the table + + Raises: + ValueError + """ + named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False) + + def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Iterable[Tuple[Any, ...]]: + mvs = [memoryview(j_array[i]) if col_def.data_type.is_primitive else j_array[i] for i, col_def in enumerate(col_defs)] + return map(named_tuple_class._make, zip(*mvs)) + + return _table_reader_chunk(table, cols, emitter=_emitter, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, prev=False) diff --git a/py/server/deephaven/table.py b/py/server/deephaven/table.py index df6cc4b3e93..b29d911801f 100644 --- a/py/server/deephaven/table.py +++ b/py/server/deephaven/table.py @@ -11,10 +11,11 @@ import inspect from enum import Enum from enum import auto -from typing import Any, Optional, Callable, Dict +from typing import Any, Optional, Callable, Dict, Generator, Tuple from typing import Sequence, List, Union, Protocol import jpy +import numpy as np from deephaven import DHError from deephaven import dtypes @@ -503,6 +504,117 @@ def meta_table(self) -> Table: def j_object(self) -> jpy.JType: return self.j_table + def iter_dict(self, cols: Optional[Union[str, Sequence[str]]] = None, *, chunk_size: int = 2048) \ + -> Generator[Dict[str, Any], None, None]: + """ Returns a generator that reads one row at a time from the table into a dictionary. The dictionary is a map + of column names to scalar values of the column data type. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary + crossings. Default is 2048. + + Returns: + A generator that yields a dictionary of column names to scalar values. + + Raises: + ValueError + """ + from deephaven._table_reader import _table_reader_row_dict # to prevent circular import + return _table_reader_row_dict(self, cols, chunk_size=chunk_size) + + def iter_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, *, tuple_name: str = 'Deephaven', + chunk_size: int = 2048) -> Generator[Tuple[Any, ...], None, None]: + """ Returns a generator that reads one row at a time from the table into a named tuple. The named tuple is made + up of fields with their names being the column names and their values being of the column data types. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. Default is None. + tuple_name (str): The name of the named tuple. Default is 'Deephaven'. + chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary + crossings. Default is 2048. + + Returns: + A generator that yields a named tuple for each row in the table + + Raises: + ValueError + """ + from deephaven._table_reader import _table_reader_row_tuple # to prevent circular import + return _table_reader_row_tuple(self, cols, tuple_name = tuple_name, chunk_size = chunk_size) + + def iter_chunk_dict(self, cols: Optional[Union[str, Sequence[str]]] = None, chunk_size: int = 2048) \ + -> Generator[Dict[str, np.ndarray], None, None]: + """ Returns a generator that reads one chunk of rows at a time from the table into a dictionary. The dictionary + is a map of column names to numpy arrays of the column data type. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + chunk_size (int): The number of rows to read at a time. Default is 2048. + + Returns: + A generator that yields a dictionary of column names to numpy arrays. + + Raises + ValueError + """ + from deephaven._table_reader import _table_reader_chunk_dict # to prevent circular import + + return _table_reader_chunk_dict(self, cols=cols, row_set=self.j_table.getRowSet(), chunk_size=chunk_size, + prev=False) + + def iter_chunk_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven', + chunk_size: int = 2048,)-> Generator[Tuple[np.ndarray, ...], None, None]: + """ Returns a generator that reads one chunk of rows at a time from the table into a named tuple. The named + tuple is made up of fields with their names being the column names and their values being numpy arrays of the + column data types. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + tuple_name (str): The name of the named tuple. Default is 'Deephaven'. + chunk_size (int): The number of rows to read at a time. Default is 2048. + + Returns: + A generator that yields a named tuple for each row in the table. + + Raises: + ValueError + """ + from deephaven._table_reader import _table_reader_chunk_tuple # to prevent circular import + return _table_reader_chunk_tuple(self, cols=cols, tuple_name=tuple_name, chunk_size=chunk_size) + def has_columns(self, cols: Union[str, Sequence[str]]): """Whether this table contains a column for each of the provided names, return False if any of the columns is not in the table. diff --git a/py/server/deephaven/table_listener.py b/py/server/deephaven/table_listener.py index 624132b29b0..c7db8af827e 100644 --- a/py/server/deephaven/table_listener.py +++ b/py/server/deephaven/table_listener.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from functools import wraps from inspect import signature -from typing import Callable, Union, List, Generator, Dict, Optional, Literal, Sequence +from typing import Callable, Union, List, Generator, Dict, Literal, Sequence import jpy import numpy @@ -14,51 +14,13 @@ from deephaven import DHError from deephaven import update_graph from deephaven._wrapper import JObjectWrapper -from deephaven.column import Column from deephaven.jcompat import to_sequence -from deephaven.numpy import _column_to_numpy_array from deephaven.table import Table +from deephaven._table_reader import _table_reader_chunk_dict, _table_reader_all_dict from deephaven.update_graph import UpdateGraph _JPythonReplayListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonReplayListenerAdapter") _JTableUpdate = jpy.get_type("io.deephaven.engine.table.TableUpdate") -_JTableUpdateDataReader = jpy.get_type("io.deephaven.integrations.python.PythonListenerTableUpdateDataReader") - - -def _col_defs(table: Table, cols: Union[str, List[str]]) -> List[Column]: - if not cols: - col_defs = table.columns - else: - cols = to_sequence(cols) - col_defs = [col for col in table.columns if col.name in cols] - - return col_defs - - -def _changes_to_numpy(table: Table, cols: Union[str, List[str]], row_set, chunk_size: Optional[int], - prev: bool = False) -> Generator[Dict[str, numpy.ndarray], None, None]: - col_defs = _col_defs(table, cols) - - row_sequence_iterator = row_set.getRowSequenceIterator() - col_sources = [table.j_table.getColumnSource(col_def.name) for col_def in col_defs] - chunk_size = row_set.size() if not chunk_size else chunk_size - j_reader_context = _JTableUpdateDataReader.makeContext(chunk_size, *col_sources) - try: - while row_sequence_iterator.hasMore(): - chunk_row_set = row_sequence_iterator.getNextRowSequenceWithLength(chunk_size) - - j_array = _JTableUpdateDataReader.readChunkColumnMajor(j_reader_context, chunk_row_set, col_sources, prev) - - col_dict = {} - for i, col_def in enumerate(col_defs): - np_array = _column_to_numpy_array(col_def, j_array[i]) - col_dict[col_def.name] = np_array - - yield col_dict - finally: - j_reader_context.close() - row_sequence_iterator.close() - class TableUpdate(JObjectWrapper): """A TableUpdate object represents a table update event. It contains the added, removed, and modified rows in the @@ -86,12 +48,8 @@ def added(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: if not self.j_table_update.added: return {} - try: - return next( - _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(), - chunk_size=None)) - except StopIteration: - return {} + return _table_reader_all_dict(table=self.table, cols=cols, row_set= self.j_table_update.added.asRowSet(), + prev=False, to_numpy=True) def added_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Generator[ Dict[str, numpy.ndarray], None, None]: @@ -108,8 +66,8 @@ def added_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> G if not self.j_table_update.added: return (_ for _ in ()) - return _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(), - chunk_size=chunk_size) + return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(), + chunk_size=chunk_size, prev=False) def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: """Returns a dict with each key being a column name and each value being a NumPy array of @@ -124,12 +82,8 @@ def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray if not self.j_table_update.removed: return {} - try: - return next( - _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), - chunk_size=None, prev=True)) - except StopIteration: - return {} + return _table_reader_all_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), + prev=True) def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Generator[ Dict[str, numpy.ndarray], None, None]: @@ -146,8 +100,8 @@ def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> if not self.j_table_update.removed: return (_ for _ in ()) - return _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), - chunk_size=chunk_size, prev=True) + return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), + chunk_size=chunk_size, prev=True) def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: """Returns a dict with each key being a column name and each value being a NumPy array of the current values of @@ -162,12 +116,8 @@ def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarra if not self.j_table_update.modified: return {} - try: - return next( - _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), - chunk_size=None)) - except StopIteration: - return {} + return _table_reader_all_dict(table=self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + prev=False, to_numpy=True) def modified_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Generator[ Dict[str, numpy.ndarray], None, None]: @@ -184,8 +134,8 @@ def modified_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) - if not self.j_table_update.modified: return (_ for _ in ()) - return _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), - chunk_size=chunk_size) + return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + chunk_size=chunk_size, prev=False) def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: """Returns a dict with each key being a column name and each value being a NumPy array of the previous values of @@ -200,12 +150,8 @@ def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.n if not self.j_table_update.modified: return {} - try: - return next( - _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), - chunk_size=None, prev=True)) - except StopIteration: - return {} + return _table_reader_all_dict(table=self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + prev=True, to_numpy=True) def modified_prev_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Generator[ Dict[str, numpy.ndarray], None, None]: @@ -222,8 +168,8 @@ def modified_prev_chunks(self, chunk_size: int, cols: Union[str, List[str]] = No if not self.j_table_update.modified: return (_ for _ in ()) - return _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), - chunk_size=chunk_size, prev=True) + return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + chunk_size=chunk_size, prev=True) @property def shifted(self): diff --git a/py/server/tests/test_table_iterator.py b/py/server/tests/test_table_iterator.py new file mode 100644 index 00000000000..465ba913453 --- /dev/null +++ b/py/server/tests/test_table_iterator.py @@ -0,0 +1,379 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# +import unittest +from dataclasses import dataclass + +import numpy as np + +import deephaven.dtypes as dtypes +from deephaven import read_csv, time_table, new_table +from deephaven import update_graph as ug +from deephaven.column import bool_col, byte_col, char_col, short_col, int_col, long_col, float_col, double_col, \ + string_col, datetime_col, pyobj_col, jobj_col +from deephaven.jcompat import j_array_list + +from tests.testbase import BaseTestCase + + +class TableIteratorTestCase(BaseTestCase): + def test_iteration_in_chunks(self): + with self.subTest("Read chunks of rows in a static table"): + test_table = read_csv("tests/data/test_table.csv") + total_read_size = 0 + for d in test_table.iter_chunk_dict(chunk_size=10): + self.assertEqual(len(d), len(test_table.columns)) + for col in test_table.columns: + self.assertIn(col.name, d) + self.assertEqual(d[col.name].dtype, col.data_type.np_type) + self.assertLessEqual(len(d[col.name]), 10) + self.assertEqual(isinstance(d[col.name], np.ndarray), True) + total_read_size += len(d[col.name]) + self.assertEqual(total_read_size, test_table.size) + + with self.subTest("Read chunks of rows in a ticking table"): + test_table = time_table("PT00:00:00.001").update(["X=i%11"]).sort("X") + test_table.await_update() + total_read_size = 0 + for d in test_table.iter_chunk_dict(chunk_size=100): + self.assertEqual(len(d), len(test_table.columns)) + for col in test_table.columns: + self.assertIn(col.name, d) + self.assertEqual(d[col.name].dtype, col.data_type.np_type) + self.assertLessEqual(len(d[col.name]), 100) + total_read_size += len(d[col.name]) + self.assertEqual(total_read_size, test_table.size) + self.assertFalse(ug.has_shared_lock(test_table)) + + with self.subTest("Read chunks of rows on selected columns in a ticking table under shared lock"): + test_table = time_table("PT00:00:00.001").update( + ["X=i", "Y=(double)i*10", "Z= i%2 == 0? true : false"]).sort("X") + test_table.await_update() + with ug.shared_lock(test_table): + total_read_size = 0 + cols = ["X", "Z"] + for d in test_table.iter_chunk_dict(cols=cols, chunk_size=100): + self.assertEqual(len(d), len(cols)) + for col in cols: + self.assertIn(col, d) + self.assertLessEqual(len(d[col]), 100) + total_read_size += len(d[col]) + self.assertEqual(total_read_size, test_table.size) + + def test_iteration_in_rows(self): + with self.subTest("Read in rows of a static table"): + test_table = read_csv("tests/data/test_table.csv") + total_read_size = 0 + for d in test_table.iter_dict(): + self.assertEqual(len(d), len(test_table.columns)) + for col in test_table.columns: + self.assertIn(col.name, d) + self.assertTrue(np.can_cast(col.data_type.np_type, np.dtype(type(d[col.name])))) + total_read_size += 1 + self.assertEqual(total_read_size, test_table.size) + + with self.subTest("Read in rows of a ticking table"): + test_table = time_table("PT00:00:00.001").update(["X=i%11"]).sort("X") + test_table.await_update() + total_read_size = 0 + for d in test_table.iter_dict(): + self.assertEqual(len(d), len(test_table.columns)) + for col in test_table.columns: + self.assertIn(col.name, d) + v_type = type(d[col.name]) + self.assertTrue(np.can_cast(col.data_type.np_type, np.dtype(v_type)) or + self.assertEqual(v_type, col.data_type.j_type)) + total_read_size += 1 + self.assertEqual(total_read_size, test_table.size) + self.assertFalse(ug.has_shared_lock(test_table)) + + with self.subTest("Read in rows on selected columns of a ticking table under shared lock"): + test_table = time_table("PT00:00:00.001").update( + ["X=i", "Y=(double)i*10", "Z= i%2 == 0? true : false"]).sort("X") + test_table.await_update() + with ug.shared_lock(test_table): + total_read_size = 0 + cols = ["X", "Z"] + for d in test_table.iter_dict(cols=cols): + self.assertEqual(len(d), len(cols)) + for col in cols: + self.assertIn(col, d) + total_read_size += 1 + self.assertEqual(total_read_size, test_table.size) + self.assertTrue(ug.has_shared_lock(test_table)) + self.assertFalse(ug.has_shared_lock(test_table)) + + def test_direct_call_chunks(self): + with self.subTest("direct call rows in a static table"): + test_table = read_csv("tests/data/test_table.csv") + t_iter = test_table.iter_chunk_dict(chunk_size=10) + for d in t_iter: + self.assertEqual(len(d), len(test_table.columns)) + for col in test_table.columns: + self.assertIn(col.name, d) + self.assertEqual(d[col.name].dtype, col.data_type.np_type) + + with self.assertRaises(StopIteration): + next(t_iter) + + with self.subTest("direct call all rows in a ticking table"): + test_table = time_table("PT00:00:00.001").update( + ["X=i", "Y=(double)i*10", "Z= i%2 == 0? true : false"]).sort("X") + test_table.await_update() + total_read_size = 0 + cols = ["X", "Z"] + t_iter = test_table.iter_chunk_dict(cols=cols, chunk_size=100) + while True: + try: + d = next(t_iter) + self.assertEqual(len(d), len(cols)) + for col in cols: + self.assertIn(col, d) + total_read_size += len(d[col]) + except StopIteration: + break + # the table can't refresh with the lock, so the total read size must be the same as the table size + self.assertEqual(total_read_size, test_table.size) + self.assertFalse(ug.has_shared_lock(test_table)) + + with self.subTest("direct call not all rows in a ticking table"): + test_table = time_table("PT00:00:00.001").update( + ["X=i", "Y=(double)i*10", "Z= i%2 == 0? true : false"]).sort("X") + test_table.await_update() + cols = ["X", "Z"] + t_iter = test_table.iter_chunk_dict(cols=cols, chunk_size=100) + while True: + d = next(t_iter) + self.assertEqual(len(d), len(cols)) + for col in cols: + self.assertIn(col, d) + self.assertLessEqual(len(d[col]), 100) + total_read_size += len(d[col]) + break + self.assertTrue(ug.has_shared_lock(test_table)) + t_iter = None + self.assertFalse(ug.has_shared_lock(test_table)) + + def test_direct_call_rows(self): + with self.subTest("direct call rows in a static table"): + test_table = read_csv("tests/data/test_table.csv") + t_iter = test_table.iter_dict() + for d in t_iter: + self.assertEqual(len(d), len(test_table.columns)) + for col in test_table.columns: + self.assertIn(col.name, d) + self.assertTrue(np.can_cast(col.data_type.np_type, np.dtype(type(d[col.name])))) + + with self.assertRaises(StopIteration): + next(t_iter) + + with self.subTest("direct call all rows in a ticking table"): + test_table = time_table("PT00:00:00.001").update( + ["X=i", "Y=(double)i*10", "Z= i%2 == 0? true : false"]).sort("X") + test_table.await_update() + total_read_size = 0 + cols = ["X", "Z"] + t_iter = test_table.iter_dict(cols=cols) + while True: + try: + d = next(t_iter) + self.assertEqual(len(d), len(cols)) + for col in cols: + self.assertIn(col, d) + total_read_size += 1 + except StopIteration: + break + # the table can't refresh with the lock, so the total read size must be the same as the table size + self.assertEqual(total_read_size, test_table.size) + self.assertFalse(ug.has_shared_lock(test_table)) + + with self.subTest("direct call not all rows in a ticking table"): + test_table = time_table("PT00:00:00.001").update( + ["X=i", "Y=(double)i*10", "Z= i%2 == 0? true : false"]).sort("X") + test_table.await_update() + cols = ["X", "Z"] + t_iter = test_table.iter_chunk_dict(cols=cols) + while True: + d = next(t_iter) + self.assertEqual(len(d), len(cols)) + for col in cols: + self.assertIn(col, d) + total_read_size += 1 + break + self.assertTrue(ug.has_shared_lock(test_table)) + t_iter = None + self.assertFalse(ug.has_shared_lock(test_table)) + + def test_data_types(self): + @dataclass + class CustomClass: + f1: int + f2: str + + j_array_list1 = j_array_list([1, -1]) + j_array_list2 = j_array_list([2, -2]) + + input_cols = [ + bool_col(name="Boolean", data=[True, False]), + byte_col(name="Byte", data=(1, -1)), + char_col(name="Char", data='-1'), + short_col(name="Short", data=[1, -1]), + int_col(name="Int", data=[1, -1]), + long_col(name="Long", data=[1, 2 ** 63 - 1]), + long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)), + float_col(name="Float", data=[1.01, -1.01]), + double_col(name="Double", data=[1.01, -1.01]), + string_col(name="String", data=["foo", "bar"]), + datetime_col(name="Datetime", data=[1, -1]), + pyobj_col(name="PyObj", data=[CustomClass(1, "1"), CustomClass(-1, "-1")]), + jobj_col(name="JObj", data=[j_array_list1, j_array_list2]), + ] + test_table = new_table(cols=input_cols) + + with self.subTest("Chunks"): + for d in test_table.iter_chunk_dict(chunk_size=10): + self.assertEqual(len(d), len(test_table.columns)) + for col in test_table.columns: + self.assertIn(col.name, d) + self.assertEqual(dtypes.from_np_dtype(d[col.name].dtype).np_type, col.data_type.np_type) + self.assertEqual(isinstance(d[col.name], np.ndarray), True) + + with self.subTest("Rows"): + for d in test_table.iter_dict(): + self.assertEqual(len(d), len(test_table.columns)) + for col in test_table.columns: + self.assertIn(col.name, d) + v_type = type(d[col.name]) + if np.dtype(v_type) == np.dtype(object): + if col.data_type not in {dtypes.PyObject, dtypes.JObject}: + self.assertEqual(v_type, col.data_type.j_type) + else: + import jpy + self.assertTrue(v_type == CustomClass or v_type == jpy.get_type("java.util.ArrayList")) + else: + self.assertTrue(np.can_cast(col.data_type.np_type, np.dtype(v_type))) + + def test_iteration_in_chunks_tuple(self): + with self.subTest("Read chunks of rows in a static table - tuple"): + test_table = read_csv("tests/data/test_table.csv") + total_read_size = 0 + for d in test_table.iter_chunk_tuple(chunk_size=10): + self.assertEqual(len(d), len(test_table.columns)) + for i, col in enumerate(test_table.columns): + self.assertEqual(col.name, d._fields[i]) + self.assertEqual(d[i].dtype, col.data_type.np_type) + self.assertLessEqual(len(d[i]), 10) + self.assertEqual(isinstance(d[i], np.ndarray), True) + total_read_size += len(d[i]) + self.assertEqual(total_read_size, test_table.size) + + with self.subTest("Read chunks of rows in a ticking table - tuple"): + test_table = time_table("PT00:00:00.001").update(["X=i%11"]).sort("X") + test_table.await_update() + total_read_size = 0 + for d in test_table.iter_chunk_tuple(chunk_size=100): + self.assertEqual(len(d), len(test_table.columns)) + for i, col in enumerate(test_table.columns): + self.assertEqual(col.name, d._fields[i]) + self.assertEqual(d[i].dtype, col.data_type.np_type) + self.assertLessEqual(len(d[i]), 100) + total_read_size += len(d[i]) + self.assertEqual(total_read_size, test_table.size) + self.assertFalse(ug.has_shared_lock(test_table)) + + with self.subTest("Read chunks of rows on selected columns in a ticking table under shared lock - tuple"): + test_table = time_table("PT00:00:00.001").update( + ["X=i", "Y=(double)i*10", "Z= i%2 == 0? true : false"]).sort("X") + test_table.await_update() + with ug.shared_lock(test_table): + total_read_size = 0 + cols = ["X", "Z"] + for d in test_table.iter_chunk_tuple(cols=cols, chunk_size=100): + self.assertEqual(len(d), len(cols)) + for i, col in enumerate(cols): + self.assertEqual(col, d._fields[i]) + self.assertLessEqual(len(d[i]), 100) + total_read_size += len(d[i]) + self.assertEqual(total_read_size, test_table.size) + + def test_iteration_in_rows_tuple(self): + with self.subTest("Read in rows of a static table - tuple"): + test_table = read_csv("tests/data/test_table.csv") + total_read_size = 0 + for d in test_table.iter_tuple(): + self.assertEqual(len(d), len(test_table.columns)) + for i, col in enumerate(test_table.columns): + self.assertEqual(col.name, d._fields[i]) + self.assertTrue(np.can_cast(col.data_type.np_type, np.dtype(type(d[i])))) + total_read_size += 1 + self.assertEqual(total_read_size, test_table.size) + + with self.subTest("Read in rows of a ticking table - tuple"): + test_table = time_table("PT00:00:00.001").update(["X=i%11"]).sort("X") + test_table.await_update() + total_read_size = 0 + for d in test_table.iter_tuple(): + self.assertEqual(len(d), len(test_table.columns)) + for i, col in enumerate(test_table.columns): + self.assertEqual(col.name, d._fields[i]) + v_type = type(d[i]) + self.assertTrue(np.can_cast(col.data_type.np_type, np.dtype(v_type)) or + self.assertEqual(v_type, col.data_type.j_type)) + total_read_size += 1 + self.assertEqual(total_read_size, test_table.size) + self.assertFalse(ug.has_shared_lock(test_table)) + + with self.subTest("Read in rows on selected columns of a ticking table under shared lock - tuple"): + test_table = time_table("PT00:00:00.001").update( + ["X=i", "Y=(double)i*10", "Z= i%2 == 0? true : false"]).sort("X") + test_table.await_update() + with ug.shared_lock(test_table): + total_read_size = 0 + cols = ["X", "Z"] + for d in test_table.iter_tuple(cols=cols): + self.assertEqual(len(d), len(cols)) + for i, col in enumerate(cols): + self.assertEqual(col, d._fields[i]) + total_read_size += 1 + self.assertEqual(total_read_size, test_table.size) + self.assertTrue(ug.has_shared_lock(test_table)) + self.assertFalse(ug.has_shared_lock(test_table)) + + def test_iteration_tuple_unpack(self): + test_table = read_csv("tests/data/test_table.csv") + total_read_size = 0 + for a, b, c, *_ in test_table.iter_tuple(): + total_read_size += 1 + self.assertEqual(total_read_size, test_table.size) + + with self.subTest("Too few receiving variables"): + with self.assertRaises(ValueError): + test_table = read_csv("tests/data/test_table.csv") + for a, b, c, d in test_table.iter_tuple(): + ... + + with self.subTest("Too many receiving variables"): + with self.assertRaises(ValueError): + test_table = read_csv("tests/data/test_table.csv") + for a, b, c, d, e, f in test_table.iter_tuple(): + ... + + def test_iteration_errors(self): + test_table = time_table("PT00:00:00.001").update(["from = i%11"]) + with self.assertRaises(ValueError) as cm: + for t in test_table.iter_tuple(): + pass + self.assertIn("'from'", str(cm.exception)) + + with self.assertRaises(ValueError) as cm: + for t in test_table.iter_tuple(cols=["from_"]): + pass + self.assertIn("'from_'", str(cm.exception)) + + with self.assertRaises(ValueError) as cm: + for t in test_table.iter_chunk_tuple(chunk_size=-1): + pass + + +if __name__ == '__main__': + unittest.main()