Skip to content

Commit

Permalink
bugfix: use a shared loop between sanic and asyncio components
Browse files Browse the repository at this point in the history
  • Loading branch information
edaniszewski committed Feb 20, 2020
1 parent 9379fd0 commit 7005716
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 104 deletions.
4 changes: 2 additions & 2 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ attrs==19.3.0 # via aiohttp, pytest
chardet==3.0.4 # via aiohttp
coverage==5.0.3 # via pytest-cov
idna-ssl==1.1.0 # via aiohttp
idna==2.8 # via idna-ssl, yarl
idna==2.9 # via idna-ssl, yarl
importlib-metadata==1.5.0 # via pluggy, pytest
more-itertools==8.2.0 # via pytest
multidict==4.7.4 # via aiohttp, yarl
Expand All @@ -27,4 +27,4 @@ six==1.14.0 # via packaging
typing-extensions==3.7.4.1 # via aiohttp
wcwidth==0.1.8 # via pytest
yarl==1.4.2 # via aiohttp
zipp==2.1.0 # via importlib-metadata
zipp==3.0.0 # via importlib-metadata
18 changes: 9 additions & 9 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ cachetools==4.0.0 # via google-auth
certifi==2019.11.28 # via httpx, kubernetes, requests
chardet==3.0.4 # via httpx, requests
contextvars==2.4 # via sniffio
google-auth==1.11.0 # via kubernetes
grpcio==1.26.0
google-auth==1.11.2 # via kubernetes
grpcio==1.27.2
h11==0.8.1 # via httpx
h2==3.1.1 # via httpx
h2==3.2.0 # via httpx
hpack==3.0.0 # via h2
hstspreload==2020.1.30 # via httpx
httptools==0.0.13 # via sanic
hstspreload==2020.2.20 # via httpx
httptools==0.1.1 # via sanic
httpx==0.9.3 # via sanic
hyperframe==5.2.0 # via h2
idna==2.8 # via httpx, requests
idna==2.9 # via httpx, requests
immutables==0.11 # via contextvars
kubernetes==10.0.1
multidict==4.7.4 # via sanic
oauthlib==3.1.0 # via requests-oauthlib
prometheus-client==0.7.1
protobuf==3.11.2 # via synse-grpc
protobuf==3.11.3 # via synse-grpc
pyasn1-modules==0.2.8 # via google-auth
pyasn1==0.4.8 # via pyasn1-modules, rsa
python-dateutil==2.8.1 # via kubernetes
pyyaml==5.3
requests-oauthlib==1.3.0 # via kubernetes
requests==2.22.0 # via kubernetes, requests-oauthlib
requests==2.23.0 # via kubernetes, requests-oauthlib
rfc3986==1.3.2 # via httpx
rsa==4.0 # via google-auth
sanic==19.12.2
Expand All @@ -48,4 +48,4 @@ websocket-client==0.57.0 # via kubernetes
websockets==8.1

# The following packages are considered to be unsafe in a requirements file:
# setuptools==45.1.0 # via google-auth, kubernetes, protobuf
# setuptools==45.2.0 # via google-auth, kubernetes, protobuf
6 changes: 6 additions & 0 deletions synse_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
"""

import argparse
import asyncio
import os
import sys

import uvloop

import synse_server
from synse_server.server import Synse


def main() -> None:
uvloop.install()
asyncio.set_event_loop(uvloop.new_event_loop())

parser = argparse.ArgumentParser(
description='API server for the Synse platform',
)
Expand Down
183 changes: 119 additions & 64 deletions synse_server/api/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ async def version(request: Request) -> HTTPResponse:
"""
log_request(request)

return utils.http_json_response(
await cmd.version(),
)
try:
return utils.http_json_response(
await cmd.version(),
)
except Exception:
logger.exception('failed to get version info')
raise


@v3.route('/config')
Expand All @@ -87,9 +91,13 @@ async def config(request: Request) -> HTTPResponse:
"""
log_request(request)

return utils.http_json_response(
await cmd.config(),
)
try:
return utils.http_json_response(
await cmd.config(),
)
except Exception:
logger.exception('failed to get server config')
raise


@v3.route('/plugin')
Expand All @@ -104,11 +112,15 @@ async def plugins(request: Request) -> HTTPResponse:

refresh = request.args.get('refresh', 'false').lower() == 'true'

return utils.http_json_response(
await cmd.plugins(
refresh=refresh,
),
)
try:
return utils.http_json_response(
await cmd.plugins(
refresh=refresh,
),
)
except Exception:
logger.exception('failed to get plugins')
raise


@v3.route('/plugin/<plugin_id>')
Expand All @@ -125,9 +137,13 @@ async def plugin(request: Request, plugin_id: str) -> HTTPResponse:
"""
log_request(request, id=plugin_id)

return utils.http_json_response(
await cmd.plugin(plugin_id),
)
try:
return utils.http_json_response(
await cmd.plugin(plugin_id),
)
except Exception:
logger.exception('failed to get plugin info', id=plugin_id)
raise


