diff --git a/py/client/pydeephaven/_app_service.py b/py/client/pydeephaven/_app_service.py index 53c3f65412d..4dc00788eee 100644 --- a/py/client/pydeephaven/_app_service.py +++ b/py/client/pydeephaven/_app_service.py @@ -15,9 +15,9 @@ def __init__(self, session): def list_fields(self) -> Any: """Fetches the current application fields.""" try: - fields = self._grpc_app_stub.ListFields( - application_pb2.ListFieldsRequest(), - metadata=self.session.grpc_metadata + fields = self.session.wrap_bidi_rpc( + self._grpc_app_stub.ListFields, + application_pb2.ListFieldsRequest() ) return fields except Exception as e: diff --git a/py/client/pydeephaven/_arrow_flight_service.py b/py/client/pydeephaven/_arrow_flight_service.py index 3ca1e2ca390..92bbb8fef2d 100644 --- a/py/client/pydeephaven/_arrow_flight_service.py +++ b/py/client/pydeephaven/_arrow_flight_service.py @@ -5,6 +5,7 @@ import pyarrow as pa import pyarrow.flight as paflight +from pyarrow.flight import FlightCallOptions from pydeephaven._arrow import map_arrow_type from pydeephaven.dherror import DHError from pydeephaven.table import Table @@ -26,10 +27,10 @@ def import_table(self, data: pa.Table) -> Table: dh_fields.append(pa.field(name=f.name, type=f.type, metadata=map_arrow_type(f.type))) dh_schema = pa.schema(dh_fields) - # No need to add headers/metadata here via the options argument; - # or middleware is already doing it for every call. writer, reader = self._flight_client.do_put( - pa.flight.FlightDescriptor.for_path("export", str(ticket)), dh_schema) + pa.flight.FlightDescriptor.for_path("export", str(ticket)), + dh_schema, + FlightCallOptions(headers=self.session.grpc_metadata)) writer.write_table(data) # Note that pyarrow's write_table completes the gRPC. If we send another gRPC close # it is possible that by the time the request arrives at the server that it no longer @@ -44,9 +45,10 @@ def do_get_table(self, table: Table) -> pa.Table: """Gets a snapshot of a Table via Flight do_get.""" try: flight_ticket = paflight.Ticket(table.ticket.ticket) - # No need to add headers/metadata here via the options argument; - # or middleware is already doing it for every call. - reader = self._flight_client.do_get(flight_ticket) + reader = self._flight_client.do_get( + flight_ticket, + FlightCallOptions(headers=self.session.grpc_metadata)) + return reader.read_all() except Exception as e: raise DHError("failed to perform a flight DoGet on the table.") from e @@ -59,8 +61,9 @@ def do_exchange(self): """ try: desc = pa.flight.FlightDescriptor.for_command(b"dphn") - options = paflight.FlightCallOptions(headers=self.session.grpc_metadata) - writer, reader = self._flight_client.do_exchange(desc, options) + writer, reader = self._flight_client.do_exchange( + desc, + FlightCallOptions(headers=self.session.grpc_metadata)) return writer, reader except Exception as e: diff --git a/py/client/pydeephaven/_config_service.py b/py/client/pydeephaven/_config_service.py index f16df6813ac..3646f359729 100644 --- a/py/client/pydeephaven/_config_service.py +++ b/py/client/pydeephaven/_config_service.py @@ -15,9 +15,9 @@ def __init__(self, session): def get_configuration_constants(self) -> Dict[str, Any]: """Fetches the server configuration as a dict.""" try: - response = self._grpc_app_stub.GetConfigurationConstants(config_pb2.ConfigurationConstantsRequest(), - metadata=self.session.grpc_metadata - ) + response = self.session.wrap_rpc( + self._grpc_app_stub.GetConfigurationConstants, + config_pb2.ConfigurationConstantsRequest()) return dict(response.config_values) except Exception as e: raise DHError("failed to get the configuration constants.") from e diff --git a/py/client/pydeephaven/_console_service.py b/py/client/pydeephaven/_console_service.py index c7d403f5d6a..2cea9d93144 100644 --- a/py/client/pydeephaven/_console_service.py +++ b/py/client/pydeephaven/_console_service.py @@ -19,25 +19,29 @@ def start_console(self): if self.console_id: return - try: - result_id = self.session.make_ticket() - response = self._grpc_console_stub.StartConsole( - console_pb2.StartConsoleRequest(result_id=result_id, session_type=self.session._session_type), - metadata=self.session.grpc_metadata) - self.console_id = response.result_id - except Exception as e: - raise DHError("failed to start a console.") from e + with self.session._r_lock: + if not self.console_id: + try: + result_id = self.session.make_ticket() + response = self.session.wrap_rpc( + self._grpc_console_stub.StartConsole, + console_pb2.StartConsoleRequest( + result_id=result_id, + session_type=self.session._session_type)) + self.console_id = response.result_id + except Exception as e: + raise DHError("failed to start a console.") from e def run_script(self, server_script: str) -> Any: """Runs a Python script in the console.""" self.start_console() try: - response = self._grpc_console_stub.ExecuteCommand( + response = self.session.wrap_rpc( + self._grpc_console_stub.ExecuteCommand, console_pb2.ExecuteCommandRequest( console_id=self.console_id, - code=server_script), - metadata=self.session.grpc_metadata) + code=server_script)) return response except Exception as e: raise DHError("failed to execute a command in the console.") from e @@ -47,10 +51,11 @@ def bind_table(self, table: Table, variable_name: str): if not table or not variable_name: raise DHError("invalid table and/or variable_name values.") try: - response = self._grpc_console_stub.BindTableToVariable( - console_pb2.BindTableToVariableRequest(console_id=self.console_id, - table_id=table.ticket, - variable_name=variable_name), - metadata=self.session.grpc_metadata) + self.session.wrap_rpc( + self._grpc_console_stub.BindTableToVariable, + console_pb2.BindTableToVariableRequest( + console_id=self.console_id, + table_id=table.ticket, + variable_name=variable_name)) except Exception as e: raise DHError("failed to bind a table to a variable on the server.") from e diff --git a/py/client/pydeephaven/_input_table_service.py b/py/client/pydeephaven/_input_table_service.py index ee85cdb2ff5..810e368dbce 100644 --- a/py/client/pydeephaven/_input_table_service.py +++ b/py/client/pydeephaven/_input_table_service.py @@ -15,19 +15,20 @@ def __init__(self, session): def add(self, input_table: InputTable, table: Table): """Adds a table to the InputTable.""" try: - response = self._grpc_input_table_stub.AddTableToInputTable( + self.session.wrap_rpc( + self._grpc_input_table_stub.AddTableToInputTable, inputtable_pb2.AddTableRequest(input_table=input_table.ticket, - table_to_add=table.ticket), - metadata=self.session.grpc_metadata) + table_to_add=table.ticket)) except Exception as e: raise DHError("failed to add to InputTable") from e def delete(self, input_table: InputTable, table: Table): """Deletes a table from an InputTable.""" try: - response = self._grpc_input_table_stub.DeleteTableFromInputTable( - inputtable_pb2.DeleteTableRequest(input_table=input_table.ticket, - table_to_remove=table.ticket), - metadata=self.session.grpc_metadata) + self.session.wrap_rpc( + self._grpc_input_table_stub.DeleteTableFromInputTable, + inputtable_pb2.DeleteTableRequest( + input_table=input_table.ticket, + table_to_remove=table.ticket)) except Exception as e: raise DHError("failed to delete from InputTable") from e diff --git a/py/client/pydeephaven/_plugin_obj_service.py b/py/client/pydeephaven/_plugin_obj_service.py index 3e5fa75db9f..28c54a5fd06 100644 --- a/py/client/pydeephaven/_plugin_obj_service.py +++ b/py/client/pydeephaven/_plugin_obj_service.py @@ -23,7 +23,9 @@ def __init__(self, session: 'pydeephaven.session.Session'): def message_stream(self, req_stream: PluginRequestStream) -> Any: """Opens a connection to the server-side implementation of this plugin.""" try: - resp = self._grpc_app_stub.MessageStream(req_stream, metadata=self.session.grpc_metadata) + resp = self.session.wrap_bidi_rpc( + self._grpc_app_stub.MessageStream, + req_stream) return resp except Exception as e: raise DHError("failed to establish bidirectional stream with the server.") from e diff --git a/py/client/pydeephaven/_session_service.py b/py/client/pydeephaven/_session_service.py index 87d07999540..ca8d17a4ac7 100644 --- a/py/client/pydeephaven/_session_service.py +++ b/py/client/pydeephaven/_session_service.py @@ -30,15 +30,19 @@ def connect(self) -> grpc.Channel: def close(self): """Closes the gRPC connection.""" try: - self._grpc_session_stub.CloseSession( - session_pb2.HandshakeRequest(auth_protocol=0, payload=self.session._auth_token), - metadata=self.session.grpc_metadata) + self.session.wrap_rpc( + self._grpc_session_stub.CloseSession, + session_pb2.HandshakeRequest( + auth_protocol=0, + payload=self.session._auth_header_value)) except Exception as e: raise DHError("failed to close the session.") from e def release(self, ticket): """Releases an exported ticket.""" try: - self._grpc_session_stub.Release(session_pb2.ReleaseRequest(id=ticket), metadata=self.session.grpc_metadata) + self.session.wrap_rpc( + self._grpc_session_stub.Release, + session_pb2.ReleaseRequest(id=ticket)) except Exception as e: raise DHError("failed to release a ticket.") from e diff --git a/py/client/pydeephaven/_table_service.py b/py/client/pydeephaven/_table_service.py index d8966b741d4..e2c0a0bee56 100644 --- a/py/client/pydeephaven/_table_service.py +++ b/py/client/pydeephaven/_table_service.py @@ -20,9 +20,9 @@ def batch(self, ops: List[TableOp]) -> Table: batch_ops = BatchOpAssembler(self.session, table_ops=ops).build_batch() try: - response = self._grpc_table_stub.Batch( - table_pb2.BatchTableRequest(ops=batch_ops), - metadata=self.session.grpc_metadata) + response = self.session.wrap_bidi_rpc( + self._grpc_table_stub.Batch, + table_pb2.BatchTableRequest(ops=batch_ops)) exported_tables = [] for exported in response: @@ -46,9 +46,11 @@ def grpc_table_op(self, table: Table, op: TableOp, table_class: type = Table) -> else: table_reference = None stub_func = op.__class__.get_stub_func(self._grpc_table_stub) - response = stub_func(op.make_grpc_request(result_id=result_id, source_id=table_reference), - metadata=self.session.grpc_metadata) - + response = self.session.wrap_rpc( + stub_func, + op.make_grpc_request( + result_id=result_id, + source_id=table_reference)) if response.success: return table_class(self.session, ticket=response.result_id.ticket, schema_header=response.schema_header, @@ -61,11 +63,13 @@ def grpc_table_op(self, table: Table, op: TableOp, table_class: type = Table) -> def fetch_etcr(self, ticket) -> Table: """Given a ticket, constructs a table around it, by fetching metadata from the server.""" - response = self._grpc_table_stub.GetExportedTableCreationResponse(ticket, metadata=self.session.grpc_metadata) + response = self.session.wrap_rpc( + self._grpc_table_stub.GetExportedTableCreationResponse, + ticket) if response.success: return Table(self.session, ticket=response.result_id.ticket, schema_header=response.schema_header, size=response.size, is_static=response.is_static) - else: - raise DHError(f"Server error received for ExportedTableCreationResponse: {response.error_info}") + raise DHError( + f"Server error received for ExportedTableCreationResponse: {response.error_info}") diff --git a/py/client/pydeephaven/session.py b/py/client/pydeephaven/session.py index df4d4107f79..987f374a881 100644 --- a/py/client/pydeephaven/session.py +++ b/py/client/pydeephaven/session.py @@ -5,15 +5,18 @@ server.""" import base64 +import logging import os +from random import random import threading -from typing import Dict, List, Union, Tuple, Any +from typing import Any, Dict, Iterable, List, Union, Tuple, NewType +from uuid import uuid4 import grpc import pyarrow as pa import pyarrow.flight as paflight from bitstring import BitArray -from pyarrow._flight import ClientMiddlewareFactory, ClientMiddleware, ClientAuthHandler +from pyarrow._flight import ClientMiddlewareFactory, ClientMiddleware from pydeephaven._app_service import AppService from pydeephaven._arrow_flight_service import ArrowFlightService @@ -31,14 +34,17 @@ from pydeephaven.query import Query from pydeephaven.table import Table, InputTable +logger = logging.getLogger(__name__) + class _DhClientAuthMiddlewareFactory(ClientMiddlewareFactory): def __init__(self, session): super().__init__() self._session = session + self._middleware = _DhClientAuthMiddleware(session) def start_call(self, info): - return _DhClientAuthMiddleware(self._session) + return self._middleware class _DhClientAuthMiddleware(ClientMiddleware): @@ -51,31 +57,23 @@ def call_completed(self, exception): def received_headers(self, headers): super().received_headers(headers) - if headers: - auth_token = bytes(headers.get("authorization")[0], encoding='ascii') - if auth_token and auth_token != self._session._auth_token: - self._session._auth_token = auth_token + header_key = "authorization" + try: + if headers and header_key in headers: + header_value = headers.get(header_key) + auth_header_value = bytes(header_value[0], encoding='ascii') + if auth_header_value: + self._session._auth_header_value = auth_header_value + except Exception as e: + logger.exception(f'_DhClientAuthMiddleware.received_headers got headers={headers}') + return def sending_headers(self): - return { - **{ - "authorization": self._session._auth_token - }, **self._session._extra_headers - } - + return None -class _DhClientAuthHandler(ClientAuthHandler): - def __init__(self, session): - super().__init__() - self._session = session - self._token = b'' - def authenticate(self, outgoing, incoming): - outgoing.write(self._session._auth_token) - self._token = incoming.read() - - def get_token(self): - return self._token +def _trace(who: str) -> None: + logger.debug(f'TRACE: {who}') class Session: @@ -137,7 +135,9 @@ def __init__(self, host: str = None, Raises: DHError """ + _trace('Session.__init__') self._r_lock = threading.RLock() # for thread-safety when accessing/changing session global state + self._services_lock = threading.Lock() # for lazy initialization of services self._last_ticket = 0 self._ticket_bitarray = BitArray(1024) @@ -149,6 +149,7 @@ def __init__(self, host: str = None, if not port: self.port = int(os.environ.get("DH_PORT", 10000)) + self._logpfx = f'pydh.Session {id(self)} {host}:port: ' self._use_tls = use_tls self._tls_root_certs = tls_root_certs self._client_cert_chain = client_cert_chain @@ -158,14 +159,23 @@ def __init__(self, host: str = None, self.is_connected = False + # We set here the initial value for the authorization header, + # which will bootstrap our authentication to the server on the first + # RPC going out. The server will give us back a bearer token to use + # in subsequent RPCs in the same authorization header. From then + # on, the value of _auth_header_value will be similar to b'Bearer X' + # where X is the bearer token provided by the server. if auth_type == "Anonymous": - self._auth_token = auth_type + self._auth_header_value = auth_type elif auth_type == "Basic": auth_token_base64 = base64.b64encode(auth_token.encode("ascii")).decode("ascii") - self._auth_token = "Basic " + auth_token_base64 + self._auth_header_value = "Basic " + auth_token_base64 else: - self._auth_token = str(auth_type) + " " + auth_token + self._auth_header_value = str(auth_type) + " " + auth_token + self._auth_header_value = bytes(self._auth_header_value, 'ascii') + # Counter for consecutive failures to refresh auth token, used to calculate retry backoff + self._refresh_failures = 0 self.grpc_channel = None self._session_service = None self._table_service = None @@ -186,6 +196,8 @@ def __init__(self, host: str = None, def __enter__(self): if not self.is_connected: + # double-checked locking, is_connected is checked inside _connect again, which + # may not end up connecting. self._connect() return self @@ -195,6 +207,32 @@ def __exit__(self, exc_type, exc_val, exc_tb): def __del__(self): self.close() + def update_metadata(self, metadata: Iterable[Tuple[str, Union[str, bytes]]]) -> None: + for header_tuple in metadata: + if header_tuple[0] == "authorization": + v = header_tuple[1] + self._auth_header_value = v if isinstance(v, bytes) else v.encode('ascii') + break + + def wrap_rpc(self, stub_call: NotBidiRpc, *args, **kwargs) -> Any: + if 'metadata' in kwargs: + raise DHError('Internal error: "metadata" in kwargs not supported in wrap_rpc.') + kwargs["metadata"] = self.grpc_metadata + # We use a future to get a chance to process initial metadata before the call + # is completed + future = stub_call.future(*args, **kwargs) + self.update_metadata(future.initial_metadata()) + # Now block until we get the result (or an exception) + return future.result() + + def wrap_bidi_rpc(self, stub_call: BidiRpc, *args, **kwargs) -> Any: + if 'metadata' in kwargs: + raise DHError('Internal error: "metadata" in kwargs not supported in wrap_bidi_rpc.') + kwargs["metadata"] = self.grpc_metadata + response = stub_call(*args, **kwargs) + self.update_metadata(response.initial_metadata()) + return response + @property def tables(self): with self._r_lock: @@ -210,7 +248,12 @@ def exportable_objects(self) -> Dict[str, ticket_pb2.TypedTicket]: @property def grpc_metadata(self): - l = [(b'authorization', self._auth_token)] + header_value_snap = self._auth_header_value # ensure it doesn't change while doing multiple reads + if not header_value_snap or not isinstance(header_value_snap, bytes): + logger.warning(f'{self._logpfx} internal invariant violated, _auth_header_value={header_value_snap}') + l = [] + else: + l = [(b'authorization', header_value_snap)] if self._extra_headers: l.extend(list(self._extra_headers.items())) return l @@ -218,54 +261,65 @@ def grpc_metadata(self): @property def table_service(self) -> TableService: if not self._table_service: - self._table_service = TableService(self) + with self._services_lock: + if not self._table_service: + self._table_service = TableService(self) return self._table_service @property def session_service(self) -> SessionService: if not self._session_service: - self._session_service = SessionService(self) + with self._services_lock: + if not self._session_service: + self._session_service = SessionService(self) return self._session_service @property def console_service(self) -> ConsoleService: if not self._console_service: - self._console_service = ConsoleService(self) + with self._services_lock: + if not self._console_service: + self._console_service = ConsoleService(self) return self._console_service @property def flight_service(self) -> ArrowFlightService: if not self._flight_service: - self._flight_service = ArrowFlightService(self, self._flight_client) - + with self._services_lock: + if not self._flight_service: + self._flight_service = ArrowFlightService(self, self._flight_client) return self._flight_service @property def app_service(self) -> AppService: if not self._app_service: - self._app_service = AppService(self) - + with self._services_lock: + if not self._app_service: + self._app_service = AppService(self) return self._app_service @property def config_service(self): if not self._config_service: - self._config_service = ConfigService(self) - + with self._services_lock: + if not self._config_service: + self._config_service = ConfigService(self) return self._config_service @property def input_table_service(self) -> InputTableService: if not self._input_table_service: - self._input_table_service = InputTableService(self) - + with self._services_lock: + if not self._input_table_service: + self._input_table_service = InputTableService(self) return self._input_table_service @property def plugin_object_service(self) -> PluginObjService: if not self._plugin_obj_service: - self._plugin_obj_service = PluginObjService(self) - + with self._services_lock: + if not self._plugin_obj_service: + self._plugin_obj_service = PluginObjService(self) return self._plugin_obj_service def make_ticket(self, ticket_no=None): @@ -296,7 +350,11 @@ def _fetch_fields(self): return resp.created if resp.created else [] def _connect(self): + _trace(f'_connect id={id(self)}') with self._r_lock: + if self.is_connected: + return + _trace(f'_connect id={id(self)} connecting.') try: scheme = "grpc+tls" if self._use_tls else "grpc" self._flight_client = paflight.FlightClient( @@ -307,38 +365,84 @@ def _connect(self): private_key=self._client_private_key, generic_options=self._client_opts ) - self._auth_handler = _DhClientAuthHandler(self) - self._flight_client.authenticate(self._auth_handler) except Exception as e: raise DHError("failed to connect to the server.") from e self.grpc_channel = self.session_service.connect() + # This RPC will get is the configuration and will also bootstrap + # our authentication to the server by virtue of sending the right + # header: "authorization" header key and our selected header value. + # The implementation will process the initial headers coming back + # from the server which will contain the bearer token we will + # use in subsequent RPCs; the token will be included in the updated + # value for self._auth_header_value that will happen through a call + # to update_metadata. config_dict = self.config_service.get_configuration_constants() session_duration = config_dict.get("http.session.durationMs") if not session_duration: raise DHError("server configuration is missing http.session.durationMs") + self._timeout_seconds = int(session_duration.string_value)/1000.0 + # Random skew to ensure multiple processes that may have + # started together don't align retries. + skew = random() + # Backoff schedule for retries after consecutive failures to refresh auth token + self._refresh_backoff = [ skew + 0.1, skew + 1, skew + 10 ] + + if self._refresh_backoff[0] > self._timeout_seconds: + raise DHError(f'server configuration http.session.durationMs={session_duration} is too small.') + if 0.25*self._timeout_seconds < self._refresh_backoff[-1]: + self._refresh_backoff.extend( + [skew + 0.25 * self._timeout_seconds, + skew + 0.35 * self._timeout_seconds, + skew + 0.45 * self._timeout_seconds]) + for i in range(1, len(self._refresh_backoff)): + if self._refresh_backoff[i] > self._timeout_seconds: + self._refresh_backoff = self._refresh_backoff[0:i] + break + self.is_connected = True - self._timeout = int(session_duration.string_value) if self._never_timeout: self._keep_alive() def _keep_alive(self): - if self.is_connected: - if self._keep_alive_timer: - self._refresh_token() - self._keep_alive_timer = threading.Timer(self._timeout / 2 / 1000, self._keep_alive) - self._keep_alive_timer.daemon = True - self._keep_alive_timer.start() - - def _refresh_token(self): + _trace(f'_keep_alive') + if not self.is_connected: + return + ok = True + if self._keep_alive_timer: + ok = self._refresh_token() + if ok: + self._refresh_failures = 0 + else: + self._refresh_failures += 1 + if self._refresh_failures == 0: + timer_wakeup = 0.5*self._timeout_seconds + elif self._refresh_failures >= len(self._refresh_backoff): + msg = f'Failed to refresh token {self._refresh_failures} times, will stop retrying.' + logger.critical(msg) + raise DHError(msg) + else: + timer_wakeup = self._refresh_backoff[self._refresh_failures] + _trace(f'_keep_alive timer_wakeup={timer_wakeup}') + self._keep_alive_timer = threading.Timer(timer_wakeup, self._keep_alive) + self._keep_alive_timer.daemon = True + self._keep_alive_timer.start() + if not ok: + logger.warning( + f'{self._logpfx}: failed to refresh auth token (retry #{self._refresh_failures-1}).' + + f' Will retry in {timer_wakeup} seconds.') + + def _refresh_token(self) -> bool: + _trace('_refresh_token') try: - self._flight_client.authenticate(self._auth_handler) - except Exception as e: - self.is_connected = False - raise DHError("failed to refresh auth token") from e + self.config_service.get_configuration_constants() + return True + except Exception as ex: + logger.warning(f'{self._logpfx} Caught exception while refreshing auth token: {ex}.') + return False @property def is_alive(self) -> bool: @@ -351,7 +455,7 @@ def is_alive(self) -> bool: return True try: - self._flight_client.authenticate(self._auth_handler) + self.config_service.get_configuration_constants() return True except DHError as e: self.is_connected = False @@ -364,12 +468,13 @@ def close(self) -> None: DHError """ with self._r_lock: - if self.is_connected: - self.session_service.close() - self.grpc_channel.close() - self.is_connected = False - self._last_ticket = 0 - self._flight_client.close() + if not self.is_connected: + return + self.session_service.close() + self.grpc_channel.close() + self.is_connected = False + self._last_ticket = 0 + self._flight_client.close() def release(self, ticket): self.session_service.release(ticket) diff --git a/py/client/tests/test_session.py b/py/client/tests/test_session.py index b5b71bc83d5..0dc34c5a2df 100644 --- a/py/client/tests/test_session.py +++ b/py/client/tests/test_session.py @@ -3,7 +3,7 @@ # import unittest -from time import sleep +from time import sleep, time import pyarrow as pa import pandas as pd @@ -273,6 +273,5 @@ def test_auto_close(self): session = None self.assertIsNone(session) - if __name__ == '__main__': unittest.main() diff --git a/py/client/tests/testbase.py b/py/client/tests/testbase.py index 7ab848dc4a1..c793f4376eb 100644 --- a/py/client/tests/testbase.py +++ b/py/client/tests/testbase.py @@ -26,6 +26,8 @@ def tearDownClass(cls) -> None: os.remove(BaseTestCase.csv_file) def setUp(self) -> None: + # For netty server and psk, change auth_token to what the server printed. + # self.session = Session(port = 8080, auth_type = 'io.deephaven.authentication.psk.PskAuthenticationHandler', auth_token = 'safw7c4nzegp') self.session = Session() def tearDown(self) -> None: