From 7abdc1b4741b27e0dd53cb0b39106fc4dc7cd963 Mon Sep 17 00:00:00 2001 From: Tomasz Noczynski <88330807+tnoczyns-volue@users.noreply.github.com> Date: Fri, 12 Jul 2024 10:14:47 +0200 Subject: [PATCH] Configurable gRPC inbound message size + docs (#483) * Configurable gRPC inbound message size + docs * Apply review suggestions --- docs/source/mesh_client.rst | 49 +++++++++++++++ docs/source/versions.rst | 10 ++- src/volue/mesh/_base_connection.py | 63 ++++++++++++++++--- src/volue/mesh/_base_session.py | 21 ++++++- .../mesh/examples/read_timeseries_points.py | 4 ++ .../examples/read_timeseries_points_async.py | 4 ++ .../mesh/examples/run_inflow_calculation.py | 34 ++++++++-- src/volue/mesh/examples/run_simulation.py | 34 ++++++++-- .../mesh/examples/write_timeseries_points.py | 8 +++ 9 files changed, 207 insertions(+), 20 deletions(-) diff --git a/docs/source/mesh_client.rst b/docs/source/mesh_client.rst index 26fd8143..e8577ba5 100644 --- a/docs/source/mesh_client.rst +++ b/docs/source/mesh_client.rst @@ -19,6 +19,55 @@ Using :ref:`api:volue.mesh.aio`.Connection: As time series data can potentially be large `Apache Arrow `_ is used to optimize memory sharing. +gRPC communication +****************** + +By default gRPC limits the size of inbound messages to 4MB. From Mesh Python +SDK side, the user can change this limit when creating a connection to Mesh +using `grpc_max_receive_message_length` argument. + +See: + +* :meth:`volue.mesh.Connection.Session.insecure` +* :meth:`volue.mesh.Connection.Session.with_tls` +* :meth:`volue.mesh.Connection.Session.with_kerberos` +* :meth:`volue.mesh.Connection.Session.with_external_access_token` + +Example usage: + +.. code-block:: python + + connection = mesh.Connection.with_tls( + address, + tls_root_pem_cert, + grpc_max_receive_message_length=10 * 1024* 1024, # 10MB + ) + + +Another example of connection with `grpc_max_receive_message_length` argument +is in `run_simulation.py`. + +.. note:: + gRPC outbound message size is not limited by default. + +This might be useful when e.g.: running long simulations with +`return_datasets` enabled. In such cases the dataset's size might exceed the +4MB limit and a `RESOURCE_EXHAUSTED` status code would be returned. + +However, in other cases like reading time series data, we suggest reading the +data in chunks. E.g.: instead of reading 50 years of hourly time series data, +the user should request few read operations, but with shorter read intervals. + +The same is true for writing data, like time series data. Here however, it is +not a suggestion, but a must. Mesh server gRPC inbound message size is not +configurable and therefore it is always equal to 4MB. If gRPC client, like Mesh +Python SDK, sends too big message then the request will be discarded. To avoid +this clients must send data in chunks. + +.. note:: + Single time series point occupies 20 bytes. To avoid exceeding the 4MB + limit single read or write operation should contain ~200k points maximum. + Date times and time zones ************************* diff --git a/docs/source/versions.rst b/docs/source/versions.rst index a6d4bb44..d66ab97d 100644 --- a/docs/source/versions.rst +++ b/docs/source/versions.rst @@ -15,14 +15,18 @@ Compatible with New features ~~~~~~~~~~~~~~~~~~ -- Support for updating versioned one-to-many link relations. (:pull:`476`) +- Support for updating versioned one-to-many link relations. :pull:`476` See `update_versioned_one_to_many_link_relation_attribute`. +- Configurable gRPC inbound message size. :issue:`421` + + See :ref:`mesh_client:gRPC communication`. + Changes ~~~~~~~~~~~~~~~~~~ -- Changes for Mesh server 2.15 gRPC interface compatibility. (:issue:`470`) +- Changes for Mesh server 2.15 gRPC interface compatibility. :issue:`470` It introduces breaking API change: `update_versioned_link_relation_attribute` is renamed to `update_versioned_one_to_one_link_relation_attribute`. @@ -56,7 +60,7 @@ New features Changes ~~~~~~~~~~~~~~~~~~ -- Changes for Mesh server 2.14 gRPC interface compatibility. (:issue:`464`) +- Changes for Mesh server 2.14 gRPC interface compatibility. :issue:`464` Install instructions ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/src/volue/mesh/_base_connection.py b/src/volue/mesh/_base_connection.py index 4d0a60bd..6be233d2 100644 --- a/src/volue/mesh/_base_connection.py +++ b/src/volue/mesh/_base_connection.py @@ -66,6 +66,21 @@ def _secure_grpc_channel(*args, **kwargs): or grpc.secure_channel depending on desired behavior. """ + @staticmethod + def _get_grpc_channel_options(max_receive_message_length: Optional[int]): + """Create a secure gRPC channel. + + Derived classes should implement this using either grpc.aio.secure_channel + or grpc.secure_channel depending on desired behavior. + """ + return ( + [ + ("grpc.max_receive_message_length", max_receive_message_length), + ] + if max_receive_message_length + else [] + ) + def __init__( self, host=None, @@ -162,17 +177,29 @@ def __init__( self.time_series_service = time_series_pb2_grpc.TimeseriesServiceStub(channel) @classmethod - def insecure(cls: C, target: str) -> C: + def insecure( + cls: C, target: str, *, grpc_max_receive_message_length: Optional[int] = None + ) -> C: """Creates an insecure connection to a Mesh server. Args: target: The server address. + grpc_max_receive_message_length: Maximum inbound gRPC message size + in bytes. By default the maximum inbound gRPC message size is 4MB. """ - channel = cls._insecure_grpc_channel(target) + + options = cls._get_grpc_channel_options(grpc_max_receive_message_length) + channel = cls._insecure_grpc_channel(target=target, options=options) return cls(channel=channel) @classmethod - def with_tls(cls: C, target: str, root_certificates: Optional[str]) -> C: + def with_tls( + cls: C, + target: str, + root_certificates: Optional[str], + *, + grpc_max_receive_message_length: Optional[int] = None, + ) -> C: """Creates an encrypted connection to a Mesh server. Args: @@ -180,9 +207,14 @@ def with_tls(cls: C, target: str, root_certificates: Optional[str]) -> C: root_certificates: The PEM-encoded TLS root certificates as a byte string, or None to retrieve them from a default location chosen by the gRPC runtime. + grpc_max_receive_message_length: Maximum inbound gRPC message size + in bytes. By default the maximum inbound gRPC message size is 4MB. """ credentials = grpc.ssl_channel_credentials(root_certificates) - channel = cls._secure_grpc_channel(target, credentials) + options = cls._get_grpc_channel_options(grpc_max_receive_message_length) + channel = cls._secure_grpc_channel( + target=target, credentials=credentials, options=options + ) return cls(channel=channel) @classmethod @@ -192,6 +224,8 @@ def with_kerberos( root_certificates: Optional[str], service_principal: str, user_principal: Optional[str] = None, + *, + grpc_max_receive_message_length: Optional[int] = None, ) -> C: """Creates an encrypted and authenticated connection to a Mesh server. @@ -215,6 +249,8 @@ def with_kerberos( service. For example 'HOST\\server.at.host'. user_principal: The Kerberos user principal name. For example 'ad\\user`. + grpc_max_receive_message_length: Maximum inbound gRPC message size + in bytes. By default the maximum inbound gRPC message size is 4MB. """ ssl_credentials = grpc.ssl_channel_credentials(root_certificates) auth_params = _authentication.Authentication.Parameters( @@ -225,12 +261,20 @@ def with_kerberos( credentials = grpc.composite_channel_credentials( ssl_credentials, call_credentials ) - channel = cls._secure_grpc_channel(target, credentials) + options = cls._get_grpc_channel_options(grpc_max_receive_message_length) + channel = cls._secure_grpc_channel( + target=target, credentials=credentials, options=options + ) return cls(channel=channel, auth_metadata_plugin=auth_metadata_plugin) @classmethod def with_external_access_token( - cls: C, target: str, root_certificates: Optional[str], access_token: str + cls: C, + target: str, + root_certificates: Optional[str], + access_token: str, + *, + grpc_max_receive_message_length: Optional[int] = None, ) -> C: """Creates an encrypted connection to a Mesh server and will add provided access token to authorization header to each server request. @@ -245,6 +289,8 @@ def with_external_access_token( by the gRPC runtime. access_token: Token obtained externally, used to get access to Mesh server. + grpc_max_receive_message_length: Maximum inbound gRPC message size + in bytes. By default the maximum inbound gRPC message size is 4MB. """ ssl_credentials = grpc.ssl_channel_credentials(root_certificates) auth_metadata_plugin = ExternalAccessTokenPlugin(access_token) @@ -252,7 +298,10 @@ def with_external_access_token( credentials = grpc.composite_channel_credentials( ssl_credentials, call_credentials ) - channel = cls._secure_grpc_channel(target, credentials) + options = cls._get_grpc_channel_options(grpc_max_receive_message_length) + channel = cls._secure_grpc_channel( + target=target, credentials=credentials, options=options + ) return cls(channel=channel, auth_metadata_plugin=auth_metadata_plugin) @classmethod diff --git a/src/volue/mesh/_base_session.py b/src/volue/mesh/_base_session.py index 944d9b06..8ddb8365 100644 --- a/src/volue/mesh/_base_session.py +++ b/src/volue/mesh/_base_session.py @@ -204,8 +204,13 @@ def read_timeseries_points( end_time: datetime, ) -> Timeseries: """ - Reads time series points for - the specified time series in the given interval. + Reads time series points for the specified time series in the given + interval. + + If there are too many points in the response you might get + a `StatusCode.RESOURCE_EXHAUSTED` error. + See: :ref:`mesh_client:gRPC communication`. + For information about `datetime` arguments and time zones refer to :ref:`mesh_client:Date times and time zones`. @@ -228,6 +233,10 @@ def write_timeseries_points(self, timeseries: Timeseries) -> None: Writes time series points for the specified time series in the given interval. Resolution of the time series does not need to be set when writing time series. + If there are too many points in the request you will the Mesh server + will discard it and return `StatusCode.RESOURCE_EXHAUSTED`. + See: :ref:`mesh_client:gRPC communication`. + Args: timeseries: The modified time series. @@ -885,6 +894,10 @@ def run_simulation( ) -> Union[typing.Iterator[None], typing.AsyncIterator[None]]: """Run a hydro simulation using HydSim on the Mesh server. + In case of running a simulation on longer interval and with + `return_datasets` enabled you might get a `StatusCode.RESOURCE_EXHAUSTED` + error. See: :ref:`mesh_client:gRPC communication`. + This function is experimental and subject to larger changes. Args: @@ -932,6 +945,10 @@ def run_inflow_calculation( ) -> Union[typing.Iterator[None], typing.AsyncIterator[None]]: """Run an inflow calculation using HydSim on the Mesh server. + In case of running an inflow calculation on longer interval and with + `return_datasets` enabled you might get a `StatusCode.RESOURCE_EXHAUSTED` + error. See: :ref:`mesh_client:gRPC communication`. + Args: model: The name of the Mesh model in which the inflow calculation exists. diff --git a/src/volue/mesh/examples/read_timeseries_points.py b/src/volue/mesh/examples/read_timeseries_points.py index 9787de9c..9d30a9c7 100644 --- a/src/volue/mesh/examples/read_timeseries_points.py +++ b/src/volue/mesh/examples/read_timeseries_points.py @@ -22,6 +22,10 @@ def read_timeseries_points(session: Connection.Session): start = datetime(2016, 1, 1, 6, 0, 0) end = datetime(2016, 1, 1, 8, 0, 0) + # Each time series point occupies 20 bytes. By default gRPC has a limitation of 4MB inbound message size. + # In case of larger data volumes please send request data in chunks. + # E.g.: call multiple times `read_timeseries_points` with shorter interval. + # Send request to read time series based on time series key. timeseries = session.read_timeseries_points( target=timeseries_key, start_time=start, end_time=end diff --git a/src/volue/mesh/examples/read_timeseries_points_async.py b/src/volue/mesh/examples/read_timeseries_points_async.py index 25a391a1..f9414819 100644 --- a/src/volue/mesh/examples/read_timeseries_points_async.py +++ b/src/volue/mesh/examples/read_timeseries_points_async.py @@ -23,6 +23,10 @@ async def read_timeseries_points_async(session: Connection.Session): start = datetime(2016, 1, 1, 6, 0, 0) end = datetime(2016, 1, 1, 8, 0, 0) + # Each time series point occupies 20 bytes. By default gRPC has a limitation of 4MB inbound message size. + # In case of larger data volumes please send request data in chunks. + # E.g.: call multiple times `read_timeseries_points` with shorter interval. + # Send request to read time series based on time series key. timeseries = await session.read_timeseries_points( target=timeseries_key, start_time=start, end_time=end diff --git a/src/volue/mesh/examples/run_inflow_calculation.py b/src/volue/mesh/examples/run_inflow_calculation.py index 5ae8a3bd..8d60b7ef 100644 --- a/src/volue/mesh/examples/run_inflow_calculation.py +++ b/src/volue/mesh/examples/run_inflow_calculation.py @@ -7,13 +7,27 @@ import volue.mesh.aio from volue import mesh +GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024 # 10MB + def sync_run_inflow_calculation(address, tls_root_pem_cert): print("connecting...") # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.: - # connection = mesh.Connection.with_tls(address, tls_root_pem_cert) - connection = mesh.Connection.insecure(address) + # connection = mesh.Connection.with_tls( + # address, + # tls_root_pem_cert, + # grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES, + # ) + + # By default the maximum inbound gRPC message size is 4MB. When Mesh server + # returns datasets for longer inflow calculation intervals the gRPC message + # size may exceed this limit. In such cases the user can set new limit + # using `grpc_max_receive_message_length` when creating a connection to Mesh. + connection = mesh.Connection.insecure( + address, + grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES, + ) with connection.create_session() as session: start_time = datetime(2021, 1, 1) @@ -48,8 +62,20 @@ async def async_run_inflow_calculation(address, tls_root_pem_cert): print("connecting...") # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.: - # connection = mesh.aio.Connection.with_tls(address, tls_root_pem_cert) - connection = mesh.aio.Connection.insecure(address) + # connection = mesh.aio.Connection.with_tls( + # address, + # tls_root_pem_cert, + # grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES, + # ) + + # By default the maximum inbound gRPC message size is 4MB. When Mesh server + # returns datasets for longer inflow calculation intervals the gRPC message + # size may exceed this limit. In such cases the user can set new limit + # using `grpc_max_receive_message_length` when creating a connection to Mesh. + connection = mesh.aio.Connection.insecure( + address, + grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES, + ) async with connection.create_session() as session: start_time = datetime(2021, 1, 1) diff --git a/src/volue/mesh/examples/run_simulation.py b/src/volue/mesh/examples/run_simulation.py index d760f601..0feac66d 100644 --- a/src/volue/mesh/examples/run_simulation.py +++ b/src/volue/mesh/examples/run_simulation.py @@ -7,13 +7,27 @@ import volue.mesh.aio from volue import mesh +GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024 # 10MB + def sync_run_simulation(address, tls_root_pem_cert): print("connecting...") # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.: - # connection = mesh.Connection.with_tls(address, tls_root_pem_cert) - connection = mesh.Connection.insecure(address) + # connection = mesh.Connection.with_tls( + # address, + # tls_root_pem_cert, + # grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES, + # ) + + # By default the maximum inbound gRPC message size is 4MB. When Mesh server + # returns datasets for longer simulation intervals the gRPC message size + # may exceed this limit. In such cases the user can set new limit using + # `grpc_max_receive_message_length` when creating a connection to Mesh. + connection = mesh.Connection.insecure( + address, + grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES, + ) with connection.create_session() as session: start_time = datetime(2023, 11, 1) @@ -47,8 +61,20 @@ async def async_run_simulation(address, tls_root_pem_cert): print("connecting...") # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.: - # connection = mesh.aio.Connection.with_tls(address, tls_root_pem_cert) - connection = mesh.aio.Connection.insecure(address) + # connection = mesh.aio.Connection.with_tls( + # address, + # tls_root_pem_cert, + # grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES, + # ) + + # By default the maximum inbound gRPC message size is 4MB. When Mesh server + # returns datasets for longer simulation intervals the gRPC message size + # may exceed this limit. In such cases the user can set new limit using + # `grpc_max_receive_message_length` when creating a connection to Mesh. + connection = mesh.aio.Connection.insecure( + address, + grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES, + ) async with connection.create_session() as session: start_time = datetime(2023, 11, 1) diff --git a/src/volue/mesh/examples/write_timeseries_points.py b/src/volue/mesh/examples/write_timeseries_points.py index 2fb5d709..d1aa0527 100644 --- a/src/volue/mesh/examples/write_timeseries_points.py +++ b/src/volue/mesh/examples/write_timeseries_points.py @@ -52,6 +52,10 @@ def sync_write_timeseries_points(address, tls_root_pem_cert): with connection.create_session() as session: table = get_pa_table_with_time_series_points() + # Each time series point occupies 20 bytes. Mesh server has a limitation of 4MB inbound message size. + # In case of larger data volumes please send input data in chunks. + # E.g.: call multiple times `write_timeseries_points` with shorter interval. + # Send request to write time series based on time series key. timeseries = mesh.Timeseries(table=table, timskey=timeseries_key) session.write_timeseries_points(timeseries=timeseries) @@ -121,6 +125,10 @@ async def async_write_timeseries_points( async with connection.create_session() as session: table = get_pa_table_with_time_series_points() + # Each time series point occupies 20 bytes. Mesh server has a limitation of 4MB inbound message size. + # In case of larger data volumes please send input data in chunks. + # E.g.: call multiple times `write_timeseries_points` with shorter interval. + # Send request to write time series based on time series key. timeseries = mesh.Timeseries(table=table, timskey=timeseries_key) await session.write_timeseries_points(timeseries=timeseries)