Skip to content

Commit

Permalink
Configurable gRPC inbound message size + docs (#483)
Browse files Browse the repository at this point in the history
* Configurable gRPC inbound message size + docs

* Apply review suggestions
  • Loading branch information
tnoczyns-volue authored Jul 12, 2024
1 parent ac9db65 commit 7abdc1b
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 20 deletions.
49 changes: 49 additions & 0 deletions docs/source/mesh_client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,55 @@ Using :ref:`api:volue.mesh.aio`.Connection:
As time series data can potentially be large `Apache Arrow <https://arrow.apache.org/>`_ is used to optimize memory sharing.


gRPC communication
******************

By default gRPC limits the size of inbound messages to 4MB. From Mesh Python
SDK side, the user can change this limit when creating a connection to Mesh
using `grpc_max_receive_message_length` argument.

See:

* :meth:`volue.mesh.Connection.Session.insecure`
* :meth:`volue.mesh.Connection.Session.with_tls`
* :meth:`volue.mesh.Connection.Session.with_kerberos`
* :meth:`volue.mesh.Connection.Session.with_external_access_token`

Example usage:

.. code-block:: python
connection = mesh.Connection.with_tls(
address,
tls_root_pem_cert,
grpc_max_receive_message_length=10 * 1024* 1024, # 10MB
)
Another example of connection with `grpc_max_receive_message_length` argument
is in `run_simulation.py`.

.. note::
gRPC outbound message size is not limited by default.

This might be useful when e.g.: running long simulations with
`return_datasets` enabled. In such cases the dataset's size might exceed the
4MB limit and a `RESOURCE_EXHAUSTED` status code would be returned.

However, in other cases like reading time series data, we suggest reading the
data in chunks. E.g.: instead of reading 50 years of hourly time series data,
the user should request few read operations, but with shorter read intervals.

The same is true for writing data, like time series data. Here however, it is
not a suggestion, but a must. Mesh server gRPC inbound message size is not
configurable and therefore it is always equal to 4MB. If gRPC client, like Mesh
Python SDK, sends too big message then the request will be discarded. To avoid
this clients must send data in chunks.

.. note::
Single time series point occupies 20 bytes. To avoid exceeding the 4MB
limit single read or write operation should contain ~200k points maximum.


Date times and time zones
*************************
Expand Down
10 changes: 7 additions & 3 deletions docs/source/versions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ Compatible with
New features
~~~~~~~~~~~~~~~~~~

- Support for updating versioned one-to-many link relations. (:pull:`476`)
- Support for updating versioned one-to-many link relations. :pull:`476`

See `update_versioned_one_to_many_link_relation_attribute`.

- Configurable gRPC inbound message size. :issue:`421`

See :ref:`mesh_client:gRPC communication`.

Changes
~~~~~~~~~~~~~~~~~~

- Changes for Mesh server 2.15 gRPC interface compatibility. (:issue:`470`)
- Changes for Mesh server 2.15 gRPC interface compatibility. :issue:`470`

It introduces breaking API change: `update_versioned_link_relation_attribute`
is renamed to `update_versioned_one_to_one_link_relation_attribute`.
Expand Down Expand Up @@ -56,7 +60,7 @@ New features
Changes
~~~~~~~~~~~~~~~~~~

- Changes for Mesh server 2.14 gRPC interface compatibility. (:issue:`464`)
- Changes for Mesh server 2.14 gRPC interface compatibility. :issue:`464`

Install instructions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
63 changes: 56 additions & 7 deletions src/volue/mesh/_base_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ def _secure_grpc_channel(*args, **kwargs):
or grpc.secure_channel depending on desired behavior.
"""

@staticmethod
def _get_grpc_channel_options(max_receive_message_length: Optional[int]):
"""Create a secure gRPC channel.
Derived classes should implement this using either grpc.aio.secure_channel
or grpc.secure_channel depending on desired behavior.
"""
return (
[
("grpc.max_receive_message_length", max_receive_message_length),
]
if max_receive_message_length
else []
)

def __init__(
self,
host=None,
Expand Down Expand Up @@ -162,27 +177,44 @@ def __init__(
self.time_series_service = time_series_pb2_grpc.TimeseriesServiceStub(channel)

@classmethod
def insecure(cls: C, target: str) -> C:
def insecure(
cls: C, target: str, *, grpc_max_receive_message_length: Optional[int] = None
) -> C:
"""Creates an insecure connection to a Mesh server.
Args:
target: The server address.
grpc_max_receive_message_length: Maximum inbound gRPC message size
in bytes. By default the maximum inbound gRPC message size is 4MB.
"""
channel = cls._insecure_grpc_channel(target)

options = cls._get_grpc_channel_options(grpc_max_receive_message_length)
channel = cls._insecure_grpc_channel(target=target, options=options)
return cls(channel=channel)

@classmethod
def with_tls(cls: C, target: str, root_certificates: Optional[str]) -> C:
def with_tls(
cls: C,
target: str,
root_certificates: Optional[str],
*,
grpc_max_receive_message_length: Optional[int] = None,
) -> C:
"""Creates an encrypted connection to a Mesh server.
Args:
target: The server address.
root_certificates: The PEM-encoded TLS root certificates as a byte
string, or None to retrieve them from a default location chosen
by the gRPC runtime.
grpc_max_receive_message_length: Maximum inbound gRPC message size
in bytes. By default the maximum inbound gRPC message size is 4MB.
"""
credentials = grpc.ssl_channel_credentials(root_certificates)
channel = cls._secure_grpc_channel(target, credentials)
options = cls._get_grpc_channel_options(grpc_max_receive_message_length)
channel = cls._secure_grpc_channel(
target=target, credentials=credentials, options=options
)
return cls(channel=channel)

@classmethod
Expand All @@ -192,6 +224,8 @@ def with_kerberos(
root_certificates: Optional[str],
service_principal: str,
user_principal: Optional[str] = None,
*,
grpc_max_receive_message_length: Optional[int] = None,
) -> C:
"""Creates an encrypted and authenticated connection to a Mesh server.
Expand All @@ -215,6 +249,8 @@ def with_kerberos(
service. For example 'HOST\\server.at.host'.
user_principal: The Kerberos user principal name. For example
'ad\\user`.
grpc_max_receive_message_length: Maximum inbound gRPC message size
in bytes. By default the maximum inbound gRPC message size is 4MB.
"""
ssl_credentials = grpc.ssl_channel_credentials(root_certificates)
auth_params = _authentication.Authentication.Parameters(
Expand All @@ -225,12 +261,20 @@ def with_kerberos(
credentials = grpc.composite_channel_credentials(
ssl_credentials, call_credentials
)
channel = cls._secure_grpc_channel(target, credentials)
options = cls._get_grpc_channel_options(grpc_max_receive_message_length)
channel = cls._secure_grpc_channel(
target=target, credentials=credentials, options=options
)
return cls(channel=channel, auth_metadata_plugin=auth_metadata_plugin)

@classmethod
def with_external_access_token(
cls: C, target: str, root_certificates: Optional[str], access_token: str
cls: C,
target: str,
root_certificates: Optional[str],
access_token: str,
*,
grpc_max_receive_message_length: Optional[int] = None,
) -> C:
"""Creates an encrypted connection to a Mesh server and will add
provided access token to authorization header to each server request.
Expand All @@ -245,14 +289,19 @@ def with_external_access_token(
by the gRPC runtime.
access_token: Token obtained externally, used to get access to Mesh
server.
grpc_max_receive_message_length: Maximum inbound gRPC message size
in bytes. By default the maximum inbound gRPC message size is 4MB.
"""
ssl_credentials = grpc.ssl_channel_credentials(root_certificates)
auth_metadata_plugin = ExternalAccessTokenPlugin(access_token)
call_credentials = grpc.metadata_call_credentials(auth_metadata_plugin)
credentials = grpc.composite_channel_credentials(
ssl_credentials, call_credentials
)
channel = cls._secure_grpc_channel(target, credentials)
options = cls._get_grpc_channel_options(grpc_max_receive_message_length)
channel = cls._secure_grpc_channel(
target=target, credentials=credentials, options=options
)
return cls(channel=channel, auth_metadata_plugin=auth_metadata_plugin)

@classmethod
Expand Down
21 changes: 19 additions & 2 deletions src/volue/mesh/_base_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,13 @@ def read_timeseries_points(
end_time: datetime,
) -> Timeseries:
"""
Reads time series points for
the specified time series in the given interval.
Reads time series points for the specified time series in the given
interval.
If there are too many points in the response you might get
a `StatusCode.RESOURCE_EXHAUSTED` error.
See: :ref:`mesh_client:gRPC communication`.
For information about `datetime` arguments and time zones refer to
:ref:`mesh_client:Date times and time zones`.
Expand All @@ -228,6 +233,10 @@ def write_timeseries_points(self, timeseries: Timeseries) -> None:
Writes time series points for the specified time series in the given interval.
Resolution of the time series does not need to be set when writing time series.
If there are too many points in the request you will the Mesh server
will discard it and return `StatusCode.RESOURCE_EXHAUSTED`.
See: :ref:`mesh_client:gRPC communication`.
Args:
timeseries: The modified time series.
Expand Down Expand Up @@ -885,6 +894,10 @@ def run_simulation(
) -> Union[typing.Iterator[None], typing.AsyncIterator[None]]:
"""Run a hydro simulation using HydSim on the Mesh server.
In case of running a simulation on longer interval and with
`return_datasets` enabled you might get a `StatusCode.RESOURCE_EXHAUSTED`
error. See: :ref:`mesh_client:gRPC communication`.
This function is experimental and subject to larger changes.
Args:
Expand Down Expand Up @@ -932,6 +945,10 @@ def run_inflow_calculation(
) -> Union[typing.Iterator[None], typing.AsyncIterator[None]]:
"""Run an inflow calculation using HydSim on the Mesh server.
In case of running an inflow calculation on longer interval and with
`return_datasets` enabled you might get a `StatusCode.RESOURCE_EXHAUSTED`
error. See: :ref:`mesh_client:gRPC communication`.
Args:
model: The name of the Mesh model in which the inflow calculation
exists.
Expand Down
4 changes: 4 additions & 0 deletions src/volue/mesh/examples/read_timeseries_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def read_timeseries_points(session: Connection.Session):
start = datetime(2016, 1, 1, 6, 0, 0)
end = datetime(2016, 1, 1, 8, 0, 0)

# Each time series point occupies 20 bytes. By default gRPC has a limitation of 4MB inbound message size.
# In case of larger data volumes please send request data in chunks.
# E.g.: call multiple times `read_timeseries_points` with shorter interval.

# Send request to read time series based on time series key.
timeseries = session.read_timeseries_points(
target=timeseries_key, start_time=start, end_time=end
Expand Down
4 changes: 4 additions & 0 deletions src/volue/mesh/examples/read_timeseries_points_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ async def read_timeseries_points_async(session: Connection.Session):
start = datetime(2016, 1, 1, 6, 0, 0)
end = datetime(2016, 1, 1, 8, 0, 0)

# Each time series point occupies 20 bytes. By default gRPC has a limitation of 4MB inbound message size.
# In case of larger data volumes please send request data in chunks.
# E.g.: call multiple times `read_timeseries_points` with shorter interval.

# Send request to read time series based on time series key.
timeseries = await session.read_timeseries_points(
target=timeseries_key, start_time=start, end_time=end
Expand Down
34 changes: 30 additions & 4 deletions src/volue/mesh/examples/run_inflow_calculation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,27 @@
import volue.mesh.aio
from volue import mesh

GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024 # 10MB


def sync_run_inflow_calculation(address, tls_root_pem_cert):
print("connecting...")

# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.Connection.with_tls(address, tls_root_pem_cert)
connection = mesh.Connection.insecure(address)
# connection = mesh.Connection.with_tls(
# address,
# tls_root_pem_cert,
# grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
# )

# By default the maximum inbound gRPC message size is 4MB. When Mesh server
# returns datasets for longer inflow calculation intervals the gRPC message
# size may exceed this limit. In such cases the user can set new limit
# using `grpc_max_receive_message_length` when creating a connection to Mesh.
connection = mesh.Connection.insecure(
address,
grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
)

with connection.create_session() as session:
start_time = datetime(2021, 1, 1)
Expand Down Expand Up @@ -48,8 +62,20 @@ async def async_run_inflow_calculation(address, tls_root_pem_cert):
print("connecting...")

# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.aio.Connection.with_tls(address, tls_root_pem_cert)
connection = mesh.aio.Connection.insecure(address)
# connection = mesh.aio.Connection.with_tls(
# address,
# tls_root_pem_cert,
# grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
# )

# By default the maximum inbound gRPC message size is 4MB. When Mesh server
# returns datasets for longer inflow calculation intervals the gRPC message
# size may exceed this limit. In such cases the user can set new limit
# using `grpc_max_receive_message_length` when creating a connection to Mesh.
connection = mesh.aio.Connection.insecure(
address,
grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
)

async with connection.create_session() as session:
start_time = datetime(2021, 1, 1)
Expand Down
34 changes: 30 additions & 4 deletions src/volue/mesh/examples/run_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,27 @@
import volue.mesh.aio
from volue import mesh

GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024 # 10MB


def sync_run_simulation(address, tls_root_pem_cert):
print("connecting...")

# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.Connection.with_tls(address, tls_root_pem_cert)
connection = mesh.Connection.insecure(address)
# connection = mesh.Connection.with_tls(
# address,
# tls_root_pem_cert,
# grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
# )

# By default the maximum inbound gRPC message size is 4MB. When Mesh server
# returns datasets for longer simulation intervals the gRPC message size
# may exceed this limit. In such cases the user can set new limit using
# `grpc_max_receive_message_length` when creating a connection to Mesh.
connection = mesh.Connection.insecure(
address,
grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
)

with connection.create_session() as session:
start_time = datetime(2023, 11, 1)
Expand Down Expand Up @@ -47,8 +61,20 @@ async def async_run_simulation(address, tls_root_pem_cert):
print("connecting...")

# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.aio.Connection.with_tls(address, tls_root_pem_cert)
connection = mesh.aio.Connection.insecure(address)
# connection = mesh.aio.Connection.with_tls(
# address,
# tls_root_pem_cert,
# grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
# )

# By default the maximum inbound gRPC message size is 4MB. When Mesh server
# returns datasets for longer simulation intervals the gRPC message size
# may exceed this limit. In such cases the user can set new limit using
# `grpc_max_receive_message_length` when creating a connection to Mesh.
connection = mesh.aio.Connection.insecure(
address,
grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
)

async with connection.create_session() as session:
start_time = datetime(2023, 11, 1)
Expand Down
Loading

0 comments on commit 7abdc1b

Please sign in to comment.