Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️Rabbitmq rpc: response shall not be jsonized #4730

Merged
merged 8 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,8 @@ exclude_lines =
if 0:
if __name__ == .__main__.:

# Don't complain about abstract methods, they aren't run:
@(abc\.)?abstractmethod

ignore_errors = True
show_missing = True
2 changes: 1 addition & 1 deletion .ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ target-version = "py310"


[per-file-ignores]
"{**/{tests, pytest_simcore}/**}" = [
"**/{tests,pytest_simcore}/**" = [
"T201", # print found
"ARG001", # unused function argument
"PT019", # user pytest.mark.usefixture
Expand Down
6 changes: 6 additions & 0 deletions packages/service-library/src/servicelib/rabbitmq/_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ class RemoteMethodNotRegisteredError(BaseRPCError):
"Could not find a remote method named: '{method_name}'. "
"Message from remote server was returned: {incoming_message}. "
)


class RPCServerError(BaseRPCError):
msg_template = (
"Unhandled error while running method '{exc_type}:{method_name}': '{msg}'"
)
26 changes: 15 additions & 11 deletions packages/service-library/src/servicelib/rabbitmq/_rpc_router.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import asyncio
import functools
import logging
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any, TypeVar

import orjson
from models_library.utils.fastapi_encoders import jsonable_encoder
from pydantic import SecretStr

from ..logging_utils import log_catch, log_context
from ..logging_utils import log_context
from ._errors import RPCServerError
from ._models import RPCMethodName

DecoratedCallable = TypeVar("DecoratedCallable", bound=Callable[..., Any])
Expand All @@ -32,14 +32,18 @@ async def wrapper(*args, **kwargs):
_logger,
logging.INFO,
msg=f"calling {func.__name__} with {args}, {kwargs}",
), log_catch(_logger, reraise=True):
result = await func(*args, **kwargs)
return orjson.dumps(
jsonable_encoder(
result,
custom_encoder=_RPC_CUSTOM_ENCODER,
)
)
):
try:
result = await func(*args, **kwargs)
return result
except asyncio.CancelledError:
_logger.debug("call was cancelled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MINOR: some extra info?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for cancellation?

raise
except Exception as exc: # pylint: disable=broad-except
_logger.exception("Unhandled exception:")
raise RPCServerError(
method_name=func.__name__, exc_type=type(exc), msg=f"{exc}"
) from exc

self.routes[RPCMethodName(func.__name__)] = wrapper
return func
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from collections.abc import Awaitable, Callable

import orjson
import pytest
from faker import Faker
from servicelib.rabbitmq import (
Expand Down Expand Up @@ -59,21 +58,21 @@ async def test_exposed_methods(
router, router_namespace, a_arg, a_global_kwarg=a_kwargs
)

json_result = await rpc_client.request(
rpc_result = await rpc_client.request(
router_namespace,
RPCMethodName(a_str_method.__name__),
a_specific_kwarg=a_specific_kwarg,
)
assert isinstance(json_result, bytes)
result = orjson.loads(json_result)
assert isinstance(rpc_result, str)
result = rpc_result
assert result == f"{a_arg}, that was a winner! {a_kwargs} {a_specific_kwarg}"

json_result = await rpc_client.request(
rpc_result = await rpc_client.request(
router_namespace,
RPCMethodName(an_int_method.__name__),
)
assert isinstance(json_result, bytes)
result = orjson.loads(json_result)
assert isinstance(rpc_result, int)
result = rpc_result
assert result == 34

with pytest.raises(RuntimeError):
Expand Down
6 changes: 6 additions & 0 deletions scripts/common.Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ hel%:
@echo ""


.env: .env-devel ## creates .env file from defaults in .env-devel
$(if $(wildcard $@), \
@echo "WARNING ##### $< is newer than $@ ####"; diff -uN $@ $<; false;,\
@echo "WARNING ##### $@ does not exist, cloning $< as $@ ############"; cp $< $@)


.PHONY: devenv
devenv: ## build development environment
@$(MAKE_C) $(REPO_BASE_DIR) $@
Expand Down
4 changes: 0 additions & 4 deletions services/clusters-keeper/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
include ../../scripts/common.Makefile
include ../../scripts/common-service.Makefile

.env: .env-devel ## creates .env file from defaults in .env-devel
$(if $(wildcard $@), \
@echo "WARNING ##### $< is newer than $@ ####"; diff -uN $@ $<; false;,\
@echo "WARNING ##### $@ does not exist, cloning $< as $@ ############"; cp $< $@)


.PHONY: test-local
Expand Down
6 changes: 4 additions & 2 deletions services/clusters-keeper/tests/unit/test_rpc_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ async def test_get_or_create_cluster(
wallet_id=wallet_id,
)
assert rpc_response
created_cluster = ClusterGet.parse_raw(rpc_response)
assert isinstance(rpc_response, ClusterGet)
created_cluster = rpc_response
# check we do have a new machine in AWS
await _assert_cluster_instance_created(ec2_client, user_id, wallet_id)
# it is called once as moto server creates instances instantly
Expand All @@ -167,7 +168,8 @@ async def test_get_or_create_cluster(
wallet_id=wallet_id,
)
assert rpc_response
returned_cluster = ClusterGet.parse_raw(rpc_response)
assert isinstance(rpc_response, ClusterGet)
returned_cluster = rpc_response
# check we still have only 1 instance
await _assert_cluster_heartbeat_on_instance(ec2_client)
mocked_dask_ping_gateway.ping_gateway.assert_called_once()
Expand Down