From 5bca85910892e9b9f9e33e2cadd9ea6d16bb7e62 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Tue, 16 Apr 2024 20:32:12 -0600 Subject: [PATCH 01/25] Support publishing anonymous tables --- py/client/pydeephaven/_session_service.py | 16 +++++- py/client/pydeephaven/session.py | 70 ++++++++++++++++++++++- py/client/tests/test_session.py | 25 ++++++++ 3 files changed, 109 insertions(+), 2 deletions(-) diff --git a/py/client/pydeephaven/_session_service.py b/py/client/pydeephaven/_session_service.py index 7e6b7aec975..2b23a07eb24 100644 --- a/py/client/pydeephaven/_session_service.py +++ b/py/client/pydeephaven/_session_service.py @@ -1,11 +1,12 @@ # # Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # +from uuid import uuid4 import grpc from pydeephaven.dherror import DHError -from pydeephaven.proto import session_pb2_grpc, session_pb2 +from pydeephaven.proto import session_pb2_grpc, session_pb2, ticket_pb2 class SessionService: @@ -42,3 +43,16 @@ def release(self, ticket): self._grpc_session_stub.Release(session_pb2.ReleaseRequest(id=ticket), metadata=self.session.grpc_metadata) except Exception as e: raise DHError("failed to release a ticket.") from e + + + def publish(self, ticket, shared_ticket) -> None: + """Publishes a ticket to the shared ticket that can be fetched by other sessions. + + Args: + ticket: The ticket to publish. + shared_ticket: The shared ticket to publish to. + """ + try: + self._grpc_session_stub.PublishFromTicket(session_pb2.PublishRequest(source_id=ticket, result_id=shared_ticket), metadata=self.session.grpc_metadata) + except Exception as e: + raise DHError("failed to publish a ticket.") from e \ No newline at end of file diff --git a/py/client/pydeephaven/session.py b/py/client/pydeephaven/session.py index 6ae939baae9..38a1e33febc 100644 --- a/py/client/pydeephaven/session.py +++ b/py/client/pydeephaven/session.py @@ -7,7 +7,8 @@ import base64 import os import threading -from typing import Dict, List, Union, Tuple, Any +from typing import Dict, List, Union, Tuple +from uuid import uuid4 import grpc import pyarrow as pa @@ -78,6 +79,38 @@ def get_token(self): return self._token +class SharedTicket: + """ A SharedTicket object represents a ticket that can be shared with other sessions. """ + + def __init__(self, ticket_bytes: bytes): + """Initializes a SharedTicket object + + Args: + ticket_bytes (bytes): the raw bytes for the ticket + """ + self._ticket_bytes = ticket_bytes + + @property + def api_ticket(self): + """ The ticket object for use with Deephaven API calls.""" + return ticket_pb2.Ticket(ticket=b'h' + self._ticket_bytes) + + @property + def bytes(self): + """ The raw bytes for the ticket.""" + return self._ticket_bytes + + @classmethod + def random_ticket(cls) -> 'SharedTicket': + """Generates a random shared ticket. + + Returns: + a SharedTicket object + """ + ticket_bytes = uuid4().int.to_bytes(16, byteorder='little', signed=False) + return cls(ticket_bytes=b'h' + ticket_bytes) + + class Session: """A Session object represents a connection to the Deephaven data server. It contains a number of convenience methods for asking the server to create tables, import Arrow data into tables, merge tables, run Python scripts, and @@ -417,6 +450,41 @@ def open_table(self, name: str) -> Table: faketable.ticket = None faketable.schema = None + def publish_table(self, table: Table, shared_ticket: SharedTicket) -> None: + """Publishes a table with the given shared ticket for sharing with other sessions. + + Args: + table (Table): a Table object + shared_ticket (SharedTicket): a SharedTicket object + + Raises: + DHError + """ + return self._session_service.publish(table.ticket, shared_ticket.api_ticket) + + def fetch_table(self, ticket: SharedTicket) -> Table: + """Fetches a table by ticket. + + Args: + ticket (Ticket): a ticket + + Returns: + a Table object + + Raises: + DHError + """ + faketable = Table(session=self, ticket=ticket.api_ticket) + try: + table_op = FetchTableOp() + return self.table_service.grpc_table_op(faketable, table_op) + except Exception as e: + raise DHError("could not fetch table by ticket") from e + finally: + # Explicitly close the table without releasing it (because it isn't ours) + faketable.ticket = None + faketable.schema = None + def bind_table(self, name: str, table: Table) -> None: """Binds a table to the given name on the server so that it can be referenced by that name. diff --git a/py/client/tests/test_session.py b/py/client/tests/test_session.py index a4731ad15e4..44d7c12fe37 100644 --- a/py/client/tests/test_session.py +++ b/py/client/tests/test_session.py @@ -10,6 +10,7 @@ from pydeephaven import DHError from pydeephaven import Session +from pydeephaven.session import SharedTicket from tests.testbase import BaseTestCase @@ -339,6 +340,30 @@ def test_blink_input_table(self): blink_input_table.delete(dh_table.select(["f1"])) + def test_share_table(self): + pub_session = Session() + t = pub_session.empty_table(1000).update("X = i") + self.assertEqual(t.size, 1000) + shared_ticket = SharedTicket.random_ticket() + pub_session.publish_table(t, shared_ticket) + + sub_session1 = Session() + t1 = sub_session1.fetch_table(shared_ticket) + self.assertEqual(t1.size, 1000) + pa_table = t1.to_arrow() + self.assertEqual(pa_table.num_rows, 1000) + + with self.subTest("the 1st subscriber session is gone, shared ticket is still valid"): + sub_session1.close() + sub_session2 = Session() + t2 = sub_session2.fetch_table(shared_ticket) + self.assertEqual(t2.size, 1000) + + with self.subTest("the publisher session is gone, shared ticket becomes invalid"): + pub_session.close() + with self.assertRaises(DHError): + sub_session2.fetch_table(shared_ticket) + if __name__ == '__main__': unittest.main() From cc027791731e04c4199d35db35d7b210b6d78227 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Wed, 17 Apr 2024 08:59:40 -0600 Subject: [PATCH 02/25] Add more documentation --- py/client/pydeephaven/_session_service.py | 1 - py/client/pydeephaven/session.py | 5 +++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/py/client/pydeephaven/_session_service.py b/py/client/pydeephaven/_session_service.py index 2b23a07eb24..99916ed484d 100644 --- a/py/client/pydeephaven/_session_service.py +++ b/py/client/pydeephaven/_session_service.py @@ -1,7 +1,6 @@ # # Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # -from uuid import uuid4 import grpc diff --git a/py/client/pydeephaven/session.py b/py/client/pydeephaven/session.py index 38a1e33febc..c7bd95c3601 100644 --- a/py/client/pydeephaven/session.py +++ b/py/client/pydeephaven/session.py @@ -453,6 +453,11 @@ def open_table(self, name: str) -> Table: def publish_table(self, table: Table, shared_ticket: SharedTicket) -> None: """Publishes a table with the given shared ticket for sharing with other sessions. + Note that, the shared ticket can be fetched by other sessions to access the table as long as the table is + not released. When the table is released either through an explicit call of the close method on it, or + implicitly through garbage collection, or through the closing of the publishing session, the shared ticket will + no longer be valid. + Args: table (Table): a Table object shared_ticket (SharedTicket): a SharedTicket object From 78910db309c812802e5f0e6e5fc99300bc5da874 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Wed, 17 Apr 2024 21:54:25 -0600 Subject: [PATCH 03/25] WIP enable fetching remote table in Py server API --- py/server/deephaven/remote.py | 90 ++++++++++++++++++++++++++++++++++ py/server/tests/test_remote.py | 18 +++++++ 2 files changed, 108 insertions(+) create mode 100644 py/server/deephaven/remote.py create mode 100644 py/server/tests/test_remote.py diff --git a/py/server/deephaven/remote.py b/py/server/deephaven/remote.py new file mode 100644 index 00000000000..7dcd2afb810 --- /dev/null +++ b/py/server/deephaven/remote.py @@ -0,0 +1,90 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# +import threading + +import jpy + +from deephaven import DHError +from deephaven.table import Table + +_JURI = jpy.get_type("java.net.URI") +_JClientConfig = jpy.get_type("io.deephaven.client.impl.ClientConfig") +_JSessionImplConfig = jpy.get_type("io.deephaven.client.impl.SessionImplConfig") +_JDeephavenTarget = jpy.get_type("io.deephaven.uri.DeephavenTarget") +_JChannelHelper = jpy.get_type("io.deephaven.client.impl.ChannelHelper") +_JDeephavenChannelImpl = jpy.get_type("io.deephaven.proto.DeephavenChannelImpl") +_JSessionImpl = jpy.get_type("io.deephaven.client.impl.SessionImpl") +_JExecutors = jpy.get_type("java.util.concurrent.Executors") +# _JBarrageSession = jpy.get_type("io.deephaven.client.impl.BarrageSession") +# _JFlightSession = jpy.get_type("io.deephaven.client.impl.FlightSession") +# _JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator") + + +class Session: + """ + A Deephaven gRPC session. + """ + def __init__(self, host: str = None, + port: int = None, + auth_type: str = "Anonymous", + auth_token: str = ""): + self.host = host + self.port = port + self._auth_type = auth_type + self._auth_token = auth_token + self.grpc_channel = None + self._r_lock = threading.RLock() + self._is_alive = False + self._connect() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def _connect(self) -> None: + target = ":".join([self.host, str(self.port)]) + try: + _j_host_config = (_JClientConfig.builder() + .target(_JDeephavenTarget.of(_JURI("dh+plain://" + target))) + .build()) + _j_channel = _JChannelHelper.channel(_j_host_config) + _j_dh_channel = _JDeephavenChannelImpl(_j_channel) + + _j_session_config = (_JSessionImplConfig.builder() + .executor(_JExecutors.newScheduledThreadPool(4)) + .authenticationTypeAndValue(f"{self._auth_type} {self._auth_token}") + .channel(_j_dh_channel) + .build()) + self._j_session = _JSessionImpl.create(_j_session_config) + self._is_alive = True + + + except Exception as e: + raise DHError("failed to create a session to a remote Deephaven server.") from e + + def close(self): + """Closes the gRPC connection.""" + if not self._is_alive: + return + + with self._r_lock: + if not self._is_alive: + return + try: + self._j_session.close() + except Exception as e: + raise DHError("failed to close the session.") from e + finally: + self._is_alive = False + + def fetch(self, shared_ticket) -> Table: + """Fetches data from the Deephaven server.""" + if not self._is_alive: + raise DHError("the session is not alive.") + try: + return Table(self._j_session.of(shared_ticket).table()) + except Exception as e: + raise DHError("failed to fetch data from the server.") from e diff --git a/py/server/tests/test_remote.py b/py/server/tests/test_remote.py new file mode 100644 index 00000000000..50d23360e28 --- /dev/null +++ b/py/server/tests/test_remote.py @@ -0,0 +1,18 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# + +import unittest + +from deephaven.remote import Session + +from tests.testbase import BaseTestCase + + +class RemoteTestCase(BaseTestCase): + def test_session(self): + session = Session(host="core-server-2-1", port=10000, auth_type="Anonymous") + + +if __name__ == "__main__": + unittest.main() From 6c8623e27b928fe898bfb9399e2d942523e21748 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 18 Apr 2024 10:19:47 -0600 Subject: [PATCH 04/25] Add some docstrings --- .../deephaven/client/impl/BarrageSession.java | 7 + py/client/tests/test_session.py | 4 +- py/server/deephaven/remote.py | 125 ++++++++++++++---- py/server/tests/test_remote.py | 12 +- 4 files changed, 120 insertions(+), 28 deletions(-) diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java index 988554f3a1e..1702519d1be 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java @@ -31,6 +31,13 @@ public static BarrageSession of( return new BarrageSession(session, client); } + public static BarrageSession create( + SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) { + final FlightClient client = FlightGrpcUtilsExtension.createFlightClientWithSharedChannel( + incomingAllocator, channel, Collections.singletonList(new SessionMiddleware(session))); + return new BarrageSession(session, client, channel); + } + protected BarrageSession( final SessionImpl session, final FlightClient client) { super(session, client); diff --git a/py/client/tests/test_session.py b/py/client/tests/test_session.py index 44d7c12fe37..4a6123632f8 100644 --- a/py/client/tests/test_session.py +++ b/py/client/tests/test_session.py @@ -341,8 +341,8 @@ def test_blink_input_table(self): def test_share_table(self): - pub_session = Session() - t = pub_session.empty_table(1000).update("X = i") + pub_session = Session("localhost", 10008) + t = pub_session.empty_table(1000).update(["X = i", "Y = 2*i"]) self.assertEqual(t.size, 1000) shared_ticket = SharedTicket.random_ticket() pub_session.publish_table(t, shared_ticket) diff --git a/py/server/deephaven/remote.py b/py/server/deephaven/remote.py index 7dcd2afb810..d74f9534339 100644 --- a/py/server/deephaven/remote.py +++ b/py/server/deephaven/remote.py @@ -1,11 +1,14 @@ # -# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # +from __future__ import annotations import threading +from typing import Dict import jpy from deephaven import DHError +from deephaven._wrapper import JObjectWrapper from deephaven.table import Table _JURI = jpy.get_type("java.net.URI") @@ -16,19 +19,93 @@ _JDeephavenChannelImpl = jpy.get_type("io.deephaven.proto.DeephavenChannelImpl") _JSessionImpl = jpy.get_type("io.deephaven.client.impl.SessionImpl") _JExecutors = jpy.get_type("java.util.concurrent.Executors") -# _JBarrageSession = jpy.get_type("io.deephaven.client.impl.BarrageSession") -# _JFlightSession = jpy.get_type("io.deephaven.client.impl.FlightSession") -# _JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator") +_JBarrageSession = jpy.get_type("io.deephaven.client.impl.BarrageSession") +_JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator") +_JSharedId = jpy.get_type("io.deephaven.client.impl.SharedId") +_JBarrageTableResolver = jpy.get_type("io.deephaven.server.uri.BarrageTableResolver") +_session_cache: Dict[str, RemoteSession] = {} # use WeakValueDictionary to avoid memory leak? +_remote_session_lock = threading.Lock() -class Session: - """ - A Deephaven gRPC session. + +def remote_session(host: str, + port: int = 10000, + auth_type: str = "Anonymous", + auth_token: str = "", + # never_timeout: bool = True, + # session_type: str = 'python', + # use_tls: bool = False, + # tls_root_certs: bytes = None, + # client_cert_chain: bytes = None, + ) -> RemoteSession: + """Returns a Deephaven gRPC session to a remote server if a cached session is available; otherwise, creates a new + session. + + Args: + host (str): the host name or IP address of the Deephaven server. + port (int): the port number that the remote Deephaven server is listening on, default is 10000. + auth_type (str): the authentication type string, can be "Anonymous', 'Basic", or any custom-built + authenticator in the server, such as "io.deephaven.authentication.psk.PskAuthenticationHandler", + default is 'Anonymous'. + auth_token (str): the authentication token string. When auth_type is 'Basic', it must be + "user:password"; when auth_type is "Anonymous', it will be ignored; when auth_type is a custom-built + authenticator, it must conform to the specific requirement of the authenticator + Returns: + a Deephaven gRPC session + + Raises: + DHError """ - def __init__(self, host: str = None, - port: int = None, + with _remote_session_lock: + uri = f"dh+plain://{host}:{port}" + session = _session_cache.get(uri) + if session: + if session.is_alive: # doesn't guarantee the session is still alive, just that it hasn't been explicitly + # closed + return session + else: + del _session_cache[uri] + session = RemoteSession(host, port, auth_type, auth_token) + _session_cache[uri] = session + return session + + +class RemoteSession (JObjectWrapper): + """ A Deephaven gRPC session to a remote server.""" + # def __init__(self, j_barrage_session): + # self.j_barrage_session = j_barrage_session + # + # def subscribe(self, ticket: bytes): + # j_table_handle = self._j_session.of(_JSharedId(ticket).ticketId().table()) + # j_barrage_subscription = self._j_barrage_session.subscribe(j_table_handle, + # _JBarrageTableResolver.SUB_OPTIONS) + # j_table = j_barrage_subscription.entireTable().get() + # return Table(j_table) + # + # def snapshot(self): + # return self.j_barrage_session.snapshot() + def __init__(self, host: str, + port: int = 10000, auth_type: str = "Anonymous", - auth_token: str = ""): + auth_token: str = "", + # never_timeout: bool = True, + # session_type: str = 'python', + # use_tls: bool = False, + # tls_root_certs: bytes = None, + # client_cert_chain: bytes = None, + ): + """Creates a Deephaven gRPC session to a remote server. + + Args: + host (str): the host name or IP address of the Deephaven server. + port (int): the port number that the remote Deephaven server is listening on, default is 10000. + auth_type (str): the authentication type string, can be "Anonymous', 'Basic", or any custom-built + authenticator in the server, such as "io.deephaven.authentication.psk.PskAuthenticationHandler", + default is 'Anonymous'. + auth_token (str): the authentication token string. When auth_type is 'Basic', it must be + "user:password"; when auth_type is "Anonymous', it will be ignored; when auth_type is a custom-built + authenticator, it must conform to the specific requirement of the authenticator + """ self.host = host self.port = port self._auth_type = auth_type @@ -36,22 +113,21 @@ def __init__(self, host: str = None, self.grpc_channel = None self._r_lock = threading.RLock() self._is_alive = False + self._uri = f"dh+plain://{host}:{port}" self._connect() - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() + @property + def is_alive(self) -> bool: + """if the session is alive.""" + return self._is_alive def _connect(self) -> None: - target = ":".join([self.host, str(self.port)]) try: _j_host_config = (_JClientConfig.builder() - .target(_JDeephavenTarget.of(_JURI("dh+plain://" + target))) + .target(_JDeephavenTarget.of(_JURI(self._uri))) .build()) - _j_channel = _JChannelHelper.channel(_j_host_config) - _j_dh_channel = _JDeephavenChannelImpl(_j_channel) + self._j_channel = _JChannelHelper.channel(_j_host_config) + _j_dh_channel = _JDeephavenChannelImpl(self._j_channel) _j_session_config = (_JSessionImplConfig.builder() .executor(_JExecutors.newScheduledThreadPool(4)) @@ -59,9 +135,8 @@ def _connect(self) -> None: .channel(_j_dh_channel) .build()) self._j_session = _JSessionImpl.create(_j_session_config) + self._j_barrage_session = _JBarrageSession.create(self._j_session, _JRootAllocator(), self._j_channel) self._is_alive = True - - except Exception as e: raise DHError("failed to create a session to a remote Deephaven server.") from e @@ -80,11 +155,15 @@ def close(self): finally: self._is_alive = False - def fetch(self, shared_ticket) -> Table: + def fetch(self, shared_ticket: bytes) -> Table: """Fetches data from the Deephaven server.""" if not self._is_alive: raise DHError("the session is not alive.") try: - return Table(self._j_session.of(shared_ticket).table()) + j_table_handle = self._j_session.of(_JSharedId(shared_ticket).ticketId().table()) + j_barrage_subscription = self._j_barrage_session.subscribe(j_table_handle, + _JBarrageTableResolver.SUB_OPTIONS) + j_table = j_barrage_subscription.entireTable().get() + return Table(j_table) except Exception as e: raise DHError("failed to fetch data from the server.") from e diff --git a/py/server/tests/test_remote.py b/py/server/tests/test_remote.py index 50d23360e28..26e76bc2708 100644 --- a/py/server/tests/test_remote.py +++ b/py/server/tests/test_remote.py @@ -3,15 +3,21 @@ # import unittest - -from deephaven.remote import Session +from deephaven.remote import remote_session from tests.testbase import BaseTestCase class RemoteTestCase(BaseTestCase): def test_session(self): - session = Session(host="core-server-2-1", port=10000, auth_type="Anonymous") + session = remote_session(host="core-server-2-1", port=10000, auth_type="Anonymous") + t = session.fetch(b'hgd\xc6\xf3\xea\xd3\x15\xbabB#\x1e-\x94\xfcI') + self.assertEqual(t.size, 1000) + self.assertEqual(len(t.columns), 2) + sp = t.snapshot() + self.assertEqual(sp.size, 1000) + t1 = t.update("Z = X + Y") + self.assertEqual(t1.size, 1000) if __name__ == "__main__": From 43e16d5e86c0d704ed9227d454e2d5e70a1d457b Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 18 Apr 2024 15:56:06 -0600 Subject: [PATCH 05/25] Use the new BarrageSessionFactoryClient --- py/server/deephaven/remote.py | 193 +++++++++++++-------------------- py/server/tests/test_remote.py | 4 +- 2 files changed, 77 insertions(+), 120 deletions(-) diff --git a/py/server/deephaven/remote.py b/py/server/deephaven/remote.py index d74f9534339..9ef0d42b369 100644 --- a/py/server/deephaven/remote.py +++ b/py/server/deephaven/remote.py @@ -7,7 +7,7 @@ import jpy -from deephaven import DHError +from deephaven import DHError, uri from deephaven._wrapper import JObjectWrapper from deephaven.table import Table @@ -24,20 +24,19 @@ _JSharedId = jpy.get_type("io.deephaven.client.impl.SharedId") _JBarrageTableResolver = jpy.get_type("io.deephaven.server.uri.BarrageTableResolver") -_session_cache: Dict[str, RemoteSession] = {} # use WeakValueDictionary to avoid memory leak? +_session_cache: Dict[str, BarrageSession] = {} # use WeakValueDictionary to avoid memory leak? _remote_session_lock = threading.Lock() -def remote_session(host: str, - port: int = 10000, - auth_type: str = "Anonymous", - auth_token: str = "", - # never_timeout: bool = True, - # session_type: str = 'python', - # use_tls: bool = False, - # tls_root_certs: bytes = None, - # client_cert_chain: bytes = None, - ) -> RemoteSession: +def barrage_session(host: str, + port: int = 10000, + auth_type: str = "Anonymous", + auth_token: str = "", + use_tls: bool = False, + tls_root_certs: bytes = None, + client_cert_chain: bytes = None, + client_private_key: bytes = None, + ) -> BarrageSession: """Returns a Deephaven gRPC session to a remote server if a cached session is available; otherwise, creates a new session. @@ -50,120 +49,78 @@ def remote_session(host: str, auth_token (str): the authentication token string. When auth_type is 'Basic', it must be "user:password"; when auth_type is "Anonymous', it will be ignored; when auth_type is a custom-built authenticator, it must conform to the specific requirement of the authenticator + use_tls (bool): if True, use a TLS connection. Defaults to False + tls_root_certs (bytes): PEM encoded root certificates to use for TLS connection, or None to use system defaults. + If not None implies use a TLS connection and the use_tls argument should have been passed + as True. Defaults to None + client_cert_chain (bytes): PEM encoded client certificate if using mutual TLS. Defaults to None, + which implies not using mutual TLS. + client_private_key (bytes): PEM encoded client private key for client_cert_chain if using mutual TLS. + Defaults to None, which implies not using mutual TLS. + Returns: a Deephaven gRPC session Raises: DHError """ - with _remote_session_lock: - uri = f"dh+plain://{host}:{port}" - session = _session_cache.get(uri) - if session: - if session.is_alive: # doesn't guarantee the session is still alive, just that it hasn't been explicitly - # closed - return session - else: - del _session_cache[uri] - session = RemoteSession(host, port, auth_type, auth_token) - _session_cache[uri] = session - return session - - -class RemoteSession (JObjectWrapper): + try: + j_barrage_session_factory_client = uri.resolve( + "dh:///app/io.deephaven.server.barrage.BarrageSessionFactoryClient/field/instance") + + target_uri = f"{host}:{port}" + if use_tls: + target_uri = f"dh://{target_uri}" + else: + target_uri = f"dh+plain://{target_uri}" + + _j_client_config = (_JClientConfig.builder() + .target(_JDeephavenTarget.of(_JURI(target_uri))) + .build()) + auth = f"{auth_type} {auth_token}" + _j_barrage_session_factory = j_barrage_session_factory_client.factory(_j_client_config, auth) + return BarrageSession(_j_barrage_session_factory.newBarrageSession()) + except Exception as e: + raise DHError("failed to get a barrage session to the target remote Deephaven server.") from e + + +class BarrageSession (JObjectWrapper): """ A Deephaven gRPC session to a remote server.""" - # def __init__(self, j_barrage_session): - # self.j_barrage_session = j_barrage_session - # - # def subscribe(self, ticket: bytes): - # j_table_handle = self._j_session.of(_JSharedId(ticket).ticketId().table()) - # j_barrage_subscription = self._j_barrage_session.subscribe(j_table_handle, - # _JBarrageTableResolver.SUB_OPTIONS) - # j_table = j_barrage_subscription.entireTable().get() - # return Table(j_table) - # - # def snapshot(self): - # return self.j_barrage_session.snapshot() - def __init__(self, host: str, - port: int = 10000, - auth_type: str = "Anonymous", - auth_token: str = "", - # never_timeout: bool = True, - # session_type: str = 'python', - # use_tls: bool = False, - # tls_root_certs: bytes = None, - # client_cert_chain: bytes = None, - ): - """Creates a Deephaven gRPC session to a remote server. + j_object_type = _JBarrageSession + + @property + def j_object(self) -> jpy.JType: + return self.j_barrage_session + + def __init__(self, j_barrage_session): + self.j_barrage_session = j_barrage_session + self.j_session = j_barrage_session.session() + + def subscribe(self, ticket: bytes) -> Table: + """ TODO Args: - host (str): the host name or IP address of the Deephaven server. - port (int): the port number that the remote Deephaven server is listening on, default is 10000. - auth_type (str): the authentication type string, can be "Anonymous', 'Basic", or any custom-built - authenticator in the server, such as "io.deephaven.authentication.psk.PskAuthenticationHandler", - default is 'Anonymous'. - auth_token (str): the authentication token string. When auth_type is 'Basic', it must be - "user:password"; when auth_type is "Anonymous', it will be ignored; when auth_type is a custom-built - authenticator, it must conform to the specific requirement of the authenticator + ticket: + + Returns: + """ - self.host = host - self.port = port - self._auth_type = auth_type - self._auth_token = auth_token - self.grpc_channel = None - self._r_lock = threading.RLock() - self._is_alive = False - self._uri = f"dh+plain://{host}:{port}" - self._connect() + j_table_handle = self.j_session.of(_JSharedId(ticket).ticketId().table()) + j_barrage_subscription = self.j_barrage_session.subscribe(j_table_handle, + _JBarrageTableResolver.SUB_OPTIONS) + return Table(j_barrage_subscription.entireTable().get()) + + def snapshot(self, ticket: bytes) -> Table: + """ TODO + + Args: + ticket: + + Returns: + + """ + j_table_handle = self.j_session.of(_JSharedId(ticket).ticketId().table()) + j_barrage_snapshot = self.j_barrage_session.snapshot(j_table_handle, _JBarrageTableResolver.SNAP_OPTIONS) + return Table(j_barrage_snapshot.entireTable().get()) + - @property - def is_alive(self) -> bool: - """if the session is alive.""" - return self._is_alive - - def _connect(self) -> None: - try: - _j_host_config = (_JClientConfig.builder() - .target(_JDeephavenTarget.of(_JURI(self._uri))) - .build()) - self._j_channel = _JChannelHelper.channel(_j_host_config) - _j_dh_channel = _JDeephavenChannelImpl(self._j_channel) - - _j_session_config = (_JSessionImplConfig.builder() - .executor(_JExecutors.newScheduledThreadPool(4)) - .authenticationTypeAndValue(f"{self._auth_type} {self._auth_token}") - .channel(_j_dh_channel) - .build()) - self._j_session = _JSessionImpl.create(_j_session_config) - self._j_barrage_session = _JBarrageSession.create(self._j_session, _JRootAllocator(), self._j_channel) - self._is_alive = True - except Exception as e: - raise DHError("failed to create a session to a remote Deephaven server.") from e - - def close(self): - """Closes the gRPC connection.""" - if not self._is_alive: - return - - with self._r_lock: - if not self._is_alive: - return - try: - self._j_session.close() - except Exception as e: - raise DHError("failed to close the session.") from e - finally: - self._is_alive = False - - def fetch(self, shared_ticket: bytes) -> Table: - """Fetches data from the Deephaven server.""" - if not self._is_alive: - raise DHError("the session is not alive.") - try: - j_table_handle = self._j_session.of(_JSharedId(shared_ticket).ticketId().table()) - j_barrage_subscription = self._j_barrage_session.subscribe(j_table_handle, - _JBarrageTableResolver.SUB_OPTIONS) - j_table = j_barrage_subscription.entireTable().get() - return Table(j_table) - except Exception as e: - raise DHError("failed to fetch data from the server.") from e diff --git a/py/server/tests/test_remote.py b/py/server/tests/test_remote.py index 26e76bc2708..0df521af7c0 100644 --- a/py/server/tests/test_remote.py +++ b/py/server/tests/test_remote.py @@ -3,14 +3,14 @@ # import unittest -from deephaven.remote import remote_session +from deephaven.remote import barrage_session from tests.testbase import BaseTestCase class RemoteTestCase(BaseTestCase): def test_session(self): - session = remote_session(host="core-server-2-1", port=10000, auth_type="Anonymous") + session = barrage_session(host="core-server-2-1", port=10000, auth_type="Anonymous") t = session.fetch(b'hgd\xc6\xf3\xea\xd3\x15\xbabB#\x1e-\x94\xfcI') self.assertEqual(t.size, 1000) self.assertEqual(len(t.columns), 2) From efeb9ea134461ac1bacb1137725049945fe336c6 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 18 Apr 2024 19:31:12 -0600 Subject: [PATCH 06/25] Add TLS configuration --- py/server/deephaven/remote.py | 67 +++++++++++++++++++--------------- py/server/tests/test_remote.py | 12 ++++++ 2 files changed, 50 insertions(+), 29 deletions(-) diff --git a/py/server/deephaven/remote.py b/py/server/deephaven/remote.py index 9ef0d42b369..ab61ffb4f25 100644 --- a/py/server/deephaven/remote.py +++ b/py/server/deephaven/remote.py @@ -13,14 +13,10 @@ _JURI = jpy.get_type("java.net.URI") _JClientConfig = jpy.get_type("io.deephaven.client.impl.ClientConfig") -_JSessionImplConfig = jpy.get_type("io.deephaven.client.impl.SessionImplConfig") +_JSSLConfig = jpy.get_type("io.deephaven.ssl.config.SSLConfig") +_JTrustCustom = jpy.get_type("io.deephaven.ssl.config.TrustCustom") _JDeephavenTarget = jpy.get_type("io.deephaven.uri.DeephavenTarget") -_JChannelHelper = jpy.get_type("io.deephaven.client.impl.ChannelHelper") -_JDeephavenChannelImpl = jpy.get_type("io.deephaven.proto.DeephavenChannelImpl") -_JSessionImpl = jpy.get_type("io.deephaven.client.impl.SessionImpl") -_JExecutors = jpy.get_type("java.util.concurrent.Executors") _JBarrageSession = jpy.get_type("io.deephaven.client.impl.BarrageSession") -_JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator") _JSharedId = jpy.get_type("io.deephaven.client.impl.SharedId") _JBarrageTableResolver = jpy.get_type("io.deephaven.server.uri.BarrageTableResolver") @@ -34,12 +30,12 @@ def barrage_session(host: str, auth_token: str = "", use_tls: bool = False, tls_root_certs: bytes = None, - client_cert_chain: bytes = None, - client_private_key: bytes = None, ) -> BarrageSession: """Returns a Deephaven gRPC session to a remote server if a cached session is available; otherwise, creates a new session. + Note: client authentication is not supported yet. + Args: host (str): the host name or IP address of the Deephaven server. port (int): the port number that the remote Deephaven server is listening on, default is 10000. @@ -53,13 +49,9 @@ def barrage_session(host: str, tls_root_certs (bytes): PEM encoded root certificates to use for TLS connection, or None to use system defaults. If not None implies use a TLS connection and the use_tls argument should have been passed as True. Defaults to None - client_cert_chain (bytes): PEM encoded client certificate if using mutual TLS. Defaults to None, - which implies not using mutual TLS. - client_private_key (bytes): PEM encoded client private key for client_cert_chain if using mutual TLS. - Defaults to None, which implies not using mutual TLS. Returns: - a Deephaven gRPC session + a Deephaven Barrage session Raises: DHError @@ -67,6 +59,8 @@ def barrage_session(host: str, try: j_barrage_session_factory_client = uri.resolve( "dh:///app/io.deephaven.server.barrage.BarrageSessionFactoryClient/field/instance") + if tls_root_certs and not use_tls: + raise DHError(message="tls_root_certs is provided but use_tls is False") target_uri = f"{host}:{port}" if use_tls: @@ -74,18 +68,23 @@ def barrage_session(host: str, else: target_uri = f"dh+plain://{target_uri}" - _j_client_config = (_JClientConfig.builder() - .target(_JDeephavenTarget.of(_JURI(target_uri))) - .build()) + _j_client_config_builder = _JClientConfig.builder() + _j_client_config_builder.target(_JDeephavenTarget.of(_JURI(target_uri))) + if tls_root_certs: + _j_ssl_config =_JSSLConfig.builder().trust(_JTrustCustom.ofX509(tls_root_certs, 0, len(tls_root_certs))).build() + _j_client_config_builder.ssl(_j_ssl_config) + _j_client_config = _j_client_config_builder.build() auth = f"{auth_type} {auth_token}" + _j_barrage_session_factory = j_barrage_session_factory_client.factory(_j_client_config, auth) return BarrageSession(_j_barrage_session_factory.newBarrageSession()) except Exception as e: - raise DHError("failed to get a barrage session to the target remote Deephaven server.") from e + raise DHError(e,"failed to get a barrage session to the target remote Deephaven server.") from e class BarrageSession (JObjectWrapper): - """ A Deephaven gRPC session to a remote server.""" + """ A Deephaven Barrage session to a remote server.""" + j_object_type = _JBarrageSession @property @@ -100,27 +99,37 @@ def subscribe(self, ticket: bytes) -> Table: """ TODO Args: - ticket: + ticket (bytes): the bytes of the shared ticket Returns: + a Table + Raises: + DHError """ - j_table_handle = self.j_session.of(_JSharedId(ticket).ticketId().table()) - j_barrage_subscription = self.j_barrage_session.subscribe(j_table_handle, - _JBarrageTableResolver.SUB_OPTIONS) - return Table(j_barrage_subscription.entireTable().get()) + try: + j_table_handle = self.j_session.of(_JSharedId(ticket).ticketId().table()) + j_barrage_subscription = self.j_barrage_session.subscribe(j_table_handle, + _JBarrageTableResolver.SUB_OPTIONS) + return Table(j_barrage_subscription.entireTable().get()) + except Exception as e: + raise DHError(e, "failed to subscribe to the ticket.") from e def snapshot(self, ticket: bytes) -> Table: """ TODO Args: - ticket: + ticket (bytes): the bytes of the shared ticket Returns: - + a Table + Raises: + DHError """ - j_table_handle = self.j_session.of(_JSharedId(ticket).ticketId().table()) - j_barrage_snapshot = self.j_barrage_session.snapshot(j_table_handle, _JBarrageTableResolver.SNAP_OPTIONS) - return Table(j_barrage_snapshot.entireTable().get()) - + try: + j_table_handle = self.j_session.of(_JSharedId(ticket).ticketId().table()) + j_barrage_snapshot = self.j_barrage_session.snapshot(j_table_handle, _JBarrageTableResolver.SNAP_OPTIONS) + return Table(j_barrage_snapshot.entireTable().get()) + except Exception as e: + raise DHError(e, "failed to get a snapshot from the ticket.") from e diff --git a/py/server/tests/test_remote.py b/py/server/tests/test_remote.py index 0df521af7c0..5bd6eb75c4a 100644 --- a/py/server/tests/test_remote.py +++ b/py/server/tests/test_remote.py @@ -3,6 +3,10 @@ # import unittest + +import jpy + +from deephaven.jcompat import j_hashset from deephaven.remote import barrage_session from tests.testbase import BaseTestCase @@ -10,6 +14,14 @@ class RemoteTestCase(BaseTestCase): def test_session(self): + _JResolver = jpy.get_type("io.deephaven.server.uri.BarrageTableResolver") + _JUriResolvers = jpy.get_type("io.deephaven.uri.resolver.UriResolvers") + _JUriResolversInstance = jpy.get_type("io.deephaven.uri.resolver.UriResolversInstance") + j_resolver = _JResolver.get() + j_resolver_set = j_hashset({j_resolver}) + j_resolvers = _JUriResolvers(j_resolver_set) + _JUriResolversInstance.init(j_resolvers) + session = barrage_session(host="core-server-2-1", port=10000, auth_type="Anonymous") t = session.fetch(b'hgd\xc6\xf3\xea\xd3\x15\xbabB#\x1e-\x94\xfcI') self.assertEqual(t.size, 1000) From 4fa1e17b560438c50298daa52c1cf1e0ce6d96ae Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Fri, 19 Apr 2024 08:26:18 -0600 Subject: [PATCH 07/25] Refactoring and more tests --- py/server/deephaven/{remote.py => barrage.py} | 71 ++++++++++++++----- py/server/tests/test_barrage.py | 35 +++++++++ py/server/tests/test_remote.py | 36 ---------- 3 files changed, 88 insertions(+), 54 deletions(-) rename py/server/deephaven/{remote.py => barrage.py} (60%) create mode 100644 py/server/tests/test_barrage.py delete mode 100644 py/server/tests/test_remote.py diff --git a/py/server/deephaven/remote.py b/py/server/deephaven/barrage.py similarity index 60% rename from py/server/deephaven/remote.py rename to py/server/deephaven/barrage.py index ab61ffb4f25..13ca95b34f6 100644 --- a/py/server/deephaven/remote.py +++ b/py/server/deephaven/barrage.py @@ -1,6 +1,9 @@ # # Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # +""" This module defines the BarrageSession wrapper class and provides a factory function to create an instance of it + for accessing resources on remote Deephaven servers.""" + from __future__ import annotations import threading from typing import Dict @@ -14,11 +17,17 @@ _JURI = jpy.get_type("java.net.URI") _JClientConfig = jpy.get_type("io.deephaven.client.impl.ClientConfig") _JSSLConfig = jpy.get_type("io.deephaven.ssl.config.SSLConfig") +_JSessionImplConfig = jpy.get_type("io.deephaven.client.impl.SessionImplConfig") _JTrustCustom = jpy.get_type("io.deephaven.ssl.config.TrustCustom") _JDeephavenTarget = jpy.get_type("io.deephaven.uri.DeephavenTarget") _JBarrageSession = jpy.get_type("io.deephaven.client.impl.BarrageSession") _JSharedId = jpy.get_type("io.deephaven.client.impl.SharedId") _JBarrageTableResolver = jpy.get_type("io.deephaven.server.uri.BarrageTableResolver") +_JChannelHelper = jpy.get_type("io.deephaven.client.impl.ChannelHelper") +_JDeephavenChannelImpl = jpy.get_type("io.deephaven.proto.DeephavenChannelImpl") +_JSessionImpl = jpy.get_type("io.deephaven.client.impl.SessionImpl") +_JExecutors = jpy.get_type("java.util.concurrent.Executors") +_JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator") _session_cache: Dict[str, BarrageSession] = {} # use WeakValueDictionary to avoid memory leak? _remote_session_lock = threading.Lock() @@ -57,8 +66,6 @@ def barrage_session(host: str, DHError """ try: - j_barrage_session_factory_client = uri.resolve( - "dh:///app/io.deephaven.server.barrage.BarrageSessionFactoryClient/field/instance") if tls_root_certs and not use_tls: raise DHError(message="tls_root_certs is provided but use_tls is False") @@ -68,21 +75,49 @@ def barrage_session(host: str, else: target_uri = f"dh+plain://{target_uri}" - _j_client_config_builder = _JClientConfig.builder() - _j_client_config_builder.target(_JDeephavenTarget.of(_JURI(target_uri))) - if tls_root_certs: - _j_ssl_config =_JSSLConfig.builder().trust(_JTrustCustom.ofX509(tls_root_certs, 0, len(tls_root_certs))).build() - _j_client_config_builder.ssl(_j_ssl_config) - _j_client_config = _j_client_config_builder.build() + j_client_config = _build_client_config(target_uri, tls_root_certs) auth = f"{auth_type} {auth_token}" - _j_barrage_session_factory = j_barrage_session_factory_client.factory(_j_client_config, auth) - return BarrageSession(_j_barrage_session_factory.newBarrageSession()) + try: + return _get_barrage_session_uri(j_client_config, auth) + except: + return _get_barrage_session_direct(j_client_config, auth) except Exception as e: - raise DHError(e,"failed to get a barrage session to the target remote Deephaven server.") from e + raise DHError(e, "failed to get a barrage session to the target remote Deephaven server.") from e + + +def _get_barrage_session_uri(client_config, auth) -> BarrageSession: + j_barrage_session_factory_client = uri.resolve( + "dh:///app/io.deephaven.server.barrage.BarrageSessionFactoryClient/field/instance") + j_barrage_session_factory = j_barrage_session_factory_client.factory(client_config, auth) + return BarrageSession(j_barrage_session_factory.newBarrageSession()) + + +def _get_barrage_session_direct(client_config, auth) -> BarrageSession: + j_channel = _JChannelHelper.channel(client_config) + j_dh_channel = _JDeephavenChannelImpl(j_channel) + j_session_config = (_JSessionImplConfig.builder() + .executor(_JExecutors.newScheduledThreadPool(4)) + .authenticationTypeAndValue(auth) + .channel(j_dh_channel) + .build()) + j_session = _JSessionImpl.create(j_session_config) + return BarrageSession(_JBarrageSession.create(j_session, _JRootAllocator(), j_channel)) -class BarrageSession (JObjectWrapper): + +def _build_client_config(target_uri, tls_root_certs) -> jpy.JType: + j_client_config_builder = _JClientConfig.builder() + j_client_config_builder.target(_JDeephavenTarget.of(_JURI(target_uri))) + if tls_root_certs: + j_ssl_config = _JSSLConfig.builder().trust( + _JTrustCustom.ofX509(tls_root_certs, 0, len(tls_root_certs))).build() + j_client_config_builder.ssl(j_ssl_config) + j_client_config = j_client_config_builder.build() + return j_client_config + + +class BarrageSession(JObjectWrapper): """ A Deephaven Barrage session to a remote server.""" j_object_type = _JBarrageSession @@ -96,7 +131,7 @@ def __init__(self, j_barrage_session): self.j_session = j_barrage_session.session() def subscribe(self, ticket: bytes) -> Table: - """ TODO + """ Subscribes to a published remote table with given shared ticket. Args: ticket (bytes): the bytes of the shared ticket @@ -110,19 +145,20 @@ def subscribe(self, ticket: bytes) -> Table: try: j_table_handle = self.j_session.of(_JSharedId(ticket).ticketId().table()) j_barrage_subscription = self.j_barrage_session.subscribe(j_table_handle, - _JBarrageTableResolver.SUB_OPTIONS) + _JBarrageTableResolver.SUB_OPTIONS) return Table(j_barrage_subscription.entireTable().get()) except Exception as e: - raise DHError(e, "failed to subscribe to the ticket.") from e + raise DHError(e, "failed to subscribe to the remote table with the provided ticket.") from e def snapshot(self, ticket: bytes) -> Table: - """ TODO + """ Returns a snapshot of a published remote table with the given shared ticket. Args: ticket (bytes): the bytes of the shared ticket Returns: a Table + Raises: DHError """ @@ -131,5 +167,4 @@ def snapshot(self, ticket: bytes) -> Table: j_barrage_snapshot = self.j_barrage_session.snapshot(j_table_handle, _JBarrageTableResolver.SNAP_OPTIONS) return Table(j_barrage_snapshot.entireTable().get()) except Exception as e: - raise DHError(e, "failed to get a snapshot from the ticket.") from e - + raise DHError(e, "failed to take a snapshot of the remote table with the provided ticket.") from e diff --git a/py/server/tests/test_barrage.py b/py/server/tests/test_barrage.py new file mode 100644 index 00000000000..f5d0b6c5708 --- /dev/null +++ b/py/server/tests/test_barrage.py @@ -0,0 +1,35 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# + +import unittest + +from deephaven import DHError +from deephaven.barrage import barrage_session + +from tests.testbase import BaseTestCase + + +class BarrageTestCase(BaseTestCase): + def test_barrage_session(self): + session = barrage_session(host="localhost", port=10000, auth_type="Anonymous") + self.assertIsNotNone(session) + + with self.assertRaises(DHError): + barrage_session(host="localhost", port=10000, auth_type="Basic", auth_token="user:password") + + def test_subscribe(self): + session = barrage_session(host="core-server-2-1", port=10000, auth_type="Anonymous") + t = session.subscribe(b'h\xd1X\x10\xe6A\x14\xe8\xbb~E\xe3\xe7\xfal\xcb\x8d') + self.assertEqual(t.size, 1000) + self.assertEqual(len(t.columns), 2) + sp = t.snapshot() + self.assertEqual(sp.size, 1000) + t1 = t.update("Z = X + Y") + self.assertEqual(t1.size, 1000) + + def test_snapshot(self): + ... + +if __name__ == "__main__": + unittest.main() diff --git a/py/server/tests/test_remote.py b/py/server/tests/test_remote.py deleted file mode 100644 index 5bd6eb75c4a..00000000000 --- a/py/server/tests/test_remote.py +++ /dev/null @@ -1,36 +0,0 @@ -# -# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -# - -import unittest - -import jpy - -from deephaven.jcompat import j_hashset -from deephaven.remote import barrage_session - -from tests.testbase import BaseTestCase - - -class RemoteTestCase(BaseTestCase): - def test_session(self): - _JResolver = jpy.get_type("io.deephaven.server.uri.BarrageTableResolver") - _JUriResolvers = jpy.get_type("io.deephaven.uri.resolver.UriResolvers") - _JUriResolversInstance = jpy.get_type("io.deephaven.uri.resolver.UriResolversInstance") - j_resolver = _JResolver.get() - j_resolver_set = j_hashset({j_resolver}) - j_resolvers = _JUriResolvers(j_resolver_set) - _JUriResolversInstance.init(j_resolvers) - - session = barrage_session(host="core-server-2-1", port=10000, auth_type="Anonymous") - t = session.fetch(b'hgd\xc6\xf3\xea\xd3\x15\xbabB#\x1e-\x94\xfcI') - self.assertEqual(t.size, 1000) - self.assertEqual(len(t.columns), 2) - sp = t.snapshot() - self.assertEqual(sp.size, 1000) - t1 = t.update("Z = X + Y") - self.assertEqual(t1.size, 1000) - - -if __name__ == "__main__": - unittest.main() From 0af127ac801ae396db8aba248456fc61bc85be64 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Fri, 19 Apr 2024 20:04:23 -0600 Subject: [PATCH 08/25] Add an auto test case --- Integrations/build.gradle | 6 ++++ py/client/pydeephaven/session.py | 11 +++---- py/client/tests/test_session.py | 4 +-- py/server/deephaven/barrage.py | 4 +-- py/server/tests/test_barrage.py | 50 +++++++++++++++++++++++++++++--- 5 files changed, 60 insertions(+), 15 deletions(-) diff --git a/Integrations/build.gradle b/Integrations/build.gradle index 109fd30252d..fa0f4d962df 100644 --- a/Integrations/build.gradle +++ b/Integrations/build.gradle @@ -61,6 +61,9 @@ def runInDocker = { String name, String sourcePath, List command, Closur from(test.runtimeClasspath) { into 'classpath' } + from("${sourcePath}/../client") { + into 'python/client' + } // Unpack the config contents for now, since we don't seem to read the configs from inside a jar. // This does not add a task dependency, but we already put :configs in the testRuntime classpath, @@ -97,6 +100,9 @@ def runInDocker = { String name, String sourcePath, List command, Closur // copy in the contents that we do expect to change as the project updates copyFile 'python', '/python' copyFile 'classpath', '/classpath' + runCommand '''set -eux; \\ + pip3 install /python/client;''' + } entrypoint = command diff --git a/py/client/pydeephaven/session.py b/py/client/pydeephaven/session.py index c7bd95c3601..6a45723fad2 100644 --- a/py/client/pydeephaven/session.py +++ b/py/client/pydeephaven/session.py @@ -3,6 +3,7 @@ # """This module implements the Session class which provides methods to connect to and interact with the Deephaven server.""" +from __future__ import annotations import base64 import os @@ -89,11 +90,7 @@ def __init__(self, ticket_bytes: bytes): ticket_bytes (bytes): the raw bytes for the ticket """ self._ticket_bytes = ticket_bytes - - @property - def api_ticket(self): - """ The ticket object for use with Deephaven API calls.""" - return ticket_pb2.Ticket(ticket=b'h' + self._ticket_bytes) + self.api_ticket = ticket_pb2.Ticket(ticket=b'h' + self._ticket_bytes) @property def bytes(self): @@ -101,14 +98,14 @@ def bytes(self): return self._ticket_bytes @classmethod - def random_ticket(cls) -> 'SharedTicket': + def random_ticket(cls) -> SharedTicket: """Generates a random shared ticket. Returns: a SharedTicket object """ ticket_bytes = uuid4().int.to_bytes(16, byteorder='little', signed=False) - return cls(ticket_bytes=b'h' + ticket_bytes) + return cls(ticket_bytes=ticket_bytes) class Session: diff --git a/py/client/tests/test_session.py b/py/client/tests/test_session.py index 4a6123632f8..77182ddb215 100644 --- a/py/client/tests/test_session.py +++ b/py/client/tests/test_session.py @@ -340,8 +340,8 @@ def test_blink_input_table(self): blink_input_table.delete(dh_table.select(["f1"])) - def test_share_table(self): - pub_session = Session("localhost", 10008) + def test_publish_table(self): + pub_session = Session() t = pub_session.empty_table(1000).update(["X = i", "Y = 2*i"]) self.assertEqual(t.size, 1000) shared_ticket = SharedTicket.random_ticket() diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py index 13ca95b34f6..d25a386b720 100644 --- a/py/server/deephaven/barrage.py +++ b/py/server/deephaven/barrage.py @@ -81,7 +81,7 @@ def barrage_session(host: str, try: return _get_barrage_session_uri(j_client_config, auth) except: - return _get_barrage_session_direct(j_client_config, auth) + return _get_barrage_session_direct(j_client_config, auth) # fallback to the direct way, used for testing except Exception as e: raise DHError(e, "failed to get a barrage session to the target remote Deephaven server.") from e @@ -131,7 +131,7 @@ def __init__(self, j_barrage_session): self.j_session = j_barrage_session.session() def subscribe(self, ticket: bytes) -> Table: - """ Subscribes to a published remote table with given shared ticket. + """ Subscribes to a published remote table with the given shared ticket. Args: ticket (bytes): the bytes of the shared ticket diff --git a/py/server/tests/test_barrage.py b/py/server/tests/test_barrage.py index f5d0b6c5708..9d30c9f7488 100644 --- a/py/server/tests/test_barrage.py +++ b/py/server/tests/test_barrage.py @@ -1,9 +1,12 @@ # # Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # - +import os import unittest +import subprocess +import time + from deephaven import DHError from deephaven.barrage import barrage_session @@ -11,6 +14,41 @@ class BarrageTestCase(BaseTestCase): + shared_ticket = None + server_proc = None + + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + env = {"START_OPTS": "-DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler"} + env.update(dict(os.environ)) + cls.server_proc = subprocess.Popen(["/opt/deephaven/server/bin/start"], shell=True, env=env, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cls.py_client_publish_table() + + @classmethod + def tearDownClass(cls) -> None: + cls.server_proc.kill() + super().tearDownClass() + + @classmethod + def py_client_publish_table(cls): + from pydeephaven.session import Session, SharedTicket + + for _ in range(10): + try: + pub_session = Session("localhost", 10000) + break + except Exception as e: + time.sleep(1) + else: + raise + cls.t = pub_session.empty_table(1000).update(["X = i", "Y = 2*i"]) + cls.shared_ticket = SharedTicket.random_ticket() + pub_session.publish_table(cls.t, cls.shared_ticket) + cls.pub_session = pub_session + def test_barrage_session(self): session = barrage_session(host="localhost", port=10000, auth_type="Anonymous") self.assertIsNotNone(session) @@ -19,8 +57,8 @@ def test_barrage_session(self): barrage_session(host="localhost", port=10000, auth_type="Basic", auth_token="user:password") def test_subscribe(self): - session = barrage_session(host="core-server-2-1", port=10000, auth_type="Anonymous") - t = session.subscribe(b'h\xd1X\x10\xe6A\x14\xe8\xbb~E\xe3\xe7\xfal\xcb\x8d') + session = barrage_session(host="localhost", port=10000, auth_type="Anonymous") + t = session.subscribe(ticket=self.shared_ticket.bytes) self.assertEqual(t.size, 1000) self.assertEqual(len(t.columns), 2) sp = t.snapshot() @@ -29,7 +67,11 @@ def test_subscribe(self): self.assertEqual(t1.size, 1000) def test_snapshot(self): - ... + session = barrage_session(host="localhost", port=10000, auth_type="Anonymous") + t = session.snapshot(self.shared_ticket.bytes) + self.assertEqual(t.size, 1000) + self.assertEqual(len(t.columns), 2) + if __name__ == "__main__": unittest.main() From f4aed95f45d3ef4e34530bc2d20bd097b5e1bd8f Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Sat, 20 Apr 2024 11:01:43 -0600 Subject: [PATCH 09/25] Add more clarifying comments --- py/client/pydeephaven/_session_service.py | 10 +++++----- py/server/deephaven/barrage.py | 8 +++++++- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/py/client/pydeephaven/_session_service.py b/py/client/pydeephaven/_session_service.py index 99916ed484d..b4ca3a134b8 100644 --- a/py/client/pydeephaven/_session_service.py +++ b/py/client/pydeephaven/_session_service.py @@ -44,14 +44,14 @@ def release(self, ticket): raise DHError("failed to release a ticket.") from e - def publish(self, ticket, shared_ticket) -> None: - """Publishes a ticket to the shared ticket that can be fetched by other sessions. + def publish(self, source_ticket, result_ticket) -> None: + """Makes a copy from the source ticket and publishes it to the result ticket. Args: - ticket: The ticket to publish. - shared_ticket: The shared ticket to publish to. + source_ticket: The source ticket to publish from. + result_ticket: The result ticket to publish to. """ try: - self._grpc_session_stub.PublishFromTicket(session_pb2.PublishRequest(source_id=ticket, result_id=shared_ticket), metadata=self.session.grpc_metadata) + self._grpc_session_stub.PublishFromTicket(session_pb2.PublishRequest(source_id=source_ticket, result_id=result_ticket), metadata=self.session.grpc_metadata) except Exception as e: raise DHError("failed to publish a ticket.") from e \ No newline at end of file diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py index d25a386b720..4e816114ca2 100644 --- a/py/server/deephaven/barrage.py +++ b/py/server/deephaven/barrage.py @@ -81,7 +81,8 @@ def barrage_session(host: str, try: return _get_barrage_session_uri(j_client_config, auth) except: - return _get_barrage_session_direct(j_client_config, auth) # fallback to the direct way, used for testing + # fall back to the direct way when we don't have a fully initialized server, used for testing + return _get_barrage_session_direct(j_client_config, auth) except Exception as e: raise DHError(e, "failed to get a barrage session to the target remote Deephaven server.") from e @@ -94,6 +95,11 @@ def _get_barrage_session_uri(client_config, auth) -> BarrageSession: def _get_barrage_session_direct(client_config, auth) -> BarrageSession: + """Note, this is used for testing only. This way of constructing a Barrage session is less efficient because it does + not share any of the state or configuration that the server provides; namely, when you are doing it with the server + context it provides a singleton executor, allocator, outbound SSL configuration, and the ability for the server to + hook in additional channel building options. + """ j_channel = _JChannelHelper.channel(client_config) j_dh_channel = _JDeephavenChannelImpl(j_channel) From f92783d4b06b55c165655850ad11c4007f7c0bda Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Sat, 20 Apr 2024 15:01:55 -0600 Subject: [PATCH 10/25] Additional docstring --- py/client/pydeephaven/session.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/py/client/pydeephaven/session.py b/py/client/pydeephaven/session.py index 6a45723fad2..8eb9191a51a 100644 --- a/py/client/pydeephaven/session.py +++ b/py/client/pydeephaven/session.py @@ -99,7 +99,8 @@ def bytes(self): @classmethod def random_ticket(cls) -> SharedTicket: - """Generates a random shared ticket. + """Generates a random shared ticket. To minimize the probability of collision, the ticket is made using a + generated UUID. Returns: a SharedTicket object From a7a2977a3bcc22238cdfeed130de10f90eb6af45 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Mon, 22 Apr 2024 10:52:39 -0600 Subject: [PATCH 11/25] Get ready for rebase to main --- Integrations/build.gradle | 6 +- py/client/pydeephaven/_session_service.py | 2 +- py/client/pydeephaven/session.py | 29 ++--- py/client/tests/test_session.py | 2 +- py/server/deephaven/barrage.py | 138 ++++++++++++---------- py/server/tests/test_barrage.py | 75 ++++++++++-- 6 files changed, 161 insertions(+), 91 deletions(-) diff --git a/Integrations/build.gradle b/Integrations/build.gradle index fa0f4d962df..8f324c2178f 100644 --- a/Integrations/build.gradle +++ b/Integrations/build.gradle @@ -62,6 +62,9 @@ def runInDocker = { String name, String sourcePath, List command, Closur into 'classpath' } from("${sourcePath}/../client") { + include 'setup.py' + include 'pydeephaven/**' + include 'README.md' into 'python/client' } @@ -100,8 +103,7 @@ def runInDocker = { String name, String sourcePath, List command, Closur // copy in the contents that we do expect to change as the project updates copyFile 'python', '/python' copyFile 'classpath', '/classpath' - runCommand '''set -eux; \\ - pip3 install /python/client;''' + runCommand '''pip3 install /python/client''' } entrypoint = command diff --git a/py/client/pydeephaven/_session_service.py b/py/client/pydeephaven/_session_service.py index b4ca3a134b8..bb317788dcf 100644 --- a/py/client/pydeephaven/_session_service.py +++ b/py/client/pydeephaven/_session_service.py @@ -44,7 +44,7 @@ def release(self, ticket): raise DHError("failed to release a ticket.") from e - def publish(self, source_ticket, result_ticket) -> None: + def publish(self, source_ticket: ticket_pb2.Ticket, result_ticket: ticket_pb2.Ticket) -> None: """Makes a copy from the source ticket and publishes it to the result ticket. Args: diff --git a/py/client/pydeephaven/session.py b/py/client/pydeephaven/session.py index 8eb9191a51a..3ea14172685 100644 --- a/py/client/pydeephaven/session.py +++ b/py/client/pydeephaven/session.py @@ -90,23 +90,23 @@ def __init__(self, ticket_bytes: bytes): ticket_bytes (bytes): the raw bytes for the ticket """ self._ticket_bytes = ticket_bytes - self.api_ticket = ticket_pb2.Ticket(ticket=b'h' + self._ticket_bytes) + self.api_ticket = ticket_pb2.Ticket(ticket=self._ticket_bytes) @property - def bytes(self): + def bytes(self) -> bytes: """ The raw bytes for the ticket.""" return self._ticket_bytes @classmethod def random_ticket(cls) -> SharedTicket: - """Generates a random shared ticket. To minimize the probability of collision, the ticket is made using a + """Generates a random shared ticket. To minimize the probability of collision, the ticket is made from a generated UUID. Returns: a SharedTicket object """ - ticket_bytes = uuid4().int.to_bytes(16, byteorder='little', signed=False) - return cls(ticket_bytes=ticket_bytes) + bytes_ = uuid4().int.to_bytes(16, byteorder='little', signed=False) + return cls(ticket_bytes=b'h' + bytes_) class Session: @@ -448,8 +448,9 @@ def open_table(self, name: str) -> Table: faketable.ticket = None faketable.schema = None - def publish_table(self, table: Table, shared_ticket: SharedTicket) -> None: - """Publishes a table with the given shared ticket for sharing with other sessions. + def publish_table(self, ticket: SharedTicket, table: Table) -> None: + """Publishes a table to the given shared ticket. The ticket can then be used by another session to fetch the + table. Note that, the shared ticket can be fetched by other sessions to access the table as long as the table is not released. When the table is released either through an explicit call of the close method on it, or @@ -457,19 +458,19 @@ def publish_table(self, table: Table, shared_ticket: SharedTicket) -> None: no longer be valid. Args: + ticket (SharedTicket): a SharedTicket object table (Table): a Table object - shared_ticket (SharedTicket): a SharedTicket object Raises: DHError """ - return self._session_service.publish(table.ticket, shared_ticket.api_ticket) + self._session_service.publish(table.ticket, ticket.api_ticket) def fetch_table(self, ticket: SharedTicket) -> Table: """Fetches a table by ticket. Args: - ticket (Ticket): a ticket + ticket (SharedTicket): a ticket Returns: a Table object @@ -477,16 +478,16 @@ def fetch_table(self, ticket: SharedTicket) -> Table: Raises: DHError """ - faketable = Table(session=self, ticket=ticket.api_ticket) + table = Table(session=self, ticket=ticket.api_ticket) try: table_op = FetchTableOp() - return self.table_service.grpc_table_op(faketable, table_op) + return self.table_service.grpc_table_op(table, table_op) except Exception as e: raise DHError("could not fetch table by ticket") from e finally: # Explicitly close the table without releasing it (because it isn't ours) - faketable.ticket = None - faketable.schema = None + table.ticket = None + table.schema = None def bind_table(self, name: str, table: Table) -> None: """Binds a table to the given name on the server so that it can be referenced by that name. diff --git a/py/client/tests/test_session.py b/py/client/tests/test_session.py index 77182ddb215..568c8a91793 100644 --- a/py/client/tests/test_session.py +++ b/py/client/tests/test_session.py @@ -345,7 +345,7 @@ def test_publish_table(self): t = pub_session.empty_table(1000).update(["X = i", "Y = 2*i"]) self.assertEqual(t.size, 1000) shared_ticket = SharedTicket.random_ticket() - pub_session.publish_table(t, shared_ticket) + pub_session.publish_table(shared_ticket, t) sub_session1 = Session() t1 = sub_session1.fetch_table(shared_ticket) diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py index 4e816114ca2..1bd961a6b4d 100644 --- a/py/server/deephaven/barrage.py +++ b/py/server/deephaven/barrage.py @@ -15,13 +15,14 @@ from deephaven.table import Table _JURI = jpy.get_type("java.net.URI") +_JTimeUnit = jpy.get_type("java.util.concurrent.TimeUnit") _JClientConfig = jpy.get_type("io.deephaven.client.impl.ClientConfig") _JSSLConfig = jpy.get_type("io.deephaven.ssl.config.SSLConfig") _JSessionImplConfig = jpy.get_type("io.deephaven.client.impl.SessionImplConfig") _JTrustCustom = jpy.get_type("io.deephaven.ssl.config.TrustCustom") _JDeephavenTarget = jpy.get_type("io.deephaven.uri.DeephavenTarget") _JBarrageSession = jpy.get_type("io.deephaven.client.impl.BarrageSession") -_JSharedId = jpy.get_type("io.deephaven.client.impl.SharedId") +_JTableSpec = jpy.get_type("io.deephaven.qst.table.TableSpec") _JBarrageTableResolver = jpy.get_type("io.deephaven.server.uri.BarrageTableResolver") _JChannelHelper = jpy.get_type("io.deephaven.client.impl.ChannelHelper") _JDeephavenChannelImpl = jpy.get_type("io.deephaven.proto.DeephavenChannelImpl") @@ -29,9 +30,72 @@ _JExecutors = jpy.get_type("java.util.concurrent.Executors") _JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator") -_session_cache: Dict[str, BarrageSession] = {} # use WeakValueDictionary to avoid memory leak? -_remote_session_lock = threading.Lock() +class BarrageSession(): + """ A Deephaven Barrage session to a remote server.""" + + def __init__(self, j_barrage_session: jpy.JType, j_managed_channel: jpy.JType = None): + self.j_barrage_session = j_barrage_session + self.j_session = j_barrage_session.session() + self.j_managed_channel = j_managed_channel + + def __enter__(self) -> BarrageSession: + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.close() + + def close(self) -> None: + """ Closes the session.""" + try: + self.j_barrage_session.close() + if self.j_managed_channel: + self.j_managed_channel.shutdown() + self.j_managed_channel.awaitTermination(5, _JTimeUnit.SECONDS) + except Exception as e: + raise DHError(e, "failed to close the barrage session.") from e + + def subscribe(self, ticket: bytes) -> Table: + """ Subscribes to a published remote table with the given ticket. + Note, if the remote table is closed or its owner session is closed, the ticket becomes invalid. + + Args: + ticket (bytes): the bytes of the ticket + + Returns: + a Table + + Raises: + DHError + """ + try: + j_table_handle = self.j_session.of(_JTableSpec.ticket(ticket)) + j_barrage_subscription = self.j_barrage_session.subscribe(j_table_handle, + _JBarrageTableResolver.SUB_OPTIONS) + return Table(j_barrage_subscription.entireTable().get()) + except Exception as e: + raise DHError(e, "failed to subscribe to the remote table with the provided ticket.") from e + + def snapshot(self, ticket: bytes) -> Table: + """ Returns a snapshot of a published remote table with the given ticket. + + Note, if the remote table is closed or its owner session is closed, the ticket becomes invalid. + + Args: + ticket (bytes): the bytes of the ticket + + Returns: + a Table + + Raises: + DHError + """ + try: + j_table_handle = self.j_session.of(_JTableSpec.ticket(ticket)) + j_barrage_snapshot = self.j_barrage_session.snapshot(j_table_handle, _JBarrageTableResolver.SNAP_OPTIONS) + return Table(j_barrage_snapshot.entireTable().get()) + except Exception as e: + raise DHError(e, "failed to take a snapshot of the remote table with the provided ticket.") from e def barrage_session(host: str, port: int = 10000, @@ -81,24 +145,27 @@ def barrage_session(host: str, try: return _get_barrage_session_uri(j_client_config, auth) except: - # fall back to the direct way when we don't have a fully initialized server, used for testing + # fall back to the direct way when we don't have a fully initialized server, used for testing only + # TODO: remove when we are done with restructuring the integrations tests wiring https://github.com/deephaven/deephaven-core/issues/5401 return _get_barrage_session_direct(j_client_config, auth) except Exception as e: raise DHError(e, "failed to get a barrage session to the target remote Deephaven server.") from e -def _get_barrage_session_uri(client_config, auth) -> BarrageSession: +def _get_barrage_session_uri(client_config: jpy.JType, auth: str) -> BarrageSession: j_barrage_session_factory_client = uri.resolve( "dh:///app/io.deephaven.server.barrage.BarrageSessionFactoryClient/field/instance") j_barrage_session_factory = j_barrage_session_factory_client.factory(client_config, auth) - return BarrageSession(j_barrage_session_factory.newBarrageSession()) + return BarrageSession(j_barrage_session_factory.newBarrageSession(), j_barrage_session_factory.managedChannel()) -def _get_barrage_session_direct(client_config, auth) -> BarrageSession: +def _get_barrage_session_direct(client_config: jpy.JType, auth: str) -> BarrageSession: """Note, this is used for testing only. This way of constructing a Barrage session is less efficient because it does not share any of the state or configuration that the server provides; namely, when you are doing it with the server context it provides a singleton executor, allocator, outbound SSL configuration, and the ability for the server to hook in additional channel building options. + + TODO: remove when we are done with restructuring the integrations tests wiring https://github.com/deephaven/deephaven-core/issues/5401. """ j_channel = _JChannelHelper.channel(client_config) j_dh_channel = _JDeephavenChannelImpl(j_channel) @@ -109,10 +176,10 @@ def _get_barrage_session_direct(client_config, auth) -> BarrageSession: .channel(j_dh_channel) .build()) j_session = _JSessionImpl.create(j_session_config) - return BarrageSession(_JBarrageSession.create(j_session, _JRootAllocator(), j_channel)) + return BarrageSession(_JBarrageSession.create(j_session, _JRootAllocator(), j_channel), j_channel) -def _build_client_config(target_uri, tls_root_certs) -> jpy.JType: +def _build_client_config(target_uri: str, tls_root_certs: bytes) -> jpy.JType: j_client_config_builder = _JClientConfig.builder() j_client_config_builder.target(_JDeephavenTarget.of(_JURI(target_uri))) if tls_root_certs: @@ -121,56 +188,3 @@ def _build_client_config(target_uri, tls_root_certs) -> jpy.JType: j_client_config_builder.ssl(j_ssl_config) j_client_config = j_client_config_builder.build() return j_client_config - - -class BarrageSession(JObjectWrapper): - """ A Deephaven Barrage session to a remote server.""" - - j_object_type = _JBarrageSession - - @property - def j_object(self) -> jpy.JType: - return self.j_barrage_session - - def __init__(self, j_barrage_session): - self.j_barrage_session = j_barrage_session - self.j_session = j_barrage_session.session() - - def subscribe(self, ticket: bytes) -> Table: - """ Subscribes to a published remote table with the given shared ticket. - - Args: - ticket (bytes): the bytes of the shared ticket - - Returns: - a Table - - Raises: - DHError - """ - try: - j_table_handle = self.j_session.of(_JSharedId(ticket).ticketId().table()) - j_barrage_subscription = self.j_barrage_session.subscribe(j_table_handle, - _JBarrageTableResolver.SUB_OPTIONS) - return Table(j_barrage_subscription.entireTable().get()) - except Exception as e: - raise DHError(e, "failed to subscribe to the remote table with the provided ticket.") from e - - def snapshot(self, ticket: bytes) -> Table: - """ Returns a snapshot of a published remote table with the given shared ticket. - - Args: - ticket (bytes): the bytes of the shared ticket - - Returns: - a Table - - Raises: - DHError - """ - try: - j_table_handle = self.j_session.of(_JSharedId(ticket).ticketId().table()) - j_barrage_snapshot = self.j_barrage_session.snapshot(j_table_handle, _JBarrageTableResolver.SNAP_OPTIONS) - return Table(j_barrage_snapshot.entireTable().get()) - except Exception as e: - raise DHError(e, "failed to take a snapshot of the remote table with the provided ticket.") from e diff --git a/py/server/tests/test_barrage.py b/py/server/tests/test_barrage.py index 9d30c9f7488..c77bc599573 100644 --- a/py/server/tests/test_barrage.py +++ b/py/server/tests/test_barrage.py @@ -22,10 +22,10 @@ def setUpClass(cls) -> None: super().setUpClass() env = {"START_OPTS": "-DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler"} env.update(dict(os.environ)) - cls.server_proc = subprocess.Popen(["/opt/deephaven/server/bin/start"], shell=True, env=env, + cls.server_proc = subprocess.Popen(["/opt/deephaven/server/bin/start"], shell=False, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - cls.py_client_publish_table() + cls.ensure_server_running() @classmethod def tearDownClass(cls) -> None: @@ -33,26 +33,36 @@ def tearDownClass(cls) -> None: super().tearDownClass() @classmethod - def py_client_publish_table(cls): - from pydeephaven.session import Session, SharedTicket + def ensure_server_running(cls): + from pydeephaven.session import Session - for _ in range(10): + for _ in range(30): try: - pub_session = Session("localhost", 10000) + Session("localhost", 10000) break except Exception as e: time.sleep(1) else: - raise - cls.t = pub_session.empty_table(1000).update(["X = i", "Y = 2*i"]) - cls.shared_ticket = SharedTicket.random_ticket() - pub_session.publish_table(cls.t, cls.shared_ticket) - cls.pub_session = pub_session + raise RuntimeError("Cannot connect to the server") + + def setUp(self) -> None: + from pydeephaven.session import Session, SharedTicket + + self.pub_session = Session("localhost", 10000) + self.t = self.pub_session.empty_table(1000).update(["X = i", "Y = 2*i"]) + self.shared_ticket = SharedTicket.random_ticket() + self.pub_session.publish_table(self.shared_ticket, self.t) + + def tearDown(self) -> None: + self.pub_session.close() def test_barrage_session(self): session = barrage_session(host="localhost", port=10000, auth_type="Anonymous") self.assertIsNotNone(session) + with self.assertRaises(DHError): + barrage_session(host="invalid", port=10000, auth_token="Anonymous") + with self.assertRaises(DHError): barrage_session(host="localhost", port=10000, auth_type="Basic", auth_token="user:password") @@ -66,11 +76,54 @@ def test_subscribe(self): t1 = t.update("Z = X + Y") self.assertEqual(t1.size, 1000) + with self.subTest("using barrage session as a context manager"): + with barrage_session(host="localhost", port=10000, auth_type="Anonymous") as cm: + t = cm.subscribe(ticket=self.shared_ticket.bytes) + t1 = t.update("Z = X + Y") + self.assertEqual(t1.size, 1000) + + with self.subTest("Invalid ticket"): + with self.assertRaises(DHError) as cm: + session.subscribe(ticket=self.shared_ticket.bytes + b"1") + + with self.subTest("Table is closed"): + self.t.close() + with self.assertRaises(DHError) as cm: + session.subscribe(ticket=self.shared_ticket.bytes) + + with self.subTest("publishing session is gone"): + self.pub_session.close() + with self.assertRaises(DHError) as cm: + session.subscribe(ticket=self.shared_ticket.bytes) + + def test_snapshot(self): session = barrage_session(host="localhost", port=10000, auth_type="Anonymous") t = session.snapshot(self.shared_ticket.bytes) self.assertEqual(t.size, 1000) self.assertEqual(len(t.columns), 2) + t1 = t.update("Z = X + Y") + self.assertEqual(t1.size, 1000) + + with self.subTest("using barrage session as a context manager"): + with barrage_session(host="localhost", port=10000, auth_type="Anonymous") as cm: + t = cm.snapshot(ticket=self.shared_ticket.bytes) + t1 = t.update("Z = X + Y") + self.assertEqual(t1.size, 1000) + + with self.subTest("Invalid ticket"): + with self.assertRaises(DHError) as cm: + session.snapshot(ticket=self.shared_ticket.bytes + b"1") + + with self.subTest("Table is closed"): + self.t.close() + with self.assertRaises(DHError) as cm: + session.snapshot(ticket=self.shared_ticket.bytes) + + with self.subTest("publishing session is gone"): + self.pub_session.close() + with self.assertRaises(DHError) as cm: + session.snapshot(ticket=self.shared_ticket.bytes) if __name__ == "__main__": From 026231e9f764f38524cf9a505b0133df3946a50f Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Wed, 24 Apr 2024 16:09:24 -0600 Subject: [PATCH 12/25] Rebase and adapt to new Barrage session factory --- .../io/deephaven/client/impl/BarrageSession.java | 2 +- py/server/deephaven/barrage.py | 15 ++++++--------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java index 1702519d1be..15c592a71e0 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java @@ -35,7 +35,7 @@ public static BarrageSession create( SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) { final FlightClient client = FlightGrpcUtilsExtension.createFlightClientWithSharedChannel( incomingAllocator, channel, Collections.singletonList(new SessionMiddleware(session))); - return new BarrageSession(session, client, channel); + return new BarrageSession(session, client); } protected BarrageSession( diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py index 1bd961a6b4d..88c6f3374a3 100644 --- a/py/server/deephaven/barrage.py +++ b/py/server/deephaven/barrage.py @@ -5,13 +5,10 @@ for accessing resources on remote Deephaven servers.""" from __future__ import annotations -import threading -from typing import Dict import jpy -from deephaven import DHError, uri -from deephaven._wrapper import JObjectWrapper +from deephaven import DHError from deephaven.table import Table _JURI = jpy.get_type("java.net.URI") @@ -29,6 +26,7 @@ _JSessionImpl = jpy.get_type("io.deephaven.client.impl.SessionImpl") _JExecutors = jpy.get_type("java.util.concurrent.Executors") _JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator") +_JDeephavenApiServer = jpy.get_type("io.deephaven.server.runner.DeephavenApiServer") class BarrageSession(): """ A Deephaven Barrage session to a remote server.""" @@ -143,7 +141,7 @@ def barrage_session(host: str, auth = f"{auth_type} {auth_token}" try: - return _get_barrage_session_uri(j_client_config, auth) + return _get_barrage_session_via_api_server(j_client_config, auth) except: # fall back to the direct way when we don't have a fully initialized server, used for testing only # TODO: remove when we are done with restructuring the integrations tests wiring https://github.com/deephaven/deephaven-core/issues/5401 @@ -152,10 +150,9 @@ def barrage_session(host: str, raise DHError(e, "failed to get a barrage session to the target remote Deephaven server.") from e -def _get_barrage_session_uri(client_config: jpy.JType, auth: str) -> BarrageSession: - j_barrage_session_factory_client = uri.resolve( - "dh:///app/io.deephaven.server.barrage.BarrageSessionFactoryClient/field/instance") - j_barrage_session_factory = j_barrage_session_factory_client.factory(client_config, auth) +def _get_barrage_session_via_api_server(client_config: jpy.JType, auth: str) -> BarrageSession: + j_barrage_session_factory_creator = _JDeephavenApiServer.getInstance().sessionFactoryCreator() + j_barrage_session_factory = j_barrage_session_factory_creator.barrageFactory(client_config, auth) return BarrageSession(j_barrage_session_factory.newBarrageSession(), j_barrage_session_factory.managedChannel()) From 11e0ae3d94dcacd642fd40172282fa662654e7bc Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 09:56:52 -0600 Subject: [PATCH 13/25] Fix issues in implementation and tests --- .../io/deephaven/client/impl/BarrageSession.java | 7 ++++--- py/server/deephaven/barrage.py | 15 ++++++++++++--- py/server/tests/test_barrage.py | 9 ++++++--- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java index 15c592a71e0..6bf19e59937 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java @@ -31,11 +31,12 @@ public static BarrageSession of( return new BarrageSession(session, client); } + // TODO (called in the Python server API) to be removed in the future if we can make JPY capable of selecting the + // right factory method to use when the same method is present in the class hierarchy multiple times public static BarrageSession create( SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) { - final FlightClient client = FlightGrpcUtilsExtension.createFlightClientWithSharedChannel( - incomingAllocator, channel, Collections.singletonList(new SessionMiddleware(session))); - return new BarrageSession(session, client); + + return BarrageSession.of(session, incomingAllocator, channel); } protected BarrageSession( diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py index 88c6f3374a3..d0ef6500072 100644 --- a/py/server/deephaven/barrage.py +++ b/py/server/deephaven/barrage.py @@ -47,7 +47,7 @@ def close(self) -> None: try: self.j_barrage_session.close() if self.j_managed_channel: - self.j_managed_channel.shutdown() + self.j_managed_channel.shutdownNow() self.j_managed_channel.awaitTermination(5, _JTimeUnit.SECONDS) except Exception as e: raise DHError(e, "failed to close the barrage session.") from e @@ -172,8 +172,17 @@ def _get_barrage_session_direct(client_config: jpy.JType, auth: str) -> BarrageS .authenticationTypeAndValue(auth) .channel(j_dh_channel) .build()) - j_session = _JSessionImpl.create(j_session_config) - return BarrageSession(_JBarrageSession.create(j_session, _JRootAllocator(), j_channel), j_channel) + try: + j_session = _JSessionImpl.create(j_session_config) + except Exception as e: + # if the connection to the host can't be established, we should clean up the resources + j_session_config.executor().shutdownNow() + j_channel.shutdownNow() + j_channel.awaitTermination(5, _JTimeUnit.SECONDS) + raise + + j_barrage_session =_JBarrageSession.create(j_session, _JRootAllocator(), j_channel) + return BarrageSession(j_barrage_session, j_channel) def _build_client_config(target_uri: str, tls_root_certs: bytes) -> jpy.JType: diff --git a/py/server/tests/test_barrage.py b/py/server/tests/test_barrage.py index c77bc599573..d429c955cb5 100644 --- a/py/server/tests/test_barrage.py +++ b/py/server/tests/test_barrage.py @@ -20,7 +20,7 @@ class BarrageTestCase(BaseTestCase): @classmethod def setUpClass(cls) -> None: super().setUpClass() - env = {"START_OPTS": "-DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler"} + env = {"START_OPTS": "-DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler -Ddeephaven.cacheDir=/cache/tmp"} env.update(dict(os.environ)) cls.server_proc = subprocess.Popen(["/opt/deephaven/server/bin/start"], shell=False, env=env, stdin=subprocess.PIPE, @@ -79,8 +79,8 @@ def test_subscribe(self): with self.subTest("using barrage session as a context manager"): with barrage_session(host="localhost", port=10000, auth_type="Anonymous") as cm: t = cm.subscribe(ticket=self.shared_ticket.bytes) - t1 = t.update("Z = X + Y") - self.assertEqual(t1.size, 1000) + with self.assertRaises(DHError): + t1 = t.update("Z = X + Y") with self.subTest("Invalid ticket"): with self.assertRaises(DHError) as cm: @@ -96,6 +96,7 @@ def test_subscribe(self): with self.assertRaises(DHError) as cm: session.subscribe(ticket=self.shared_ticket.bytes) + session.close() def test_snapshot(self): session = barrage_session(host="localhost", port=10000, auth_type="Anonymous") @@ -125,6 +126,8 @@ def test_snapshot(self): with self.assertRaises(DHError) as cm: session.snapshot(ticket=self.shared_ticket.bytes) + session.close() + if __name__ == "__main__": unittest.main() From b11fe276b048754fa4e726ff32016f1e615523df Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 10:27:14 -0600 Subject: [PATCH 14/25] Fix a buy in create session via factory --- py/server/deephaven/barrage.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py index d0ef6500072..b31830986d9 100644 --- a/py/server/deephaven/barrage.py +++ b/py/server/deephaven/barrage.py @@ -16,6 +16,7 @@ _JClientConfig = jpy.get_type("io.deephaven.client.impl.ClientConfig") _JSSLConfig = jpy.get_type("io.deephaven.ssl.config.SSLConfig") _JSessionImplConfig = jpy.get_type("io.deephaven.client.impl.SessionImplConfig") +_JSessionConfig = jpy.get_type("io.deephaven.client.impl.SessionConfig") _JTrustCustom = jpy.get_type("io.deephaven.ssl.config.TrustCustom") _JDeephavenTarget = jpy.get_type("io.deephaven.uri.DeephavenTarget") _JBarrageSession = jpy.get_type("io.deephaven.client.impl.BarrageSession") @@ -28,6 +29,7 @@ _JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator") _JDeephavenApiServer = jpy.get_type("io.deephaven.server.runner.DeephavenApiServer") + class BarrageSession(): """ A Deephaven Barrage session to a remote server.""" @@ -95,6 +97,7 @@ def snapshot(self, ticket: bytes) -> Table: except Exception as e: raise DHError(e, "failed to take a snapshot of the remote table with the provided ticket.") from e + def barrage_session(host: str, port: int = 10000, auth_type: str = "Anonymous", @@ -144,7 +147,8 @@ def barrage_session(host: str, return _get_barrage_session_via_api_server(j_client_config, auth) except: # fall back to the direct way when we don't have a fully initialized server, used for testing only - # TODO: remove when we are done with restructuring the integrations tests wiring https://github.com/deephaven/deephaven-core/issues/5401 + # TODO: remove when we are done with restructuring the integrations tests wiring + # https://github.com/deephaven/deephaven-core/issues/5401 return _get_barrage_session_direct(j_client_config, auth) except Exception as e: raise DHError(e, "failed to get a barrage session to the target remote Deephaven server.") from e @@ -152,9 +156,16 @@ def barrage_session(host: str, def _get_barrage_session_via_api_server(client_config: jpy.JType, auth: str) -> BarrageSession: j_barrage_session_factory_creator = _JDeephavenApiServer.getInstance().sessionFactoryCreator() - j_barrage_session_factory = j_barrage_session_factory_creator.barrageFactory(client_config, auth) - return BarrageSession(j_barrage_session_factory.newBarrageSession(), j_barrage_session_factory.managedChannel()) - + j_barrage_session_factory = j_barrage_session_factory_creator.barrageFactory(client_config) + j_managed_channel = j_barrage_session_factory.managedChannel() + if auth: + j_session_config = (_JSessionConfig.builder() + .authenticationTypeAndValue(auth) + .build()) + j_barrage_session = j_barrage_session_factory.newBarrageSession(j_session_config) + else: + j_barrage_session = j_barrage_session_factory.newBarrageSession() + return BarrageSession(j_barrage_session, j_managed_channel) def _get_barrage_session_direct(client_config: jpy.JType, auth: str) -> BarrageSession: """Note, this is used for testing only. This way of constructing a Barrage session is less efficient because it does @@ -162,7 +173,8 @@ def _get_barrage_session_direct(client_config: jpy.JType, auth: str) -> BarrageS context it provides a singleton executor, allocator, outbound SSL configuration, and the ability for the server to hook in additional channel building options. - TODO: remove when we are done with restructuring the integrations tests wiring https://github.com/deephaven/deephaven-core/issues/5401. + TODO: remove when we are done with restructuring the integrations tests wiring + https://github.com/deephaven/deephaven-core/issues/5401. """ j_channel = _JChannelHelper.channel(client_config) j_dh_channel = _JDeephavenChannelImpl(j_channel) @@ -181,7 +193,7 @@ def _get_barrage_session_direct(client_config: jpy.JType, auth: str) -> BarrageS j_channel.awaitTermination(5, _JTimeUnit.SECONDS) raise - j_barrage_session =_JBarrageSession.create(j_session, _JRootAllocator(), j_channel) + j_barrage_session = _JBarrageSession.create(j_session, _JRootAllocator(), j_channel) return BarrageSession(j_barrage_session, j_channel) From 3d27e835ad5b2f603dc0ad87c5ccde086c3529ab Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 10:48:39 -0600 Subject: [PATCH 15/25] More comments to explain test setup change --- py/server/tests/test_barrage.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/py/server/tests/test_barrage.py b/py/server/tests/test_barrage.py index d429c955cb5..6e19273f88a 100644 --- a/py/server/tests/test_barrage.py +++ b/py/server/tests/test_barrage.py @@ -20,6 +20,8 @@ class BarrageTestCase(BaseTestCase): @classmethod def setUpClass(cls) -> None: super().setUpClass() + # the cacheDir env var is required to avoid a problem when the same cache dir is used by two server instances, + # in which case the later server instance will wipe out the cache dir of the earlier server instance. env = {"START_OPTS": "-DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler -Ddeephaven.cacheDir=/cache/tmp"} env.update(dict(os.environ)) cls.server_proc = subprocess.Popen(["/opt/deephaven/server/bin/start"], shell=False, env=env, From 8a72ca516414719d8f022d092c753847b138977a Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 11:42:23 -0600 Subject: [PATCH 16/25] Increase timeout to see if test can pass in CI --- py/server/deephaven/barrage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py index b31830986d9..5f2f9125fd8 100644 --- a/py/server/deephaven/barrage.py +++ b/py/server/deephaven/barrage.py @@ -50,7 +50,7 @@ def close(self) -> None: self.j_barrage_session.close() if self.j_managed_channel: self.j_managed_channel.shutdownNow() - self.j_managed_channel.awaitTermination(5, _JTimeUnit.SECONDS) + self.j_managed_channel.awaitTermination(10, _JTimeUnit.SECONDS) except Exception as e: raise DHError(e, "failed to close the barrage session.") from e @@ -190,7 +190,7 @@ def _get_barrage_session_direct(client_config: jpy.JType, auth: str) -> BarrageS # if the connection to the host can't be established, we should clean up the resources j_session_config.executor().shutdownNow() j_channel.shutdownNow() - j_channel.awaitTermination(5, _JTimeUnit.SECONDS) + j_channel.awaitTermination(10, _JTimeUnit.SECONDS) raise j_barrage_session = _JBarrageSession.create(j_session, _JRootAllocator(), j_channel) From a98a8f329b68743762e3e6b32182b2fdfa18754e Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 12:58:48 -0600 Subject: [PATCH 17/25] More clarifying comments --- py/server/deephaven/barrage.py | 6 ++++-- py/server/tests/test_barrage.py | 6 +++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py index 5f2f9125fd8..37db4dfadb8 100644 --- a/py/server/deephaven/barrage.py +++ b/py/server/deephaven/barrage.py @@ -57,7 +57,8 @@ def close(self) -> None: def subscribe(self, ticket: bytes) -> Table: """ Subscribes to a published remote table with the given ticket. - Note, if the remote table is closed or its owner session is closed, the ticket becomes invalid. + Note, if the remote table is closed or its owner session is closed, the ticket becomes invalid. If the same + ticket is subscribed to multiple times, multiple subscriptions will be created. Args: ticket (bytes): the bytes of the ticket @@ -79,7 +80,8 @@ def subscribe(self, ticket: bytes) -> Table: def snapshot(self, ticket: bytes) -> Table: """ Returns a snapshot of a published remote table with the given ticket. - Note, if the remote table is closed or its owner session is closed, the ticket becomes invalid. + Note, if the remote table is closed or its owner session is closed, the ticket becomes invalid. If the same + ticket is snapshot multiple times, multiple snapshots will be created. Args: ticket (bytes): the bytes of the ticket diff --git a/py/server/tests/test_barrage.py b/py/server/tests/test_barrage.py index 6e19273f88a..319e27c48f8 100644 --- a/py/server/tests/test_barrage.py +++ b/py/server/tests/test_barrage.py @@ -77,12 +77,15 @@ def test_subscribe(self): self.assertEqual(sp.size, 1000) t1 = t.update("Z = X + Y") self.assertEqual(t1.size, 1000) + t2 = session.subscribe(ticket=self.shared_ticket.bytes) + self.assertEqual(t.size, 1000) with self.subTest("using barrage session as a context manager"): with barrage_session(host="localhost", port=10000, auth_type="Anonymous") as cm: t = cm.subscribe(ticket=self.shared_ticket.bytes) - with self.assertRaises(DHError): t1 = t.update("Z = X + Y") + with self.assertRaises(DHError): + t.update("Z = X + Y") with self.subTest("Invalid ticket"): with self.assertRaises(DHError) as cm: @@ -107,6 +110,7 @@ def test_snapshot(self): self.assertEqual(len(t.columns), 2) t1 = t.update("Z = X + Y") self.assertEqual(t1.size, 1000) + t2 = session.snapshot(self.shared_ticket.bytes) with self.subTest("using barrage session as a context manager"): with barrage_session(host="localhost", port=10000, auth_type="Anonymous") as cm: From 1505f777bdab408baf0184cc4dbd4c6ec8afcf13 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 14:27:31 -0600 Subject: [PATCH 18/25] Make the neg test more deterministic --- py/server/tests/test_barrage.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/py/server/tests/test_barrage.py b/py/server/tests/test_barrage.py index 319e27c48f8..4120bcdaa4f 100644 --- a/py/server/tests/test_barrage.py +++ b/py/server/tests/test_barrage.py @@ -85,7 +85,9 @@ def test_subscribe(self): t = cm.subscribe(ticket=self.shared_ticket.bytes) t1 = t.update("Z = X + Y") with self.assertRaises(DHError): - t.update("Z = X + Y") + for _ in range(10): + t.update("Z = X + Y") + time.sleep(1) with self.subTest("Invalid ticket"): with self.assertRaises(DHError) as cm: From 528584104d3daf3d7aa97edf4306aef81bb32ac6 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 15:00:41 -0600 Subject: [PATCH 19/25] A better way to check an orphaned table is failed --- py/server/tests/test_barrage.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/py/server/tests/test_barrage.py b/py/server/tests/test_barrage.py index 4120bcdaa4f..0a84a7cab75 100644 --- a/py/server/tests/test_barrage.py +++ b/py/server/tests/test_barrage.py @@ -84,10 +84,13 @@ def test_subscribe(self): with barrage_session(host="localhost", port=10000, auth_type="Anonymous") as cm: t = cm.subscribe(ticket=self.shared_ticket.bytes) t1 = t.update("Z = X + Y") - with self.assertRaises(DHError): - for _ in range(10): - t.update("Z = X + Y") - time.sleep(1) + + for _ in range(10): + if t.j_table.isFailed(): + break + time.sleep(1) + else: + self.fail("the barrage table is still alive after 10 seconds elapsed.") with self.subTest("Invalid ticket"): with self.assertRaises(DHError) as cm: From f45fcc2bc587f8088dd4a008eacfe18ee7d03b00 Mon Sep 17 00:00:00 2001 From: Jianfeng Mao <4297243+jmao-denver@users.noreply.github.com> Date: Thu, 25 Apr 2024 15:03:13 -0600 Subject: [PATCH 20/25] Apply suggestions from code review Co-authored-by: Ryan Caudy --- .../java/io/deephaven/client/impl/BarrageSession.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java index 6bf19e59937..a4bf4cb4cfe 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java @@ -31,11 +31,15 @@ public static BarrageSession of( return new BarrageSession(session, client); } - // TODO (called in the Python server API) to be removed in the future if we can make JPY capable of selecting the - // right factory method to use when the same method is present in the class hierarchy multiple times + /** + * @apiNote This method exists to be called by the Python API. It will be removed in the future if we can make JPY + * capable of selecting the right factory method to use when the same method is present in the class + * hierarchy multiple times. + * @see #of(SessionImpl, BufferAllocator, ManagedChannel) + */ + @InternalUseOnly public static BarrageSession create( SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) { - return BarrageSession.of(session, incomingAllocator, channel); } From a31d7371be15ef49146fc9356b2eed7b202a9598 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 15:06:49 -0600 Subject: [PATCH 21/25] Better code organization --- py/client/pydeephaven/session.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/py/client/pydeephaven/session.py b/py/client/pydeephaven/session.py index 3ea14172685..e3fea0c6b4f 100644 --- a/py/client/pydeephaven/session.py +++ b/py/client/pydeephaven/session.py @@ -448,6 +448,18 @@ def open_table(self, name: str) -> Table: faketable.ticket = None faketable.schema = None + def bind_table(self, name: str, table: Table) -> None: + """Binds a table to the given name on the server so that it can be referenced by that name. + + Args: + name (str): name for the table + table (Table): a Table object + + Raises: + DHError + """ + self.console_service.bind_table(table=table, variable_name=name) + def publish_table(self, ticket: SharedTicket, table: Table) -> None: """Publishes a table to the given shared ticket. The ticket can then be used by another session to fetch the table. @@ -489,18 +501,6 @@ def fetch_table(self, ticket: SharedTicket) -> Table: table.ticket = None table.schema = None - def bind_table(self, name: str, table: Table) -> None: - """Binds a table to the given name on the server so that it can be referenced by that name. - - Args: - name (str): name for the table - table (Table): a Table object - - Raises: - DHError - """ - self.console_service.bind_table(table=table, variable_name=name) - def time_table(self, period: Union[int, str], start_time: Union[int, str] = None, blink_table: bool = False) -> Table: """Creates a time table on the server. From 88ca1de2d536e3168cee88417204820f22298cfc Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 15:46:08 -0600 Subject: [PATCH 22/25] Remove a flaky test and add more comments --- py/server/deephaven/barrage.py | 14 +++++++++++++- py/server/tests/test_barrage.py | 17 ++++++++++------- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py index 37db4dfadb8..6c719673b3b 100644 --- a/py/server/deephaven/barrage.py +++ b/py/server/deephaven/barrage.py @@ -34,6 +34,15 @@ class BarrageSession(): """ A Deephaven Barrage session to a remote server.""" def __init__(self, j_barrage_session: jpy.JType, j_managed_channel: jpy.JType = None): + """ Initializes a Barrage session. + + when BarrageSession is created via the barrage_session() factory function, j_managed_channel is always provided, + and when BarrageSession.close() is called, it will shut down the channel as well as close the j_barrage_session.. + + when BarrageSession is initialized directly and j_managed_channel is None, when BarrageSession.close() is called, + it will only close the j_barrage_session, it is the responsibility of the user to shut down the associated + channel if needed. + """ self.j_barrage_session = j_barrage_session self.j_session = j_barrage_session.session() self.j_managed_channel = j_managed_channel @@ -45,7 +54,10 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close() def close(self) -> None: - """ Closes the session.""" + """ Closes the barrage session. + + If the BarrageSession is initialized with a managed channel, the channel will be shut down as well. + """ try: self.j_barrage_session.close() if self.j_managed_channel: diff --git a/py/server/tests/test_barrage.py b/py/server/tests/test_barrage.py index 0a84a7cab75..b3411f60ad7 100644 --- a/py/server/tests/test_barrage.py +++ b/py/server/tests/test_barrage.py @@ -84,13 +84,16 @@ def test_subscribe(self): with barrage_session(host="localhost", port=10000, auth_type="Anonymous") as cm: t = cm.subscribe(ticket=self.shared_ticket.bytes) t1 = t.update("Z = X + Y") - - for _ in range(10): - if t.j_table.isFailed(): - break - time.sleep(1) - else: - self.fail("the barrage table is still alive after 10 seconds elapsed.") + self.assertEqual(t1.size, 1000) + + # TODO this test is flaky because of https://github.com/deephaven/deephaven-core/issues/5416, re-enable it + # when the issue is fixed. + # for _ in range(10): + # if t.j_table.isFailed(): + # break + # time.sleep(1) + # else: + # self.fail("the barrage table is still alive after 10 seconds elapsed.") with self.subTest("Invalid ticket"): with self.assertRaises(DHError) as cm: From 72873283a26a45f1d94ac27c2106d1be3309561a Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 15:56:51 -0600 Subject: [PATCH 23/25] Fix a missing import problem --- .../src/main/java/io/deephaven/client/impl/BarrageSession.java | 1 + 1 file changed, 1 insertion(+) diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java index a4bf4cb4cfe..583aa6a727d 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java @@ -7,6 +7,7 @@ import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.proto.DeephavenChannel; import io.deephaven.qst.table.TableSpec; +import io.deephaven.util.annotations.InternalUseOnly; import io.grpc.ManagedChannel; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightGrpcUtilsExtension; From e8f4449e4f387f60658d4d6c7d7d8d54348cb163 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 16:28:15 -0600 Subject: [PATCH 24/25] Change to avoid leaking of Java table handles --- py/server/deephaven/barrage.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py index 6c719673b3b..59adae987be 100644 --- a/py/server/deephaven/barrage.py +++ b/py/server/deephaven/barrage.py @@ -82,8 +82,7 @@ def subscribe(self, ticket: bytes) -> Table: DHError """ try: - j_table_handle = self.j_session.of(_JTableSpec.ticket(ticket)) - j_barrage_subscription = self.j_barrage_session.subscribe(j_table_handle, + j_barrage_subscription = self.j_barrage_session.subscribe(_JTableSpec.ticket(ticket), _JBarrageTableResolver.SUB_OPTIONS) return Table(j_barrage_subscription.entireTable().get()) except Exception as e: @@ -105,8 +104,7 @@ def snapshot(self, ticket: bytes) -> Table: DHError """ try: - j_table_handle = self.j_session.of(_JTableSpec.ticket(ticket)) - j_barrage_snapshot = self.j_barrage_session.snapshot(j_table_handle, _JBarrageTableResolver.SNAP_OPTIONS) + j_barrage_snapshot = self.j_barrage_session.snapshot(_JTableSpec.ticket(ticket), _JBarrageTableResolver.SNAP_OPTIONS) return Table(j_barrage_snapshot.entireTable().get()) except Exception as e: raise DHError(e, "failed to take a snapshot of the remote table with the provided ticket.") from e From 4ef649787680675810d2f9c9a1bb7cdd08238fc4 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 25 Apr 2024 16:30:37 -0600 Subject: [PATCH 25/25] Minor housekeeping on comments --- py/server/deephaven/barrage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py index 59adae987be..00e4a54d0bb 100644 --- a/py/server/deephaven/barrage.py +++ b/py/server/deephaven/barrage.py @@ -36,10 +36,10 @@ class BarrageSession(): def __init__(self, j_barrage_session: jpy.JType, j_managed_channel: jpy.JType = None): """ Initializes a Barrage session. - when BarrageSession is created via the barrage_session() factory function, j_managed_channel is always provided, - and when BarrageSession.close() is called, it will shut down the channel as well as close the j_barrage_session.. + When BarrageSession is created via the barrage_session() factory function, j_managed_channel is always provided, + and when BarrageSession.close() is called, it will shut down the channel as well as close the j_barrage_session. - when BarrageSession is initialized directly and j_managed_channel is None, when BarrageSession.close() is called, + When BarrageSession is initialized directly and j_managed_channel is None, when BarrageSession.close() is called, it will only close the j_barrage_session, it is the responsibility of the user to shut down the associated channel if needed. """