Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize Python InputTable class #5236

Merged
merged 15 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions py/server/deephaven/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from deephaven.updateby import UpdateByOperation

# Table
_J_Table = jpy.get_type("io.deephaven.engine.table.Table")
_JTable = jpy.get_type("io.deephaven.engine.table.Table")
_JAttributeMap = jpy.get_type("io.deephaven.engine.table.AttributeMap")
_JTableTools = jpy.get_type("io.deephaven.engine.util.TableTools")
_JColumnName = jpy.get_type("io.deephaven.api.ColumnName")
Expand Down Expand Up @@ -426,7 +426,7 @@ class Table(JObjectWrapper):
data ingestion operations, queries, aggregations, joins, etc.

"""
j_object_type = _J_Table
j_object_type = _JTable

def __init__(self, j_table: jpy.JType):
self.j_table = jpy.cast(j_table, self.j_object_type)
Expand Down
102 changes: 53 additions & 49 deletions py/server/deephaven/table_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
from deephaven.column import InputColumn, Column
from deephaven.dtypes import DType, Duration, Instant
from deephaven.execution_context import ExecutionContext
from deephaven.jcompat import j_lambda
from deephaven.jcompat import to_sequence
from deephaven.jcompat import j_lambda, j_list_to_list, to_sequence
from deephaven.table import Table
from deephaven.update_graph import auto_locking_ctx

_JTableFactory = jpy.get_type("io.deephaven.engine.table.TableFactory")
_JTableTools = jpy.get_type("io.deephaven.engine.util.TableTools")
_JDynamicTableWriter = jpy.get_type("io.deephaven.engine.table.impl.util.DynamicTableWriter")
_JBaseArrayBackedInputTable = jpy.get_type("io.deephaven.engine.table.impl.util.BaseArrayBackedInputTable")
_JAppendOnlyArrayBackedInputTable = jpy.get_type(
"io.deephaven.engine.table.impl.util.AppendOnlyArrayBackedInputTable")
_JKeyedArrayBackedInputTable = jpy.get_type("io.deephaven.engine.table.impl.util.KeyedArrayBackedInputTable")
Expand Down Expand Up @@ -231,52 +231,20 @@ def write_row(self, *values: Any) -> None:


class InputTable(Table):
"""InputTable is a subclass of Table that allows the users to dynamically add/delete/modify data in it. There are two
types of InputTable - append-only and keyed.
"""InputTable is a subclass of Table that allows the users to dynamically add/delete/modify data in it.

The append-only input table is not keyed, all rows are added to the end of the table, and deletions and edits are
not permitted.

The keyed input tablet has keys for each row and supports addition/deletion/modification of rows by the keys.
Users should always create InputTables through factory methods rather than directly from the constructor.
"""
j_object_type = _JBaseArrayBackedInputTable

def __init__(self, col_defs: Dict[str, DType] = None, init_table: Table = None,
key_cols: Union[str, Sequence[str]] = None):
"""Creates an InputTable instance from either column definitions or initial table. When key columns are
provided, the InputTable will be keyed, otherwise it will be append-only.

Args:
col_defs (Dict[str, DType]): the column definitions
init_table (Table): the initial table
key_cols (Union[str, Sequence[str]): the name(s) of the key column(s)

Raises:
DHError
"""
try:
if col_defs is None and init_table is None:
raise ValueError("either column definitions or init table should be provided.")
elif col_defs and init_table:
raise ValueError("both column definitions and init table are provided.")

if col_defs:
j_arg_1 = _JTableDefinition.of(
[Column(name=n, data_type=t).j_column_definition for n, t in col_defs.items()])
else:
j_arg_1 = init_table.j_table

key_cols = to_sequence(key_cols)
if key_cols:
super().__init__(_JKeyedArrayBackedInputTable.make(j_arg_1, key_cols))
else:
super().__init__(_JAppendOnlyArrayBackedInputTable.make(j_arg_1))
self.j_input_table = self.j_table.getAttribute(_J_INPUT_TABLE_ATTRIBUTE)
self.key_columns = key_cols
except Exception as e:
raise DHError(e, "failed to create a InputTable.") from e
def __init__(self, j_table: jpy.JType):
super().__init__(j_table)
self.j_input_table = self.j_table.getAttribute(_J_INPUT_TABLE_ATTRIBUTE)
if not self.j_input_table:
raise DHError("the provided table input is not suitable for input tables.")

def add(self, table: Table) -> None:
"""Writes rows from the provided table to this input table. If this is a keyed input table, added rows with keys
"""Synchronously writes rows from the provided table to this input table. If this is a keyed input table, added rows with keys
that match existing rows will replace those rows.

Args:
Expand All @@ -291,8 +259,8 @@ def add(self, table: Table) -> None:
raise DHError(e, "add to InputTable failed.") from e

def delete(self, table: Table) -> None:
"""Deletes the keys contained in the provided table from this keyed input table. If this method is called on an
append-only input table, a PermissionError will be raised.
"""Synchronously deletes the keys contained in the provided table from this keyed input table. If this method is called on an
append-only input table, an error will be raised.

