From 56db0b1365965c02ff539193e26c333b7f70d101 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 18 May 2020 09:46:18 -0400 Subject: [PATCH 01/28] Hash passwords earlier in the registration process (#7523) --- changelog.d/7523.bugfix | 1 + synapse/handlers/register.py | 9 ++----- synapse/rest/admin/users.py | 30 ++++++++++++------------ synapse/rest/client/v2_alpha/register.py | 22 ++++++++++------- 4 files changed, 31 insertions(+), 31 deletions(-) create mode 100644 changelog.d/7523.bugfix diff --git a/changelog.d/7523.bugfix b/changelog.d/7523.bugfix new file mode 100644 index 000000000000..552dae39f08e --- /dev/null +++ b/changelog.d/7523.bugfix @@ -0,0 +1 @@ +Hash passwords as early as possible during registration. diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 1e6bdac0add3..a6178e74a19b 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -132,7 +132,7 @@ def check_username(self, localpart, guest_access_token=None, assigned_user_id=No def register_user( self, localpart=None, - password=None, + password_hash=None, guest_access_token=None, make_guest=False, admin=False, @@ -147,7 +147,7 @@ def register_user( Args: localpart: The local part of the user ID to register. If None, one will be generated. - password (unicode): The password to assign to this user so they can + password_hash (str|None): The hashed password to assign to this user so they can login again. This can be None which means they cannot login again via a password (e.g. the user is an application service user). user_type (str|None): type of user. One of the values from @@ -164,11 +164,6 @@ def register_user( yield self.check_registration_ratelimit(address) yield self.auth.check_auth_blocking(threepid=threepid) - password_hash = None - if password: - password_hash = yield defer.ensureDeferred( - self._auth_handler.hash(password) - ) if localpart is not None: yield self.check_username(localpart, guest_access_token=guest_access_token) diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 593ce011e888..326682fbdb67 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -243,11 +243,11 @@ async def on_PUT(self, request, user_id): else: # create user password = body.get("password") - if password is not None and ( - not isinstance(body["password"], text_type) - or len(body["password"]) > 512 - ): - raise SynapseError(400, "Invalid password") + password_hash = None + if password is not None: + if not isinstance(password, text_type) or len(password) > 512: + raise SynapseError(400, "Invalid password") + password_hash = await self.auth_handler.hash(password) admin = body.get("admin", None) user_type = body.get("user_type", None) @@ -259,7 +259,7 @@ async def on_PUT(self, request, user_id): user_id = await self.registration_handler.register_user( localpart=target_user.localpart, - password=password, + password_hash=password_hash, admin=bool(admin), default_display_name=displayname, user_type=user_type, @@ -298,7 +298,7 @@ class UserRegisterServlet(RestServlet): NONCE_TIMEOUT = 60 def __init__(self, hs): - self.handlers = hs.get_handlers() + self.auth_handler = hs.get_auth_handler() self.reactor = hs.get_reactor() self.nonces = {} self.hs = hs @@ -362,16 +362,16 @@ async def on_POST(self, request): 400, "password must be specified", errcode=Codes.BAD_JSON ) else: - if ( - not isinstance(body["password"], text_type) - or len(body["password"]) > 512 - ): + password = body["password"] + if not isinstance(password, text_type) or len(password) > 512: raise SynapseError(400, "Invalid password") - password = body["password"].encode("utf-8") - if b"\x00" in password: + password_bytes = password.encode("utf-8") + if b"\x00" in password_bytes: raise SynapseError(400, "Invalid password") + password_hash = await self.auth_handler.hash(password) + admin = body.get("admin", None) user_type = body.get("user_type", None) @@ -388,7 +388,7 @@ async def on_POST(self, request): want_mac_builder.update(b"\x00") want_mac_builder.update(username) want_mac_builder.update(b"\x00") - want_mac_builder.update(password) + want_mac_builder.update(password_bytes) want_mac_builder.update(b"\x00") want_mac_builder.update(b"admin" if admin else b"notadmin") if user_type: @@ -407,7 +407,7 @@ async def on_POST(self, request): user_id = await register.registration_handler.register_user( localpart=body["username"].lower(), - password=body["password"], + password_hash=password_hash, admin=bool(admin), user_type=user_type, ) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index af08cc6cce82..c26927f27b9e 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -426,12 +426,16 @@ async def on_POST(self, request): # we do basic sanity checks here because the auth layer will store these # in sessions. Pull out the username/password provided to us. if "password" in body: - if ( - not isinstance(body["password"], string_types) - or len(body["password"]) > 512 - ): + password = body.pop("password") + if not isinstance(password, string_types) or len(password) > 512: raise SynapseError(400, "Invalid password") - self.password_policy_handler.validate_password(body["password"]) + self.password_policy_handler.validate_password(password) + + # If the password is valid, hash it and store it back on the request. + # This ensures the hashed password is handled everywhere. + if "password_hash" in body: + raise SynapseError(400, "Unexpected property: password_hash") + body["password_hash"] = await self.auth_handler.hash(password) desired_username = None if "username" in body: @@ -484,7 +488,7 @@ async def on_POST(self, request): guest_access_token = body.get("guest_access_token", None) - if "initial_device_display_name" in body and "password" not in body: + if "initial_device_display_name" in body and "password_hash" not in body: # ignore 'initial_device_display_name' if sent without # a password to work around a client bug where it sent # the 'initial_device_display_name' param alone, wiping out @@ -546,11 +550,11 @@ async def on_POST(self, request): registered = False else: # NB: This may be from the auth handler and NOT from the POST - assert_params_in_dict(params, ["password"]) + assert_params_in_dict(params, ["password_hash"]) desired_username = params.get("username", None) guest_access_token = params.get("guest_access_token", None) - new_password = params.get("password", None) + new_password_hash = params.get("password_hash", None) if desired_username is not None: desired_username = desired_username.lower() @@ -583,7 +587,7 @@ async def on_POST(self, request): registered_user_id = await self.registration_handler.register_user( localpart=desired_username, - password=new_password, + password_hash=new_password_hash, guest_access_token=guest_access_token, threepid=threepid, address=client_addr, From 3c8a57f080a66a3d4d146adf7020c18b397bcf6c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 18 May 2020 10:58:51 -0400 Subject: [PATCH 02/28] 1.13.0rc3 --- CHANGES.md | 9 +++++++++ changelog.d/7523.bugfix | 1 - synapse/__init__.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) delete mode 100644 changelog.d/7523.bugfix diff --git a/CHANGES.md b/CHANGES.md index 7dbe072fed11..45008995e5c3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,12 @@ +Synapse 1.13.0rc3 (2020-05-18) +============================== + +Bugfixes +-------- + +- Hash passwords as early as possible during registration. ([\#7523](https://github.com/matrix-org/synapse/issues/7523)) + + Synapse 1.13.0rc2 (2020-05-14) ============================== diff --git a/changelog.d/7523.bugfix b/changelog.d/7523.bugfix deleted file mode 100644 index 552dae39f08e..000000000000 --- a/changelog.d/7523.bugfix +++ /dev/null @@ -1 +0,0 @@ -Hash passwords as early as possible during registration. diff --git a/synapse/__init__.py b/synapse/__init__.py index 977e26a04828..6b26c5e87e5c 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -36,7 +36,7 @@ except ImportError: pass -__version__ = "1.13.0rc2" +__version__ = "1.13.0rc3" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when From 250f3eb991f129bbd12e4fce58a9d4124aabd41e Mon Sep 17 00:00:00 2001 From: Aaron Raimist Date: Tue, 19 May 2020 04:31:25 -0500 Subject: [PATCH 03/28] Omit displayname or avatar_url if they aren't set instead of returning null (#7497) Per https://github.com/matrix-org/matrix-doc/issues/1436#issuecomment-410089470 they should be omitted instead of returning null or "". They aren't marked as required in the spec. Fixes https://github.com/matrix-org/synapse/issues/7333 Signed-off-by: Aaron Raimist --- changelog.d/7497.bugfix | 1 + synapse/handlers/message.py | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 changelog.d/7497.bugfix diff --git a/changelog.d/7497.bugfix b/changelog.d/7497.bugfix new file mode 100644 index 000000000000..3c7920cb1059 --- /dev/null +++ b/changelog.d/7497.bugfix @@ -0,0 +1 @@ +When sending `m.room.member` events, omit `displayname` and `avatar_url` if they aren't set instead of setting them to `null`. Contributed by Aaron Raimist. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 0242521cc64c..8f362896a2a7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -484,9 +484,13 @@ def create_event( try: if "displayname" not in content: - content["displayname"] = yield profile.get_displayname(target) + displayname = yield profile.get_displayname(target) + if displayname is not None: + content["displayname"] = displayname if "avatar_url" not in content: - content["avatar_url"] = yield profile.get_avatar_url(target) + avatar_url = yield profile.get_avatar_url(target) + if avatar_url is not None: + content["avatar_url"] = avatar_url except Exception as e: logger.info( "Failed to get profile information for %r: %s", target, e From ab3e19d814f26442f128420f43eb990cc3457bff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20T=C3=B6tterman?= Date: Tue, 19 May 2020 13:20:23 +0300 Subject: [PATCH 04/28] Improve API doc readability (#7527) --- docs/admin_api/user_admin_api.rst | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/docs/admin_api/user_admin_api.rst b/docs/admin_api/user_admin_api.rst index 859d7f99e7c8..776e71ec0439 100644 --- a/docs/admin_api/user_admin_api.rst +++ b/docs/admin_api/user_admin_api.rst @@ -33,21 +33,22 @@ with a body of: including an ``access_token`` of a server admin. -The parameter ``displayname`` is optional and defaults to the value of -``user_id``. +Parameters: -The parameter ``threepids`` is optional and allows setting the third-party IDs -(email, msisdn) belonging to a user. +- ``password``, optional. If provided, the user's password is updated and all + devices are logged out. + +- ``displayname``, optional, defaults to the value of ``user_id``. -The parameter ``avatar_url`` is optional. Must be a [MXC -URI](https://matrix.org/docs/spec/client_server/r0.6.0#matrix-content-mxc-uris). +- ``threepids``, optional, allows setting the third-party IDs (email, msisdn) + belonging to a user. -The parameter ``admin`` is optional and defaults to ``false``. +- ``avatar_url``, optional, must be a + `MXC URI `_. -The parameter ``deactivated`` is optional and defaults to ``false``. +- ``admin``, optional, defaults to ``false``. -The parameter ``password`` is optional. If provided, the user's password is -updated and all devices are logged out. +- ``deactivated``, optional, defaults to ``false``. If the user already exists then optional parameters default to the current value. From a57863d2b4b14b3ca9186f5b4dd6bbae50706741 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Romain=20Bouy=C3=A9?= Date: Tue, 19 May 2020 13:47:45 +0100 Subject: [PATCH 05/28] synctl warns when no process is stopped and avoids start (#6598) * If an error occurs when stopping a process synctl now logs a warning. * During a restart, synctl will avoid attempting to start Synapse if an error occurs during stopping Synapse. --- changelog.d/6590.misc | 1 + synctl | 31 ++++++++++++++++++++++++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) create mode 100644 changelog.d/6590.misc diff --git a/changelog.d/6590.misc b/changelog.d/6590.misc new file mode 100644 index 000000000000..05a0156212f1 --- /dev/null +++ b/changelog.d/6590.misc @@ -0,0 +1 @@ +`synctl` now warns if it was unable to stop Synapse and will not attempt to start Synapse if nothing was stopped. Contributed by Romain Bouyé. diff --git a/synctl b/synctl index bbccd0529088..81e9bf6b5c39 100755 --- a/synctl +++ b/synctl @@ -142,12 +142,23 @@ def start_worker(app: str, configfile: str, worker_configfile: str) -> bool: return False -def stop(pidfile, app): +def stop(pidfile: str, app: str) -> bool: + """Attempts to kill a synapse worker from the pidfile. + Args: + pidfile: path to file containing worker's pid + app: name of the worker's appservice + + Returns: + True if the process stopped successfully + False if process was already stopped or an error occured + """ + if os.path.exists(pidfile): pid = int(open(pidfile).read()) try: os.kill(pid, signal.SIGTERM) write("stopped %s" % (app,), colour=GREEN) + return True except OSError as err: if err.errno == errno.ESRCH: write("%s not running" % (app,), colour=YELLOW) @@ -155,6 +166,14 @@ def stop(pidfile, app): abort("Cannot stop %s: Operation not permitted" % (app,)) else: abort("Cannot stop %s: Unknown error" % (app,)) + return False + else: + write( + "No running worker of %s found (from %s)\nThe process might be managed by another controller (e.g. systemd)" + % (app, pidfile), + colour=YELLOW, + ) + return False Worker = collections.namedtuple( @@ -300,11 +319,17 @@ def main(): action = options.action if action == "stop" or action == "restart": + has_stopped = True for worker in workers: - stop(worker.pidfile, worker.app) + if not stop(worker.pidfile, worker.app): + # A worker could not be stopped. + has_stopped = False if start_stop_synapse: - stop(pidfile, "synapse.app.homeserver") + if not stop(pidfile, "synapse.app.homeserver"): + has_stopped = False + if not has_stopped: + sys.exit(1) # Wait for synapse to actually shutdown before starting it again if action == "restart": From 1fc8914f767f5f80d7263d8db96d73bf0310a39c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 19 May 2020 13:48:41 +0100 Subject: [PATCH 06/28] update dh-virtualenv (#7526) --- changelog.d/7526.misc | 1 + debian/build_virtualenv | 1 - debian/changelog | 4 +--- docker/Dockerfile-dhvirtualenv | 15 ++++++++------- scripts-dev/build_debian_packages | 1 + 5 files changed, 11 insertions(+), 11 deletions(-) create mode 100644 changelog.d/7526.misc diff --git a/changelog.d/7526.misc b/changelog.d/7526.misc new file mode 100644 index 000000000000..c739312c4c0a --- /dev/null +++ b/changelog.d/7526.misc @@ -0,0 +1 @@ +Update the version of dh-virtualenv we use to build debs, and add focal to the list of target distributions. diff --git a/debian/build_virtualenv b/debian/build_virtualenv index d892fd5c9d9a..4c9aabcac386 100755 --- a/debian/build_virtualenv +++ b/debian/build_virtualenv @@ -36,7 +36,6 @@ esac dh_virtualenv \ --install-suffix "matrix-synapse" \ --builtin-venv \ - --setuptools \ --python "$SNAKE" \ --upgrade-pip \ --preinstall="lxml" \ diff --git a/debian/changelog b/debian/changelog index 8641571986e9..2db94ee6095e 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,16 +1,14 @@ -<<<<<<< HEAD matrix-synapse-py3 (1.12.3ubuntu1) UNRELEASED; urgency=medium * Add information about .well-known files to Debian installation scripts. -- Patrick Cloke Mon, 06 Apr 2020 10:10:38 -0400 -======= + matrix-synapse-py3 (1.12.4) stable; urgency=medium * New synapse release 1.12.4. -- Synapse Packaging team Thu, 23 Apr 2020 10:58:14 -0400 ->>>>>>> master matrix-synapse-py3 (1.12.3) stable; urgency=medium diff --git a/docker/Dockerfile-dhvirtualenv b/docker/Dockerfile-dhvirtualenv index ac9ebcfd88c2..579724685c2b 100644 --- a/docker/Dockerfile-dhvirtualenv +++ b/docker/Dockerfile-dhvirtualenv @@ -27,15 +27,16 @@ RUN env DEBIAN_FRONTEND=noninteractive apt-get install \ wget # fetch and unpack the package -RUN wget -q -O /dh-virtuenv-1.1.tar.gz https://github.com/spotify/dh-virtualenv/archive/1.1.tar.gz -RUN tar xvf /dh-virtuenv-1.1.tar.gz +RUN mkdir /dh-virtualenv +RUN wget -q -O /dh-virtualenv.tar.gz https://github.com/matrix-org/dh-virtualenv/archive/matrixorg-20200519.tar.gz +RUN tar -xv --strip-components=1 -C /dh-virtualenv -f /dh-virtualenv.tar.gz # install its build deps -RUN cd dh-virtualenv-1.1/ \ - && env DEBIAN_FRONTEND=noninteractive mk-build-deps -ri -t "apt-get -yqq --no-install-recommends" +RUN cd /dh-virtualenv \ + && env DEBIAN_FRONTEND=noninteractive mk-build-deps -ri -t "apt-get -y --no-install-recommends" # build it -RUN cd dh-virtualenv-1.1 && dpkg-buildpackage -us -uc -b +RUN cd /dh-virtualenv && dpkg-buildpackage -us -uc -b ### ### Stage 1 @@ -68,12 +69,12 @@ RUN apt-get update -qq -o Acquire::Languages=none \ sqlite3 \ libpq-dev -COPY --from=builder /dh-virtualenv_1.1-1_all.deb / +COPY --from=builder /dh-virtualenv_1.2~dev-1_all.deb / # install dhvirtualenv. Update the apt cache again first, in case we got a # cached cache from docker the first time. RUN apt-get update -qq -o Acquire::Languages=none \ - && apt-get install -yq /dh-virtualenv_1.1-1_all.deb + && apt-get install -yq /dh-virtualenv_1.2~dev-1_all.deb WORKDIR /synapse/source ENTRYPOINT ["bash","/synapse/source/docker/build_debian.sh"] diff --git a/scripts-dev/build_debian_packages b/scripts-dev/build_debian_packages index 84eaec6a9512..ae2145d71723 100755 --- a/scripts-dev/build_debian_packages +++ b/scripts-dev/build_debian_packages @@ -27,6 +27,7 @@ DISTS = ( "ubuntu:cosmic", "ubuntu:disco", "ubuntu:eoan", + "ubuntu:focal", ) DESC = '''\ From ac3264bf1ec8815cde14d77a62caad373c280ef1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 19 May 2020 09:19:09 -0400 Subject: [PATCH 07/28] 1.13.0 --- CHANGES.md | 9 +++++++++ changelog.d/7526.misc | 1 - debian/changelog | 8 ++++++-- synapse/__init__.py | 2 +- 4 files changed, 16 insertions(+), 4 deletions(-) delete mode 100644 changelog.d/7526.misc diff --git a/CHANGES.md b/CHANGES.md index 45008995e5c3..04a3e525e664 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,12 @@ +Synapse 1.13.0 (2020-05-19) +=========================== + +Internal Changes +---------------- + +- Update the version of dh-virtualenv we use to build debs, and add focal to the list of target distributions. ([\#7526](https://github.com/matrix-org/synapse/issues/7526)) + + Synapse 1.13.0rc3 (2020-05-18) ============================== diff --git a/changelog.d/7526.misc b/changelog.d/7526.misc deleted file mode 100644 index c739312c4c0a..000000000000 --- a/changelog.d/7526.misc +++ /dev/null @@ -1 +0,0 @@ -Update the version of dh-virtualenv we use to build debs, and add focal to the list of target distributions. diff --git a/debian/changelog b/debian/changelog index 2db94ee6095e..e7842d417416 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,8 +1,12 @@ -matrix-synapse-py3 (1.12.3ubuntu1) UNRELEASED; urgency=medium +matrix-synapse-py3 (1.13.0) stable; urgency=medium + [ Patrick Cloke ] * Add information about .well-known files to Debian installation scripts. - -- Patrick Cloke Mon, 06 Apr 2020 10:10:38 -0400 + [ Synapse Packaging team ] + * New synapse release 1.13.0. + + -- Synapse Packaging team Tue, 19 May 2020 09:16:56 -0400 matrix-synapse-py3 (1.12.4) stable; urgency=medium diff --git a/synapse/__init__.py b/synapse/__init__.py index 6b26c5e87e5c..0abf4911729d 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -36,7 +36,7 @@ except ImportError: pass -__version__ = "1.13.0rc3" +__version__ = "1.13.0" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when From 66fd16261c5662a2ac315c60f19b018d4c3319e0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 19 May 2020 09:24:29 -0400 Subject: [PATCH 08/28] Move warnings in the changelog and re-iterate changes to branches. --- CHANGES.md | 50 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 04a3e525e664..96dfb1668fee 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,38 @@ Synapse 1.13.0 (2020-05-19) =========================== +This release brings some potential changes necessary for certain +configurations of Synapse: + +* If your Synapse is configured to use SSO and have a custom + `sso_redirect_confirm_template_dir` configuration option set, you will need + to duplicate the new `sso_auth_confirm.html`, `sso_auth_success.html` and + `sso_account_deactivated.html` templates into that directory. +* Synapse plugins using the `complete_sso_login` method of + `synapse.module_api.ModuleApi` should instead switch to the async/await + version, `complete_sso_login_async`, which includes additional checks. The + former version is now deprecated. +* A bug was introduced in Synapse 1.4.0 which could cause the room directory + to be incomplete or empty if Synapse was upgraded directly from v1.2.1 or + earlier, to versions between v1.4.0 and v1.12.x. + +Please review [UPGRADE.rst](UPGRADE.rst) for more details on these changes +and for general upgrade guidance. + + +Notice of change to the default `git` branch for Synapse +-------------------------------------------------------- + +With the release of Synapse 1.13.0, the default `git` branch for Synapse has +changed to `develop`, which is the development tip. This is more consistent with +common practice and modern `git` usage. + +The `master` branch, which tracks the latest release, is still available. It is +recommended that developers and distributors who have scripts which run builds +using the default branch of `Synapse` should therefore consider pinning their +scripts to `master`. + + Internal Changes ---------------- @@ -34,24 +66,6 @@ Internal Changes Synapse 1.13.0rc1 (2020-05-11) ============================== -This release brings some potential changes necessary for certain -configurations of Synapse: - -* If your Synapse is configured to use SSO and have a custom - `sso_redirect_confirm_template_dir` configuration option set, you will need - to duplicate the new `sso_auth_confirm.html`, `sso_auth_success.html` and - `sso_account_deactivated.html` templates into that directory. -* Synapse plugins using the `complete_sso_login` method of - `synapse.module_api.ModuleApi` should instead switch to the async/await - version, `complete_sso_login_async`, which includes additional checks. The - former version is now deprecated. -* A bug was introduced in Synapse 1.4.0 which could cause the room directory - to be incomplete or empty if Synapse was upgraded directly from v1.2.1 or - earlier, to versions between v1.4.0 and v1.12.x. - -Please review [UPGRADE.rst](UPGRADE.rst) for more details on these changes -and for general upgrade guidance. - Features -------- From 45c8b1c618878905ab20187460da2260590211b2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 19 May 2020 09:31:59 -0400 Subject: [PATCH 09/28] Update changelog based on feedback. --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 96dfb1668fee..225fced285a3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -29,7 +29,7 @@ common practice and modern `git` usage. The `master` branch, which tracks the latest release, is still available. It is recommended that developers and distributors who have scripts which run builds -using the default branch of `Synapse` should therefore consider pinning their +using the default branch of Synapse should therefore consider pinning their scripts to `master`. From 4fa74c7606ff302b0f255e418299eacdc1b5ca7f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 20 May 2020 11:04:34 +0100 Subject: [PATCH 10/28] Minor clarifications to the TURN docs (#7533) --- changelog.d/7533.doc | 1 + docs/turn-howto.md | 57 ++++++++++++++++++++++++++++++++------------ 2 files changed, 43 insertions(+), 15 deletions(-) create mode 100644 changelog.d/7533.doc diff --git a/changelog.d/7533.doc b/changelog.d/7533.doc new file mode 100644 index 000000000000..e3c1df99faa6 --- /dev/null +++ b/changelog.d/7533.doc @@ -0,0 +1 @@ +Minor clarifications to the TURN docs. diff --git a/docs/turn-howto.md b/docs/turn-howto.md index b8a2ba3e82b1..d4a726be660d 100644 --- a/docs/turn-howto.md +++ b/docs/turn-howto.md @@ -18,7 +18,7 @@ For TURN relaying with `coturn` to work, it must be hosted on a server/endpoint Hosting TURN behind a NAT (even with appropriate port forwarding) is known to cause issues and to often not work. -## `coturn` Setup +## `coturn` setup ### Initial installation @@ -26,7 +26,13 @@ The TURN daemon `coturn` is available from a variety of sources such as native p #### Debian installation - # apt install coturn +Just install the debian package: + +```sh +apt install coturn +``` + +This will install and start a systemd service called `coturn`. #### Source installation @@ -63,38 +69,52 @@ The TURN daemon `coturn` is available from a variety of sources such as native p 1. Consider your security settings. TURN lets users request a relay which will connect to arbitrary IP addresses and ports. The following configuration is suggested as a minimum starting point: - + # VoIP traffic is all UDP. There is no reason to let users connect to arbitrary TCP endpoints via the relay. no-tcp-relay - + # don't let the relay ever try to connect to private IP address ranges within your network (if any) # given the turn server is likely behind your firewall, remember to include any privileged public IPs too. denied-peer-ip=10.0.0.0-10.255.255.255 denied-peer-ip=192.168.0.0-192.168.255.255 denied-peer-ip=172.16.0.0-172.31.255.255 - + # special case the turn server itself so that client->TURN->TURN->client flows work allowed-peer-ip=10.0.0.1 - + # consider whether you want to limit the quota of relayed streams per user (or total) to avoid risk of DoS. user-quota=12 # 4 streams per video call, so 12 streams = 3 simultaneous relayed calls per user. total-quota=1200 - Ideally coturn should refuse to relay traffic which isn't SRTP; see - +1. Also consider supporting TLS/DTLS. To do this, add the following settings + to `turnserver.conf`: + + # TLS certificates, including intermediate certs. + # For Let's Encrypt certificates, use `fullchain.pem` here. + cert=/path/to/fullchain.pem + + # TLS private key file + pkey=/path/to/privkey.pem 1. Ensure your firewall allows traffic into the TURN server on the ports - you've configured it to listen on (remember to allow both TCP and UDP TURN - traffic) + you've configured it to listen on (By default: 3478 and 5349 for the TURN(s) + traffic (remember to allow both TCP and UDP traffic), and ports 49152-65535 + for the UDP relay.) + +1. (Re)start the turn server: -1. If you've configured coturn to support TLS/DTLS, generate or import your - private key and certificate. + * If you used the Debian package (or have set up a systemd unit yourself): + ```sh + systemctl restart coturn + ``` -1. Start the turn server: + * If you installed from source: - bin/turnserver -o + ```sh + bin/turnserver -o + ``` -## synapse Setup +## Synapse setup Your home server configuration file needs the following extra keys: @@ -126,7 +146,14 @@ As an example, here is the relevant section of the config file for matrix.org: After updating the homeserver configuration, you must restart synapse: + * If you use synctl: + ```sh cd /where/you/run/synapse ./synctl restart + ``` + * If you use systemd: + ``` + systemctl restart synapse.service + ``` ..and your Home Server now supports VoIP relaying! From 9dc6f3075aea7c76c3d6a201f8a78ace76f99a3e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 20 May 2020 09:48:03 -0400 Subject: [PATCH 11/28] Hash passwords earlier in the password reset process (#7538) This now matches the logic of the registration process as modified in 56db0b1365965c02ff539193e26c333b7f70d101 / #7523. --- changelog.d/7538.bugfix | 1 + synapse/handlers/set_password.py | 5 +---- synapse/rest/admin/users.py | 13 +++++++++++-- synapse/rest/client/v2_alpha/account.py | 21 ++++++++++++++++++--- synapse/rest/client/v2_alpha/register.py | 4 ++-- 5 files changed, 33 insertions(+), 11 deletions(-) create mode 100644 changelog.d/7538.bugfix diff --git a/changelog.d/7538.bugfix b/changelog.d/7538.bugfix new file mode 100644 index 000000000000..4a614a9e61cc --- /dev/null +++ b/changelog.d/7538.bugfix @@ -0,0 +1 @@ + Hash passwords as early as possible during password reset. diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py index 63d8f9aa0d54..4d245b618b17 100644 --- a/synapse/handlers/set_password.py +++ b/synapse/handlers/set_password.py @@ -35,16 +35,13 @@ def __init__(self, hs): async def set_password( self, user_id: str, - new_password: str, + password_hash: str, logout_devices: bool, requester: Optional[Requester] = None, ): if not self.hs.config.password_localdb_enabled: raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN) - self._password_policy_handler.validate_password(new_password) - password_hash = await self._auth_handler.hash(new_password) - try: await self.store.user_set_password_hash(user_id, password_hash) except StoreError as e: diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 326682fbdb67..e7f6928c859d 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -222,8 +222,14 @@ async def on_PUT(self, request, user_id): else: new_password = body["password"] logout_devices = True + + new_password_hash = await self.auth_handler.hash(new_password) + await self.set_password_handler.set_password( - target_user.to_string(), new_password, logout_devices, requester + target_user.to_string(), + new_password_hash, + logout_devices, + requester, ) if "deactivated" in body: @@ -523,6 +529,7 @@ def __init__(self, hs): self.store = hs.get_datastore() self.hs = hs self.auth = hs.get_auth() + self.auth_handler = hs.get_auth_handler() self._set_password_handler = hs.get_set_password_handler() async def on_POST(self, request, target_user_id): @@ -539,8 +546,10 @@ async def on_POST(self, request, target_user_id): new_password = params["new_password"] logout_devices = params.get("logout_devices", True) + new_password_hash = await self.auth_handler.hash(new_password) + await self._set_password_handler.set_password( - target_user_id, new_password, logout_devices, requester + target_user_id, new_password_hash, logout_devices, requester ) return 200, {} diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 1bd023477902..d4f721b6b989 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -220,12 +220,27 @@ def __init__(self, hs): self.auth = hs.get_auth() self.auth_handler = hs.get_auth_handler() self.datastore = self.hs.get_datastore() + self.password_policy_handler = hs.get_password_policy_handler() self._set_password_handler = hs.get_set_password_handler() @interactive_auth_handler async def on_POST(self, request): body = parse_json_object_from_request(request) + # we do basic sanity checks here because the auth layer will store these + # in sessions. Pull out the new password provided to us. + if "new_password" in body: + new_password = body.pop("new_password") + if not isinstance(new_password, str) or len(new_password) > 512: + raise SynapseError(400, "Invalid password") + self.password_policy_handler.validate_password(new_password) + + # If the password is valid, hash it and store it back on the body. + # This ensures that only the hashed password is handled everywhere. + if "new_password_hash" in body: + raise SynapseError(400, "Unexpected property: new_password_hash") + body["new_password_hash"] = await self.auth_handler.hash(new_password) + # there are two possibilities here. Either the user does not have an # access token, and needs to do a password reset; or they have one and # need to validate their identity. @@ -276,12 +291,12 @@ async def on_POST(self, request): logger.error("Auth succeeded but no known type! %r", result.keys()) raise SynapseError(500, "", Codes.UNKNOWN) - assert_params_in_dict(params, ["new_password"]) - new_password = params["new_password"] + assert_params_in_dict(params, ["new_password_hash"]) + new_password_hash = params["new_password_hash"] logout_devices = params.get("logout_devices", True) await self._set_password_handler.set_password( - user_id, new_password, logout_devices, requester + user_id, new_password_hash, logout_devices, requester ) return 200, {} diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index c26927f27b9e..addd4cae1906 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -431,8 +431,8 @@ async def on_POST(self, request): raise SynapseError(400, "Invalid password") self.password_policy_handler.validate_password(password) - # If the password is valid, hash it and store it back on the request. - # This ensures the hashed password is handled everywhere. + # If the password is valid, hash it and store it back on the body. + # This ensures that only the hashed password is handled everywhere. if "password_hash" in body: raise SynapseError(400, "Unexpected property: password_hash") body["password_hash"] = await self.auth_handler.hash(password) From b2b86990705de8a099093ec141ad83e09f182034 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 20 May 2020 10:08:46 -0400 Subject: [PATCH 12/28] Remove Ubuntu Cosmic and Disco which are both EOL. (#7539) --- changelog.d/7539.misc | 1 + scripts-dev/build_debian_packages | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) create mode 100644 changelog.d/7539.misc diff --git a/changelog.d/7539.misc b/changelog.d/7539.misc new file mode 100644 index 000000000000..93c030875a7e --- /dev/null +++ b/changelog.d/7539.misc @@ -0,0 +1 @@ +Remove Ubuntu Cosmic and Disco from the list of distributions which we provide `.deb`s for, due to end-of-life. diff --git a/scripts-dev/build_debian_packages b/scripts-dev/build_debian_packages index ae2145d71723..e6f4bd1dcadf 100755 --- a/scripts-dev/build_debian_packages +++ b/scripts-dev/build_debian_packages @@ -24,8 +24,6 @@ DISTS = ( "debian:sid", "ubuntu:xenial", "ubuntu:bionic", - "ubuntu:cosmic", - "ubuntu:disco", "ubuntu:eoan", "ubuntu:focal", ) From 5db2a59a861f6ab79f18e9424123c5e152421306 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 20 May 2020 18:47:19 +0100 Subject: [PATCH 13/28] Update CONTRIBUTING.md (#7541) --- CONTRIBUTING.md | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3350e533dc8a..062413e92531 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -16,21 +16,10 @@ project on github, and then [create a pull request]( https://help.github.com/articles/using-pull-requests/) to ask us to pull your changes into our repo. -**The single biggest thing you need to know is: please base your changes on -the develop branch - *not* master.** - -We use the master branch to track the most recent release, so that folks who -blindly clone the repo and automatically check out master get something that -works. Develop is the unstable branch where all the development actually -happens: the workflow is that contributors should fork the develop branch to -make a 'feature' branch for a particular contribution, and then make a pull -request to merge this back into the matrix.org 'official' develop branch. We -use github's pull request workflow to review the contribution, and either ask -you to make any refinements needed or merge it and make them ourselves. The -changes will then land on master when we next do a release. - -Some other things you will need to know when contributing to Synapse: - +Some other points to follow: + + * Please base your changes on the `develop` branch. + * Please follow the [code style requirements](#code-style). * Please include a [changelog entry](#changelog) with each PR. From f6f92845f84301619f4c48eed9364b25db0b1445 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2020 13:20:10 +0100 Subject: [PATCH 14/28] Fix bug in persist events when dealing with non member types. (#7548) `_is_server_still_joined` will throw if it is given state updates with non-user ID state keys with local user leaves. This is actually rarely a problem since local leaves almost always get persisted by themselves. (I discovered this on a branch that was otherwise broken, so I haven't seen this in the wild) --- changelog.d/7548.bugfix | 1 + synapse/storage/persist_events.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changelog.d/7548.bugfix diff --git a/changelog.d/7548.bugfix b/changelog.d/7548.bugfix new file mode 100644 index 000000000000..1233b3b31a12 --- /dev/null +++ b/changelog.d/7548.bugfix @@ -0,0 +1 @@ +Fix bug where a local user leaving a room could fail under rare circumstances. diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 41881ea20b25..12e1ffb9a2f2 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -740,8 +740,8 @@ async def _is_server_still_joined( # whose state has changed as we've already their new state above. users_to_ignore = [ state_key - for _, state_key in itertools.chain(delta.to_insert, delta.to_delete) - if self.is_mine_id(state_key) + for typ, state_key in itertools.chain(delta.to_insert, delta.to_delete) + if typ == EventTypes.Member and self.is_mine_id(state_key) ] if await self.main_store.is_local_host_in_room_ignoring_users( From 075375bbc97f16c5750c446534342b3a63d9be5a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 21 May 2020 12:56:27 +0100 Subject: [PATCH 15/28] add a comment --- synapse/federation/sender/per_destination_queue.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 276a2b596f19..4e698981a4c8 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -80,6 +80,9 @@ def __init__( # a list of tuples of (pending pdu, order) self._pending_pdus = [] # type: List[Tuple[EventBase, int]] + + # XXX this is never actually used: see + # https://github.com/matrix-org/synapse/issues/7549 self._pending_edus = [] # type: List[Edu] # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered From d74cdc1a42e8b487d74c214b1d0ca575429d546a Mon Sep 17 00:00:00 2001 From: David Vo Date: Thu, 21 May 2020 22:47:23 +1000 Subject: [PATCH 16/28] Ensure worker config exists in systemd service (#7528) --- changelog.d/7528.doc | 1 + docs/systemd-with-workers/system/matrix-synapse-worker@.service | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/7528.doc diff --git a/changelog.d/7528.doc b/changelog.d/7528.doc new file mode 100644 index 000000000000..6f2a783b50b0 --- /dev/null +++ b/changelog.d/7528.doc @@ -0,0 +1 @@ +Change the systemd worker service to check that the worker config file exists instead of silently failing. Contributed by David Vo. diff --git a/docs/systemd-with-workers/system/matrix-synapse-worker@.service b/docs/systemd-with-workers/system/matrix-synapse-worker@.service index 70589a7a51c1..39bc5e88e862 100644 --- a/docs/systemd-with-workers/system/matrix-synapse-worker@.service +++ b/docs/systemd-with-workers/system/matrix-synapse-worker@.service @@ -1,6 +1,6 @@ [Unit] Description=Synapse %i - +AssertPathExists=/etc/matrix-synapse/workers/%i.yaml # This service should be restarted when the synapse target is restarted. PartOf=matrix-synapse.target From 0bbbd10513008d30c17eb1d1e7ba1d091fb44ec7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 21 May 2020 14:36:46 +0100 Subject: [PATCH 17/28] Stub out GET presence requests in the frontend proxy (#7545) We don't really make any promises about returning accurate presence data when presence is disabled, so we may as well just return a static response, rather than making the master handle a request. --- changelog.d/7545.misc | 1 + synapse/app/generic_worker.py | 21 ++++----------------- 2 files changed, 5 insertions(+), 17 deletions(-) create mode 100644 changelog.d/7545.misc diff --git a/changelog.d/7545.misc b/changelog.d/7545.misc new file mode 100644 index 000000000000..177ec883e206 --- /dev/null +++ b/changelog.d/7545.misc @@ -0,0 +1 @@ +Make worker processes return a stubbed-out response to `GET /presence` requests. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 506b70443b5d..d751c9772be2 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -26,7 +26,7 @@ import synapse import synapse.events -from synapse.api.errors import HttpResponseException, SynapseError +from synapse.api.errors import SynapseError from synapse.api.urls import ( CLIENT_API_PREFIX, FEDERATION_PREFIX, @@ -137,31 +137,18 @@ class PresenceStatusStubServlet(RestServlet): """If presence is disabled this servlet can be used to stub out setting - presence status, while proxying the getters to the master instance. + presence status. """ PATTERNS = client_patterns("/presence/(?P[^/]*)/status") def __init__(self, hs): super(PresenceStatusStubServlet, self).__init__() - self.http_client = hs.get_simple_http_client() self.auth = hs.get_auth() - self.main_uri = hs.config.worker_main_http_uri async def on_GET(self, request, user_id): - # Pass through the auth headers, if any, in case the access token - # is there. - auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) - headers = {"Authorization": auth_headers} - - try: - result = await self.http_client.get_json( - self.main_uri + request.uri.decode("ascii"), headers=headers - ) - except HttpResponseException as e: - raise e.to_synapse_error() - - return 200, result + await self.auth.get_user_by_req(request) + return 200, {"presence": "offline"} async def on_PUT(self, request, user_id): await self.auth.get_user_by_req(request) From d1ae1015ecb7dcb36d7cb39d5d41733e1ced2a52 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 21 May 2020 17:41:12 +0200 Subject: [PATCH 18/28] Retry to sync out of sync device lists (#7453) When a call to `user_device_resync` fails, we don't currently mark the remote user's device list as out of sync, nor do we retry to sync it. https://github.com/matrix-org/synapse/pull/6776 introduced some code infrastructure to mark device lists as stale/out of sync. This commit uses that code infrastructure to mark device lists as out of sync if processing an incoming device list update makes the device handler realise that the device list is out of sync, but we can't resync right now. It also adds a looping call to retry all failed resync every 30s. This shouldn't cause too much spam in the logs as this commit also removes the "Failed to handle device list update for..." warning logs when catching `NotRetryingDestination`. Fixes #7418 --- changelog.d/7453.bugfix | 1 + synapse/handlers/device.py | 80 +++++++++++++++++++-- synapse/storage/data_stores/main/devices.py | 34 +++++---- tests/test_federation.py | 63 +++++++++++++++- 4 files changed, 158 insertions(+), 20 deletions(-) create mode 100644 changelog.d/7453.bugfix diff --git a/changelog.d/7453.bugfix b/changelog.d/7453.bugfix new file mode 100644 index 000000000000..629a3d61e235 --- /dev/null +++ b/changelog.d/7453.bugfix @@ -0,0 +1 @@ +Fix a bug that would cause Synapse not to resync out-of-sync device lists. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9bd941b5a0f7..29a19b457207 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -29,6 +29,7 @@ SynapseError, ) from synapse.logging.opentracing import log_kv, set_tag, trace +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import stringutils from synapse.util.async_helpers import Linearizer @@ -535,6 +536,15 @@ def __init__(self, hs, device_handler): iterable=True, ) + # Attempt to resync out of sync device lists every 30s. + self._resync_retry_in_progress = False + self.clock.looping_call( + run_as_background_process, + 30 * 1000, + func=self._maybe_retry_device_resync, + desc="_maybe_retry_device_resync", + ) + @trace @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): @@ -679,11 +689,50 @@ def _need_to_do_resync(self, user_id, updates): return False @defer.inlineCallbacks - def user_device_resync(self, user_id): + def _maybe_retry_device_resync(self): + """Retry to resync device lists that are out of sync, except if another retry is + in progress. + """ + if self._resync_retry_in_progress: + return + + try: + # Prevent another call of this function to retry resyncing device lists so + # we don't send too many requests. + self._resync_retry_in_progress = True + # Get all of the users that need resyncing. + need_resync = yield self.store.get_user_ids_requiring_device_list_resync() + # Iterate over the set of user IDs. + for user_id in need_resync: + # Try to resync the current user's devices list. Exception handling + # isn't necessary here, since user_device_resync catches all instances + # of "Exception" that might be raised from the federation request. This + # means that if an exception is raised by this function, it must be + # because of a database issue, which means _maybe_retry_device_resync + # probably won't be able to go much further anyway. + result = yield self.user_device_resync( + user_id=user_id, mark_failed_as_stale=False, + ) + # user_device_resync only returns a result if it managed to successfully + # resync and update the database. Updating the table of users requiring + # resync isn't necessary here as user_device_resync already does it + # (through self.store.update_remote_device_list_cache). + if result: + logger.debug( + "Successfully resynced the device list for %s" % user_id, + ) + finally: + # Allow future calls to retry resyncinc out of sync device lists. + self._resync_retry_in_progress = False + + @defer.inlineCallbacks + def user_device_resync(self, user_id, mark_failed_as_stale=True): """Fetches all devices for a user and updates the device cache with them. Args: user_id (str): The user's id whose device_list will be updated. + mark_failed_as_stale (bool): Whether to mark the user's device list as stale + if the attempt to resync failed. Returns: Deferred[dict]: a dict with device info as under the "devices" in the result of this request: @@ -694,10 +743,23 @@ def user_device_resync(self, user_id): origin = get_domain_from_id(user_id) try: result = yield self.federation.query_user_devices(origin, user_id) - except (NotRetryingDestination, RequestSendFailed, HttpResponseException): - # TODO: Remember that we are now out of sync and try again - # later - logger.warning("Failed to handle device list update for %s", user_id) + except NotRetryingDestination: + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + + return + except (RequestSendFailed, HttpResponseException) as e: + logger.warning( + "Failed to handle device list update for %s: %s", user_id, e, + ) + + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + # We abort on exceptions rather than accepting the update # as otherwise synapse will 'forget' that its device list # is out of date. If we bail then we will retry the resync @@ -711,13 +773,17 @@ def user_device_resync(self, user_id): logger.info(e) return except Exception as e: - # TODO: Remember that we are now out of sync and try again - # later set_tag("error", True) log_kv( {"message": "Exception raised by federation request", "exception": e} ) logger.exception("Failed to handle device list update for %s", user_id) + + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + return log_kv({"result": result}) stream_id = result["stream_id"] diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index fe6d6ecfe0e0..0e8378714ad8 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List, Tuple +from typing import List, Optional, Set, Tuple from six import iteritems @@ -649,21 +649,31 @@ def get_device_list_last_stream_id_for_remotes(self, user_ids): return results @defer.inlineCallbacks - def get_user_ids_requiring_device_list_resync(self, user_ids: Collection[str]): + def get_user_ids_requiring_device_list_resync( + self, user_ids: Optional[Collection[str]] = None, + ) -> Set[str]: """Given a list of remote users return the list of users that we - should resync the device lists for. + should resync the device lists for. If None is given instead of a list, + return every user that we should resync the device lists for. Returns: - Deferred[Set[str]] + The IDs of users whose device lists need resync. """ - - rows = yield self.db.simple_select_many_batch( - table="device_lists_remote_resync", - column="user_id", - iterable=user_ids, - retcols=("user_id",), - desc="get_user_ids_requiring_device_list_resync", - ) + if user_ids: + rows = yield self.db.simple_select_many_batch( + table="device_lists_remote_resync", + column="user_id", + iterable=user_ids, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync_with_iterable", + ) + else: + rows = yield self.db.simple_select_list( + table="device_lists_remote_resync", + keyvalues=None, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync", + ) return {row["user_id"] for row in rows} diff --git a/tests/test_federation.py b/tests/test_federation.py index f297de95f1b7..13ff14863ea5 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -6,12 +6,13 @@ from synapse.logging.context import LoggingContext from synapse.types import Requester, UserID from synapse.util import Clock +from synapse.util.retryutils import NotRetryingDestination from tests import unittest from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver -class MessageAcceptTests(unittest.TestCase): +class MessageAcceptTests(unittest.HomeserverTestCase): def setUp(self): self.http_client = Mock() @@ -145,3 +146,63 @@ def post_json(destination, path, data, headers=None, timeout=0): # Make sure the invalid event isn't there extrem = maybeDeferred(self.store.get_latest_event_ids_in_room, self.room_id) self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv") + + def test_retry_device_list_resync(self): + """Tests that device lists are marked as stale if they couldn't be synced, and + that stale device lists are retried periodically. + """ + remote_user_id = "@john:test_remote" + remote_origin = "test_remote" + + # Track the number of attempts to resync the user's device list. + self.resync_attempts = 0 + + # When this function is called, increment the number of resync attempts (only if + # we're querying devices for the right user ID), then raise a + # NotRetryingDestination error to fail the resync gracefully. + def query_user_devices(destination, user_id): + if user_id == remote_user_id: + self.resync_attempts += 1 + + raise NotRetryingDestination(0, 0, destination) + + # Register the mock on the federation client. + federation_client = self.homeserver.get_federation_client() + federation_client.query_user_devices = Mock(side_effect=query_user_devices) + + # Register a mock on the store so that the incoming update doesn't fail because + # we don't share a room with the user. + store = self.homeserver.get_datastore() + store.get_rooms_for_user = Mock(return_value=["!someroom:test"]) + + # Manually inject a fake device list update. We need this update to include at + # least one prev_id so that the user's device list will need to be retried. + device_list_updater = self.homeserver.get_device_handler().device_list_updater + self.get_success( + device_list_updater.incoming_device_list_update( + origin=remote_origin, + edu_content={ + "deleted": False, + "device_display_name": "Mobile", + "device_id": "QBUAZIFURK", + "prev_id": [5], + "stream_id": 6, + "user_id": remote_user_id, + }, + ) + ) + + # Check that there was one resync attempt. + self.assertEqual(self.resync_attempts, 1) + + # Check that the resync attempt failed and caused the user's device list to be + # marked as stale. + need_resync = self.get_success( + store.get_user_ids_requiring_device_list_resync() + ) + self.assertIn(remote_user_id, need_resync) + + # Check that waiting for 30 seconds caused Synapse to retry resyncing the device + # list. + self.reactor.advance(30) + self.assertEqual(self.resync_attempts, 2) From 66a564c859c035c273c8feb25c624473055f1d78 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 22 May 2020 10:11:50 +0100 Subject: [PATCH 19/28] Fix some DETECTED VIOLATIONS in the config file (#7550) consistency ftw --- changelog.d/7550.misc | 1 + docs/sample_config.yaml | 61 +++++++++++++++++++--------------- synapse/config/captcha.py | 17 ++++++---- synapse/config/emailconfig.py | 4 +-- synapse/config/key.py | 4 +-- synapse/config/metrics.py | 3 +- synapse/config/registration.py | 4 +-- synapse/config/server.py | 33 ++++++++++-------- 8 files changed, 72 insertions(+), 55 deletions(-) create mode 100644 changelog.d/7550.misc diff --git a/changelog.d/7550.misc b/changelog.d/7550.misc new file mode 100644 index 000000000000..79e119e977c5 --- /dev/null +++ b/changelog.d/7550.misc @@ -0,0 +1 @@ +Fix some indentation inconsistencies in the sample config. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 8a8415b9a290..0e1be153c718 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -322,22 +322,27 @@ listeners: # Used by phonehome stats to group together related servers. #server_context: context -# Resource-constrained homeserver Settings +# Resource-constrained homeserver settings # -# If limit_remote_rooms.enabled is True, the room complexity will be -# checked before a user joins a new remote room. If it is above -# limit_remote_rooms.complexity, it will disallow joining or -# instantly leave. +# When this is enabled, the room "complexity" will be checked before a user +# joins a new remote room. If it is above the complexity limit, the server will +# disallow joining, or will instantly leave. # -# limit_remote_rooms.complexity_error can be set to customise the text -# displayed to the user when a room above the complexity threshold has -# its join cancelled. +# Room complexity is an arbitrary measure based on factors such as the number of +# users in the room. # -# Uncomment the below lines to enable: -#limit_remote_rooms: -# enabled: true -# complexity: 1.0 -# complexity_error: "This room is too complex." +limit_remote_rooms: + # Uncomment to enable room complexity checking. + # + #enabled: true + + # the limit above which rooms cannot be joined. The default is 1.0. + # + #complexity: 0.5 + + # override the error which is returned when the room is too complex. + # + #complexity_error: "This room is too complex." # Whether to require a user to be in the room to add an alias to it. # Defaults to 'true'. @@ -942,25 +947,28 @@ url_preview_accept_language: ## Captcha ## -# See docs/CAPTCHA_SETUP for full details of configuring this. +# See docs/CAPTCHA_SETUP.md for full details of configuring this. -# This homeserver's ReCAPTCHA public key. +# This homeserver's ReCAPTCHA public key. Must be specified if +# enable_registration_captcha is enabled. # #recaptcha_public_key: "YOUR_PUBLIC_KEY" -# This homeserver's ReCAPTCHA private key. +# This homeserver's ReCAPTCHA private key. Must be specified if +# enable_registration_captcha is enabled. # #recaptcha_private_key: "YOUR_PRIVATE_KEY" -# Enables ReCaptcha checks when registering, preventing signup +# Uncomment to enable ReCaptcha checks when registering, preventing signup # unless a captcha is answered. Requires a valid ReCaptcha -# public/private key. +# public/private key. Defaults to 'false'. # -#enable_registration_captcha: false +#enable_registration_captcha: true # The API endpoint to use for verifying m.login.recaptcha responses. +# Defaults to "https://www.recaptcha.net/recaptcha/api/siteverify". # -#recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify" +#recaptcha_siteverify_api: "https://my.recaptcha.site" ## TURN ## @@ -1104,7 +1112,7 @@ account_validity: # If set, allows registration of standard or admin accounts by anyone who # has the shared secret, even if registration is otherwise disabled. # -# registration_shared_secret: +#registration_shared_secret: # Set the number of bcrypt rounds used to generate password hash. # Larger numbers increase the work factor needed to generate the hash. @@ -1237,7 +1245,8 @@ metrics_flags: #known_servers: true # Whether or not to report anonymized homeserver usage statistics. -# report_stats: true|false +# +#report_stats: true|false # The endpoint to report the anonymized homeserver usage statistics to. # Defaults to https://matrix.org/report-usage-stats/push @@ -1273,13 +1282,13 @@ metrics_flags: # the registration_shared_secret is used, if one is given; otherwise, # a secret key is derived from the signing key. # -# macaroon_secret_key: +#macaroon_secret_key: # a secret which is used to calculate HMACs for form values, to stop # falsification of values. Must be specified for the User Consent # forms to work. # -# form_secret: +#form_secret: ## Signing Keys ## @@ -1764,8 +1773,8 @@ email: # Username/password for authentication to the SMTP server. By default, no # authentication is attempted. # - # smtp_user: "exampleusername" - # smtp_pass: "examplepassword" + #smtp_user: "exampleusername" + #smtp_pass: "examplepassword" # Uncomment the following to require TLS transport security for SMTP. # By default, Synapse will connect over plain text, and will then switch to diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py index 56c87fa296cb..82f04d7966e2 100644 --- a/synapse/config/captcha.py +++ b/synapse/config/captcha.py @@ -32,23 +32,26 @@ def read_config(self, config, **kwargs): def generate_config_section(self, **kwargs): return """\ ## Captcha ## - # See docs/CAPTCHA_SETUP for full details of configuring this. + # See docs/CAPTCHA_SETUP.md for full details of configuring this. - # This homeserver's ReCAPTCHA public key. + # This homeserver's ReCAPTCHA public key. Must be specified if + # enable_registration_captcha is enabled. # #recaptcha_public_key: "YOUR_PUBLIC_KEY" - # This homeserver's ReCAPTCHA private key. + # This homeserver's ReCAPTCHA private key. Must be specified if + # enable_registration_captcha is enabled. # #recaptcha_private_key: "YOUR_PRIVATE_KEY" - # Enables ReCaptcha checks when registering, preventing signup + # Uncomment to enable ReCaptcha checks when registering, preventing signup # unless a captcha is answered. Requires a valid ReCaptcha - # public/private key. + # public/private key. Defaults to 'false'. # - #enable_registration_captcha: false + #enable_registration_captcha: true # The API endpoint to use for verifying m.login.recaptcha responses. + # Defaults to "https://www.recaptcha.net/recaptcha/api/siteverify". # - #recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify" + #recaptcha_siteverify_api: "https://my.recaptcha.site" """ diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index 76b8957ea502..ca61214454f8 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -311,8 +311,8 @@ def generate_config_section(self, config_dir_path, server_name, **kwargs): # Username/password for authentication to the SMTP server. By default, no # authentication is attempted. # - # smtp_user: "exampleusername" - # smtp_pass: "examplepassword" + #smtp_user: "exampleusername" + #smtp_pass: "examplepassword" # Uncomment the following to require TLS transport security for SMTP. # By default, Synapse will connect over plain text, and will then switch to diff --git a/synapse/config/key.py b/synapse/config/key.py index 066e7838c3b5..b529ea5da096 100644 --- a/synapse/config/key.py +++ b/synapse/config/key.py @@ -175,8 +175,8 @@ def generate_config_section( ) form_secret = 'form_secret: "%s"' % random_string_with_symbols(50) else: - macaroon_secret_key = "# macaroon_secret_key: " - form_secret = "# form_secret: " + macaroon_secret_key = "#macaroon_secret_key: " + form_secret = "#form_secret: " return ( """\ diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py index 6f517a71d092..6aad0d37c0cc 100644 --- a/synapse/config/metrics.py +++ b/synapse/config/metrics.py @@ -93,10 +93,11 @@ def generate_config_section(self, report_stats=None, **kwargs): #known_servers: true # Whether or not to report anonymized homeserver usage statistics. + # """ if report_stats is None: - res += "# report_stats: true|false\n" + res += "#report_stats: true|false\n" else: res += "report_stats: %s\n" % ("true" if report_stats else "false") diff --git a/synapse/config/registration.py b/synapse/config/registration.py index e7ea3a01cb87..a9aa8c3737f7 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -148,9 +148,7 @@ def generate_config_section(self, generate_secrets=False, **kwargs): random_string_with_symbols(50), ) else: - registration_shared_secret = ( - "# registration_shared_secret: " - ) + registration_shared_secret = "#registration_shared_secret: " return ( """\ diff --git a/synapse/config/server.py b/synapse/config/server.py index ed28da3deb9b..f57eefc99c14 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -434,7 +434,7 @@ class LimitRemoteRoomsConfig(object): ) self.limit_remote_rooms = LimitRemoteRoomsConfig( - **config.get("limit_remote_rooms", {}) + **(config.get("limit_remote_rooms") or {}) ) bind_port = config.get("bind_port") @@ -895,22 +895,27 @@ def generate_config_section( # Used by phonehome stats to group together related servers. #server_context: context - # Resource-constrained homeserver Settings + # Resource-constrained homeserver settings # - # If limit_remote_rooms.enabled is True, the room complexity will be - # checked before a user joins a new remote room. If it is above - # limit_remote_rooms.complexity, it will disallow joining or - # instantly leave. + # When this is enabled, the room "complexity" will be checked before a user + # joins a new remote room. If it is above the complexity limit, the server will + # disallow joining, or will instantly leave. # - # limit_remote_rooms.complexity_error can be set to customise the text - # displayed to the user when a room above the complexity threshold has - # its join cancelled. + # Room complexity is an arbitrary measure based on factors such as the number of + # users in the room. # - # Uncomment the below lines to enable: - #limit_remote_rooms: - # enabled: true - # complexity: 1.0 - # complexity_error: "This room is too complex." + limit_remote_rooms: + # Uncomment to enable room complexity checking. + # + #enabled: true + + # the limit above which rooms cannot be joined. The default is 1.0. + # + #complexity: 0.5 + + # override the error which is returned when the room is too complex. + # + #complexity_error: "This room is too complex." # Whether to require a user to be in the room to add an alias to it. # Defaults to 'true'. From d84bdfe599aa47617d59c8e6eea5071463382c1c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 22 May 2020 10:12:17 +0100 Subject: [PATCH 20/28] mypy for synapse.http.site (#7553) --- changelog.d/7553.misc | 1 + synapse/http/site.py | 9 ++++++--- tox.ini | 1 + 3 files changed, 8 insertions(+), 3 deletions(-) create mode 100644 changelog.d/7553.misc diff --git a/changelog.d/7553.misc b/changelog.d/7553.misc new file mode 100644 index 000000000000..90b9e8693a9a --- /dev/null +++ b/changelog.d/7553.misc @@ -0,0 +1 @@ +Include `synapse.http.site` in type checking. diff --git a/synapse/http/site.py b/synapse/http/site.py index 514f2f14029b..167293c46d54 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -14,6 +14,7 @@ import contextlib import logging import time +from typing import Optional from twisted.python.failure import Failure from twisted.web.server import Request, Site @@ -45,7 +46,7 @@ class SynapseRequest(Request): request even after the client has disconnected. Attributes: - logcontext(LoggingContext) : the log context for this request + logcontext: the log context for this request """ def __init__(self, channel, *args, **kw): @@ -53,10 +54,10 @@ def __init__(self, channel, *args, **kw): self.site = channel.site self._channel = channel # this is used by the tests self.authenticated_entity = None - self.start_time = 0 + self.start_time = 0.0 # we can't yet create the logcontext, as we don't know the method. - self.logcontext = None + self.logcontext = None # type: Optional[LoggingContext] global _next_request_seq self.request_seq = _next_request_seq @@ -182,6 +183,7 @@ def finish(self): self.finish_time = time.time() Request.finish(self) if not self._is_processing: + assert self.logcontext is not None with PreserveLoggingContext(self.logcontext): self._finished_processing() @@ -249,6 +251,7 @@ def _started_processing(self, servlet_name): def _finished_processing(self): """Log the completion of this request and update the metrics """ + assert self.logcontext is not None usage = self.logcontext.get_resource_usage() if self._processing_finished_time is None: diff --git a/tox.ini b/tox.ini index 3bb4d45e2a97..9fefcb72b510 100644 --- a/tox.ini +++ b/tox.ini @@ -193,6 +193,7 @@ commands = mypy \ synapse/handlers/saml_handler.py \ synapse/handlers/sync.py \ synapse/handlers/ui_auth \ + synapse/http/site.py \ synapse/logging/ \ synapse/metrics \ synapse/module_api \ From a0f99f81b35b0e2ff600d7c72a0d71f15bf94f4c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 22 May 2020 10:17:36 +0100 Subject: [PATCH 21/28] Fix stacktrace mangling in `patch_inline_callbacks` (#7554) `Failure()` is more cunning than `Failure(e)`. --- changelog.d/7554.misc | 1 + synapse/util/patch_inline_callbacks.py | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 changelog.d/7554.misc diff --git a/changelog.d/7554.misc b/changelog.d/7554.misc new file mode 100644 index 000000000000..7c35c46aa69d --- /dev/null +++ b/changelog.d/7554.misc @@ -0,0 +1 @@ +Fix some test code to not mangle stacktraces, to make it easier to debug errors. diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index fdff1957716f..2605f3c65b85 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -186,10 +186,15 @@ def check_yield_points_inner(*args, **kwargs): ) raise Exception(err) + # the wrapped function yielded a Deferred: yield it back up to the parent + # inlineCallbacks(). try: result = yield d - except Exception as e: - result = Failure(e) + except Exception: + # this will fish an earlier Failure out of the stack where possible, and + # thus is preferable to passing in an exeception to the Failure + # constructor, since it results in less stack-mangling. + result = Failure() if current_context() != expected_context: From 8c75da916c7be82e6e6ad6a83f48aa8416d806f8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 22 May 2020 10:17:47 +0100 Subject: [PATCH 22/28] Refresh apt cache when building dh_virtualenv docker image (#7555) When we tried to build debs for 1.13.0, the build failed because docker used a base docker image which had a stale apt cache. Fixes: #7540 --- changelog.d/7555.misc | 1 + docker/Dockerfile-dhvirtualenv | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 changelog.d/7555.misc diff --git a/changelog.d/7555.misc b/changelog.d/7555.misc new file mode 100644 index 000000000000..75a317613346 --- /dev/null +++ b/changelog.d/7555.misc @@ -0,0 +1 @@ +Refresh apt cache when building dh_virtualenv docker image. diff --git a/docker/Dockerfile-dhvirtualenv b/docker/Dockerfile-dhvirtualenv index 579724685c2b..bf6af74104cc 100644 --- a/docker/Dockerfile-dhvirtualenv +++ b/docker/Dockerfile-dhvirtualenv @@ -31,8 +31,10 @@ RUN mkdir /dh-virtualenv RUN wget -q -O /dh-virtualenv.tar.gz https://github.com/matrix-org/dh-virtualenv/archive/matrixorg-20200519.tar.gz RUN tar -xv --strip-components=1 -C /dh-virtualenv -f /dh-virtualenv.tar.gz -# install its build deps -RUN cd /dh-virtualenv \ +# install its build deps. We do another apt-cache-update here, because we might +# be using a stale cache from docker build. +RUN apt-get update -qq -o Acquire::Languages=none \ + && cd /dh-virtualenv \ && env DEBIAN_FRONTEND=noninteractive mk-build-deps -ri -t "apt-get -y --no-install-recommends" # build it From ac481a738eac021e07e591d8de0fa5f741574103 Mon Sep 17 00:00:00 2001 From: Ivan Shapovalov Date: Fri, 22 May 2020 13:08:41 +0300 Subject: [PATCH 23/28] synapse.metrics: implement detailed memory usage reporting on PyPy (#7536) PyPy's gc.get_stats() returns an object containing detailed allocator statistics which could be beneficial to collect as metrics. Signed-off-by: Ivan Shapovalov --- changelog.d/7536.misc | 1 + synapse/metrics/__init__.py | 79 ++++++++++++++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 changelog.d/7536.misc diff --git a/changelog.d/7536.misc b/changelog.d/7536.misc new file mode 100644 index 000000000000..c1211167fcc3 --- /dev/null +++ b/changelog.d/7536.misc @@ -0,0 +1 @@ +Synapse now exports [detailed allocator statistics](https://doc.pypy.org/en/latest/gc_info.html#gc-get-stats) and basic GC timings as Prometheus metrics (`pypy_gc_time_seconds_total` and `pypy_memory_bytes`) when run under PyPy. Contributed by Ivan Shapovalov. diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index d2fd29acb454..9cf31f96b3fa 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -26,7 +26,12 @@ import attr from prometheus_client import Counter, Gauge, Histogram -from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily +from prometheus_client.core import ( + REGISTRY, + CounterMetricFamily, + GaugeMetricFamily, + HistogramMetricFamily, +) from twisted.internet import reactor @@ -338,6 +343,78 @@ def collect(self): if not running_on_pypy: REGISTRY.register(GCCounts()) + +# +# PyPy GC / memory metrics +# + + +class PyPyGCStats(object): + def collect(self): + + # @stats is a pretty-printer object with __str__() returning a nice table, + # plus some fields that contain data from that table. + # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB'). + stats = gc.get_stats(memory_pressure=False) # type: ignore + # @s contains same fields as @stats, but as actual integers. + s = stats._s # type: ignore + + # also note that field naming is completely braindead + # and only vaguely correlates with the pretty-printed table. + # >>>> gc.get_stats(False) + # Total memory consumed: + # GC used: 8.7MB (peak: 39.0MB) # s.total_gc_memory, s.peak_memory + # in arenas: 3.0MB # s.total_arena_memory + # rawmalloced: 1.7MB # s.total_rawmalloced_memory + # nursery: 4.0MB # s.nursery_size + # raw assembler used: 31.0kB # s.jit_backend_used + # ----------------------------- + # Total: 8.8MB # stats.memory_used_sum + # + # Total memory allocated: + # GC allocated: 38.7MB (peak: 41.1MB) # s.total_allocated_memory, s.peak_allocated_memory + # in arenas: 30.9MB # s.peak_arena_memory + # rawmalloced: 4.1MB # s.peak_rawmalloced_memory + # nursery: 4.0MB # s.nursery_size + # raw assembler allocated: 1.0MB # s.jit_backend_allocated + # ----------------------------- + # Total: 39.7MB # stats.memory_allocated_sum + # + # Total time spent in GC: 0.073 # s.total_gc_time + + pypy_gc_time = CounterMetricFamily( + "pypy_gc_time_seconds_total", "Total time spent in PyPy GC", labels=[], + ) + pypy_gc_time.add_metric([], s.total_gc_time / 1000) + yield pypy_gc_time + + pypy_mem = GaugeMetricFamily( + "pypy_memory_bytes", + "Memory tracked by PyPy allocator", + labels=["state", "class", "kind"], + ) + # memory used by JIT assembler + pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used) + pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated) + # memory used by GCed objects + pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory) + pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory) + pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory) + pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory) + pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size) + pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size) + # totals + pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory) + pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory) + pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory) + pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory) + yield pypy_mem + + +if running_on_pypy: + REGISTRY.register(PyPyGCStats()) + + # # Twisted reactor metrics # From 547e4dd83e7d70c30ce8b40578d0750fc10373fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2020 11:39:20 +0100 Subject: [PATCH 24/28] Fix exception reporting due to HTTP request errors. (#7556) These are business as usual errors, rather than stuff we want to log at error. --- changelog.d/7556.misc | 1 + synapse/app/generic_worker.py | 13 +++++++++---- synapse/handlers/federation.py | 7 +++++++ synapse/http/matrixfederationclient.py | 7 +++++++ 4 files changed, 24 insertions(+), 4 deletions(-) create mode 100644 changelog.d/7556.misc diff --git a/changelog.d/7556.misc b/changelog.d/7556.misc new file mode 100644 index 000000000000..ed271f9de883 --- /dev/null +++ b/changelog.d/7556.misc @@ -0,0 +1 @@ +Stop logging some expected HTTP request errors as exceptions. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index d751c9772be2..a45c876213f2 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -26,7 +26,7 @@ import synapse import synapse.events -from synapse.api.errors import SynapseError +from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.api.urls import ( CLIENT_API_PREFIX, FEDERATION_PREFIX, @@ -202,9 +202,14 @@ async def on_POST(self, request, device_id): # is there. auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", []) headers = {"Authorization": auth_headers} - result = await self.http_client.post_json_get_json( - self.main_uri + request.uri.decode("ascii"), body, headers=headers - ) + try: + result = await self.http_client.post_json_get_json( + self.main_uri + request.uri.decode("ascii"), body, headers=headers + ) + except HttpResponseException as e: + raise e.to_synapse() from e + except RequestSendFailed as e: + raise SynapseError(502, "Failed to talk to master") from e return 200, result else: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 81d859f807bb..bb03cc9adde4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -40,6 +40,7 @@ Codes, FederationDeniedError, FederationError, + HttpResponseException, RequestSendFailed, SynapseError, ) @@ -1036,6 +1037,12 @@ async def try_backfill(domains): # TODO: We can probably do something more intelligent here. return True except SynapseError as e: + logger.info("Failed to backfill from %s because %s", dom, e) + continue + except HttpResponseException as e: + if 400 <= e.code < 500: + raise e.to_synapse_error() + logger.info("Failed to backfill from %s because %s", dom, e) continue except CodeMessageException as e: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 44077f534912..2d47b9ea001b 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -144,6 +144,11 @@ def _handle_json_response(reactor, timeout_sec, request, response): d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) body = yield make_deferred_yieldable(d) + except TimeoutError as e: + logger.warning( + "{%s} [%s] Timed out reading response", request.txn_id, request.destination, + ) + raise RequestSendFailed(e, can_retry=True) from e except Exception as e: logger.warning( "{%s} [%s] Error reading response: %s", @@ -424,6 +429,8 @@ def _send_request( ) response = yield request_deferred + except TimeoutError as e: + raise RequestSendFailed(e, can_retry=True) from e except DNSLookupError as e: raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e) except Exception as e: From 710d958c646dcdebea9d0ec254f1bc80fb0e82f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2020 11:41:41 +0100 Subject: [PATCH 25/28] On upgrade room only send canonical alias once. (#7547) Instead of doing a complicated dance of deleting and moving aliases one by one, which sends a canonical alias update into the old room for each one, lets do it all in one go. This also changes the function to move *all* local alias events to the new room, however that happens later on anyway. --- changelog.d/7547.misc | 1 + synapse/handlers/room.py | 115 ++++++++++++++++++++------------------- 2 files changed, 61 insertions(+), 55 deletions(-) create mode 100644 changelog.d/7547.misc diff --git a/changelog.d/7547.misc b/changelog.d/7547.misc new file mode 100644 index 000000000000..2cb8f9bd5b2c --- /dev/null +++ b/changelog.d/7547.misc @@ -0,0 +1 @@ +On upgrade room only send canonical alias once. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 73f9eeb39939..13850ba67261 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -439,73 +439,78 @@ async def _move_aliases_to_new_room( new_room_id: str, old_room_state: StateMap[str], ): - directory_handler = self.hs.get_handlers().directory_handler - - aliases = await self.store.get_aliases_for_room(old_room_id) - # check to see if we have a canonical alias. canonical_alias_event = None canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, "")) if canonical_alias_event_id: canonical_alias_event = await self.store.get_event(canonical_alias_event_id) - # first we try to remove the aliases from the old room (we suppress sending - # the room_aliases event until the end). - # - # Note that we'll only be able to remove aliases that (a) aren't owned by an AS, - # and (b) unless the user is a server admin, which the user created. - # - # This is probably correct - given we don't allow such aliases to be deleted - # normally, it would be odd to allow it in the case of doing a room upgrade - - # but it makes the upgrade less effective, and you have to wonder why a room - # admin can't remove aliases that point to that room anyway. - # (cf https://github.com/matrix-org/synapse/issues/2360) - # - removed_aliases = [] - for alias_str in aliases: - alias = RoomAlias.from_string(alias_str) - try: - await directory_handler.delete_association(requester, alias) - removed_aliases.append(alias_str) - except SynapseError as e: - logger.warning("Unable to remove alias %s from old room: %s", alias, e) - - # if we didn't find any aliases, or couldn't remove anyway, we can skip the rest - # of this. - if not removed_aliases: + await self.store.update_aliases_for_room(old_room_id, new_room_id) + + if not canonical_alias_event: return - # we can now add any aliases we successfully removed to the new room. - for alias in removed_aliases: - try: - await directory_handler.create_association( - requester, - RoomAlias.from_string(alias), - new_room_id, - servers=(self.hs.hostname,), - check_membership=False, - ) - logger.info("Moved alias %s to new room", alias) - except SynapseError as e: - # I'm not really expecting this to happen, but it could if the spam - # checking module decides it shouldn't, or similar. - logger.error("Error adding alias %s to new room: %s", alias, e) + # If there is a canonical alias we need to update the one in the old + # room and set one in the new one. + old_canonical_alias_content = dict(canonical_alias_event.content) + new_canonical_alias_content = {} + + canonical = canonical_alias_event.content.get("alias") + if canonical and self.hs.is_mine_id(canonical): + new_canonical_alias_content["alias"] = canonical + old_canonical_alias_content.pop("alias", None) + + # We convert to a list as it will be a Tuple. + old_alt_aliases = list(old_canonical_alias_content.get("alt_aliases", [])) + if old_alt_aliases: + old_canonical_alias_content["alt_aliases"] = old_alt_aliases + new_alt_aliases = new_canonical_alias_content.setdefault("alt_aliases", []) + for alias in canonical_alias_event.content.get("alt_aliases", []): + try: + if self.hs.is_mine_id(alias): + new_alt_aliases.append(alias) + old_alt_aliases.remove(alias) + except Exception: + logger.info( + "Invalid alias %s in canonical alias event %s", + alias, + canonical_alias_event_id, + ) + + if not old_alt_aliases: + old_canonical_alias_content.pop("alt_aliases") # If a canonical alias event existed for the old room, fire a canonical # alias event for the new room with a copy of the information. try: - if canonical_alias_event: - await self.event_creation_handler.create_and_send_nonmember_event( - requester, - { - "type": EventTypes.CanonicalAlias, - "state_key": "", - "room_id": new_room_id, - "sender": requester.user.to_string(), - "content": canonical_alias_event.content, - }, - ratelimit=False, - ) + await self.event_creation_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.CanonicalAlias, + "state_key": "", + "room_id": old_room_id, + "sender": requester.user.to_string(), + "content": old_canonical_alias_content, + }, + ratelimit=False, + ) + except SynapseError as e: + # again I'm not really expecting this to fail, but if it does, I'd rather + # we returned the new room to the client at this point. + logger.error("Unable to send updated alias events in old room: %s", e) + + try: + await self.event_creation_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.CanonicalAlias, + "state_key": "", + "room_id": new_room_id, + "sender": requester.user.to_string(), + "content": new_canonical_alias_content, + }, + ratelimit=False, + ) except SynapseError as e: # again I'm not really expecting this to fail, but if it does, I'd rather # we returned the new room to the client at this point. From 66f2ebc22fec01b4673fabae22f2c94dfeac58e3 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 22 May 2020 07:17:30 -0400 Subject: [PATCH 26/28] Use a non-empty RelayState for user interactive auth with SAML. (#7552) --- changelog.d/7552.bugfix | 1 + synapse/rest/client/v2_alpha/auth.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 changelog.d/7552.bugfix diff --git a/changelog.d/7552.bugfix b/changelog.d/7552.bugfix new file mode 100644 index 000000000000..60b31d6d3147 --- /dev/null +++ b/changelog.d/7552.bugfix @@ -0,0 +1 @@ +Fix "Missing RelayState parameter" error when using user interactive authentication with SAML for some SAML providers. diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index 7bca1326d54f..75590ebaeb71 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -177,7 +177,10 @@ async def on_GET(self, request, stagetype): ) elif self._saml_enabled: - client_redirect_url = b"" + # Some SAML identity providers (e.g. Google) require a + # RelayState parameter on requests. It is not necessary here, so + # pass in a dummy redirect URL (which will never get used). + client_redirect_url = b"unused" sso_redirect_url = self._saml_handler.handle_redirect_request( client_redirect_url, session ) From 06a02bc1ce9ef23a6dff28dbfd30f910ae330b1d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2020 13:41:11 +0100 Subject: [PATCH 27/28] Convert sending mail to async/await. (#7557) Mainly because sometimes the email push code raises exceptions where the stack traces have gotten lost, which is hopefully fixed by this. --- changelog.d/7557.misc | 1 + synapse/handlers/identity.py | 9 ++- synapse/push/emailpusher.py | 38 +++++----- synapse/push/mailer.py | 84 ++++++++++------------ tests/rest/client/v2_alpha/test_account.py | 4 +- 5 files changed, 60 insertions(+), 76 deletions(-) create mode 100644 changelog.d/7557.misc diff --git a/changelog.d/7557.misc b/changelog.d/7557.misc new file mode 100644 index 000000000000..c850a2bc0c1a --- /dev/null +++ b/changelog.d/7557.misc @@ -0,0 +1 @@ +Convert sending mail to async/await. diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 0f0e632b626c..9ed0d23b0fab 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -290,8 +290,7 @@ def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server): return changed - @defer.inlineCallbacks - def send_threepid_validation( + async def send_threepid_validation( self, email_address, client_secret, @@ -319,7 +318,7 @@ def send_threepid_validation( """ # Check that this email/client_secret/send_attempt combo is new or # greater than what we've seen previously - session = yield self.store.get_threepid_validation_session( + session = await self.store.get_threepid_validation_session( "email", client_secret, address=email_address, validated=False ) @@ -353,7 +352,7 @@ def send_threepid_validation( # Send the mail with the link containing the token, client_secret # and session_id try: - yield send_email_func(email_address, token, client_secret, session_id) + await send_email_func(email_address, token, client_secret, session_id) except Exception: logger.exception( "Error sending threepid validation email to %s", email_address @@ -364,7 +363,7 @@ def send_threepid_validation( self.hs.clock.time_msec() + self.hs.config.email_validation_token_lifetime ) - yield self.store.start_or_continue_validation_session( + await self.store.start_or_continue_validation_session( "email", email_address, session_id, diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index ba4551d61964..568c13eaea19 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -15,7 +15,6 @@ import logging -from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled from synapse.metrics.background_process_metrics import run_as_background_process @@ -132,8 +131,7 @@ def _resume_processing(self): self._is_processing = False self._start_processing() - @defer.inlineCallbacks - def _process(self): + async def _process(self): # we should never get here if we are already processing assert not self._is_processing @@ -142,7 +140,7 @@ def _process(self): if self.throttle_params is None: # this is our first loop: load up the throttle params - self.throttle_params = yield self.store.get_throttle_params_by_room( + self.throttle_params = await self.store.get_throttle_params_by_room( self.pusher_id ) @@ -151,7 +149,7 @@ def _process(self): while True: starting_max_ordering = self.max_stream_ordering try: - yield self._unsafe_process() + await self._unsafe_process() except Exception: logger.exception("Exception processing notifs") if self.max_stream_ordering == starting_max_ordering: @@ -159,8 +157,7 @@ def _process(self): finally: self._is_processing = False - @defer.inlineCallbacks - def _unsafe_process(self): + async def _unsafe_process(self): """ Main logic of the push loop without the wrapper function that sets up logging, measures and guards against multiple instances of it @@ -168,12 +165,12 @@ def _unsafe_process(self): """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering fn = self.store.get_unread_push_actions_for_user_in_range_for_email - unprocessed = yield fn(self.user_id, start, self.max_stream_ordering) + unprocessed = await fn(self.user_id, start, self.max_stream_ordering) soonest_due_at = None if not unprocessed: - yield self.save_last_stream_ordering_and_success(self.max_stream_ordering) + await self.save_last_stream_ordering_and_success(self.max_stream_ordering) return for push_action in unprocessed: @@ -201,15 +198,15 @@ def _unsafe_process(self): "throttle_ms": self.get_room_throttle_ms(push_action["room_id"]), } - yield self.send_notification(unprocessed, reason) + await self.send_notification(unprocessed, reason) - yield self.save_last_stream_ordering_and_success( + await self.save_last_stream_ordering_and_success( max(ea["stream_ordering"] for ea in unprocessed) ) # we update the throttle on all the possible unprocessed push actions for ea in unprocessed: - yield self.sent_notif_update_throttle(ea["room_id"], ea) + await self.sent_notif_update_throttle(ea["room_id"], ea) break else: if soonest_due_at is None or should_notify_at < soonest_due_at: @@ -227,14 +224,13 @@ def _unsafe_process(self): self.seconds_until(soonest_due_at), self.on_timer ) - @defer.inlineCallbacks - def save_last_stream_ordering_and_success(self, last_stream_ordering): + async def save_last_stream_ordering_and_success(self, last_stream_ordering): if last_stream_ordering is None: # This happens if we haven't yet processed anything return self.last_stream_ordering = last_stream_ordering - pusher_still_exists = yield self.store.update_pusher_last_stream_ordering_and_success( + pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success( self.app_id, self.email, self.user_id, @@ -275,13 +271,12 @@ def room_ready_to_notify_at(self, room_id): may_send_at = last_sent_ts + throttle_ms return may_send_at - @defer.inlineCallbacks - def sent_notif_update_throttle(self, room_id, notified_push_action): + async def sent_notif_update_throttle(self, room_id, notified_push_action): # We have sent a notification, so update the throttle accordingly. # If the event that triggered the notif happened more than # THROTTLE_RESET_AFTER_MS after the previous one that triggered a # notif, we release the throttle. Otherwise, the throttle is increased. - time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before( + time_of_previous_notifs = await self.store.get_time_of_last_push_action_before( notified_push_action["stream_ordering"] ) @@ -310,14 +305,13 @@ def sent_notif_update_throttle(self, room_id, notified_push_action): "last_sent_ts": self.clock.time_msec(), "throttle_ms": new_throttle_ms, } - yield self.store.set_throttle_params( + await self.store.set_throttle_params( self.pusher_id, room_id, self.throttle_params[room_id] ) - @defer.inlineCallbacks - def send_notification(self, push_actions, reason): + async def send_notification(self, push_actions, reason): logger.info("Sending notif email for user %r", self.user_id) - yield self.mailer.send_notification_mail( + await self.mailer.send_notification_mail( self.app_id, self.user_id, self.email, push_actions, reason ) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index ab33abbeed83..d57a66a697d5 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -26,8 +26,6 @@ import bleach import jinja2 -from twisted.internet import defer - from synapse.api.constants import EventTypes from synapse.api.errors import StoreError from synapse.logging.context import make_deferred_yieldable @@ -127,8 +125,7 @@ def __init__(self, hs, app_name, template_html, template_text): logger.info("Created Mailer for app_name %s" % app_name) - @defer.inlineCallbacks - def send_password_reset_mail(self, email_address, token, client_secret, sid): + async def send_password_reset_mail(self, email_address, token, client_secret, sid): """Send an email with a password reset link to a user Args: @@ -149,14 +146,13 @@ def send_password_reset_mail(self, email_address, token, client_secret, sid): template_vars = {"link": link} - yield self.send_email( + await self.send_email( email_address, "[%s] Password Reset" % self.hs.config.server_name, template_vars, ) - @defer.inlineCallbacks - def send_registration_mail(self, email_address, token, client_secret, sid): + async def send_registration_mail(self, email_address, token, client_secret, sid): """Send an email with a registration confirmation link to a user Args: @@ -177,14 +173,13 @@ def send_registration_mail(self, email_address, token, client_secret, sid): template_vars = {"link": link} - yield self.send_email( + await self.send_email( email_address, "[%s] Register your Email Address" % self.hs.config.server_name, template_vars, ) - @defer.inlineCallbacks - def send_add_threepid_mail(self, email_address, token, client_secret, sid): + async def send_add_threepid_mail(self, email_address, token, client_secret, sid): """Send an email with a validation link to a user for adding a 3pid to their account Args: @@ -206,20 +201,19 @@ def send_add_threepid_mail(self, email_address, token, client_secret, sid): template_vars = {"link": link} - yield self.send_email( + await self.send_email( email_address, "[%s] Validate Your Email" % self.hs.config.server_name, template_vars, ) - @defer.inlineCallbacks - def send_notification_mail( + async def send_notification_mail( self, app_id, user_id, email_address, push_actions, reason ): """Send email regarding a user's room notifications""" rooms_in_order = deduped_ordered_list([pa["room_id"] for pa in push_actions]) - notif_events = yield self.store.get_events( + notif_events = await self.store.get_events( [pa["event_id"] for pa in push_actions] ) @@ -232,7 +226,7 @@ def send_notification_mail( state_by_room = {} try: - user_display_name = yield self.store.get_profile_displayname( + user_display_name = await self.store.get_profile_displayname( UserID.from_string(user_id).localpart ) if user_display_name is None: @@ -240,14 +234,13 @@ def send_notification_mail( except StoreError: user_display_name = user_id - @defer.inlineCallbacks - def _fetch_room_state(room_id): - room_state = yield self.store.get_current_state_ids(room_id) + async def _fetch_room_state(room_id): + room_state = await self.store.get_current_state_ids(room_id) state_by_room[room_id] = room_state # Run at most 3 of these at once: sync does 10 at a time but email # notifs are much less realtime than sync so we can afford to wait a bit. - yield concurrently_execute(_fetch_room_state, rooms_in_order, 3) + await concurrently_execute(_fetch_room_state, rooms_in_order, 3) # actually sort our so-called rooms_in_order list, most recent room first rooms_in_order.sort(key=lambda r: -(notifs_by_room[r][-1]["received_ts"] or 0)) @@ -255,19 +248,19 @@ def _fetch_room_state(room_id): rooms = [] for r in rooms_in_order: - roomvars = yield self.get_room_vars( + roomvars = await self.get_room_vars( r, user_id, notifs_by_room[r], notif_events, state_by_room[r] ) rooms.append(roomvars) - reason["room_name"] = yield calculate_room_name( + reason["room_name"] = await calculate_room_name( self.store, state_by_room[reason["room_id"]], user_id, fallback_to_members=True, ) - summary_text = yield self.make_summary_text( + summary_text = await self.make_summary_text( notifs_by_room, state_by_room, notif_events, user_id, reason ) @@ -282,12 +275,11 @@ def _fetch_room_state(room_id): "reason": reason, } - yield self.send_email( + await self.send_email( email_address, "[%s] %s" % (self.app_name, summary_text), template_vars ) - @defer.inlineCallbacks - def send_email(self, email_address, subject, template_vars): + async def send_email(self, email_address, subject, template_vars): """Send an email with the given information and template text""" try: from_string = self.hs.config.email_notif_from % {"app": self.app_name} @@ -317,7 +309,7 @@ def send_email(self, email_address, subject, template_vars): logger.info("Sending email to %s" % email_address) - yield make_deferred_yieldable( + await make_deferred_yieldable( self.sendmail( self.hs.config.email_smtp_host, raw_from, @@ -332,13 +324,14 @@ def send_email(self, email_address, subject, template_vars): ) ) - @defer.inlineCallbacks - def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state_ids): + async def get_room_vars( + self, room_id, user_id, notifs, notif_events, room_state_ids + ): my_member_event_id = room_state_ids[("m.room.member", user_id)] - my_member_event = yield self.store.get_event(my_member_event_id) + my_member_event = await self.store.get_event(my_member_event_id) is_invite = my_member_event.content["membership"] == "invite" - room_name = yield calculate_room_name(self.store, room_state_ids, user_id) + room_name = await calculate_room_name(self.store, room_state_ids, user_id) room_vars = { "title": room_name, @@ -350,7 +343,7 @@ def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state_ids): if not is_invite: for n in notifs: - notifvars = yield self.get_notif_vars( + notifvars = await self.get_notif_vars( n, user_id, notif_events[n["event_id"]], room_state_ids ) @@ -377,9 +370,8 @@ def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state_ids): return room_vars - @defer.inlineCallbacks - def get_notif_vars(self, notif, user_id, notif_event, room_state_ids): - results = yield self.store.get_events_around( + async def get_notif_vars(self, notif, user_id, notif_event, room_state_ids): + results = await self.store.get_events_around( notif["room_id"], notif["event_id"], before_limit=CONTEXT_BEFORE, @@ -392,25 +384,24 @@ def get_notif_vars(self, notif, user_id, notif_event, room_state_ids): "messages": [], } - the_events = yield filter_events_for_client( + the_events = await filter_events_for_client( self.storage, user_id, results["events_before"] ) the_events.append(notif_event) for event in the_events: - messagevars = yield self.get_message_vars(notif, event, room_state_ids) + messagevars = await self.get_message_vars(notif, event, room_state_ids) if messagevars is not None: ret["messages"].append(messagevars) return ret - @defer.inlineCallbacks - def get_message_vars(self, notif, event, room_state_ids): + async def get_message_vars(self, notif, event, room_state_ids): if event.type != EventTypes.Message: return sender_state_event_id = room_state_ids[("m.room.member", event.sender)] - sender_state_event = yield self.store.get_event(sender_state_event_id) + sender_state_event = await self.store.get_event(sender_state_event_id) sender_name = name_from_member_event(sender_state_event) sender_avatar_url = sender_state_event.content.get("avatar_url") @@ -460,8 +451,7 @@ def add_image_message_vars(self, messagevars, event): return messagevars - @defer.inlineCallbacks - def make_summary_text( + async def make_summary_text( self, notifs_by_room, room_state_ids, notif_events, user_id, reason ): if len(notifs_by_room) == 1: @@ -471,17 +461,17 @@ def make_summary_text( # If the room has some kind of name, use it, but we don't # want the generated-from-names one here otherwise we'll # end up with, "new message from Bob in the Bob room" - room_name = yield calculate_room_name( + room_name = await calculate_room_name( self.store, room_state_ids[room_id], user_id, fallback_to_members=False ) my_member_event_id = room_state_ids[room_id][("m.room.member", user_id)] - my_member_event = yield self.store.get_event(my_member_event_id) + my_member_event = await self.store.get_event(my_member_event_id) if my_member_event.content["membership"] == "invite": inviter_member_event_id = room_state_ids[room_id][ ("m.room.member", my_member_event.sender) ] - inviter_member_event = yield self.store.get_event( + inviter_member_event = await self.store.get_event( inviter_member_event_id ) inviter_name = name_from_member_event(inviter_member_event) @@ -506,7 +496,7 @@ def make_summary_text( state_event_id = room_state_ids[room_id][ ("m.room.member", event.sender) ] - state_event = yield self.store.get_event(state_event_id) + state_event = await self.store.get_event(state_event_id) sender_name = name_from_member_event(state_event) if sender_name is not None and room_name is not None: @@ -535,7 +525,7 @@ def make_summary_text( } ) - member_events = yield self.store.get_events( + member_events = await self.store.get_events( [ room_state_ids[room_id][("m.room.member", s)] for s in sender_ids @@ -567,7 +557,7 @@ def make_summary_text( } ) - member_events = yield self.store.get_events( + member_events = await self.store.get_events( [room_state_ids[room_id][("m.room.member", s)] for s in sender_ids] ) diff --git a/tests/rest/client/v2_alpha/test_account.py b/tests/rest/client/v2_alpha/test_account.py index 0d6936fd36e2..3ab611f6184d 100644 --- a/tests/rest/client/v2_alpha/test_account.py +++ b/tests/rest/client/v2_alpha/test_account.py @@ -46,7 +46,7 @@ def make_homeserver(self, reactor, clock): # Email config. self.email_attempts = [] - def sendmail(smtphost, from_addr, to_addrs, msg, **kwargs): + async def sendmail(smtphost, from_addr, to_addrs, msg, **kwargs): self.email_attempts.append(msg) return @@ -358,7 +358,7 @@ def make_homeserver(self, reactor, clock): # Email config. self.email_attempts = [] - def sendmail(smtphost, from_addr, to_addrs, msg, **kwargs): + async def sendmail(smtphost, from_addr, to_addrs, msg, **kwargs): self.email_attempts.append(msg) config["email"] = { From 1531b214fc57714c14046a8f66c7b5fe5ec5dcdd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2020 14:21:54 +0100 Subject: [PATCH 28/28] Add ability to wait for replication streams (#7542) The idea here is that if an instance persists an event via the replication HTTP API it can return before we receive that event over replication, which can lead to races where code assumes that persisting an event immediately updates various caches (e.g. current state of the room). Most of Synapse doesn't hit such races, so we don't do the waiting automagically, instead we do so where necessary to avoid unnecessary delays. We may decide to change our minds here if it turns out there are a lot of subtle races going on. People probably want to look at this commit by commit. --- changelog.d/7542.misc | 1 + synapse/handlers/federation.py | 33 ++++--- synapse/handlers/message.py | 36 +++++--- synapse/handlers/room.py | 65 +++++++++----- synapse/handlers/room_member.py | 65 +++++++++----- synapse/handlers/room_member_worker.py | 11 +-- synapse/replication/http/federation.py | 13 ++- synapse/replication/http/membership.py | 14 +-- synapse/replication/http/send_event.py | 4 +- synapse/replication/http/streams.py | 5 +- synapse/replication/tcp/client.py | 90 ++++++++++++++++++- synapse/rest/admin/rooms.py | 10 ++- synapse/rest/client/v1/room.py | 20 +++-- synapse/rest/client/v2_alpha/relations.py | 2 +- synapse/server.py | 5 ++ synapse/server.pyi | 5 ++ .../server_notices/server_notices_manager.py | 6 +- .../storage/data_stores/main/events_worker.py | 6 +- .../storage/data_stores/main/roommember.py | 2 + tests/federation/test_complexity.py | 8 +- tests/handlers/test_typing.py | 5 +- tests/storage/test_cleanup_extrems.py | 4 +- tests/storage/test_event_metrics.py | 2 +- tests/test_federation.py | 4 +- 24 files changed, 304 insertions(+), 112 deletions(-) create mode 100644 changelog.d/7542.misc diff --git a/changelog.d/7542.misc b/changelog.d/7542.misc new file mode 100644 index 000000000000..7dd9b4823b50 --- /dev/null +++ b/changelog.d/7542.misc @@ -0,0 +1 @@ +Add ability to wait for replication streams. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bb03cc9adde4..e354c803dbf9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -126,6 +126,7 @@ def __init__(self, hs): self._server_notices_mxid = hs.config.server_notices_mxid self.config = hs.config self.http_client = hs.get_simple_http_client() + self._replication = hs.get_replication_data_handler() self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client( hs @@ -1221,7 +1222,7 @@ async def on_event_auth(self, event_id: str) -> List[EventBase]: async def do_invite_join( self, target_hosts: Iterable[str], room_id: str, joinee: str, content: JsonDict - ) -> None: + ) -> Tuple[str, int]: """ Attempts to join the `joinee` to the room `room_id` via the servers contained in `target_hosts`. @@ -1304,15 +1305,23 @@ async def do_invite_join( room_id=room_id, room_version=room_version_obj, ) - await self._persist_auth_tree( + max_stream_id = await self._persist_auth_tree( origin, auth_chain, state, event, room_version_obj ) + # We wait here until this instance has seen the events come down + # replication (if we're using replication) as the below uses caches. + # + # TODO: Currently the events stream is written to from master + await self._replication.wait_for_stream_position( + "master", "events", max_stream_id + ) + # Check whether this room is the result of an upgrade of a room we already know # about. If so, migrate over user information predecessor = await self.store.get_room_predecessor(room_id) if not predecessor or not isinstance(predecessor.get("room_id"), str): - return + return event.event_id, max_stream_id old_room_id = predecessor["room_id"] logger.debug( "Found predecessor for %s during remote join: %s", room_id, old_room_id @@ -1325,6 +1334,7 @@ async def do_invite_join( ) logger.debug("Finished joining %s to %s", joinee, room_id) + return event.event_id, max_stream_id finally: room_queue = self.room_queues[room_id] del self.room_queues[room_id] @@ -1554,7 +1564,7 @@ async def on_invite_request( async def do_remotely_reject_invite( self, target_hosts: Iterable[str], room_id: str, user_id: str, content: JsonDict - ) -> EventBase: + ) -> Tuple[EventBase, int]: origin, event, room_version = await self._make_and_verify_event( target_hosts, room_id, user_id, "leave", content=content ) @@ -1574,9 +1584,9 @@ async def do_remotely_reject_invite( await self.federation_client.send_leave(target_hosts, event) context = await self.state_handler.compute_event_context(event) - await self.persist_events_and_notify([(event, context)]) + stream_id = await self.persist_events_and_notify([(event, context)]) - return event + return event, stream_id async def _make_and_verify_event( self, @@ -1888,7 +1898,7 @@ async def _persist_auth_tree( state: List[EventBase], event: EventBase, room_version: RoomVersion, - ) -> None: + ) -> int: """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. Persists the event separately. Notifies about the persisted events @@ -1982,7 +1992,7 @@ async def _persist_auth_tree( event, old_state=state ) - await self.persist_events_and_notify([(event, new_event_context)]) + return await self.persist_events_and_notify([(event, new_event_context)]) async def _prep_event( self, @@ -2835,7 +2845,7 @@ async def persist_events_and_notify( self, event_and_contexts: Sequence[Tuple[EventBase, EventContext]], backfilled: bool = False, - ) -> None: + ) -> int: """Persists events and tells the notifier/pushers about them, if necessary. @@ -2845,11 +2855,12 @@ async def persist_events_and_notify( backfilling or not """ if self.config.worker_app: - await self._send_events_to_master( + result = await self._send_events_to_master( store=self.store, event_and_contexts=event_and_contexts, backfilled=backfilled, ) + return result["max_stream_id"] else: max_stream_id = await self.storage.persistence.persist_events( event_and_contexts, backfilled=backfilled @@ -2864,6 +2875,8 @@ async def persist_events_and_notify( for event, _ in event_and_contexts: await self._notify_persisted_event(event, max_stream_id) + return max_stream_id + async def _notify_persisted_event( self, event: EventBase, max_stream_id: int ) -> None: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8f362896a2a7..f445e2aa2a1a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Optional +from typing import Optional, Tuple from six import iteritems, itervalues, string_types @@ -42,6 +42,7 @@ ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.api.urls import ConsentURIBuilder +from synapse.events import EventBase from synapse.events.validator import EventValidator from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process @@ -630,7 +631,9 @@ def assert_accepted_privacy_policy(self, requester): msg = self._block_events_without_consent_error % {"consent_uri": consent_uri} raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri) - async def send_nonmember_event(self, requester, event, context, ratelimit=True): + async def send_nonmember_event( + self, requester, event, context, ratelimit=True + ) -> int: """ Persists and notifies local clients and federation of an event. @@ -639,6 +642,9 @@ async def send_nonmember_event(self, requester, event, context, ratelimit=True): context (Context) the context of the event. ratelimit (bool): Whether to rate limit this send. is_guest (bool): Whether the sender is a guest. + + Return: + The stream_id of the persisted event. """ if event.type == EventTypes.Member: raise SynapseError( @@ -659,7 +665,7 @@ async def send_nonmember_event(self, requester, event, context, ratelimit=True): ) return prev_state - await self.handle_new_client_event( + return await self.handle_new_client_event( requester=requester, event=event, context=context, ratelimit=ratelimit ) @@ -688,7 +694,7 @@ def deduplicate_state_event(self, event, context): async def create_and_send_nonmember_event( self, requester, event_dict, ratelimit=True, txn_id=None - ): + ) -> Tuple[EventBase, int]: """ Creates an event, then sends it. @@ -711,10 +717,10 @@ async def create_and_send_nonmember_event( spam_error = "Spam is not permitted here" raise SynapseError(403, spam_error, Codes.FORBIDDEN) - await self.send_nonmember_event( + stream_id = await self.send_nonmember_event( requester, event, context, ratelimit=ratelimit ) - return event + return event, stream_id @measure_func("create_new_client_event") @defer.inlineCallbacks @@ -774,7 +780,7 @@ def create_new_client_event( @measure_func("handle_new_client_event") async def handle_new_client_event( self, requester, event, context, ratelimit=True, extra_users=[] - ): + ) -> int: """Processes a new event. This includes checking auth, persisting it, notifying users, sending to remote servers, etc. @@ -787,6 +793,9 @@ async def handle_new_client_event( context (EventContext) ratelimit (bool) extra_users (list(UserID)): Any extra users to notify about event + + Return: + The stream_id of the persisted event. """ if event.is_state() and (event.type, event.state_key) == ( @@ -827,7 +836,7 @@ async def handle_new_client_event( try: # If we're a worker we need to hit out to the master. if self.config.worker_app: - await self.send_event_to_master( + result = await self.send_event_to_master( event_id=event.event_id, store=self.store, requester=requester, @@ -836,14 +845,17 @@ async def handle_new_client_event( ratelimit=ratelimit, extra_users=extra_users, ) + stream_id = result["stream_id"] + event.internal_metadata.stream_ordering = stream_id success = True - return + return stream_id - await self.persist_and_notify_client_event( + stream_id = await self.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) success = True + return stream_id finally: if not success: # Ensure that we actually remove the entries in the push actions @@ -886,7 +898,7 @@ def _validate_canonical_alias( async def persist_and_notify_client_event( self, requester, event, context, ratelimit=True, extra_users=[] - ): + ) -> int: """Called when we have fully built the event, have already calculated the push actions for the event, and checked auth. @@ -1076,6 +1088,8 @@ def _notify(): # matters as sometimes presence code can take a while. run_in_background(self._bump_active_time, requester.user) + return event_stream_id + async def _bump_active_time(self, user): try: presence = self.hs.get_presence_handler() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 13850ba67261..2698a129cac9 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,6 +22,7 @@ import math import string from collections import OrderedDict +from typing import Tuple from six import iteritems, string_types @@ -518,7 +519,7 @@ async def _move_aliases_to_new_room( async def create_room( self, requester, config, ratelimit=True, creator_join_profile=None - ): + ) -> Tuple[dict, int]: """ Creates a new room. Args: @@ -535,9 +536,9 @@ async def create_room( `avatar_url` and/or `displayname`. Returns: - Deferred[dict]: - a dict containing the keys `room_id` and, if an alias was - requested, `room_alias`. + First, a dict containing the keys `room_id` and, if an alias + was, requested, `room_alias`. Secondly, the stream_id of the + last persisted event. Raises: SynapseError if the room ID couldn't be stored, or something went horribly wrong. @@ -669,7 +670,7 @@ async def create_room( # override any attempt to set room versions via the creation_content creation_content["room_version"] = room_version.identifier - await self._send_events_for_new_room( + last_stream_id = await self._send_events_for_new_room( requester, room_id, preset_config=preset_config, @@ -683,7 +684,10 @@ async def create_room( if "name" in config: name = config["name"] - await self.event_creation_handler.create_and_send_nonmember_event( + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Name, @@ -697,7 +701,10 @@ async def create_room( if "topic" in config: topic = config["topic"] - await self.event_creation_handler.create_and_send_nonmember_event( + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Topic, @@ -715,7 +722,7 @@ async def create_room( if is_direct: content["is_direct"] = is_direct - await self.room_member_handler.update_membership( + _, last_stream_id = await self.room_member_handler.update_membership( requester, UserID.from_string(invitee), room_id, @@ -729,7 +736,7 @@ async def create_room( id_access_token = invite_3pid.get("id_access_token") # optional address = invite_3pid["address"] medium = invite_3pid["medium"] - await self.hs.get_room_member_handler().do_3pid_invite( + last_stream_id = await self.hs.get_room_member_handler().do_3pid_invite( room_id, requester.user, medium, @@ -745,7 +752,7 @@ async def create_room( if room_alias: result["room_alias"] = room_alias.to_string() - return result + return result, last_stream_id async def _send_events_for_new_room( self, @@ -758,7 +765,13 @@ async def _send_events_for_new_room( room_alias=None, power_level_content_override=None, # Doesn't apply when initial state has power level state event content creator_join_profile=None, - ): + ) -> int: + """Sends the initial events into a new room. + + Returns: + The stream_id of the last event persisted. + """ + def create(etype, content, **kwargs): e = {"type": etype, "content": content} @@ -767,12 +780,16 @@ def create(etype, content, **kwargs): return e - async def send(etype, content, **kwargs): + async def send(etype, content, **kwargs) -> int: event = create(etype, content, **kwargs) logger.debug("Sending %s in new room", etype) - await self.event_creation_handler.create_and_send_nonmember_event( + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( creator, event, ratelimit=False ) + return last_stream_id config = RoomCreationHandler.PRESETS_DICT[preset_config] @@ -797,7 +814,9 @@ async def send(etype, content, **kwargs): # of the first events that get sent into a room. pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None) if pl_content is not None: - await send(etype=EventTypes.PowerLevels, content=pl_content) + last_sent_stream_id = await send( + etype=EventTypes.PowerLevels, content=pl_content + ) else: power_level_content = { "users": {creator_id: 100}, @@ -830,33 +849,39 @@ async def send(etype, content, **kwargs): if power_level_content_override: power_level_content.update(power_level_content_override) - await send(etype=EventTypes.PowerLevels, content=power_level_content) + last_sent_stream_id = await send( + etype=EventTypes.PowerLevels, content=power_level_content + ) if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.CanonicalAlias, content={"alias": room_alias.to_string()}, ) if (EventTypes.JoinRules, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]} ) if (EventTypes.RoomHistoryVisibility, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.RoomHistoryVisibility, content={"history_visibility": config["history_visibility"]}, ) if config["guest_can_join"]: if (EventTypes.GuestAccess, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.GuestAccess, content={"guest_access": "can_join"} ) for (etype, state_key), content in initial_state.items(): - await send(etype=etype, state_key=state_key, content=content) + last_sent_stream_id = await send( + etype=etype, state_key=state_key, content=content + ) + + return last_sent_stream_id async def _generate_room_id( self, creator_id: str, is_public: str, room_version: RoomVersion, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index e51e1c32fed0..691b6705b261 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -17,7 +17,7 @@ import abc import logging -from typing import Dict, Iterable, List, Optional, Tuple, Union +from typing import Dict, Iterable, List, Optional, Tuple from six.moves import http_client @@ -84,7 +84,7 @@ async def _remote_join( room_id: str, user: UserID, content: dict, - ) -> Optional[dict]: + ) -> Tuple[str, int]: """Try and join a room that this server is not in Args: @@ -104,7 +104,7 @@ async def _remote_reject_invite( room_id: str, target: UserID, content: dict, - ) -> dict: + ) -> Tuple[Optional[str], int]: """Attempt to reject an invite for a room this server is not in. If we fail to do so we locally mark the invite as rejected. @@ -154,7 +154,7 @@ async def _local_membership_update( ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, - ) -> EventBase: + ) -> Tuple[str, int]: user_id = target.to_string() if content is None: @@ -187,9 +187,10 @@ async def _local_membership_update( ) if duplicate is not None: # Discard the new event since this membership change is a no-op. - return duplicate + _, stream_id = await self.store.get_event_ordering(duplicate.event_id) + return duplicate.event_id, stream_id - await self.event_creation_handler.handle_new_client_event( + stream_id = await self.event_creation_handler.handle_new_client_event( requester, event, context, extra_users=[target], ratelimit=ratelimit ) @@ -213,7 +214,7 @@ async def _local_membership_update( if prev_member_event.membership == Membership.JOIN: await self._user_left_room(target, room_id) - return event + return event.event_id, stream_id async def copy_room_tags_and_direct_to_room( self, old_room_id, new_room_id, user_id @@ -263,7 +264,7 @@ async def update_membership( ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, - ) -> Union[EventBase, Optional[dict]]: + ) -> Tuple[Optional[str], int]: key = (room_id,) with (await self.member_linearizer.queue(key)): @@ -294,7 +295,7 @@ async def _update_membership( ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, - ) -> Union[EventBase, Optional[dict]]: + ) -> Tuple[Optional[str], int]: content_specified = bool(content) if content is None: content = {} @@ -398,7 +399,13 @@ async def _update_membership( same_membership = old_membership == effective_membership_state same_sender = requester.user.to_string() == old_state.sender if same_sender and same_membership and same_content: - return old_state + _, stream_id = await self.store.get_event_ordering( + old_state.event_id + ) + return ( + old_state.event_id, + stream_id, + ) if old_membership in ["ban", "leave"] and action == "kick": raise AuthError(403, "The target user is not in the room") @@ -705,7 +712,7 @@ async def do_3pid_invite( requester: Requester, txn_id: Optional[str], id_access_token: Optional[str] = None, - ) -> None: + ) -> int: if self.config.block_non_admin_invites: is_requester_admin = await self.auth.is_server_admin(requester.user) if not is_requester_admin: @@ -737,11 +744,11 @@ async def do_3pid_invite( ) if invitee: - await self.update_membership( + _, stream_id = await self.update_membership( requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id ) else: - await self._make_and_store_3pid_invite( + stream_id = await self._make_and_store_3pid_invite( requester, id_server, medium, @@ -752,6 +759,8 @@ async def do_3pid_invite( id_access_token=id_access_token, ) + return stream_id + async def _make_and_store_3pid_invite( self, requester: Requester, @@ -762,7 +771,7 @@ async def _make_and_store_3pid_invite( user: UserID, txn_id: Optional[str], id_access_token: Optional[str] = None, - ) -> None: + ) -> int: room_state = await self.state_handler.get_current_state(room_id) inviter_display_name = "" @@ -817,7 +826,10 @@ async def _make_and_store_3pid_invite( id_access_token=id_access_token, ) - await self.event_creation_handler.create_and_send_nonmember_event( + ( + event, + stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.ThirdPartyInvite, @@ -835,6 +847,7 @@ async def _make_and_store_3pid_invite( ratelimit=False, txn_id=txn_id, ) + return stream_id async def _is_host_in_room( self, current_state_ids: Dict[Tuple[str, str], str] @@ -916,7 +929,7 @@ async def _remote_join( room_id: str, user: UserID, content: dict, - ) -> None: + ) -> Tuple[str, int]: """Implements RoomMemberHandler._remote_join """ # filter ourselves out of remote_room_hosts: do_invite_join ignores it @@ -945,7 +958,7 @@ async def _remote_join( # join dance for now, since we're kinda implicitly checking # that we are allowed to join when we decide whether or not we # need to do the invite/join dance. - await self.federation_handler.do_invite_join( + event_id, stream_id = await self.federation_handler.do_invite_join( remote_room_hosts, room_id, user.to_string(), content ) await self._user_joined_room(user, room_id) @@ -955,14 +968,14 @@ async def _remote_join( if self.hs.config.limit_remote_rooms.enabled: if too_complex is False: # We checked, and we're under the limit. - return + return event_id, stream_id # Check again, but with the local state events too_complex = await self._is_local_room_too_complex(room_id) if too_complex is False: # We're under the limit. - return + return event_id, stream_id # The room is too large. Leave. requester = types.create_requester(user, None, False, None) @@ -975,6 +988,8 @@ async def _remote_join( errcode=Codes.RESOURCE_LIMIT_EXCEEDED, ) + return event_id, stream_id + async def _remote_reject_invite( self, requester: Requester, @@ -982,15 +997,15 @@ async def _remote_reject_invite( room_id: str, target: UserID, content: dict, - ) -> dict: + ) -> Tuple[Optional[str], int]: """Implements RoomMemberHandler._remote_reject_invite """ fed_handler = self.federation_handler try: - ret = await fed_handler.do_remotely_reject_invite( + event, stream_id = await fed_handler.do_remotely_reject_invite( remote_room_hosts, room_id, target.to_string(), content=content, ) - return ret + return event.event_id, stream_id except Exception as e: # if we were unable to reject the exception, just mark # it as rejected on our end and plough ahead. @@ -1000,8 +1015,10 @@ async def _remote_reject_invite( # logger.warning("Failed to reject invite: %s", e) - await self.store.locally_reject_invite(target.to_string(), room_id) - return {} + stream_id = await self.store.locally_reject_invite( + target.to_string(), room_id + ) + return None, stream_id async def _user_joined_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_joined_room diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 5c776cc0be0d..02e0c4103d9b 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import List, Optional +from typing import List, Optional, Tuple from synapse.api.errors import SynapseError from synapse.handlers.room_member import RoomMemberHandler @@ -43,7 +43,7 @@ async def _remote_join( room_id: str, user: UserID, content: dict, - ) -> Optional[dict]: + ) -> Tuple[str, int]: """Implements RoomMemberHandler._remote_join """ if len(remote_room_hosts) == 0: @@ -59,7 +59,7 @@ async def _remote_join( await self._user_joined_room(user, room_id) - return ret + return ret["event_id"], ret["stream_id"] async def _remote_reject_invite( self, @@ -68,16 +68,17 @@ async def _remote_reject_invite( room_id: str, target: UserID, content: dict, - ) -> dict: + ) -> Tuple[Optional[str], int]: """Implements RoomMemberHandler._remote_reject_invite """ - return await self._remote_reject_client( + ret = await self._remote_reject_client( requester=requester, remote_room_hosts=remote_room_hosts, room_id=room_id, user_id=target.to_string(), content=content, ) + return ret["event_id"], ret["stream_id"] async def _user_joined_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_joined_room diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 7e23b565b9f0..c287c4e269f2 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -29,7 +29,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): """Handles events newly received from federation, including persisting and - notifying. + notifying. Returns the maximum stream ID of the persisted events. The API looks like: @@ -46,6 +46,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): "context": { .. serialized event context .. }, }], "backfilled": false + } + + 200 OK + + { + "max_stream_id": 32443, + } """ NAME = "fed_send_events" @@ -115,11 +122,11 @@ async def _handle_request(self, request): logger.info("Got %d events from federation", len(event_and_contexts)) - await self.federation_handler.persist_events_and_notify( + max_stream_id = await self.federation_handler.persist_events_and_notify( event_and_contexts, backfilled ) - return 200, {} + return 200, {"max_stream_id": max_stream_id} class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 3577611fd791..050fd345622d 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -76,11 +76,11 @@ async def _handle_request(self, request, room_id, user_id): logger.info("remote_join: %s into room: %s", user_id, room_id) - await self.federation_handler.do_invite_join( + event_id, stream_id = await self.federation_handler.do_invite_join( remote_room_hosts, room_id, user_id, event_content ) - return 200, {} + return 200, {"event_id": event_id, "stream_id": stream_id} class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): @@ -136,10 +136,10 @@ async def _handle_request(self, request, room_id, user_id): logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id) try: - event = await self.federation_handler.do_remotely_reject_invite( + event, stream_id = await self.federation_handler.do_remotely_reject_invite( remote_room_hosts, room_id, user_id, event_content, ) - ret = event.get_pdu_json() + event_id = event.event_id except Exception as e: # if we were unable to reject the exception, just mark # it as rejected on our end and plough ahead. @@ -149,10 +149,10 @@ async def _handle_request(self, request, room_id, user_id): # logger.warning("Failed to reject invite: %s", e) - await self.store.locally_reject_invite(user_id, room_id) - ret = {} + stream_id = await self.store.locally_reject_invite(user_id, room_id) + event_id = None - return 200, ret + return 200, {"event_id": event_id, "stream_id": stream_id} class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index b74b088ff4c2..c981723c1a66 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -119,11 +119,11 @@ async def _handle_request(self, request, event_id): "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) - await self.event_creation_handler.persist_and_notify_client_event( + stream_id = await self.event_creation_handler.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) - return 200, {} + return 200, {"stream_id": stream_id} def register_servlets(hs, http_server): diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py index b705a8e16c59..bde97eef328a 100644 --- a/synapse/replication/http/streams.py +++ b/synapse/replication/http/streams.py @@ -51,10 +51,7 @@ def __init__(self, hs): super().__init__(hs) self._instance_name = hs.get_instance_name() - - # We pull the streams from the replication handler (if we try and make - # them ourselves we end up in an import loop). - self.streams = hs.get_tcp_replication().get_streams() + self.streams = hs.get_replication_streams() @staticmethod def _serialize_payload(stream_name, from_token, upto_token): diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 28826302f5a9..508ad1b7209b 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,19 +14,23 @@ # limitations under the License. """A replication client for use by synapse workers. """ - +import heapq import logging -from typing import TYPE_CHECKING, Tuple +from typing import TYPE_CHECKING, Dict, List, Tuple +from twisted.internet.defer import Deferred from twisted.internet.protocol import ReconnectingClientFactory from synapse.api.constants import EventTypes +from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol from synapse.replication.tcp.streams.events import ( EventsStream, EventsStreamEventRow, EventsStreamRow, ) +from synapse.util.async_helpers import timeout_deferred +from synapse.util.metrics import Measure if TYPE_CHECKING: from synapse.server import HomeServer @@ -35,6 +39,10 @@ logger = logging.getLogger(__name__) +# How long we allow callers to wait for replication updates before timing out. +_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30 + + class DirectTcpReplicationClientFactory(ReconnectingClientFactory): """Factory for building connections to the master. Will reconnect if the connection is lost. @@ -92,6 +100,16 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() self.pusher_pool = hs.get_pusherpool() self.notifier = hs.get_notifier() + self._reactor = hs.get_reactor() + self._clock = hs.get_clock() + self._streams = hs.get_replication_streams() + self._instance_name = hs.get_instance_name() + + # Map from stream to list of deferreds waiting for the stream to + # arrive at a particular position. The lists are sorted by stream position. + self._streams_to_waiters = ( + {} + ) # type: Dict[str, List[Tuple[int, Deferred[None]]]] async def on_rdata( self, stream_name: str, instance_name: str, token: int, rows: list @@ -131,8 +149,76 @@ async def on_rdata( await self.pusher_pool.on_new_notifications(token, token) + # Notify any waiting deferreds. The list is ordered by position so we + # just iterate through the list until we reach a position that is + # greater than the received row position. + waiting_list = self._streams_to_waiters.get(stream_name, []) + + # Index of first item with a position after the current token, i.e we + # have called all deferreds before this index. If not overwritten by + # loop below means either a) no items in list so no-op or b) all items + # in list were called and so the list should be cleared. Setting it to + # `len(list)` works for both cases. + index_of_first_deferred_not_called = len(waiting_list) + + for idx, (position, deferred) in enumerate(waiting_list): + if position <= token: + try: + with PreserveLoggingContext(): + deferred.callback(None) + except Exception: + # The deferred has been cancelled or timed out. + pass + else: + # The list is sorted by position so we don't need to continue + # checking any futher entries in the list. + index_of_first_deferred_not_called = idx + break + + # Drop all entries in the waiting list that were called in the above + # loop. (This maintains the order so no need to resort) + waiting_list[:] = waiting_list[index_of_first_deferred_not_called:] + async def on_position(self, stream_name: str, instance_name: str, token: int): self.store.process_replication_rows(stream_name, instance_name, token, []) def on_remote_server_up(self, server: str): """Called when get a new REMOTE_SERVER_UP command.""" + + async def wait_for_stream_position( + self, instance_name: str, stream_name: str, position: int + ): + """Wait until this instance has received updates up to and including + the given stream position. + """ + + if instance_name == self._instance_name: + # We don't get told about updates written by this process, and + # anyway in that case we don't need to wait. + return + + current_position = self._streams[stream_name].current_token(self._instance_name) + if position <= current_position: + # We're already past the position + return + + # Create a new deferred that times out after N seconds, as we don't want + # to wedge here forever. + deferred = Deferred() + deferred = timeout_deferred( + deferred, _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS, self._reactor + ) + + waiting_list = self._streams_to_waiters.setdefault(stream_name, []) + + # We insert into the list using heapq as it is more efficient than + # pushing then resorting each time. + heapq.heappush(waiting_list, (position, deferred)) + + # We measure here to get in flight counts and average waiting time. + with Measure(self._clock, "repl.wait_for_stream_position"): + logger.info("Waiting for repl stream %r to reach %s", stream_name, position) + await make_deferred_yieldable(deferred) + logger.info( + "Finished waiting for repl stream %r to reach %s", stream_name, position + ) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 7d4000198866..0a13e1ed348c 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -59,6 +59,7 @@ def __init__(self, hs): self.event_creation_handler = hs.get_event_creation_handler() self.room_member_handler = hs.get_room_member_handler() self.auth = hs.get_auth() + self._replication = hs.get_replication_data_handler() async def on_POST(self, request, room_id): requester = await self.auth.get_user_by_req(request) @@ -73,7 +74,7 @@ async def on_POST(self, request, room_id): message = content.get("message", self.DEFAULT_MESSAGE) room_name = content.get("room_name", "Content Violation Notification") - info = await self._room_creation_handler.create_room( + info, stream_id = await self._room_creation_handler.create_room( room_creator_requester, config={ "preset": "public_chat", @@ -94,6 +95,13 @@ async def on_POST(self, request, room_id): # desirable in case the first attempt at blocking the room failed below. await self.store.block_room(room_id, requester_user_id) + # We now wait for the create room to come back in via replication so + # that we can assume that all the joins/invites have propogated before + # we try and auto join below. + # + # TODO: Currently the events stream is written to from master + await self._replication.wait_for_stream_position("master", "events", stream_id) + users = await self.state.get_current_users_in_room(room_id) kicked_users = [] failed_to_kick_users = [] diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 6b5830cc3f53..105e0cf4d28a 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -93,7 +93,7 @@ def on_PUT(self, request, txn_id): async def on_POST(self, request): requester = await self.auth.get_user_by_req(request) - info = await self._room_creation_handler.create_room( + info, _ = await self._room_creation_handler.create_room( requester, self.get_room_config(request) ) @@ -202,7 +202,7 @@ async def on_PUT(self, request, room_id, event_type, state_key, txn_id=None): if event_type == EventTypes.Member: membership = content.get("membership", None) - event = await self.room_member_handler.update_membership( + event_id, _ = await self.room_member_handler.update_membership( requester, target=UserID.from_string(state_key), room_id=room_id, @@ -210,14 +210,18 @@ async def on_PUT(self, request, room_id, event_type, state_key, txn_id=None): content=content, ) else: - event = await self.event_creation_handler.create_and_send_nonmember_event( + ( + event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, event_dict, txn_id=txn_id ) + event_id = event.event_id ret = {} # type: dict - if event: - set_tag("event_id", event.event_id) - ret = {"event_id": event.event_id} + if event_id: + set_tag("event_id", event_id) + ret = {"event_id": event_id} return 200, ret @@ -247,7 +251,7 @@ async def on_POST(self, request, room_id, event_type, txn_id=None): if b"ts" in request.args and requester.app_service: event_dict["origin_server_ts"] = parse_integer(request, "ts", 0) - event = await self.event_creation_handler.create_and_send_nonmember_event( + event, _ = await self.event_creation_handler.create_and_send_nonmember_event( requester, event_dict, txn_id=txn_id ) @@ -781,7 +785,7 @@ async def on_POST(self, request, room_id, event_id, txn_id=None): requester = await self.auth.get_user_by_req(request) content = parse_json_object_from_request(request) - event = await self.event_creation_handler.create_and_send_nonmember_event( + event, _ = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Redaction, diff --git a/synapse/rest/client/v2_alpha/relations.py b/synapse/rest/client/v2_alpha/relations.py index 63f07b63da0e..89002ffbffdd 100644 --- a/synapse/rest/client/v2_alpha/relations.py +++ b/synapse/rest/client/v2_alpha/relations.py @@ -111,7 +111,7 @@ async def on_PUT_or_POST( "sender": requester.user.to_string(), } - event = await self.event_creation_handler.create_and_send_nonmember_event( + event, _ = await self.event_creation_handler.create_and_send_nonmember_event( requester, event_dict=event_dict, txn_id=txn_id ) diff --git a/synapse/server.py b/synapse/server.py index c530f1aa1ad9..ca2deb49bbe4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -90,6 +90,7 @@ from synapse.replication.tcp.client import ReplicationDataHandler from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.resource import ReplicationStreamer +from synapse.replication.tcp.streams import STREAMS_MAP from synapse.rest.media.v1.media_repository import ( MediaRepository, MediaRepositoryResource, @@ -210,6 +211,7 @@ def build_DEPENDENCY(self) "storage", "replication_streamer", "replication_data_handler", + "replication_streams", ] REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"] @@ -583,6 +585,9 @@ def build_replication_streamer(self) -> ReplicationStreamer: def build_replication_data_handler(self): return ReplicationDataHandler(self) + def build_replication_streams(self): + return {stream.NAME: stream(self) for stream in STREAMS_MAP.values()} + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/server.pyi b/synapse/server.pyi index 9e7fad7e6e53..fe8024d2d4e6 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -1,3 +1,5 @@ +from typing import Dict + import twisted.internet import synapse.api.auth @@ -28,6 +30,7 @@ import synapse.server_notices.server_notices_sender import synapse.state import synapse.storage from synapse.events.builder import EventBuilderFactory +from synapse.replication.tcp.streams import Stream class HomeServer(object): @property @@ -136,3 +139,5 @@ class HomeServer(object): pass def get_pusherpool(self) -> synapse.push.pusherpool.PusherPool: pass + def get_replication_streams(self) -> Dict[str, Stream]: + pass diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py index 999c621b9277..bf2454c01cd8 100644 --- a/synapse/server_notices/server_notices_manager.py +++ b/synapse/server_notices/server_notices_manager.py @@ -83,10 +83,10 @@ async def send_notice( if state_key is not None: event_dict["state_key"] = state_key - res = await self._event_creation_handler.create_and_send_nonmember_event( + event, _ = await self._event_creation_handler.create_and_send_nonmember_event( requester, event_dict, ratelimit=False ) - return res + return event @cached() async def get_or_create_notice_room_for_user(self, user_id): @@ -143,7 +143,7 @@ async def get_or_create_notice_room_for_user(self, user_id): } requester = create_requester(self.server_notices_mxid) - info = await self._room_creation_handler.create_room( + info, _ = await self._room_creation_handler.create_room( requester, config={ "preset": RoomCreationPreset.PRIVATE_CHAT, diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 9130b74eb57c..b880a71782e3 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -1289,12 +1289,12 @@ def get_all_new_events_txn(txn): async def is_event_after(self, event_id1, event_id2): """Returns True if event_id1 is after event_id2 in the stream """ - to_1, so_1 = await self._get_event_ordering(event_id1) - to_2, so_2 = await self._get_event_ordering(event_id2) + to_1, so_1 = await self.get_event_ordering(event_id1) + to_2, so_2 = await self.get_event_ordering(event_id2) return (to_1, so_1) > (to_2, so_2) @cachedInlineCallbacks(max_entries=5000) - def _get_event_ordering(self, event_id): + def get_event_ordering(self, event_id): res = yield self.db.simple_select_one( table="events", retcols=["topological_ordering", "stream_ordering"], diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py index 1e9c85015274..7c5ca81ae046 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py @@ -1069,6 +1069,8 @@ def f(txn, stream_ordering): with self._stream_id_gen.get_next() as stream_ordering: yield self.db.runInteraction("locally_reject_invite", f, stream_ordering) + return stream_ordering + def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" diff --git a/tests/federation/test_complexity.py b/tests/federation/test_complexity.py index 94980733c4bd..0c9987be54e3 100644 --- a/tests/federation/test_complexity.py +++ b/tests/federation/test_complexity.py @@ -79,7 +79,9 @@ def test_join_too_large(self): # Mock out some things, because we don't want to test the whole join fed_transport.client.get_json = Mock(return_value=defer.succeed({"v1": 9999})) - handler.federation_handler.do_invite_join = Mock(return_value=defer.succeed(1)) + handler.federation_handler.do_invite_join = Mock( + return_value=defer.succeed(("", 1)) + ) d = handler._remote_join( None, @@ -115,7 +117,9 @@ def test_join_too_large_once_joined(self): # Mock out some things, because we don't want to test the whole join fed_transport.client.get_json = Mock(return_value=defer.succeed(None)) - handler.federation_handler.do_invite_join = Mock(return_value=defer.succeed(1)) + handler.federation_handler.do_invite_join = Mock( + return_value=defer.succeed(("", 1)) + ) # Artificially raise the complexity self.hs.get_datastore().get_current_state_event_counts = lambda x: defer.succeed( diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 51e2b37218ab..2fa8d4739b36 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -86,7 +86,10 @@ def make_homeserver(self, reactor, clock): reactor.pump((1000,)) hs = self.setup_test_homeserver( - notifier=Mock(), http_client=mock_federation_client, keyring=mock_keyring + notifier=Mock(), + http_client=mock_federation_client, + keyring=mock_keyring, + replication_streams={}, ) hs.datastores = datastores diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py index 0e04b2cf9274..43425c969a0d 100644 --- a/tests/storage/test_cleanup_extrems.py +++ b/tests/storage/test_cleanup_extrems.py @@ -39,7 +39,7 @@ def prepare(self, reactor, clock, homeserver): # Create a test user and room self.user = UserID("alice", "test") self.requester = Requester(self.user, None, False, None, None) - info = self.get_success(self.room_creator.create_room(self.requester, {})) + info, _ = self.get_success(self.room_creator.create_room(self.requester, {})) self.room_id = info["room_id"] def run_background_update(self): @@ -261,7 +261,7 @@ def prepare(self, reactor, clock, homeserver): self.user = UserID.from_string(self.register_user("user1", "password")) self.token1 = self.login("user1", "password") self.requester = Requester(self.user, None, False, None, None) - info = self.get_success(self.room_creator.create_room(self.requester, {})) + info, _ = self.get_success(self.room_creator.create_room(self.requester, {})) self.room_id = info["room_id"] self.event_creator = homeserver.get_event_creation_handler() homeserver.config.user_consent_version = self.CONSENT_VERSION diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py index a7b7fd36d3ca..a7b85004e5d3 100644 --- a/tests/storage/test_event_metrics.py +++ b/tests/storage/test_event_metrics.py @@ -33,7 +33,7 @@ def test_exposed_to_prometheus(self): events = [(3, 2), (6, 2), (4, 6)] for event_count, extrems in events: - info = self.get_success(room_creator.create_room(requester, {})) + info, _ = self.get_success(room_creator.create_room(requester, {})) room_id = info["room_id"] last_event = None diff --git a/tests/test_federation.py b/tests/test_federation.py index 13ff14863ea5..c5099dd039a2 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -28,13 +28,13 @@ def setUp(self): user_id = UserID("us", "test") our_user = Requester(user_id, None, False, None, None) room_creator = self.homeserver.get_room_creation_handler() - room = ensureDeferred( + room_deferred = ensureDeferred( room_creator.create_room( our_user, room_creator.PRESETS_DICT["public_chat"], ratelimit=False ) ) self.reactor.advance(0.1) - self.room_id = self.successResultOf(room)["room_id"] + self.room_id = self.successResultOf(room_deferred)[0]["room_id"] self.store = self.homeserver.get_datastore()