-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Wrap the new PartitionedTable interface #2446
Changes from all commits
b8c3c06
75614e8
764c4a8
047bcbd
a1909aa
6b7777c
d28e2c1
a4368b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,13 @@ | ||
# | ||
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending | ||
# | ||
""" This module implements the Table class and functions that work with Tables. """ | ||
""" This module implements the Table and PartitionedTable classes which are the main instruments for working with | ||
Deephaven refreshing and static data.""" | ||
|
||
from __future__ import annotations | ||
|
||
from enum import Enum, auto | ||
from typing import Union, Sequence, List | ||
from typing import Union, Sequence, List, Any, Optional | ||
|
||
import jpy | ||
|
||
|
@@ -25,6 +27,7 @@ | |
_JPair = jpy.get_type("io.deephaven.api.agg.Pair") | ||
_JMatchPair = jpy.get_type("io.deephaven.engine.table.MatchPair") | ||
_JLayoutHintBuilder = jpy.get_type("io.deephaven.engine.util.LayoutHintBuilder") | ||
_JPartitionedTable = jpy.get_type("io.deephaven.engine.table.PartitionedTable") | ||
|
||
|
||
class SortDirection(Enum): | ||
|
@@ -44,6 +47,26 @@ class AsOfMatchRule(Enum): | |
GREATER_THAN = _JAsOfMatchRule.GREATER_THAN | ||
|
||
|
||
def _sort_column(col, dir_): | ||
return (_JSortColumn.desc(_JColumnName.of(col)) if dir_ == SortDirection.DESCENDING else _JSortColumn.asc( | ||
_JColumnName.of(col))) | ||
|
||
|
||
def _td_to_columns(table_definition): | ||
cols = [] | ||
j_cols = table_definition.getColumnList().toArray() | ||
for j_col in j_cols: | ||
cols.append( | ||
Column( | ||
name=j_col.getName(), | ||
data_type=dtypes.from_jtype(j_col.getDataType()), | ||
component_type=dtypes.from_jtype(j_col.getComponentType()), | ||
column_type=ColumnType(j_col.getColumnType()), | ||
) | ||
) | ||
return cols | ||
|
||
|
||
class Table(JObjectWrapper): | ||
"""A Table represents a Deephaven table. It allows applications to perform powerful Deephaven table operations. | ||
|
||
|
@@ -91,18 +114,7 @@ def columns(self) -> List[Column]: | |
if self._schema: | ||
return self._schema | ||
|
||
self._schema = [] | ||
j_col_list = self._definition.getColumnList() | ||
for i in range(j_col_list.size()): | ||
j_col = j_col_list.get(i) | ||
self._schema.append( | ||
Column( | ||
name=j_col.getName(), | ||
data_type=dtypes.from_jtype(j_col.getDataType()), | ||
component_type=dtypes.from_jtype(j_col.getComponentType()), | ||
column_type=ColumnType(j_col.getColumnType()), | ||
) | ||
) | ||
self._schema = _td_to_columns(self._definition) | ||
return self._schema | ||
|
||
@property | ||
|
@@ -615,8 +627,8 @@ def sort(self, order_by: Union[str, Sequence[str]], | |
|
||
Args: | ||
order_by (Union[str, Sequence[str]]): the column(s) to be sorted on | ||
order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for each sort | ||
column, default is None. In the absence of explicit sort directions, data will be sorted in the ascending order. | ||
order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for | ||
each sort column, default is None, meaning ascending order for all the sort columns. | ||
|
||
Returns: | ||
a new table | ||
|
@@ -625,24 +637,17 @@ def sort(self, order_by: Union[str, Sequence[str]], | |
DHError | ||
""" | ||
|
||
def sort_column(col, dir_): | ||
return ( | ||
_JSortColumn.desc(_JColumnName.of(col)) | ||
if dir_ == SortDirection.DESCENDING | ||
else _JSortColumn.asc(_JColumnName.of(col)) | ||
) | ||
|
||
try: | ||
order_by = to_sequence(order_by) | ||
if not order: | ||
order = (SortDirection.ASCENDING,) * len(order_by) | ||
order = to_sequence(order) | ||
if order: | ||
sort_columns = [ | ||
sort_column(col, dir_) for col, dir_ in zip(order_by, order) | ||
] | ||
j_sc_list = j_array_list(sort_columns) | ||
return Table(j_table=self.j_table.sort(j_sc_list)) | ||
else: | ||
return Table(j_table=self.j_table.sort(*order_by)) | ||
if len(order_by) != len(order): | ||
raise DHError(message="The number of sort columns must be the same as the number of sort directions.") | ||
|
||
sort_columns = [_sort_column(col, dir_) for col, dir_ in zip(order_by, order)] | ||
j_sc_list = j_array_list(sort_columns) | ||
return Table(j_table=self.j_table.sort(j_sc_list)) | ||
except Exception as e: | ||
raise DHError(e, "table sort operation failed.") from e | ||
|
||
|
@@ -723,7 +728,8 @@ def exact_join(self, table: Table, on: Union[str, Sequence[str]], joins: Union[s | |
except Exception as e: | ||
raise DHError(e, "table exact_join operation failed.") from e | ||
|
||
def join(self, table: Table, on: Union[str, Sequence[str]] = None, joins: Union[str, Sequence[str]] = None) -> Table: | ||
def join(self, table: Table, on: Union[str, Sequence[str]] = None, | ||
joins: Union[str, Sequence[str]] = None) -> Table: | ||
"""The join method creates a new table containing rows that have matching values in both tables. Rows that | ||
do not have matching criteria will not be included in the result. If there are multiple matches between a row | ||
from the left table and rows from the right table, all matching combinations will be included. If no columns | ||
|
@@ -1171,24 +1177,6 @@ def agg_all_by(self, agg: Aggregation, by: Union[str, Sequence[str]] = None) -> | |
|
||
# endregion | ||
|
||
def partition_by(self, by: Union[str, Sequence[str]]) -> jpy.JType: | ||
""" Creates a TableMap (opaque) by dividing this table into sub-tables. | ||
|
||
Args: | ||
by (Union[str, Sequence[str]]): the column(s) by which to group data | ||
|
||
Returns: | ||
A TableMap containing a sub-table for each group | ||
|
||
Raises: | ||
DHError | ||
""" | ||
try: | ||
by = to_sequence(by) | ||
return self.j_table.partitionBy(*by) | ||
except Exception as e: | ||
raise DHError(e, "failed to create a TableMap.") from e | ||
|
||
def format_columns(self, formulas: Union[str, List[str]]) -> Table: | ||
""" Applies color formatting to the columns of the table. | ||
|
||
|
@@ -1285,3 +1273,179 @@ def layout_hints(self, front: Union[str, List[str]] = None, back: Union[str, Lis | |
return Table(j_table=self.j_table.setLayoutHints(_j_layout_hint_builder.build())) | ||
except Exception as e: | ||
raise DHError(e, "failed to set layout hints on table") from e | ||
|
||
def partition_by(self, by: Union[str, Sequence[str]], drop_keys: bool = False) -> PartitionedTable: | ||
""" Creates a PartitionedTable from this table, partitioned according to the specified key columns. | ||
|
||
Args: | ||
by (Union[str, Sequence[str]]): the column(s) by which to group data | ||
drop_keys (bool): whether to drop key columns in the constituent tables, default is False | ||
|
||
Returns: | ||
A PartitionedTable containing a sub-table for each group | ||
|
||
Raises: | ||
DHError | ||
""" | ||
try: | ||
by = to_sequence(by) | ||
return PartitionedTable(j_partitioned_table=self.j_table.partitionBy(drop_keys, *by)) | ||
except Exception as e: | ||
raise DHError(e, "failed to create a partitioned table.") from e | ||
|
||
|
||
class PartitionedTable(JObjectWrapper): | ||
"""A partitioned table is a table with one column containing like-defined constituent tables, optionally with | ||
key columns defined to allow binary operation based transformation or joins with other like-keyed partitioned | ||
tables. """ | ||
|
||
j_object_type = _JPartitionedTable | ||
|
||
@property | ||
def j_object(self) -> jpy.JType: | ||
self.j_partitioned_table | ||
|
||
def __init__(self, j_partitioned_table): | ||
self.j_partitioned_table = j_partitioned_table | ||
self._schema = None | ||
self._table = None | ||
self._key_columns = None | ||
self._unique_keys = None | ||
self._constituent_column = None | ||
self._constituent_changes_permitted = None | ||
|
||
@property | ||
def table(self) -> Table: | ||
"""The underlying Table.""" | ||
if self._table is None: | ||
self._table = Table(j_table=self.j_partitioned_table.table()) | ||
return self._table | ||
|
||
@property | ||
def key_columns(self) -> List[str]: | ||
"""The partition key column names.""" | ||
if self._key_columns is None: | ||
self._key_columns = list(self.j_partitioned_table.keyColumnNames().toArray()) | ||
return self._key_columns | ||
|
||
@property | ||
def unique_keys(self) -> bool: | ||
"""Whether the keys in the underlying table are unique. If keys are unique, one can expect that | ||
select_distinct(key_column_names) and view(key_column_names) operations produce equivalent tables.""" | ||
if self._unique_keys is None: | ||
self._unique_keys = self.j_partitioned_table.uniqueKeys() | ||
return self._unique_keys | ||
|
||
@property | ||
def constituent_column(self) -> str: | ||
"""The constituent column name.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this mean? If this is the column containing the partitioned tables, it is totally unclear. Similar comment on function name. |
||
if self._constituent_column is None: | ||
self._constituent_column = self.j_partitioned_table.constituentColumnName() | ||
return self._constituent_column | ||
|
||
@property | ||
def constituent_table_columns(self) -> List[Column]: | ||
"""The column definitions shared by the constituent tables.""" | ||
Comment on lines
+1347
to
+1348
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment above. Not clear (aka obvious) for a user. |
||
if not self._schema: | ||
self._schema = _td_to_columns(self.j_partitioned_table.constituentDefinition()) | ||
|
||
return self._schema | ||
|
||
@property | ||
def constituent_changes_permitted(self) -> bool: | ||
"""Whether the constituents of the underlying partitioned table can change, specifically whether the values of | ||
the constituent column can change. | ||
|
||
Note, this is unrelated to whether the constituent tables are refreshing, or whether the underlying partitioned | ||
table is refreshing. Also note that the underlying partitioned table must be refreshing if it contains | ||
any refreshing constituents. | ||
""" | ||
Comment on lines
+1355
to
+1362
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not clear to me. It sounds like this is like determining if a table is dynamic or not, but then it seems to say that it is unrelated to that concept. A user will not have a clue. |
||
if self._constituent_changes_permitted is None: | ||
self._constituent_changes_permitted = self.j_partitioned_table.constituentChangesPermitted() | ||
return self._constituent_changes_permitted | ||
|
||
def merge(self) -> Table: | ||
"""Makes a new Table that contains all the rows from all the constituent tables. | ||
|
||
Returns: | ||
a Table | ||
|
||
Raises: | ||
DHError | ||
""" | ||
try: | ||
return Table(j_table=self.j_partitioned_table.merge()) | ||
except Exception as e: | ||
raise DHError(e, "failed to merge all the constituent tables.") | ||
|
||
def filter(self, filters: Union[str, Filter, Sequence[str], Sequence[Filter]]) -> PartitionedTable: | ||
"""Makes a new PartitionedTable from the result of applying filters to the underlying partitioned table. | ||
|
||
Args: | ||
filters (Union[str, Filter, Sequence[str], Sequence[Filter]]): the filter condition expression(s) or | ||
Filter object(s) | ||
|
||
Returns: | ||
a PartitionedTable | ||
|
||
Raises: | ||
DHError | ||
""" | ||
filters = to_sequence(filters) | ||
if isinstance(filters[0], str): | ||
filters = Filter.from_(filters) | ||
filters = to_sequence(filters) | ||
try: | ||
return PartitionedTable(j_partitioned_table=self.j_partitioned_table.filter(j_array_list(filters))) | ||
except Exception as e: | ||
raise DHError(e, "failed to apply filters to the partitioned table.") from e | ||
|
||
def sort(self, order_by: Union[str, Sequence[str]], | ||
order: Union[SortDirection, Sequence[SortDirection]] = None) -> PartitionedTable: | ||
"""Makes a new PartitionedTable from sorting the underlying partitioned table. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Much less clear than |
||
|
||
Args: | ||
order_by (Union[str, Sequence[str]]): the column(s) to be sorted on, can't include the constituent column | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do these columns have to be columns in the subtables? Can they be key columns? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They can only be columns in the underlying table. This doesn't touch the sub-tables (constituents). |
||
order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for | ||
each sort column, default is None, meaning ascending order for all the sort columns. | ||
|
||
Returns: | ||
a new PartitionedTable | ||
|
||
Raises: | ||
DHError | ||
""" | ||
|
||
try: | ||
order_by = to_sequence(order_by) | ||
if not order: | ||
order = (SortDirection.ASCENDING,) * len(order_by) | ||
order = to_sequence(order) | ||
if len(order_by) != len(order): | ||
raise DHError(message="The number of sort columns must be the same as the number of sort directions.") | ||
|
||
sort_columns = [_sort_column(col, dir_) for col, dir_ in zip(order_by, order)] | ||
j_sc_list = j_array_list(sort_columns) | ||
return PartitionedTable(j_partitioned_table=self.j_partitioned_table.sort(j_sc_list)) | ||
except Exception as e: | ||
raise DHError(e, "failed to sort the partitioned table.") from e | ||
|
||
def get_constituent(self, key_values: Sequence[Any]) -> Optional[Table]: | ||
"""Gets a single constituent table by its corresponding key column values. | ||
|
||
Args: | ||
key_values (Sequence[Any]): the values of the key columns | ||
|
||
Returns: | ||
a Table or None | ||
jmao-denver marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
j_table = self.j_partitioned_table.constituentFor(*key_values) | ||
if j_table: | ||
return Table(j_table=j_table) | ||
else: | ||
return None | ||
|
||
@property | ||
def constituent_tables(self) -> List[Table]: | ||
"""Returns all the current constituent tables.""" | ||
Comment on lines
+1449
to
+1450
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, I don't think the nomenclature is clear. e.g. a name like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I very intentionally didn't call this sub-tables. |
||
return list(map(Table, self.j_partitioned_table.constituents())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users will probably not find this clear. Is this the source table the PartitionedTable was created from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, in the partition_by case, it is the table created with agg_by on the source table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the partitioned table. The PartitionedTable is just a wrapper around it.