Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove explicit field syncing from Python client #2716

Merged
merged 5 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 29 additions & 98 deletions py/client/pydeephaven/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
from pydeephaven.query import Query
from pydeephaven.table import Table

NO_SYNC = 0
SYNC_ONCE = 1
SYNC_REPEATED = 2

class Session:
""" A Session object represents a connection to the Deephaven data server. It contains a number of convenience
methods for asking the server to create tables, import Arrow data into tables, merge tables, run Python scripts, and
Expand All @@ -39,21 +35,15 @@ class Session:
is_alive (bool): check if the session is still alive (may refresh the session)
"""

def __init__(self, host: str = None, port: int = None, never_timeout: bool = True, session_type: str = 'python', sync_fields: int = NO_SYNC):
def __init__(self, host: str = None, port: int = None, never_timeout: bool = True, session_type: str = 'python'):
""" Initialize a Session object that connects to the Deephaven server

Args:
host (str): the host name or IP address of the remote machine, default is 'localhost'
port (int): the port number that Deephaven server is listening on, default is 10000
never_timeout (bool, optional): never allow the session to timeout, default is True
session_type (str, optional): the Deephaven session type. Defaults to 'python'
sync_fields (int, optional): equivalent to calling `Session.sync_fields()` (see below), default is NO_SYNC

Sync Options:
session.NO_SYNC: does not check for existing tables on the server
session.SYNC_ONCE: equivalent to `Session.sync_fields(repeating=False)`
session.SYNC_REPEATED: equivalent to `Session.sync_fields(repeating=True)`

Raises:
DHError
"""
Expand All @@ -69,9 +59,6 @@ def __init__(self, host: str = None, port: int = None, never_timeout: bool = Tru
if not port:
self.port = int(os.environ.get("DH_PORT", 10000))

if sync_fields not in (NO_SYNC, SYNC_ONCE, SYNC_REPEATED):
raise DHError("invalid sync_fields setting")

self.is_connected = False
self.session_token = None
self.grpc_channel = None
Expand All @@ -84,10 +71,6 @@ def __init__(self, host: str = None, port: int = None, never_timeout: bool = Tru
self._never_timeout = never_timeout
self._keep_alive_timer = None
self._session_type = session_type
self._sync_fields = sync_fields
self._list_fields = None
self._field_update_thread = None
self._fields = {}

self._connect()

Expand All @@ -102,7 +85,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
@property
def tables(self):
with self._r_lock:
return [nm for sc, nm in self._fields if sc == 'scope' and self._fields[(sc, nm)][0] == 'Table']
fields = self._fetch_fields()
return [field.field_name for field in fields if field.application_id == 'scope' and field.typed_ticket.type == 'Table']

@property
def grpc_metadata(self):
Expand Down Expand Up @@ -154,53 +138,19 @@ def get_ticket(self):

return self._last_ticket

def sync_fields(self, repeating: bool):
""" Check for fields that have been added/deleted by other sessions and add them to the local list
def _fetch_fields(self):
""" Returns a list of available fields on the server.

This will start a new background thread when `repeating=True`.

Args:
repeating (bool): Continue to check in the background for new/updated tables

Raises:
DHError
"""
with self._r_lock:
if self._list_fields is not None:
return

self._list_fields = self.app_service.list_fields()
self._parse_fields_change(next(self._list_fields))
if repeating:
self._field_update_thread = threading.Thread(target=self._update_fields)
self._field_update_thread.daemon = True
self._field_update_thread.start()
else:
if not self._list_fields.cancel():
raise DHError("could not cancel ListFields subscription")
self._list_fields = None

def _update_fields(self):
""" Constant loop that checks for any server-side field changes and adds them to the local list """
try:
while True:
fields_change = next(self._list_fields)
with self._r_lock:
self._parse_fields_change(fields_change)
except Exception as e:
if isinstance(e, grpc.Future):
pass
else:
raise e
list_fields = self.app_service.list_fields()
resp = next(list_fields)
if not list_fields.cancel():
raise DHError("could not cancel ListFields subscription")
return resp.created if resp.created else []

def _cancel_update_fields(self):
with self._r_lock:
if self._field_update_thread is not None:
self._list_fields.cancel()
self._field_update_thread.join()
self._list_fields = None
self._field_update_thread = None

def _connect(self):
with self._r_lock:
self.grpc_channel, self.session_token, self._timeout = self.session_service.connect()
Expand All @@ -209,11 +159,6 @@ def _connect(self):
if self._never_timeout:
self._keep_alive()

if self._sync_fields == SYNC_ONCE:
self.sync_fields(repeating=False)
elif self._sync_fields == SYNC_REPEATED:
self.sync_fields(repeating=True)

def _keep_alive(self):
if self._keep_alive_timer:
self._refresh_token()
Expand Down Expand Up @@ -252,7 +197,6 @@ def close(self) -> None:
"""
with self._r_lock:
if self.is_connected:
self._cancel_update_fields()
self.session_service.close()
self.grpc_channel.close()
self.is_connected = False
Expand All @@ -262,24 +206,6 @@ def close(self) -> None:
def release(self, ticket):
self.session_service.release(ticket)

def _parse_fields_change(self, fields_change):
if fields_change.created:
for t in fields_change.created:
t_type = None if t.typed_ticket.type == '' else t.typed_ticket.type
self._fields[(t.application_id, t.field_name)] = (t_type, Table(session=self, ticket=t.typed_ticket.ticket))

if fields_change.updated:
for t in fields_change.updated:
t_type = None if t.typed_ticket.type == '' else t.typed_ticket.type
self._fields[(t.application_id, t.field_name)] = (t_type, Table(session=self, ticket=t.typed_ticket.ticket))

if fields_change.removed:
for t in fields_change.removed:
self._fields.pop((t.application_id, t.field_name), None)

def _parse_script_response(self, response):
self._parse_fields_change(response.changes)

# convenience/factory methods
def run_script(self, script: str) -> None:
""" Run the supplied Python script on the server.
Expand All @@ -292,17 +218,9 @@ def run_script(self, script: str) -> None:
"""

with self._r_lock:
if self._sync_fields == SYNC_REPEATED:
self._cancel_update_fields()

response = self.console_service.run_script(script)

if self._sync_fields == SYNC_REPEATED:
self._fields = {}
self._parse_script_response(response)
self.sync_fields(repeating=True)
else:
self._parse_script_response(response)
if response.error_message != '':
raise DHError("could not run script: " + response.error_message)

def open_table(self, name: str) -> Table:
""" Open a table in the global scope with the given name on the server.
Expand All @@ -317,10 +235,23 @@ def open_table(self, name: str) -> Table:
DHError
"""
with self._r_lock:
if name not in self.tables:
raise DHError(f"no table by the name {name}")
table_op = FetchTableOp()
return self.table_service.grpc_table_op(self._fields[('scope', name)][1], table_op)
ticket = ticket_pb2.Ticket(ticket=f's/{name}'.encode(encoding='ascii'))

faketable = Table(session=self, ticket=ticket)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So - I wonder if we want to actually add abstractions, or safety barriers, such that returning faketable here makes sense. (For example, we need to make sure that we don't try to close() w/ a s/... ticket.)

Or may introduce ref_table(...) and fetch(), such that open_table(...) == ref_table(...).fetch()? (I think naming could be better.)

Of particular interest is the interest in obtaining ref_table(...).snapshot() without needing a FetchTableOp against the server.

Copy link
Member

@devinrsmith devinrsmith Aug 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe, still have only open_table, but only execute the FetchTableOp when the user calls into a method that needs FetchTableOp-provided data?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having open_table() still makes more sense if we consider the typical use case of a client wants to get a reference to a table in the server for the only reason to perform some operations on it. So lazy fetching isn't too meaningful here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think it would be more intuitive that the "this table doesn't exist" error shows up when you try to open the table, rather than the first time you try to use it. If we wanted lazy fetching, I think a separate open_lazy_table method would make more sense (and that can go into a separate PR if we thought it was useful).


try:
table_op = FetchTableOp()
return self.table_service.grpc_table_op(faketable, table_op)
except Exception as e:
if isinstance(e.__cause__, grpc.RpcError):
if e.__cause__.code() == grpc.StatusCode.INVALID_ARGUMENT:
raise DHError(f"no table by the name {name}") from None
raise e
finally:
# Explicitly close the table without releasing it (because it isn't ours)
faketable.ticket = None
faketable.schema = None


def bind_table(self, name: str, table: Table) -> None:
""" Bind a table to the given name on the server so that it can be referenced by that name.
Expand Down
5 changes: 5 additions & 0 deletions py/client/tests/test_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from pyarrow import csv

from pydeephaven import DHError
from tests.testbase import BaseTestCase


Expand Down Expand Up @@ -48,3 +49,7 @@ def vectorized_func(x, y):
pa_table = self.session.open_table('demo_table').snapshot()
df = pa_table.to_pandas()
self.assertEquals(1000, len(df.index))

def test_open_invalid_table(self):
with self.assertRaises(DHError):
self.session.open_table('thistablereallyreallydoesnotexist')
6 changes: 2 additions & 4 deletions py/client/tests/test_multi_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@


from pydeephaven import Session
from pydeephaven.session import SYNC_ONCE, SYNC_REPEATED
from tests.testbase import BaseTestCase
import timeout_decorator

Expand All @@ -19,12 +18,11 @@ def test_persistent_tables(self):
t = session1.empty_table(10)
session1.bind_table('t', t)


with Session(sync_fields=SYNC_ONCE) as session2:
with Session() as session2:
self.assertIn('t', session2.tables)

def test_shared_tables(self):
session1 = Session(sync_fields=SYNC_REPEATED)
session1 = Session()
session1.run_script('t = None')

session2 = Session()
Expand Down