From 79f6dc35646db4562bae2145ef6c8e99d52d5fce Mon Sep 17 00:00:00 2001 From: samson0v Date: Fri, 23 Feb 2024 14:38:22 +0200 Subject: [PATCH] Fixed stopping REST connector --- .../connectors/rest/rest_connector.py | 42 ++++++++++++------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/thingsboard_gateway/connectors/rest/rest_connector.py b/thingsboard_gateway/connectors/rest/rest_connector.py index 7ea61c302..7f39d7308 100644 --- a/thingsboard_gateway/connectors/rest/rest_connector.py +++ b/thingsboard_gateway/connectors/rest/rest_connector.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import json -from asyncio import CancelledError, run_coroutine_threadsafe from queue import Queue from random import choice from re import fullmatch @@ -74,7 +74,9 @@ def __init__(self, gateway, config, connector_type): self._default_uplink_converter = TBModuleLoader.import_module(self._connector_type, self._default_converters['uplink']) self.__USER_DATA = {} + self._loop = None self._app = None + self._runner = None self._connected = False self.__stopped = False self.daemon = True @@ -135,7 +137,7 @@ def open(self): self.__stopped = False self.start() - def __run_server(self): + async def __run_server(self): self.endpoints = self.load_endpoints() self._app = web.Application(debug=self.__config.get('debugMode', False), logger=self.__log) @@ -163,32 +165,42 @@ def __run_server(self): ssl_context.load_cert_chain(cert, key) self.load_handlers() - web.run_app(self._app, host=self.__config['host'], port=int(self.__config.get('port', 5000)), - handle_signals=False, ssl_context=ssl_context, reuse_port=True, reuse_address=True, - access_log=self.__log) + self._runner = web.AppRunner(self._app) + await self._runner.setup() + site = web.TCPSite(self._runner, host=self.__config['host'], port=int(self.__config.get('port', 5000)), + ssl_context=ssl_context, reuse_port=True, reuse_address=True) + await site.start() + self.__log.info('REST connector started at %s', + self.__config['host'] + ':' + str(self.__config.get('port', 5000))) def run(self): self._connected = True + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + self._loop.run_until_complete(self.__run_server()) try: - self.__run_server() - except Exception as e: - self.__log.exception(e) - except CancelledError: - pass + self._loop.run_forever() + finally: + self._loop.run_until_complete(self.stop_server()) + self._loop.close() async def stop_server(self): - await self._app.shutdown() - await self._app.cleanup() + await self._runner.cleanup() + + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) def close(self): self.__stopped = True self._connected = False - loop = self._app.loop - run_coroutine_threadsafe(self.stop_server(), loop) - sleep(1) + if not self._loop.is_closed(): + self._loop.call_soon_threadsafe(self._loop.stop) self.__log.info('REST connector stopped.') self.__log.reset() + self.join() def get_id(self): return self.__id