From aeeedd88cde39cd45a283d5d03485e7d2aa087a1 Mon Sep 17 00:00:00 2001 From: Jianfeng Mao <4297243+jmao-denver@users.noreply.github.com> Date: Thu, 25 Apr 2024 17:06:39 -0600 Subject: [PATCH] Support publishing/sharing anonymous tables between different sessions in Python client/server APIs (#5374) * Support publishing anonymous tables between sessions. --- Integrations/build.gradle | 8 + .../deephaven/client/impl/BarrageSession.java | 13 ++ py/client/pydeephaven/_session_service.py | 15 +- py/client/pydeephaven/session.py | 74 +++++- py/client/tests/test_session.py | 25 ++ py/server/deephaven/barrage.py | 220 ++++++++++++++++++ py/server/tests/test_barrage.py | 147 ++++++++++++ 7 files changed, 500 insertions(+), 2 deletions(-) create mode 100644 py/server/deephaven/barrage.py create mode 100644 py/server/tests/test_barrage.py diff --git a/Integrations/build.gradle b/Integrations/build.gradle index 109fd30252d..8f324c2178f 100644 --- a/Integrations/build.gradle +++ b/Integrations/build.gradle @@ -61,6 +61,12 @@ def runInDocker = { String name, String sourcePath, List command, Closur from(test.runtimeClasspath) { into 'classpath' } + from("${sourcePath}/../client") { + include 'setup.py' + include 'pydeephaven/**' + include 'README.md' + into 'python/client' + } // Unpack the config contents for now, since we don't seem to read the configs from inside a jar. // This does not add a task dependency, but we already put :configs in the testRuntime classpath, @@ -97,6 +103,8 @@ def runInDocker = { String name, String sourcePath, List command, Closur // copy in the contents that we do expect to change as the project updates copyFile 'python', '/python' copyFile 'classpath', '/classpath' + runCommand '''pip3 install /python/client''' + } entrypoint = command diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java index 988554f3a1e..583aa6a727d 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java @@ -7,6 +7,7 @@ import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.proto.DeephavenChannel; import io.deephaven.qst.table.TableSpec; +import io.deephaven.util.annotations.InternalUseOnly; import io.grpc.ManagedChannel; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightGrpcUtilsExtension; @@ -31,6 +32,18 @@ public static BarrageSession of( return new BarrageSession(session, client); } + /** + * @apiNote This method exists to be called by the Python API. It will be removed in the future if we can make JPY + * capable of selecting the right factory method to use when the same method is present in the class + * hierarchy multiple times. + * @see #of(SessionImpl, BufferAllocator, ManagedChannel) + */ + @InternalUseOnly + public static BarrageSession create( + SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) { + return BarrageSession.of(session, incomingAllocator, channel); + } + protected BarrageSession( final SessionImpl session, final FlightClient client) { super(session, client); diff --git a/py/client/pydeephaven/_session_service.py b/py/client/pydeephaven/_session_service.py index 7e6b7aec975..bb317788dcf 100644 --- a/py/client/pydeephaven/_session_service.py +++ b/py/client/pydeephaven/_session_service.py @@ -5,7 +5,7 @@ import grpc from pydeephaven.dherror import DHError -from pydeephaven.proto import session_pb2_grpc, session_pb2 +from pydeephaven.proto import session_pb2_grpc, session_pb2, ticket_pb2 class SessionService: @@ -42,3 +42,16 @@ def release(self, ticket): self._grpc_session_stub.Release(session_pb2.ReleaseRequest(id=ticket), metadata=self.session.grpc_metadata) except Exception as e: raise DHError("failed to release a ticket.") from e + + + def publish(self, source_ticket: ticket_pb2.Ticket, result_ticket: ticket_pb2.Ticket) -> None: + """Makes a copy from the source ticket and publishes it to the result ticket. + + Args: + source_ticket: The source ticket to publish from. + result_ticket: The result ticket to publish to. + """ + try: + self._grpc_session_stub.PublishFromTicket(session_pb2.PublishRequest(source_id=source_ticket, result_id=result_ticket), metadata=self.session.grpc_metadata) + except Exception as e: + raise DHError("failed to publish a ticket.") from e \ No newline at end of file diff --git a/py/client/pydeephaven/session.py b/py/client/pydeephaven/session.py index 6ae939baae9..e3fea0c6b4f 100644 --- a/py/client/pydeephaven/session.py +++ b/py/client/pydeephaven/session.py @@ -3,11 +3,13 @@ # """This module implements the Session class which provides methods to connect to and interact with the Deephaven server.""" +from __future__ import annotations import base64 import os import threading -from typing import Dict, List, Union, Tuple, Any +from typing import Dict, List, Union, Tuple +from uuid import uuid4 import grpc import pyarrow as pa @@ -78,6 +80,35 @@ def get_token(self): return self._token +class SharedTicket: + """ A SharedTicket object represents a ticket that can be shared with other sessions. """ + + def __init__(self, ticket_bytes: bytes): + """Initializes a SharedTicket object + + Args: + ticket_bytes (bytes): the raw bytes for the ticket + """ + self._ticket_bytes = ticket_bytes + self.api_ticket = ticket_pb2.Ticket(ticket=self._ticket_bytes) + + @property + def bytes(self) -> bytes: + """ The raw bytes for the ticket.""" + return self._ticket_bytes + + @classmethod + def random_ticket(cls) -> SharedTicket: + """Generates a random shared ticket. To minimize the probability of collision, the ticket is made from a + generated UUID. + + Returns: + a SharedTicket object + """ + bytes_ = uuid4().int.to_bytes(16, byteorder='little', signed=False) + return cls(ticket_bytes=b'h' + bytes_) + + class Session: """A Session object represents a connection to the Deephaven data server. It contains a number of convenience methods for asking the server to create tables, import Arrow data into tables, merge tables, run Python scripts, and @@ -429,6 +460,47 @@ def bind_table(self, name: str, table: Table) -> None: """ self.console_service.bind_table(table=table, variable_name=name) + def publish_table(self, ticket: SharedTicket, table: Table) -> None: + """Publishes a table to the given shared ticket. The ticket can then be used by another session to fetch the + table. + + Note that, the shared ticket can be fetched by other sessions to access the table as long as the table is + not released. When the table is released either through an explicit call of the close method on it, or + implicitly through garbage collection, or through the closing of the publishing session, the shared ticket will + no longer be valid. + + Args: + ticket (SharedTicket): a SharedTicket object + table (Table): a Table object + + Raises: + DHError + """ + self._session_service.publish(table.ticket, ticket.api_ticket) + + def fetch_table(self, ticket: SharedTicket) -> Table: + """Fetches a table by ticket. + + Args: + ticket (SharedTicket): a ticket + + Returns: + a Table object + + Raises: + DHError + """ + table = Table(session=self, ticket=ticket.api_ticket) + try: + table_op = FetchTableOp() + return self.table_service.grpc_table_op(table, table_op) + except Exception as e: + raise DHError("could not fetch table by ticket") from e + finally: + # Explicitly close the table without releasing it (because it isn't ours) + table.ticket = None + table.schema = None + def time_table(self, period: Union[int, str], start_time: Union[int, str] = None, blink_table: bool = False) -> Table: """Creates a time table on the server. diff --git a/py/client/tests/test_session.py b/py/client/tests/test_session.py index a4731ad15e4..568c8a91793 100644 --- a/py/client/tests/test_session.py +++ b/py/client/tests/test_session.py @@ -10,6 +10,7 @@ from pydeephaven import DHError from pydeephaven import Session +from pydeephaven.session import SharedTicket from tests.testbase import BaseTestCase @@ -339,6 +340,30 @@ def test_blink_input_table(self): blink_input_table.delete(dh_table.select(["f1"])) + def test_publish_table(self): + pub_session = Session() + t = pub_session.empty_table(1000).update(["X = i", "Y = 2*i"]) + self.assertEqual(t.size, 1000) + shared_ticket = SharedTicket.random_ticket() + pub_session.publish_table(shared_ticket, t) + + sub_session1 = Session() + t1 = sub_session1.fetch_table(shared_ticket) + self.assertEqual(t1.size, 1000) + pa_table = t1.to_arrow() + self.assertEqual(pa_table.num_rows, 1000) + + with self.subTest("the 1st subscriber session is gone, shared ticket is still valid"): + sub_session1.close() + sub_session2 = Session() + t2 = sub_session2.fetch_table(shared_ticket) + self.assertEqual(t2.size, 1000) + + with self.subTest("the publisher session is gone, shared ticket becomes invalid"): + pub_session.close() + with self.assertRaises(DHError): + sub_session2.fetch_table(shared_ticket) + if __name__ == '__main__': unittest.main() diff --git a/py/server/deephaven/barrage.py b/py/server/deephaven/barrage.py new file mode 100644 index 00000000000..00e4a54d0bb --- /dev/null +++ b/py/server/deephaven/barrage.py @@ -0,0 +1,220 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# +""" This module defines the BarrageSession wrapper class and provides a factory function to create an instance of it + for accessing resources on remote Deephaven servers.""" + +from __future__ import annotations + +import jpy + +from deephaven import DHError +from deephaven.table import Table + +_JURI = jpy.get_type("java.net.URI") +_JTimeUnit = jpy.get_type("java.util.concurrent.TimeUnit") +_JClientConfig = jpy.get_type("io.deephaven.client.impl.ClientConfig") +_JSSLConfig = jpy.get_type("io.deephaven.ssl.config.SSLConfig") +_JSessionImplConfig = jpy.get_type("io.deephaven.client.impl.SessionImplConfig") +_JSessionConfig = jpy.get_type("io.deephaven.client.impl.SessionConfig") +_JTrustCustom = jpy.get_type("io.deephaven.ssl.config.TrustCustom") +_JDeephavenTarget = jpy.get_type("io.deephaven.uri.DeephavenTarget") +_JBarrageSession = jpy.get_type("io.deephaven.client.impl.BarrageSession") +_JTableSpec = jpy.get_type("io.deephaven.qst.table.TableSpec") +_JBarrageTableResolver = jpy.get_type("io.deephaven.server.uri.BarrageTableResolver") +_JChannelHelper = jpy.get_type("io.deephaven.client.impl.ChannelHelper") +_JDeephavenChannelImpl = jpy.get_type("io.deephaven.proto.DeephavenChannelImpl") +_JSessionImpl = jpy.get_type("io.deephaven.client.impl.SessionImpl") +_JExecutors = jpy.get_type("java.util.concurrent.Executors") +_JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator") +_JDeephavenApiServer = jpy.get_type("io.deephaven.server.runner.DeephavenApiServer") + + +class BarrageSession(): + """ A Deephaven Barrage session to a remote server.""" + + def __init__(self, j_barrage_session: jpy.JType, j_managed_channel: jpy.JType = None): + """ Initializes a Barrage session. + + When BarrageSession is created via the barrage_session() factory function, j_managed_channel is always provided, + and when BarrageSession.close() is called, it will shut down the channel as well as close the j_barrage_session. + + When BarrageSession is initialized directly and j_managed_channel is None, when BarrageSession.close() is called, + it will only close the j_barrage_session, it is the responsibility of the user to shut down the associated + channel if needed. + """ + self.j_barrage_session = j_barrage_session + self.j_session = j_barrage_session.session() + self.j_managed_channel = j_managed_channel + + def __enter__(self) -> BarrageSession: + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.close() + + def close(self) -> None: + """ Closes the barrage session. + + If the BarrageSession is initialized with a managed channel, the channel will be shut down as well. + """ + try: + self.j_barrage_session.close() + if self.j_managed_channel: + self.j_managed_channel.shutdownNow() + self.j_managed_channel.awaitTermination(10, _JTimeUnit.SECONDS) + except Exception as e: + raise DHError(e, "failed to close the barrage session.") from e + + def subscribe(self, ticket: bytes) -> Table: + """ Subscribes to a published remote table with the given ticket. + + Note, if the remote table is closed or its owner session is closed, the ticket becomes invalid. If the same + ticket is subscribed to multiple times, multiple subscriptions will be created. + + Args: + ticket (bytes): the bytes of the ticket + + Returns: + a Table + + Raises: + DHError + """ + try: + j_barrage_subscription = self.j_barrage_session.subscribe(_JTableSpec.ticket(ticket), + _JBarrageTableResolver.SUB_OPTIONS) + return Table(j_barrage_subscription.entireTable().get()) + except Exception as e: + raise DHError(e, "failed to subscribe to the remote table with the provided ticket.") from e + + def snapshot(self, ticket: bytes) -> Table: + """ Returns a snapshot of a published remote table with the given ticket. + + Note, if the remote table is closed or its owner session is closed, the ticket becomes invalid. If the same + ticket is snapshot multiple times, multiple snapshots will be created. + + Args: + ticket (bytes): the bytes of the ticket + + Returns: + a Table + + Raises: + DHError + """ + try: + j_barrage_snapshot = self.j_barrage_session.snapshot(_JTableSpec.ticket(ticket), _JBarrageTableResolver.SNAP_OPTIONS) + return Table(j_barrage_snapshot.entireTable().get()) + except Exception as e: + raise DHError(e, "failed to take a snapshot of the remote table with the provided ticket.") from e + + +def barrage_session(host: str, + port: int = 10000, + auth_type: str = "Anonymous", + auth_token: str = "", + use_tls: bool = False, + tls_root_certs: bytes = None, + ) -> BarrageSession: + """Returns a Deephaven gRPC session to a remote server if a cached session is available; otherwise, creates a new + session. + + Note: client authentication is not supported yet. + + Args: + host (str): the host name or IP address of the Deephaven server. + port (int): the port number that the remote Deephaven server is listening on, default is 10000. + auth_type (str): the authentication type string, can be "Anonymous', 'Basic", or any custom-built + authenticator in the server, such as "io.deephaven.authentication.psk.PskAuthenticationHandler", + default is 'Anonymous'. + auth_token (str): the authentication token string. When auth_type is 'Basic', it must be + "user:password"; when auth_type is "Anonymous', it will be ignored; when auth_type is a custom-built + authenticator, it must conform to the specific requirement of the authenticator + use_tls (bool): if True, use a TLS connection. Defaults to False + tls_root_certs (bytes): PEM encoded root certificates to use for TLS connection, or None to use system defaults. + If not None implies use a TLS connection and the use_tls argument should have been passed + as True. Defaults to None + + Returns: + a Deephaven Barrage session + + Raises: + DHError + """ + try: + if tls_root_certs and not use_tls: + raise DHError(message="tls_root_certs is provided but use_tls is False") + + target_uri = f"{host}:{port}" + if use_tls: + target_uri = f"dh://{target_uri}" + else: + target_uri = f"dh+plain://{target_uri}" + + j_client_config = _build_client_config(target_uri, tls_root_certs) + auth = f"{auth_type} {auth_token}" + + try: + return _get_barrage_session_via_api_server(j_client_config, auth) + except: + # fall back to the direct way when we don't have a fully initialized server, used for testing only + # TODO: remove when we are done with restructuring the integrations tests wiring + # https://github.com/deephaven/deephaven-core/issues/5401 + return _get_barrage_session_direct(j_client_config, auth) + except Exception as e: + raise DHError(e, "failed to get a barrage session to the target remote Deephaven server.") from e + + +def _get_barrage_session_via_api_server(client_config: jpy.JType, auth: str) -> BarrageSession: + j_barrage_session_factory_creator = _JDeephavenApiServer.getInstance().sessionFactoryCreator() + j_barrage_session_factory = j_barrage_session_factory_creator.barrageFactory(client_config) + j_managed_channel = j_barrage_session_factory.managedChannel() + if auth: + j_session_config = (_JSessionConfig.builder() + .authenticationTypeAndValue(auth) + .build()) + j_barrage_session = j_barrage_session_factory.newBarrageSession(j_session_config) + else: + j_barrage_session = j_barrage_session_factory.newBarrageSession() + return BarrageSession(j_barrage_session, j_managed_channel) + +def _get_barrage_session_direct(client_config: jpy.JType, auth: str) -> BarrageSession: + """Note, this is used for testing only. This way of constructing a Barrage session is less efficient because it does + not share any of the state or configuration that the server provides; namely, when you are doing it with the server + context it provides a singleton executor, allocator, outbound SSL configuration, and the ability for the server to + hook in additional channel building options. + + TODO: remove when we are done with restructuring the integrations tests wiring + https://github.com/deephaven/deephaven-core/issues/5401. + """ + j_channel = _JChannelHelper.channel(client_config) + j_dh_channel = _JDeephavenChannelImpl(j_channel) + + j_session_config = (_JSessionImplConfig.builder() + .executor(_JExecutors.newScheduledThreadPool(4)) + .authenticationTypeAndValue(auth) + .channel(j_dh_channel) + .build()) + try: + j_session = _JSessionImpl.create(j_session_config) + except Exception as e: + # if the connection to the host can't be established, we should clean up the resources + j_session_config.executor().shutdownNow() + j_channel.shutdownNow() + j_channel.awaitTermination(10, _JTimeUnit.SECONDS) + raise + + j_barrage_session = _JBarrageSession.create(j_session, _JRootAllocator(), j_channel) + return BarrageSession(j_barrage_session, j_channel) + + +def _build_client_config(target_uri: str, tls_root_certs: bytes) -> jpy.JType: + j_client_config_builder = _JClientConfig.builder() + j_client_config_builder.target(_JDeephavenTarget.of(_JURI(target_uri))) + if tls_root_certs: + j_ssl_config = _JSSLConfig.builder().trust( + _JTrustCustom.ofX509(tls_root_certs, 0, len(tls_root_certs))).build() + j_client_config_builder.ssl(j_ssl_config) + j_client_config = j_client_config_builder.build() + return j_client_config diff --git a/py/server/tests/test_barrage.py b/py/server/tests/test_barrage.py new file mode 100644 index 00000000000..b3411f60ad7 --- /dev/null +++ b/py/server/tests/test_barrage.py @@ -0,0 +1,147 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# +import os +import unittest + +import subprocess +import time + +from deephaven import DHError +from deephaven.barrage import barrage_session + +from tests.testbase import BaseTestCase + + +class BarrageTestCase(BaseTestCase): + shared_ticket = None + server_proc = None + + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + # the cacheDir env var is required to avoid a problem when the same cache dir is used by two server instances, + # in which case the later server instance will wipe out the cache dir of the earlier server instance. + env = {"START_OPTS": "-DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler -Ddeephaven.cacheDir=/cache/tmp"} + env.update(dict(os.environ)) + cls.server_proc = subprocess.Popen(["/opt/deephaven/server/bin/start"], shell=False, env=env, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cls.ensure_server_running() + + @classmethod + def tearDownClass(cls) -> None: + cls.server_proc.kill() + super().tearDownClass() + + @classmethod + def ensure_server_running(cls): + from pydeephaven.session import Session + + for _ in range(30): + try: + Session("localhost", 10000) + break + except Exception as e: + time.sleep(1) + else: + raise RuntimeError("Cannot connect to the server") + + def setUp(self) -> None: + from pydeephaven.session import Session, SharedTicket + + self.pub_session = Session("localhost", 10000) + self.t = self.pub_session.empty_table(1000).update(["X = i", "Y = 2*i"]) + self.shared_ticket = SharedTicket.random_ticket() + self.pub_session.publish_table(self.shared_ticket, self.t) + + def tearDown(self) -> None: + self.pub_session.close() + + def test_barrage_session(self): + session = barrage_session(host="localhost", port=10000, auth_type="Anonymous") + self.assertIsNotNone(session) + + with self.assertRaises(DHError): + barrage_session(host="invalid", port=10000, auth_token="Anonymous") + + with self.assertRaises(DHError): + barrage_session(host="localhost", port=10000, auth_type="Basic", auth_token="user:password") + + def test_subscribe(self): + session = barrage_session(host="localhost", port=10000, auth_type="Anonymous") + t = session.subscribe(ticket=self.shared_ticket.bytes) + self.assertEqual(t.size, 1000) + self.assertEqual(len(t.columns), 2) + sp = t.snapshot() + self.assertEqual(sp.size, 1000) + t1 = t.update("Z = X + Y") + self.assertEqual(t1.size, 1000) + t2 = session.subscribe(ticket=self.shared_ticket.bytes) + self.assertEqual(t.size, 1000) + + with self.subTest("using barrage session as a context manager"): + with barrage_session(host="localhost", port=10000, auth_type="Anonymous") as cm: + t = cm.subscribe(ticket=self.shared_ticket.bytes) + t1 = t.update("Z = X + Y") + self.assertEqual(t1.size, 1000) + + # TODO this test is flaky because of https://github.com/deephaven/deephaven-core/issues/5416, re-enable it + # when the issue is fixed. + # for _ in range(10): + # if t.j_table.isFailed(): + # break + # time.sleep(1) + # else: + # self.fail("the barrage table is still alive after 10 seconds elapsed.") + + with self.subTest("Invalid ticket"): + with self.assertRaises(DHError) as cm: + session.subscribe(ticket=self.shared_ticket.bytes + b"1") + + with self.subTest("Table is closed"): + self.t.close() + with self.assertRaises(DHError) as cm: + session.subscribe(ticket=self.shared_ticket.bytes) + + with self.subTest("publishing session is gone"): + self.pub_session.close() + with self.assertRaises(DHError) as cm: + session.subscribe(ticket=self.shared_ticket.bytes) + + session.close() + + def test_snapshot(self): + session = barrage_session(host="localhost", port=10000, auth_type="Anonymous") + t = session.snapshot(self.shared_ticket.bytes) + self.assertEqual(t.size, 1000) + self.assertEqual(len(t.columns), 2) + t1 = t.update("Z = X + Y") + self.assertEqual(t1.size, 1000) + t2 = session.snapshot(self.shared_ticket.bytes) + + with self.subTest("using barrage session as a context manager"): + with barrage_session(host="localhost", port=10000, auth_type="Anonymous") as cm: + t = cm.snapshot(ticket=self.shared_ticket.bytes) + t1 = t.update("Z = X + Y") + self.assertEqual(t1.size, 1000) + + with self.subTest("Invalid ticket"): + with self.assertRaises(DHError) as cm: + session.snapshot(ticket=self.shared_ticket.bytes + b"1") + + with self.subTest("Table is closed"): + self.t.close() + with self.assertRaises(DHError) as cm: + session.snapshot(ticket=self.shared_ticket.bytes) + + with self.subTest("publishing session is gone"): + self.pub_session.close() + with self.assertRaises(DHError) as cm: + session.snapshot(ticket=self.shared_ticket.bytes) + + session.close() + + +if __name__ == "__main__": + unittest.main()