Skip to content

Commit

Permalink
Merge pull request #4 from wirenboard/feature/introduce_async
Browse files Browse the repository at this point in the history
Feature/introduce async
  • Loading branch information
vdromanov authored Nov 29, 2022
2 parents 76dd25f + 4c0d091 commit 0e4be37
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 34 deletions.
9 changes: 8 additions & 1 deletion debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
python-mqttrpc (1.1.5) stable; urgency=medium

* TMQTTRPCClient.call_async: add a result_future param
* add asyncio-compatible version of MQTTRPCResponseManager

-- Vladimir Romanov <v.romanov@wirenboard.ru> Sun, 27 Nov 2022 18:39:12 +0300

python-mqttrpc (1.1.4) stable; urgency=medium

* Add simple client to installation
Expand Down Expand Up @@ -43,6 +50,6 @@ python-mqttrpc (1.0.1) stable; urgency=medium

python-mqttrpc (1.0) stable; urgency=medium

* Initial release.
* Initial release.

-- Evgeny Boger <boger@contactless.ru> Fri, 26 Jun 2015 12:22:46 +0300
6 changes: 2 additions & 4 deletions mqttrpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ def call(self, driver, service, method, params, timeout=None):



def call_async(self, driver, service, method, params):
def call_async(self, driver, service, method, params, result_future=AsyncResult):
self.counter += 1
payload = {'params': params,
'id' : self.counter}

result = AsyncResult()
result = result_future()
result.packet_id = self.counter
self.futures[(driver, service, method, self.counter)] = result

Expand All @@ -130,5 +130,3 @@ def call_async(self, driver, service, method, params):
self.client.publish(topic, json.dumps(payload))

return result


91 changes: 62 additions & 29 deletions mqttrpc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,44 @@ class MQTTRPCResponseManager(object):
"""


@classmethod
def handle(cls, request_str, service_id, method_id, dispatcher):
def _prepare_request(cls, request_str):
if isinstance(request_str, bytes):
request_str = request_str.decode("utf-8")

try:
json.loads(request_str)
except (TypeError, ValueError):
return MQTTRPC10Response(error=JSONRPCParseError()._data)

return None, MQTTRPC10Response(error=JSONRPCParseError()._data)
try:
request = MQTTRPC10Request.from_json(request_str)
except JSONRPCInvalidRequestException:
return MQTTRPC10Response(error=JSONRPCInvalidRequest()._data)
return None, MQTTRPC10Response(error=JSONRPCInvalidRequest()._data)

return request, None

@classmethod
def handle(cls, request_str, service_id, method_id, dispatcher):
request, erroneous_response = cls._prepare_request(request_str)
if request:
return cls.handle_request(request, service_id, method_id, dispatcher)
else:
return erroneous_response

return cls.handle_request(request, service_id, method_id, dispatcher)
@classmethod
def _process_exception(cls, request, e):
data = {
"type": e.__class__.__name__,
"args": e.args,
"message": str(e),
}

if isinstance(e, JSONRPCDispatchException):
return MQTTRPC10Response(_id=request._id, error=e.error._data)
elif isinstance(e, TypeError) and is_invalid_params(method, *request.args, **request.kwargs):
return MQTTRPC10Response(_id=request._id, error=JSONRPCInvalidParams(data=data)._data)
else:
logger.exception("API Exception: {0}".format(data))
return MQTTRPC10Response(_id=request._id, error=JSONRPCServerError(data=data)._data)

@classmethod
def handle_request(cls, request, service_id, method_id, dispatcher):
Expand All @@ -59,40 +80,52 @@ def handle_request(cls, request, service_id, method_id, dispatcher):
.. versionadded: 1.8.0
"""

def response(**kwargs):
return MQTTRPC10Response(
_id=request._id, **kwargs)

try:
method = dispatcher[(service_id, method_id)]
except KeyError:
output = response(error=JSONRPCMethodNotFound()._data)
output = MQTTRPC10Response(_id=request._id, error=JSONRPCMethodNotFound()._data)
else:
try:
result = method(*request.args, **request.kwargs)
except JSONRPCDispatchException as e:
output = response(error=e.error._data)
except Exception as e:
data = {
"type": e.__class__.__name__,
"args": e.args,
"message": str(e),
}
if isinstance(e, TypeError) and is_invalid_params(
method, *request.args, **request.kwargs):
output = response(
error=JSONRPCInvalidParams(data=data)._data)
else:
logger.exception("API Exception: {0}".format(data))
output = response(
error=JSONRPCServerError(data=data)._data)
output = cls._process_exception(request, e)
else:
output = response(result=result)
output = MQTTRPC10Response(_id=request._id, result=result)
finally:
if not request.is_notification:
return output
else:
return []


class AMQTTRPCResponseManager(MQTTRPCResponseManager):
"""
asyncio-compatible version of MQTTRPCResponseManager
"""

@classmethod
async def handle(cls, request_str, service_id, method_id, dispatcher):
request, erroneous_response = cls._prepare_request(request_str)
if request:
return await cls.handle_request(request, service_id, method_id, dispatcher)
else:
return erroneous_response

@classmethod
async def handle_request(cls, request, service_id, method_id, dispatcher):
try:
method = dispatcher[(service_id, method_id)]
except KeyError:
output = MQTTRPC10Response(_id=request._id, error=JSONRPCMethodNotFound()._data)
else:
try:
result = await method(*request.args, **request.kwargs)
except Exception as e:
output = cls._process_exception(request, e)
else:
output = MQTTRPC10Response(_id=request._id, result=result)
finally:
if not request.is_notification:
return output
else:
return []

0 comments on commit 0e4be37

Please sign in to comment.