Skip to content

Commit

Permalink
Support publishing/sharing anonymous tables between different session…
Browse files Browse the repository at this point in the history
…s in Python client/server APIs (#5374)

* Support publishing anonymous tables between sessions.
  • Loading branch information
jmao-denver committed Apr 25, 2024
1 parent 3ca98c9 commit aeeedd8
Show file tree
Hide file tree
Showing 7 changed files with 500 additions and 2 deletions.
8 changes: 8 additions & 0 deletions Integrations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ def runInDocker = { String name, String sourcePath, List<String> command, Closur
from(test.runtimeClasspath) {
into 'classpath'
}
from("${sourcePath}/../client") {
include 'setup.py'
include 'pydeephaven/**'
include 'README.md'
into 'python/client'
}

// Unpack the config contents for now, since we don't seem to read the configs from inside a jar.
// This does not add a task dependency, but we already put :configs in the testRuntime classpath,
Expand Down Expand Up @@ -97,6 +103,8 @@ def runInDocker = { String name, String sourcePath, List<String> command, Closur
// copy in the contents that we do expect to change as the project updates
copyFile 'python', '/python'
copyFile 'classpath', '/classpath'
runCommand '''pip3 install /python/client'''

}
entrypoint = command

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.proto.DeephavenChannel;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.util.annotations.InternalUseOnly;
import io.grpc.ManagedChannel;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightGrpcUtilsExtension;
Expand All @@ -31,6 +32,18 @@ public static BarrageSession of(
return new BarrageSession(session, client);
}

/**
* @apiNote This method exists to be called by the Python API. It will be removed in the future if we can make JPY
* capable of selecting the right factory method to use when the same method is present in the class
* hierarchy multiple times.
* @see #of(SessionImpl, BufferAllocator, ManagedChannel)
*/
@InternalUseOnly
public static BarrageSession create(
SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) {
return BarrageSession.of(session, incomingAllocator, channel);
}

protected BarrageSession(
final SessionImpl session, final FlightClient client) {
super(session, client);
Expand Down
15 changes: 14 additions & 1 deletion py/client/pydeephaven/_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import grpc

from pydeephaven.dherror import DHError
from pydeephaven.proto import session_pb2_grpc, session_pb2
from pydeephaven.proto import session_pb2_grpc, session_pb2, ticket_pb2


class SessionService:
Expand Down Expand Up @@ -42,3 +42,16 @@ def release(self, ticket):
self._grpc_session_stub.Release(session_pb2.ReleaseRequest(id=ticket), metadata=self.session.grpc_metadata)
except Exception as e:
raise DHError("failed to release a ticket.") from e


def publish(self, source_ticket: ticket_pb2.Ticket, result_ticket: ticket_pb2.Ticket) -> None:
"""Makes a copy from the source ticket and publishes it to the result ticket.
Args:
source_ticket: The source ticket to publish from.
result_ticket: The result ticket to publish to.
"""
try:
self._grpc_session_stub.PublishFromTicket(session_pb2.PublishRequest(source_id=source_ticket, result_id=result_ticket), metadata=self.session.grpc_metadata)
except Exception as e:
raise DHError("failed to publish a ticket.") from e
74 changes: 73 additions & 1 deletion py/client/pydeephaven/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
#
"""This module implements the Session class which provides methods to connect to and interact with the Deephaven
server."""
from __future__ import annotations

import base64
import os
import threading
from typing import Dict, List, Union, Tuple, Any
from typing import Dict, List, Union, Tuple
from uuid import uuid4

import grpc
import pyarrow as pa
Expand Down Expand Up @@ -78,6 +80,35 @@ def get_token(self):
return self._token


class SharedTicket:
""" A SharedTicket object represents a ticket that can be shared with other sessions. """

def __init__(self, ticket_bytes: bytes):
"""Initializes a SharedTicket object
Args:
ticket_bytes (bytes): the raw bytes for the ticket
"""
self._ticket_bytes = ticket_bytes
self.api_ticket = ticket_pb2.Ticket(ticket=self._ticket_bytes)

@property
def bytes(self) -> bytes:
""" The raw bytes for the ticket."""
return self._ticket_bytes

@classmethod
def random_ticket(cls) -> SharedTicket:
"""Generates a random shared ticket. To minimize the probability of collision, the ticket is made from a
generated UUID.
Returns:
a SharedTicket object
"""
bytes_ = uuid4().int.to_bytes(16, byteorder='little', signed=False)
return cls(ticket_bytes=b'h' + bytes_)


class Session:
"""A Session object represents a connection to the Deephaven data server. It contains a number of convenience
methods for asking the server to create tables, import Arrow data into tables, merge tables, run Python scripts, and
Expand Down Expand Up @@ -429,6 +460,47 @@ def bind_table(self, name: str, table: Table) -> None:
"""
self.console_service.bind_table(table=table, variable_name=name)

def publish_table(self, ticket: SharedTicket, table: Table) -> None:
"""Publishes a table to the given shared ticket. The ticket can then be used by another session to fetch the
table.
Note that, the shared ticket can be fetched by other sessions to access the table as long as the table is
not released. When the table is released either through an explicit call of the close method on it, or
implicitly through garbage collection, or through the closing of the publishing session, the shared ticket will
no longer be valid.
Args:
ticket (SharedTicket): a SharedTicket object
table (Table): a Table object
Raises:
DHError
"""
self._session_service.publish(table.ticket, ticket.api_ticket)

def fetch_table(self, ticket: SharedTicket) -> Table:
"""Fetches a table by ticket.
Args:
ticket (SharedTicket): a ticket
Returns:
a Table object
Raises:
DHError
"""
table = Table(session=self, ticket=ticket.api_ticket)
try:
table_op = FetchTableOp()
return self.table_service.grpc_table_op(table, table_op)
except Exception as e:
raise DHError("could not fetch table by ticket") from e
finally:
# Explicitly close the table without releasing it (because it isn't ours)
table.ticket = None
table.schema = None

def time_table(self, period: Union[int, str], start_time: Union[int, str] = None,
blink_table: bool = False) -> Table:
"""Creates a time table on the server.
Expand Down
25 changes: 25 additions & 0 deletions py/client/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from pydeephaven import DHError
from pydeephaven import Session
from pydeephaven.session import SharedTicket
from tests.testbase import BaseTestCase


Expand Down Expand Up @@ -339,6 +340,30 @@ def test_blink_input_table(self):
blink_input_table.delete(dh_table.select(["f1"]))


def test_publish_table(self):
pub_session = Session()
t = pub_session.empty_table(1000).update(["X = i", "Y = 2*i"])
self.assertEqual(t.size, 1000)
shared_ticket = SharedTicket.random_ticket()
pub_session.publish_table(shared_ticket, t)

sub_session1 = Session()
t1 = sub_session1.fetch_table(shared_ticket)
self.assertEqual(t1.size, 1000)
pa_table = t1.to_arrow()
self.assertEqual(pa_table.num_rows, 1000)

with self.subTest("the 1st subscriber session is gone, shared ticket is still valid"):
sub_session1.close()
sub_session2 = Session()
t2 = sub_session2.fetch_table(shared_ticket)
self.assertEqual(t2.size, 1000)

with self.subTest("the publisher session is gone, shared ticket becomes invalid"):
pub_session.close()
with self.assertRaises(DHError):
sub_session2.fetch_table(shared_ticket)


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit aeeedd8

Please sign in to comment.