Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support publishing/sharing anonymous tables between different sessions in Python client/server APIs #5374

Merged
merged 25 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5bca859
Support publishing anonymous tables
jmao-denver Apr 17, 2024
cc02779
Add more documentation
jmao-denver Apr 17, 2024
78910db
WIP enable fetching remote table in Py server API
jmao-denver Apr 18, 2024
6c8623e
Add some docstrings
jmao-denver Apr 18, 2024
43e16d5
Use the new BarrageSessionFactoryClient
jmao-denver Apr 18, 2024
efeb9ea
Add TLS configuration
jmao-denver Apr 19, 2024
4fa1e17
Refactoring and more tests
jmao-denver Apr 19, 2024
0af127a
Add an auto test case
jmao-denver Apr 20, 2024
f4aed95
Add more clarifying comments
jmao-denver Apr 20, 2024
f92783d
Additional docstring
jmao-denver Apr 20, 2024
a7a2977
Get ready for rebase to main
jmao-denver Apr 22, 2024
026231e
Rebase and adapt to new Barrage session factory
jmao-denver Apr 24, 2024
11e0ae3
Fix issues in implementation and tests
jmao-denver Apr 25, 2024
b11fe27
Fix a buy in create session via factory
jmao-denver Apr 25, 2024
3d27e83
More comments to explain test setup change
jmao-denver Apr 25, 2024
8a72ca5
Increase timeout to see if test can pass in CI
jmao-denver Apr 25, 2024
a98a8f3
More clarifying comments
jmao-denver Apr 25, 2024
1505f77
Make the neg test more deterministic
jmao-denver Apr 25, 2024
5285841
A better way to check an orphaned table is failed
jmao-denver Apr 25, 2024
f45fcc2
Apply suggestions from code review
jmao-denver Apr 25, 2024
a31d737
Better code organization
jmao-denver Apr 25, 2024
88ca1de
Remove a flaky test and add more comments
jmao-denver Apr 25, 2024
7287328
Fix a missing import problem
jmao-denver Apr 25, 2024
e8f4449
Change to avoid leaking of Java table handles
jmao-denver Apr 25, 2024
4ef6497
Minor housekeeping on comments
jmao-denver Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Integrations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ def runInDocker = { String name, String sourcePath, List<String> command, Closur
from(test.runtimeClasspath) {
into 'classpath'
}
from("${sourcePath}/../client") {
include 'setup.py'
include 'pydeephaven/**'
include 'README.md'
into 'python/client'
}

// Unpack the config contents for now, since we don't seem to read the configs from inside a jar.
// This does not add a task dependency, but we already put :configs in the testRuntime classpath,
Expand Down Expand Up @@ -97,6 +103,8 @@ def runInDocker = { String name, String sourcePath, List<String> command, Closur
// copy in the contents that we do expect to change as the project updates
copyFile 'python', '/python'
copyFile 'classpath', '/classpath'
runCommand '''pip3 install /python/client'''

}
entrypoint = command

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.proto.DeephavenChannel;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.util.annotations.InternalUseOnly;
import io.grpc.ManagedChannel;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightGrpcUtilsExtension;
Expand All @@ -31,6 +32,18 @@ public static BarrageSession of(
return new BarrageSession(session, client);
}

/**
* @apiNote This method exists to be called by the Python API. It will be removed in the future if we can make JPY
* capable of selecting the right factory method to use when the same method is present in the class
* hierarchy multiple times.
* @see #of(SessionImpl, BufferAllocator, ManagedChannel)
*/
@InternalUseOnly
public static BarrageSession create(
SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) {
return BarrageSession.of(session, incomingAllocator, channel);
}

protected BarrageSession(
final SessionImpl session, final FlightClient client) {
super(session, client);
Expand Down
15 changes: 14 additions & 1 deletion py/client/pydeephaven/_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import grpc

from pydeephaven.dherror import DHError
from pydeephaven.proto import session_pb2_grpc, session_pb2
from pydeephaven.proto import session_pb2_grpc, session_pb2, ticket_pb2


class SessionService:
Expand Down Expand Up @@ -42,3 +42,16 @@ def release(self, ticket):
self._grpc_session_stub.Release(session_pb2.ReleaseRequest(id=ticket), metadata=self.session.grpc_metadata)
except Exception as e:
raise DHError("failed to release a ticket.") from e


def publish(self, source_ticket: ticket_pb2.Ticket, result_ticket: ticket_pb2.Ticket) -> None:
"""Makes a copy from the source ticket and publishes it to the result ticket.
rcaudy marked this conversation as resolved.
Show resolved Hide resolved

Args:
source_ticket: The source ticket to publish from.
result_ticket: The result ticket to publish to.
"""
try:
self._grpc_session_stub.PublishFromTicket(session_pb2.PublishRequest(source_id=source_ticket, result_id=result_ticket), metadata=self.session.grpc_metadata)
except Exception as e:
raise DHError("failed to publish a ticket.") from e
74 changes: 73 additions & 1 deletion py/client/pydeephaven/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
#
"""This module implements the Session class which provides methods to connect to and interact with the Deephaven
server."""
from __future__ import annotations

import base64
import os
import threading
from typing import Dict, List, Union, Tuple, Any
from typing import Dict, List, Union, Tuple
from uuid import uuid4

import grpc
import pyarrow as pa
Expand Down Expand Up @@ -78,6 +80,35 @@ def get_token(self):
return self._token


class SharedTicket:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
""" A SharedTicket object represents a ticket that can be shared with other sessions. """

def __init__(self, ticket_bytes: bytes):
"""Initializes a SharedTicket object

Args:
ticket_bytes (bytes): the raw bytes for the ticket
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
"""
self._ticket_bytes = ticket_bytes
self.api_ticket = ticket_pb2.Ticket(ticket=self._ticket_bytes)

@property
def bytes(self) -> bytes:
""" The raw bytes for the ticket."""
return self._ticket_bytes

@classmethod
def random_ticket(cls) -> SharedTicket:
"""Generates a random shared ticket. To minimize the probability of collision, the ticket is made from a
generated UUID.

Returns:
a SharedTicket object
"""
bytes_ = uuid4().int.to_bytes(16, byteorder='little', signed=False)
return cls(ticket_bytes=b'h' + bytes_)


class Session:
"""A Session object represents a connection to the Deephaven data server. It contains a number of convenience
methods for asking the server to create tables, import Arrow data into tables, merge tables, run Python scripts, and
Expand Down Expand Up @@ -429,6 +460,47 @@ def bind_table(self, name: str, table: Table) -> None:
"""
self.console_service.bind_table(table=table, variable_name=name)

def publish_table(self, ticket: SharedTicket, table: Table) -> None:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
"""Publishes a table to the given shared ticket. The ticket can then be used by another session to fetch the
table.
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

Note that, the shared ticket can be fetched by other sessions to access the table as long as the table is
not released. When the table is released either through an explicit call of the close method on it, or
implicitly through garbage collection, or through the closing of the publishing session, the shared ticket will
no longer be valid.

Args:
ticket (SharedTicket): a SharedTicket object
table (Table): a Table object

Raises:
DHError
"""
self._session_service.publish(table.ticket, ticket.api_ticket)

def fetch_table(self, ticket: SharedTicket) -> Table:
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
"""Fetches a table by ticket.
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

Args:
ticket (SharedTicket): a ticket

Returns:
a Table object

Raises:
DHError
"""
table = Table(session=self, ticket=ticket.api_ticket)
try:
table_op = FetchTableOp()
return self.table_service.grpc_table_op(table, table_op)
except Exception as e:
raise DHError("could not fetch table by ticket") from e
finally:
# Explicitly close the table without releasing it (because it isn't ours)
table.ticket = None
table.schema = None

def time_table(self, period: Union[int, str], start_time: Union[int, str] = None,
blink_table: bool = False) -> Table:
"""Creates a time table on the server.
Expand Down
25 changes: 25 additions & 0 deletions py/client/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from pydeephaven import DHError
from pydeephaven import Session
from pydeephaven.session import SharedTicket
from tests.testbase import BaseTestCase


Expand Down Expand Up @@ -339,6 +340,30 @@ def test_blink_input_table(self):
blink_input_table.delete(dh_table.select(["f1"]))


def test_publish_table(self):
pub_session = Session()
t = pub_session.empty_table(1000).update(["X = i", "Y = 2*i"])
self.assertEqual(t.size, 1000)
shared_ticket = SharedTicket.random_ticket()
pub_session.publish_table(shared_ticket, t)

sub_session1 = Session()
t1 = sub_session1.fetch_table(shared_ticket)
self.assertEqual(t1.size, 1000)
pa_table = t1.to_arrow()
self.assertEqual(pa_table.num_rows, 1000)

with self.subTest("the 1st subscriber session is gone, shared ticket is still valid"):
sub_session1.close()
sub_session2 = Session()
t2 = sub_session2.fetch_table(shared_ticket)
self.assertEqual(t2.size, 1000)

with self.subTest("the publisher session is gone, shared ticket becomes invalid"):
pub_session.close()
with self.assertRaises(DHError):
sub_session2.fetch_table(shared_ticket)


if __name__ == '__main__':
unittest.main()
Loading
Loading