Skip to content

Commit

Permalink
Merge pull request #1310 from samson0v/master
Browse files Browse the repository at this point in the history
Fixed stopping REST connector
  • Loading branch information
imbeacon authored Feb 23, 2024
2 parents 5b27dc0 + 79f6dc3 commit 9869ece
Showing 1 changed file with 27 additions and 15 deletions.
42 changes: 27 additions & 15 deletions thingsboard_gateway/connectors/rest/rest_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import json
from asyncio import CancelledError, run_coroutine_threadsafe
from queue import Queue
from random import choice
from re import fullmatch
Expand Down Expand Up @@ -74,7 +74,9 @@ def __init__(self, gateway, config, connector_type):
self._default_uplink_converter = TBModuleLoader.import_module(self._connector_type,
self._default_converters['uplink'])
self.__USER_DATA = {}
self._loop = None
self._app = None
self._runner = None
self._connected = False
self.__stopped = False
self.daemon = True
Expand Down Expand Up @@ -135,7 +137,7 @@ def open(self):
self.__stopped = False
self.start()

def __run_server(self):
async def __run_server(self):
self.endpoints = self.load_endpoints()

self._app = web.Application(debug=self.__config.get('debugMode', False), logger=self.__log)
Expand Down Expand Up @@ -163,32 +165,42 @@ def __run_server(self):
ssl_context.load_cert_chain(cert, key)

self.load_handlers()
web.run_app(self._app, host=self.__config['host'], port=int(self.__config.get('port', 5000)),
handle_signals=False, ssl_context=ssl_context, reuse_port=True, reuse_address=True,
access_log=self.__log)
self._runner = web.AppRunner(self._app)
await self._runner.setup()
site = web.TCPSite(self._runner, host=self.__config['host'], port=int(self.__config.get('port', 5000)),
ssl_context=ssl_context, reuse_port=True, reuse_address=True)
await site.start()
self.__log.info('REST connector started at %s',
self.__config['host'] + ':' + str(self.__config.get('port', 5000)))

def run(self):
self._connected = True

self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop.run_until_complete(self.__run_server())
try:
self.__run_server()
except Exception as e:
self.__log.exception(e)
except CancelledError:
pass
self._loop.run_forever()
finally:
self._loop.run_until_complete(self.stop_server())
self._loop.close()

async def stop_server(self):
await self._app.shutdown()
await self._app.cleanup()
await self._runner.cleanup()

tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)

def close(self):
self.__stopped = True
self._connected = False
loop = self._app.loop
run_coroutine_threadsafe(self.stop_server(), loop)
sleep(1)
if not self._loop.is_closed():
self._loop.call_soon_threadsafe(self._loop.stop)
self.__log.info('REST connector stopped.')
self.__log.reset()
self.join()

def get_id(self):
return self.__id
Expand Down

0 comments on commit 9869ece

Please sign in to comment.