Args:
table (Table): the table with the keys to delete
Expand All @@ -301,18 +269,33 @@ def delete(self, table: Table) -> None:
DHError
"""
try:
if not self.key_columns:
raise PermissionError("deletion on an append-only input table is not allowed.")
self.j_input_table.delete(table.j_table)
except Exception as e:
raise DHError(e, "delete data in the InputTable failed.") from e

@property
def key_names(self) -> List[str]:
"""The names of the key columns of the InputTable."""
return j_list_to_list(self.j_input_table.getKeyNames())

@property
def value_names(self) -> List[str]:
"""The names of the value columns. By default, any column not marked as a key column is a value column."""
return j_list_to_list(self.j_input_table.getValueNames())


def input_table(col_defs: Dict[str, DType] = None, init_table: Table = None,
key_cols: Union[str, Sequence[str]] = None) -> InputTable:
"""Creates an InputTable from either column definitions or initial table. When key columns are
"""Creates an in-memory InputTable from either column definitions or an initial table. When key columns are
provided, the InputTable will be keyed, otherwise it will be append-only.

There are two types of in-memory InputTable - append-only and keyed.

The append-only input table is not keyed, all rows are added to the end of the table, and deletions and edits are
not permitted.

The keyed input table has keys for each row and supports addition/deletion/modification of rows by the keys.

Args:
col_defs (Dict[str, DType]): the column definitions
init_table (Table): the initial table
Expand All @@ -324,7 +307,28 @@ def input_table(col_defs: Dict[str, DType] = None, init_table: Table = None,
Raises:
DHError
"""
return InputTable(col_defs=col_defs, init_table=init_table, key_cols=key_cols)

try:
if col_defs is None and init_table is None:
raise ValueError("either column definitions or init table should be provided.")
elif col_defs and init_table:
raise ValueError("both column definitions and init table are provided.")

if col_defs:
j_arg_1 = _JTableDefinition.of(
[Column(name=n, data_type=t).j_column_definition for n, t in col_defs.items()])
else:
j_arg_1 = init_table.j_table

key_cols = to_sequence(key_cols)
if key_cols:
j_table = _JKeyedArrayBackedInputTable.make(j_arg_1, key_cols)
else:
j_table = _JAppendOnlyArrayBackedInputTable.make(j_arg_1)
except Exception as e:
raise DHError(e, "failed to create an in-memory InputTable.") from e

return InputTable(j_table)


def ring_table(parent: Table, capacity: int, initialize: bool = True) -> Table:
Expand Down
12 changes: 11 additions & 1 deletion py/server/tests/test_table_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,24 +327,32 @@ def test_input_table(self):
col_defs = {c.name: c.data_type for c in t.columns}
with self.subTest("from table definition"):
append_only_input_table = input_table(col_defs=col_defs)
self.assertEqual(append_only_input_table.key_names, [])
self.assertEqual(append_only_input_table.value_names, [col.name for col in cols])
append_only_input_table.add(t)
self.assertEqual(append_only_input_table.size, 2)
append_only_input_table.add(t)
self.assertEqual(append_only_input_table.size, 4)

keyed_input_table = input_table(col_defs=col_defs, key_cols="String")
self.assertEqual(keyed_input_table.key_names, ["String"])
self.assertEqual(keyed_input_table.value_names, [col.name for col in cols if col.name != "String"])
keyed_input_table.add(t)
self.assertEqual(keyed_input_table.size, 2)
keyed_input_table.add(t)
self.assertEqual(keyed_input_table.size, 2)

with self.subTest("from init table"):
append_only_input_table = input_table(init_table=t)
self.assertEqual(append_only_input_table.key_names, [])
self.assertEqual(append_only_input_table.value_names, [col.name for col in cols])
self.assertEqual(append_only_input_table.size, 2)
append_only_input_table.add(t)
self.assertEqual(append_only_input_table.size, 4)

keyed_input_table = input_table(init_table=t, key_cols="String")
self.assertEqual(keyed_input_table.key_names, ["String"])
self.assertEqual(keyed_input_table.value_names, [col.name for col in cols if col.name != "String"])
self.assertEqual(keyed_input_table.size, 2)
keyed_input_table.add(t)
self.assertEqual(keyed_input_table.size, 2)
Expand All @@ -355,9 +363,11 @@ def test_input_table(self):
append_only_input_table = input_table(init_table=t)
with self.assertRaises(DHError) as cm:
append_only_input_table.delete(t)
self.assertIn("not allowed.", str(cm.exception))
self.assertIn("doesn\'t support delete operation", str(cm.exception))

keyed_input_table = input_table(init_table=t, key_cols=["String", "Double"])
self.assertEqual(keyed_input_table.key_names, ["String", "Double"])
self.assertEqual(keyed_input_table.value_names, [col.name for col in cols if col.name != "String" and col.name != "Double"])
self.assertEqual(keyed_input_table.size, 2)
keyed_input_table.delete(t.select(["String", "Double"]))
self.assertEqual(keyed_input_table.size, 0)
Expand Down
Loading