@v3.route('/plugin/health')
Expand All @@ -140,9 +156,13 @@ async def plugin_health(request: Request) -> HTTPResponse:
"""
log_request(request)

return utils.http_json_response(
await cmd.plugin_health(),
)
try:
return utils.http_json_response(
await cmd.plugin_health(),
)
except Exception:
logger.exception('failed to get plugin health')
raise


@v3.route('/scan')
Expand Down Expand Up @@ -203,14 +223,18 @@ async def scan(request: Request) -> HTTPResponse:
)
sort_keys = param_sort[0]

return utils.http_json_response(
await cmd.scan(
ns=namespace,
tag_groups=tag_groups,
force=force,
sort=sort_keys,
),
)
try:
return utils.http_json_response(
await cmd.scan(
ns=namespace,
tag_groups=tag_groups,
force=force,
sort=sort_keys,
),
)
except Exception:
logger.exception('failed to get devices (scan)')
raise


@v3.route('/tags')
Expand Down Expand Up @@ -242,12 +266,16 @@ async def tags(request: Request) -> HTTPResponse:

include_ids = request.args.get('ids', 'false').lower() == 'true'

return utils.http_json_response(
await cmd.tags(
namespaces,
with_id_tags=include_ids,
),
)
try:
return utils.http_json_response(
await cmd.tags(
namespaces,
with_id_tags=include_ids,
),
)
except Exception:
logger.exception('failed to get device tags')
raise


@v3.route('/info/<device_id>')
Expand All @@ -264,9 +292,13 @@ async def info(request: Request, device_id: str) -> HTTPResponse:
"""
log_request(request, id=device_id)

return utils.http_json_response(
await cmd.info(device_id),
)
try:
return utils.http_json_response(
await cmd.info(device_id),
)
except Exception:
logger.exception('failed to get device info', id=device_id)
raise


@v3.route('/read')
Expand Down Expand Up @@ -308,12 +340,16 @@ async def read(request: Request) -> HTTPResponse:
for group in param_tags:
tag_groups.append(group.split(','))

return utils.http_json_response(
await cmd.read(
ns=namespace,
tag_groups=tag_groups,
),
)
try:
return utils.http_json_response(
await cmd.read(
ns=namespace,
tag_groups=tag_groups,
),
)
except Exception:
logger.exception('failed to read device(s)', namespace=namespace, tag_groups=tag_groups)
raise


@v3.route('/readcache')
Expand Down Expand Up @@ -365,9 +401,8 @@ async def response_streamer(response):
try:
async for reading in cmd.read_cache(start, end):
await response.write(ujson.dumps(reading) + '\n')
except Exception as e:
logger.error('failure when streaming cached readings')
logger.exception(e)
except Exception:
logger.exception('failure when streaming cached readings')

return stream(response_streamer, content_type='application/json; charset=utf-8')

Expand All @@ -390,9 +425,13 @@ async def read_device(request: Request, device_id: str) -> HTTPResponse:
"""
log_request(request, id=device_id)

return utils.http_json_response(
await cmd.read_device(device_id),
)
try:
return utils.http_json_response(
await cmd.read_device(device_id),
)
except Exception:
logger.exception('failed to read device', id=device_id)
raise


@v3.route('/write/<device_id>', methods=['POST'])
Expand Down Expand Up @@ -432,12 +471,16 @@ async def async_write(request: Request, device_id: str) -> HTTPResponse:
'invalid json: key "action" is required in payload, but not found'
)

return utils.http_json_response(
await cmd.write_async(
device_id=device_id,
payload=data,
),
)
try:
return utils.http_json_response(
await cmd.write_async(
device_id=device_id,
payload=data,
),
)
except Exception:
logger.exception('failed to write asynchronously', id=device_id, payload=data)
raise


@v3.route('/write/wait/<device_id>', methods=['POST'])
Expand Down Expand Up @@ -477,12 +520,16 @@ async def sync_write(request: Request, device_id: str) -> HTTPResponse:
'invalid json: key "action" is required in payload, but not found'
)

return utils.http_json_response(
await cmd.write_sync(
device_id=device_id,
payload=data,
),
)
try:
return utils.http_json_response(
await cmd.write_sync(
device_id=device_id,
payload=data,
),
)
except Exception:
logger.exception('failed to write synchronously', id=device_id, payload=data)
raise


@v3.route('/transaction')
Expand All @@ -495,9 +542,13 @@ async def transactions(request: Request) -> HTTPResponse:
"""
log_request(request)

return utils.http_json_response(
await cmd.transactions(),
)
try:
return utils.http_json_response(
await cmd.transactions(),
)
except Exception:
logger.exception('failed to list transactions')
raise


@v3.route('/transaction/<transaction_id>')
Expand All @@ -514,9 +565,13 @@ async def transaction(request: Request, transaction_id: str) -> HTTPResponse:
"""
log_request(request, id=transaction_id)

return utils.http_json_response(
await cmd.transaction(transaction_id),
)
try:
return utils.http_json_response(
await cmd.transaction(transaction_id),
)
except Exception:
logger.exception('failed to get transaction info', id=transaction_id)
raise


@v3.route('/device/<device_id>', methods=['GET', 'POST'])
Expand Down
9 changes: 6 additions & 3 deletions synse_server/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from structlog import get_logger
from synse_grpc import api

from synse_server import config, plugin
from synse_server import config, loop, plugin
from synse_server.i18n import _

logger = get_logger()
Expand All @@ -35,8 +35,8 @@
namespace=NS_ALIAS,
)

device_cache_lock = asyncio.Lock()
alias_cache_lock = asyncio.Lock()
device_cache_lock = asyncio.Lock(loop=loop.synse_loop)
alias_cache_lock = asyncio.Lock(loop=loop.synse_loop)


async def get_transaction(transaction_id: str) -> dict:
Expand Down Expand Up @@ -216,7 +216,10 @@ async def get_device(device_id: str) -> Union[api.V3Device, None]:
# empty, return the first element of the list - there should only be
# one element; otherwise, return None.
if result:
logger.debug(_('got device from cache'))
device = result[0]
else:
logger.debug(_('failed to lookup device from cache'), id=device_id)

# No device was found from an ID lookup. Try looking up the ID in the
# alias cache.
Expand Down
Loading

0 comments on commit 7005716

Please sign in to comment.