Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an asyncio version of the HttpApi class #168

Closed
wants to merge 13 commits into from
99 changes: 99 additions & 0 deletions matrix_client/_api_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
This is a asyncio wrapper for the matrix API class.
"""
from asyncio import sleep
from urllib.parse import quote

from matrix_client.api import MatrixHttpApi, MATRIX_V2_API_PATH
from matrix_client.errors import MatrixRequestError


class AsyncHTTPAPI(MatrixHttpApi):
"""
Contains all raw matrix HTTP client-server API calls using asyncio and coroutines.

Usage:
async def main():
async with aiohttp.ClientSession() as session:
mapi = AsyncHTTPAPI("http://matrix.org", session)
resp = await mapi.get_room_id("#matrix:matrix.org")
print(resp)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""

def __init__(self, base_url, client_session, **kwargs):
self.client_session = client_session
super().__init__(base_url, **kwargs)

async def _send(self,
method,
path,
content=None,
query_params=None,
headers=None,
api_path=MATRIX_V2_API_PATH):

args = self._prepare_send(method, content, query_params, headers, path, api_path)
content, query_params, headers, endpoint = args

while True:
request = self.client_session.request(
method,
endpoint,
params=query_params,
data=content,
headers=headers)

async with request as response:
if response.status == 429:
responsejson = await response.json()
await sleep(self._get_waittime(responsejson))

elif response.status < 200 or response.status >= 300:
raise MatrixRequestError(
code=response.status, content=await response.text())

else:
return await response.json()

# We only need to re-define methods that do something after _send
async def get_display_name(self, user_id):
content = await self._send("GET", "/profile/{}/displayname".format(user_id))
return content.get('displayname', None)

async def set_display_name(self, user_id, display_name):
content = {"displayname": display_name}
await self._send("PUT", "/profile/%s/displayname" % user_id, content)

async def get_avatar_url(self, user_id):
content = await self._send("GET", "/profile/{}/avatar_url".format(user_id))
return content.get('avatar_url', None)

async def get_room_id(self, room_alias):
"""Get room id from its alias

Args:
room_alias(str): The room alias name.

Returns:
Wanted room's id.
"""
content = await self._send(
"GET",
"/directory/room/{}".format(quote(room_alias)),
api_path="/_matrix/client/r0")
return content.get("room_id", None)

async def get_room_displayname(self, room_id, user_id):
"""Get a users displayname for the given room"""
if room_id.startswith('#'):
room_id = await self.get_room_id(room_id)

members = await self.get_room_members(room_id)
members = members['chunk']
for mem in members:
if mem['sender'] == user_id:
return mem['content']['displayname']
88 changes: 66 additions & 22 deletions matrix_client/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,27 +688,34 @@ def create_filter(self, user_id, filter_params):
"/user/{userId}/filter".format(userId=user_id),
filter_params)

def _send(self, method, path, content=None, query_params=None, headers=None,
api_path=MATRIX_V2_API_PATH, return_json=True):
if query_params is None:
def _prepare_send(self, method, content, query_params, headers, path, api_path):
"""
Process the arguments to the _send method.

This is factored out of _send as it is shared by the asyncio class.
"""
method = method.upper()
if method not in ["GET", "PUT", "DELETE", "POST"]:
raise MatrixError("Unsupported HTTP method: %s" % method)

if not content:
content = {}
if not query_params:
query_params = {}
if headers is None:
if not headers:
headers = {}

if "User-Agent" not in headers:
headers["User-Agent"] = "matrix-python-sdk/%s" % __version__

method = method.upper()
if method not in ["GET", "PUT", "DELETE", "POST"]:
raise MatrixError("Unsupported HTTP method: %s" % method)

if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"

if self.use_authorization_header:
headers["Authorization"] = 'Bearer %s' % self.token
else:
query_params["access_token"] = self.token
if self.token:
if self.use_authorization_header:
headers["Authorization"] = 'Bearer %s' % self.token
else:
query_params["access_token"] = self.token

if self.identity:
query_params["user_id"] = self.identity
Expand All @@ -718,6 +725,32 @@ def _send(self, method, path, content=None, query_params=None, headers=None,
if headers["Content-Type"] == "application/json" and content is not None:
content = json.dumps(content)

return content, query_params, headers, endpoint

def _get_waittime(self, responsejson):
"""
Read the response from a 429 and return a time in seconds to wait.

This is factored out of _send as it is shared by the asyncio class.
"""
waittime = self.default_429_wait_ms / 1000
try:
waittime = responsejson['retry_after_ms'] / 1000
except KeyError:
try:
errordata = json.loads(responsejson['error'])
waittime = errordata['retry_after_ms'] / 1000
except KeyError:
pass
finally:
return waittime

def _send(self, method, path, content=None, query_params=None, headers=None,
api_path=MATRIX_V2_API_PATH, return_json=True):

args = self._prepare_send(method, content, query_params, headers, path, api_path)
content, query_params, headers, endpoint = args

while True:
try:
response = self.session.request(
Expand All @@ -731,16 +764,7 @@ def _send(self, method, path, content=None, query_params=None, headers=None,
raise MatrixHttpLibError(e, method, endpoint)

if response.status_code == 429:
waittime = self.default_429_wait_ms / 1000
try:
waittime = response.json()['retry_after_ms'] / 1000
except KeyError:
try:
errordata = json.loads(response.json()['error'])
waittime = errordata['retry_after_ms'] / 1000
except KeyError:
pass
sleep(waittime)
sleep(self._get_waittime(response.json()))
else:
break

Expand Down Expand Up @@ -1096,3 +1120,23 @@ def whoami(self):
"GET",
"/account/whoami"
)

def get_event_in_room(self, room_id, event_id):
"""
Get a single event based on roomId/eventId.

You must have permission to retrieve this event e.g. by being a member
in the room for this event.
"""
return self._send("GET", "/rooms/{}/event/{}".format(room_id, event_id))

def get_room_displayname(self, room_id, user_id):
"""Get a users displayname for the given room"""
if room_id.startswith('#'):
room_id = self.get_room_id(room_id)

members = self.get_room_members(room_id)
members = members['chunk']
for mem in members:
if mem['sender'] == user_id:
return mem['content']['displayname']
9 changes: 9 additions & 0 deletions matrix_client/api_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import sys

if sys.version_info < (3, 5):
raise ValueError("The asyncio version of the api "
"is only supported on Python 3.5+") # pragma: nocover
else:
from ._api_async import AsyncHTTPAPI

__all__ = ['AsyncHTTPAPI']
4 changes: 4 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import sys

if sys.version_info < (3, 5):
collect_ignore = ["test_async_api.py"]
Loading