Skip to content

Commit

Permalink
add asyncio-compatible version of MQTTRPCResponseManager
Browse files Browse the repository at this point in the history
  • Loading branch information
vdromanov committed Nov 27, 2022
1 parent 7d85155 commit 4c0d091
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 29 deletions.
1 change: 1 addition & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
python-mqttrpc (1.1.5) stable; urgency=medium

* TMQTTRPCClient.call_async: add a result_future param
* add asyncio-compatible version of MQTTRPCResponseManager

-- Vladimir Romanov <v.romanov@wirenboard.ru> Sun, 27 Nov 2022 18:39:12 +0300

Expand Down
91 changes: 62 additions & 29 deletions mqttrpc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,44 @@ class MQTTRPCResponseManager(object):
"""


@classmethod
def handle(cls, request_str, service_id, method_id, dispatcher):
def _prepare_request(cls, request_str):
if isinstance(request_str, bytes):
request_str = request_str.decode("utf-8")

try:
json.loads(request_str)
except (TypeError, ValueError):
return MQTTRPC10Response(error=JSONRPCParseError()._data)

return None, MQTTRPC10Response(error=JSONRPCParseError()._data)
try:
request = MQTTRPC10Request.from_json(request_str)
except JSONRPCInvalidRequestException:
return MQTTRPC10Response(error=JSONRPCInvalidRequest()._data)
return None, MQTTRPC10Response(error=JSONRPCInvalidRequest()._data)

return request, None

@classmethod
def handle(cls, request_str, service_id, method_id, dispatcher):
request, erroneous_response = cls._prepare_request(request_str)
if request:
return cls.handle_request(request, service_id, method_id, dispatcher)
else:
return erroneous_response

return cls.handle_request(request, service_id, method_id, dispatcher)
@classmethod
def _process_exception(cls, request, e):
data = {
"type": e.__class__.__name__,
"args": e.args,
"message": str(e),
}

if isinstance(e, JSONRPCDispatchException):
return MQTTRPC10Response(_id=request._id, error=e.error._data)
elif isinstance(e, TypeError) and is_invalid_params(method, *request.args, **request.kwargs):
return MQTTRPC10Response(_id=request._id, error=JSONRPCInvalidParams(data=data)._data)
else:
logger.exception("API Exception: {0}".format(data))
return MQTTRPC10Response(_id=request._id, error=JSONRPCServerError(data=data)._data)

@classmethod
def handle_request(cls, request, service_id, method_id, dispatcher):
Expand All @@ -59,40 +80,52 @@ def handle_request(cls, request, service_id, method_id, dispatcher):
.. versionadded: 1.8.0
"""

def response(**kwargs):
return MQTTRPC10Response(
_id=request._id, **kwargs)

try:
method = dispatcher[(service_id, method_id)]
except KeyError:
output = response(error=JSONRPCMethodNotFound()._data)
output = MQTTRPC10Response(_id=request._id, error=JSONRPCMethodNotFound()._data)
else:
try:
result = method(*request.args, **request.kwargs)
except JSONRPCDispatchException as e:
output = response(error=e.error._data)
except Exception as e:
data = {
"type": e.__class__.__name__,
"args": e.args,
"message": str(e),
}
if isinstance(e, TypeError) and is_invalid_params(
method, *request.args, **request.kwargs):
output = response(
error=JSONRPCInvalidParams(data=data)._data)
else:
logger.exception("API Exception: {0}".format(data))
output = response(
error=JSONRPCServerError(data=data)._data)
output = cls._process_exception(request, e)
else:
output = response(result=result)
output = MQTTRPC10Response(_id=request._id, result=result)
finally:
if not request.is_notification:
return output
else:
return []


class AMQTTRPCResponseManager(MQTTRPCResponseManager):
"""
asyncio-compatible version of MQTTRPCResponseManager
"""

@classmethod
async def handle(cls, request_str, service_id, method_id, dispatcher):
request, erroneous_response = cls._prepare_request(request_str)
if request:
return await cls.handle_request(request, service_id, method_id, dispatcher)
else:
return erroneous_response

@classmethod
async def handle_request(cls, request, service_id, method_id, dispatcher):
try:
method = dispatcher[(service_id, method_id)]
except KeyError:
output = MQTTRPC10Response(_id=request._id, error=JSONRPCMethodNotFound()._data)
else:
try:
result = await method(*request.args, **request.kwargs)
except Exception as e:
output = cls._process_exception(request, e)
else:
output = MQTTRPC10Response(_id=request._id, result=result)
finally:
if not request.is_notification:
return output
else:
return []

0 comments on commit 4c0d091

Please sign in to comment.