diff --git a/CHANGES.md b/CHANGES.md index 7dbe072fed11..225fced285a3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,20 +1,5 @@ -Synapse 1.13.0rc2 (2020-05-14) -============================== - -Bugfixes --------- - -- Fix a long-standing bug which could cause messages not to be sent over federation, when state events with state keys matching user IDs (such as custom user statuses) were received. ([\#7376](https://github.com/matrix-org/synapse/issues/7376)) -- Restore compatibility with non-compliant clients during the user interactive authentication process, fixing a problem introduced in v1.13.0rc1. ([\#7483](https://github.com/matrix-org/synapse/issues/7483)) - -Internal Changes ----------------- - -- Fix linting errors in new version of Flake8. ([\#7470](https://github.com/matrix-org/synapse/issues/7470)) - - -Synapse 1.13.0rc1 (2020-05-11) -============================== +Synapse 1.13.0 (2020-05-19) +=========================== This release brings some potential changes necessary for certain configurations of Synapse: @@ -34,6 +19,53 @@ configurations of Synapse: Please review [UPGRADE.rst](UPGRADE.rst) for more details on these changes and for general upgrade guidance. + +Notice of change to the default `git` branch for Synapse +-------------------------------------------------------- + +With the release of Synapse 1.13.0, the default `git` branch for Synapse has +changed to `develop`, which is the development tip. This is more consistent with +common practice and modern `git` usage. + +The `master` branch, which tracks the latest release, is still available. It is +recommended that developers and distributors who have scripts which run builds +using the default branch of Synapse should therefore consider pinning their +scripts to `master`. + + +Internal Changes +---------------- + +- Update the version of dh-virtualenv we use to build debs, and add focal to the list of target distributions. ([\#7526](https://github.com/matrix-org/synapse/issues/7526)) + + +Synapse 1.13.0rc3 (2020-05-18) +============================== + +Bugfixes +-------- + +- Hash passwords as early as possible during registration. ([\#7523](https://github.com/matrix-org/synapse/issues/7523)) + + +Synapse 1.13.0rc2 (2020-05-14) +============================== + +Bugfixes +-------- + +- Fix a long-standing bug which could cause messages not to be sent over federation, when state events with state keys matching user IDs (such as custom user statuses) were received. ([\#7376](https://github.com/matrix-org/synapse/issues/7376)) +- Restore compatibility with non-compliant clients during the user interactive authentication process, fixing a problem introduced in v1.13.0rc1. ([\#7483](https://github.com/matrix-org/synapse/issues/7483)) + +Internal Changes +---------------- + +- Fix linting errors in new version of Flake8. ([\#7470](https://github.com/matrix-org/synapse/issues/7470)) + + +Synapse 1.13.0rc1 (2020-05-11) +============================== + Features -------- diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3350e533dc8a..062413e92531 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -16,21 +16,10 @@ project on github, and then [create a pull request]( https://help.github.com/articles/using-pull-requests/) to ask us to pull your changes into our repo. -**The single biggest thing you need to know is: please base your changes on -the develop branch - *not* master.** - -We use the master branch to track the most recent release, so that folks who -blindly clone the repo and automatically check out master get something that -works. Develop is the unstable branch where all the development actually -happens: the workflow is that contributors should fork the develop branch to -make a 'feature' branch for a particular contribution, and then make a pull -request to merge this back into the matrix.org 'official' develop branch. We -use github's pull request workflow to review the contribution, and either ask -you to make any refinements needed or merge it and make them ourselves. The -changes will then land on master when we next do a release. - -Some other things you will need to know when contributing to Synapse: - +Some other points to follow: + + * Please base your changes on the `develop` branch. + * Please follow the [code style requirements](#code-style). * Please include a [changelog entry](#changelog) with each PR. diff --git a/changelog.d/6590.misc b/changelog.d/6590.misc new file mode 100644 index 000000000000..05a0156212f1 --- /dev/null +++ b/changelog.d/6590.misc @@ -0,0 +1 @@ +`synctl` now warns if it was unable to stop Synapse and will not attempt to start Synapse if nothing was stopped. Contributed by Romain Bouyé. diff --git a/changelog.d/7453.bugfix b/changelog.d/7453.bugfix new file mode 100644 index 000000000000..629a3d61e235 --- /dev/null +++ b/changelog.d/7453.bugfix @@ -0,0 +1 @@ +Fix a bug that would cause Synapse not to resync out-of-sync device lists. diff --git a/changelog.d/7497.bugfix b/changelog.d/7497.bugfix new file mode 100644 index 000000000000..3c7920cb1059 --- /dev/null +++ b/changelog.d/7497.bugfix @@ -0,0 +1 @@ +When sending `m.room.member` events, omit `displayname` and `avatar_url` if they aren't set instead of setting them to `null`. Contributed by Aaron Raimist. diff --git a/changelog.d/7528.doc b/changelog.d/7528.doc new file mode 100644 index 000000000000..6f2a783b50b0 --- /dev/null +++ b/changelog.d/7528.doc @@ -0,0 +1 @@ +Change the systemd worker service to check that the worker config file exists instead of silently failing. Contributed by David Vo. diff --git a/changelog.d/7533.doc b/changelog.d/7533.doc new file mode 100644 index 000000000000..e3c1df99faa6 --- /dev/null +++ b/changelog.d/7533.doc @@ -0,0 +1 @@ +Minor clarifications to the TURN docs. diff --git a/changelog.d/7536.misc b/changelog.d/7536.misc new file mode 100644 index 000000000000..c1211167fcc3 --- /dev/null +++ b/changelog.d/7536.misc @@ -0,0 +1 @@ +Synapse now exports [detailed allocator statistics](https://doc.pypy.org/en/latest/gc_info.html#gc-get-stats) and basic GC timings as Prometheus metrics (`pypy_gc_time_seconds_total` and `pypy_memory_bytes`) when run under PyPy. Contributed by Ivan Shapovalov. diff --git a/changelog.d/7538.bugfix b/changelog.d/7538.bugfix new file mode 100644 index 000000000000..4a614a9e61cc --- /dev/null +++ b/changelog.d/7538.bugfix @@ -0,0 +1 @@ + Hash passwords as early as possible during password reset. diff --git a/changelog.d/7539.misc b/changelog.d/7539.misc new file mode 100644 index 000000000000..93c030875a7e --- /dev/null +++ b/changelog.d/7539.misc @@ -0,0 +1 @@ +Remove Ubuntu Cosmic and Disco from the list of distributions which we provide `.deb`s for, due to end-of-life. diff --git a/changelog.d/7542.misc b/changelog.d/7542.misc new file mode 100644 index 000000000000..7dd9b4823b50 --- /dev/null +++ b/changelog.d/7542.misc @@ -0,0 +1 @@ +Add ability to wait for replication streams. diff --git a/changelog.d/7545.misc b/changelog.d/7545.misc new file mode 100644 index 000000000000..177ec883e206 --- /dev/null +++ b/changelog.d/7545.misc @@ -0,0 +1 @@ +Make worker processes return a stubbed-out response to `GET /presence` requests. diff --git a/changelog.d/7547.misc b/changelog.d/7547.misc new file mode 100644 index 000000000000..2cb8f9bd5b2c --- /dev/null +++ b/changelog.d/7547.misc @@ -0,0 +1 @@ +On upgrade room only send canonical alias once. diff --git a/changelog.d/7548.bugfix b/changelog.d/7548.bugfix new file mode 100644 index 000000000000..1233b3b31a12 --- /dev/null +++ b/changelog.d/7548.bugfix @@ -0,0 +1 @@ +Fix bug where a local user leaving a room could fail under rare circumstances. diff --git a/changelog.d/7550.misc b/changelog.d/7550.misc new file mode 100644 index 000000000000..79e119e977c5 --- /dev/null +++ b/changelog.d/7550.misc @@ -0,0 +1 @@ +Fix some indentation inconsistencies in the sample config. diff --git a/changelog.d/7552.bugfix b/changelog.d/7552.bugfix new file mode 100644 index 000000000000..60b31d6d3147 --- /dev/null +++ b/changelog.d/7552.bugfix @@ -0,0 +1 @@ +Fix "Missing RelayState parameter" error when using user interactive authentication with SAML for some SAML providers. diff --git a/changelog.d/7553.misc b/changelog.d/7553.misc new file mode 100644 index 000000000000..90b9e8693a9a --- /dev/null +++ b/changelog.d/7553.misc @@ -0,0 +1 @@ +Include `synapse.http.site` in type checking. diff --git a/changelog.d/7554.misc b/changelog.d/7554.misc new file mode 100644 index 000000000000..7c35c46aa69d --- /dev/null +++ b/changelog.d/7554.misc @@ -0,0 +1 @@ +Fix some test code to not mangle stacktraces, to make it easier to debug errors. diff --git a/changelog.d/7555.misc b/changelog.d/7555.misc new file mode 100644 index 000000000000..75a317613346 --- /dev/null +++ b/changelog.d/7555.misc @@ -0,0 +1 @@ +Refresh apt cache when building dh_virtualenv docker image. diff --git a/changelog.d/7556.misc b/changelog.d/7556.misc new file mode 100644 index 000000000000..ed271f9de883 --- /dev/null +++ b/changelog.d/7556.misc @@ -0,0 +1 @@ +Stop logging some expected HTTP request errors as exceptions. diff --git a/changelog.d/7557.misc b/changelog.d/7557.misc new file mode 100644 index 000000000000..c850a2bc0c1a --- /dev/null +++ b/changelog.d/7557.misc @@ -0,0 +1 @@ +Convert sending mail to async/await. diff --git a/debian/build_virtualenv b/debian/build_virtualenv index d892fd5c9d9a..4c9aabcac386 100755 --- a/debian/build_virtualenv +++ b/debian/build_virtualenv @@ -36,7 +36,6 @@ esac dh_virtualenv \ --install-suffix "matrix-synapse" \ --builtin-venv \ - --setuptools \ --python "$SNAKE" \ --upgrade-pip \ --preinstall="lxml" \ diff --git a/debian/changelog b/debian/changelog index 8641571986e9..e7842d417416 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,16 +1,18 @@ -<<<<<<< HEAD -matrix-synapse-py3 (1.12.3ubuntu1) UNRELEASED; urgency=medium +matrix-synapse-py3 (1.13.0) stable; urgency=medium + [ Patrick Cloke ] * Add information about .well-known files to Debian installation scripts. - -- Patrick Cloke Mon, 06 Apr 2020 10:10:38 -0400 -======= + [ Synapse Packaging team ] + * New synapse release 1.13.0. + + -- Synapse Packaging team Tue, 19 May 2020 09:16:56 -0400 + matrix-synapse-py3 (1.12.4) stable; urgency=medium * New synapse release 1.12.4. -- Synapse Packaging team Thu, 23 Apr 2020 10:58:14 -0400 ->>>>>>> master matrix-synapse-py3 (1.12.3) stable; urgency=medium diff --git a/docker/Dockerfile-dhvirtualenv b/docker/Dockerfile-dhvirtualenv index ac9ebcfd88c2..bf6af74104cc 100644 --- a/docker/Dockerfile-dhvirtualenv +++ b/docker/Dockerfile-dhvirtualenv @@ -27,15 +27,18 @@ RUN env DEBIAN_FRONTEND=noninteractive apt-get install \ wget # fetch and unpack the package -RUN wget -q -O /dh-virtuenv-1.1.tar.gz https://github.com/spotify/dh-virtualenv/archive/1.1.tar.gz -RUN tar xvf /dh-virtuenv-1.1.tar.gz +RUN mkdir /dh-virtualenv +RUN wget -q -O /dh-virtualenv.tar.gz https://github.com/matrix-org/dh-virtualenv/archive/matrixorg-20200519.tar.gz +RUN tar -xv --strip-components=1 -C /dh-virtualenv -f /dh-virtualenv.tar.gz -# install its build deps -RUN cd dh-virtualenv-1.1/ \ - && env DEBIAN_FRONTEND=noninteractive mk-build-deps -ri -t "apt-get -yqq --no-install-recommends" +# install its build deps. We do another apt-cache-update here, because we might +# be using a stale cache from docker build. +RUN apt-get update -qq -o Acquire::Languages=none \ + && cd /dh-virtualenv \ + && env DEBIAN_FRONTEND=noninteractive mk-build-deps -ri -t "apt-get -y --no-install-recommends" # build it -RUN cd dh-virtualenv-1.1 && dpkg-buildpackage -us -uc -b +RUN cd /dh-virtualenv && dpkg-buildpackage -us -uc -b ### ### Stage 1 @@ -68,12 +71,12 @@ RUN apt-get update -qq -o Acquire::Languages=none \ sqlite3 \ libpq-dev -COPY --from=builder /dh-virtualenv_1.1-1_all.deb / +COPY --from=builder /dh-virtualenv_1.2~dev-1_all.deb / # install dhvirtualenv. Update the apt cache again first, in case we got a # cached cache from docker the first time. RUN apt-get update -qq -o Acquire::Languages=none \ - && apt-get install -yq /dh-virtualenv_1.1-1_all.deb + && apt-get install -yq /dh-virtualenv_1.2~dev-1_all.deb WORKDIR /synapse/source ENTRYPOINT ["bash","/synapse/source/docker/build_debian.sh"] diff --git a/docs/admin_api/user_admin_api.rst b/docs/admin_api/user_admin_api.rst index 859d7f99e7c8..776e71ec0439 100644 --- a/docs/admin_api/user_admin_api.rst +++ b/docs/admin_api/user_admin_api.rst @@ -33,21 +33,22 @@ with a body of: including an ``access_token`` of a server admin. -The parameter ``displayname`` is optional and defaults to the value of -``user_id``. +Parameters: -The parameter ``threepids`` is optional and allows setting the third-party IDs -(email, msisdn) belonging to a user. +- ``password``, optional. If provided, the user's password is updated and all + devices are logged out. + +- ``displayname``, optional, defaults to the value of ``user_id``. -The parameter ``avatar_url`` is optional. Must be a [MXC -URI](https://matrix.org/docs/spec/client_server/r0.6.0#matrix-content-mxc-uris). +- ``threepids``, optional, allows setting the third-party IDs (email, msisdn) + belonging to a user. -The parameter ``admin`` is optional and defaults to ``false``. +- ``avatar_url``, optional, must be a + `MXC URI `_. -The parameter ``deactivated`` is optional and defaults to ``false``. +- ``admin``, optional, defaults to ``false``. -The parameter ``password`` is optional. If provided, the user's password is -updated and all devices are logged out. +- ``deactivated``, optional, defaults to ``false``. If the user already exists then optional parameters default to the current value. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 8a8415b9a290..0e1be153c718 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -322,22 +322,27 @@ listeners: # Used by phonehome stats to group together related servers. #server_context: context -# Resource-constrained homeserver Settings +# Resource-constrained homeserver settings # -# If limit_remote_rooms.enabled is True, the room complexity will be -# checked before a user joins a new remote room. If it is above -# limit_remote_rooms.complexity, it will disallow joining or -# instantly leave. +# When this is enabled, the room "complexity" will be checked before a user +# joins a new remote room. If it is above the complexity limit, the server will +# disallow joining, or will instantly leave. # -# limit_remote_rooms.complexity_error can be set to customise the text -# displayed to the user when a room above the complexity threshold has -# its join cancelled. +# Room complexity is an arbitrary measure based on factors such as the number of +# users in the room. # -# Uncomment the below lines to enable: -#limit_remote_rooms: -# enabled: true -# complexity: 1.0 -# complexity_error: "This room is too complex." +limit_remote_rooms: + # Uncomment to enable room complexity checking. + # + #enabled: true + + # the limit above which rooms cannot be joined. The default is 1.0. + # + #complexity: 0.5 + + # override the error which is returned when the room is too complex. + # + #complexity_error: "This room is too complex." # Whether to require a user to be in the room to add an alias to it. # Defaults to 'true'. @@ -942,25 +947,28 @@ url_preview_accept_language: ## Captcha ## -# See docs/CAPTCHA_SETUP for full details of configuring this. +# See docs/CAPTCHA_SETUP.md for full details of configuring this. -# This homeserver's ReCAPTCHA public key. +# This homeserver's ReCAPTCHA public key. Must be specified if +# enable_registration_captcha is enabled. # #recaptcha_public_key: "YOUR_PUBLIC_KEY" -# This homeserver's ReCAPTCHA private key. +# This homeserver's ReCAPTCHA private key. Must be specified if +# enable_registration_captcha is enabled. # #recaptcha_private_key: "YOUR_PRIVATE_KEY" -# Enables ReCaptcha checks when registering, preventing signup +# Uncomment to enable ReCaptcha checks when registering, preventing signup # unless a captcha is answered. Requires a valid ReCaptcha -# public/private key. +# public/private key. Defaults to 'false'. # -#enable_registration_captcha: false +#enable_registration_captcha: true # The API endpoint to use for verifying m.login.recaptcha responses. +# Defaults to "https://www.recaptcha.net/recaptcha/api/siteverify". # -#recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify" +#recaptcha_siteverify_api: "https://my.recaptcha.site" ## TURN ## @@ -1104,7 +1112,7 @@ account_validity: # If set, allows registration of standard or admin accounts by anyone who # has the shared secret, even if registration is otherwise disabled. # -# registration_shared_secret: +#registration_shared_secret: # Set the number of bcrypt rounds used to generate password hash. # Larger numbers increase the work factor needed to generate the hash. @@ -1237,7 +1245,8 @@ metrics_flags: #known_servers: true # Whether or not to report anonymized homeserver usage statistics. -# report_stats: true|false +# +#report_stats: true|false # The endpoint to report the anonymized homeserver usage statistics to. # Defaults to https://matrix.org/report-usage-stats/push @@ -1273,13 +1282,13 @@ metrics_flags: # the registration_shared_secret is used, if one is given; otherwise, # a secret key is derived from the signing key. # -# macaroon_secret_key: +#macaroon_secret_key: # a secret which is used to calculate HMACs for form values, to stop # falsification of values. Must be specified for the User Consent # forms to work. # -# form_secret: +#form_secret: ## Signing Keys ## @@ -1764,8 +1773,8 @@ email: # Username/password for authentication to the SMTP server. By default, no # authentication is attempted. # - # smtp_user: "exampleusername" - # smtp_pass: "examplepassword" + #smtp_user: "exampleusername" + #smtp_pass: "examplepassword" # Uncomment the following to require TLS transport security for SMTP. # By default, Synapse will connect over plain text, and will then switch to diff --git a/docs/systemd-with-workers/system/matrix-synapse-worker@.service b/docs/systemd-with-workers/system/matrix-synapse-worker@.service index 70589a7a51c1..39bc5e88e862 100644 --- a/docs/systemd-with-workers/system/matrix-synapse-worker@.service +++ b/docs/systemd-with-workers/system/matrix-synapse-worker@.service @@ -1,6 +1,6 @@ [Unit] Description=Synapse %i - +AssertPathExists=/etc/matrix-synapse/workers/%i.yaml # This service should be restarted when the synapse target is restarted. PartOf=matrix-synapse.target diff --git a/docs/turn-howto.md b/docs/turn-howto.md index b8a2ba3e82b1..d4a726be660d 100644 --- a/docs/turn-howto.md +++ b/docs/turn-howto.md @@ -18,7 +18,7 @@ For TURN relaying with `coturn` to work, it must be hosted on a server/endpoint Hosting TURN behind a NAT (even with appropriate port forwarding) is known to cause issues and to often not work. -## `coturn` Setup +## `coturn` setup ### Initial installation @@ -26,7 +26,13 @@ The TURN daemon `coturn` is available from a variety of sources such as native p #### Debian installation - # apt install coturn +Just install the debian package: + +```sh +apt install coturn +``` + +This will install and start a systemd service called `coturn`. #### Source installation @@ -63,38 +69,52 @@ The TURN daemon `coturn` is available from a variety of sources such as native p 1. Consider your security settings. TURN lets users request a relay which will connect to arbitrary IP addresses and ports. The following configuration is suggested as a minimum starting point: - + # VoIP traffic is all UDP. There is no reason to let users connect to arbitrary TCP endpoints via the relay. no-tcp-relay - + # don't let the relay ever try to connect to private IP address ranges within your network (if any) # given the turn server is likely behind your firewall, remember to include any privileged public IPs too. denied-peer-ip=10.0.0.0-10.255.255.255 denied-peer-ip=192.168.0.0-192.168.255.255 denied-peer-ip=172.16.0.0-172.31.255.255 - + # special case the turn server itself so that client->TURN->TURN->client flows work allowed-peer-ip=10.0.0.1 - + # consider whether you want to limit the quota of relayed streams per user (or total) to avoid risk of DoS. user-quota=12 # 4 streams per video call, so 12 streams = 3 simultaneous relayed calls per user. total-quota=1200 - Ideally coturn should refuse to relay traffic which isn't SRTP; see - +1. Also consider supporting TLS/DTLS. To do this, add the following settings + to `turnserver.conf`: + + # TLS certificates, including intermediate certs. + # For Let's Encrypt certificates, use `fullchain.pem` here. + cert=/path/to/fullchain.pem + + # TLS private key file + pkey=/path/to/privkey.pem 1. Ensure your firewall allows traffic into the TURN server on the ports - you've configured it to listen on (remember to allow both TCP and UDP TURN - traffic) + you've configured it to listen on (By default: 3478 and 5349 for the TURN(s) + traffic (remember to allow both TCP and UDP traffic), and ports 49152-65535 + for the UDP relay.) + +1. (Re)start the turn server: -1. If you've configured coturn to support TLS/DTLS, generate or import your - private key and certificate. + * If you used the Debian package (or have set up a systemd unit yourself): + ```sh + systemctl restart coturn + ``` -1. Start the turn server: + * If you installed from source: - bin/turnserver -o + ```sh + bin/turnserver -o + ``` -## synapse Setup +## Synapse setup Your home server configuration file needs the following extra keys: @@ -126,7 +146,14 @@ As an example, here is the relevant section of the config file for matrix.org: After updating the homeserver configuration, you must restart synapse: + * If you use synctl: + ```sh cd /where/you/run/synapse ./synctl restart + ``` + * If you use systemd: + ``` + systemctl restart synapse.service + ``` ..and your Home Server now supports VoIP relaying! diff --git a/scripts-dev/build_debian_packages b/scripts-dev/build_debian_packages index 84eaec6a9512..e6f4bd1dcadf 100755 --- a/scripts-dev/build_debian_packages +++ b/scripts-dev/build_debian_packages @@ -24,9 +24,8 @@ DISTS = ( "debian:sid", "ubuntu:xenial", "ubuntu:bionic", - "ubuntu:cosmic", - "ubuntu:disco", "ubuntu:eoan", + "ubuntu:focal", ) DESC = '''\ diff --git a/synapse/__init__.py b/synapse/__init__.py index 977e26a04828..0abf4911729d 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -36,7 +36,7 @@ except ImportError: pass -__version__ = "1.13.0rc2" +__version__ = "1.13.0" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 31b8c5dbf9c0..31cf106a34a5 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -26,7 +26,7 @@ import synapse import synapse.events -from synapse.api.errors import HttpResponseException, SynapseError +from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.api.urls import ( CLIENT_API_PREFIX, FEDERATION_PREFIX, @@ -145,31 +145,18 @@ class PresenceStatusStubServlet(RestServlet): """If presence is disabled this servlet can be used to stub out setting - presence status, while proxying the getters to the master instance. + presence status. """ PATTERNS = client_patterns("/presence/(?P[^/]*)/status") def __init__(self, hs): super(PresenceStatusStubServlet, self).__init__() - self.http_client = hs.get_simple_http_client() self.auth = hs.get_auth() - self.main_uri = hs.config.worker_main_http_uri async def on_GET(self, request, user_id): - # Pass through the auth headers, if any, in case the access token - # is there. - auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) - headers = {"Authorization": auth_headers} - - try: - result = await self.http_client.get_json( - self.main_uri + request.uri.decode("ascii"), headers=headers - ) - except HttpResponseException as e: - raise e.to_synapse_error() - - return 200, result + await self.auth.get_user_by_req(request) + return 200, {"presence": "offline"} async def on_PUT(self, request, user_id): await self.auth.get_user_by_req(request) @@ -223,9 +210,14 @@ async def on_POST(self, request, device_id): # is there. auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", []) headers = {"Authorization": auth_headers} - result = await self.http_client.post_json_get_json( - self.main_uri + request.uri.decode("ascii"), body, headers=headers - ) + try: + result = await self.http_client.post_json_get_json( + self.main_uri + request.uri.decode("ascii"), body, headers=headers + ) + except HttpResponseException as e: + raise e.to_synapse() from e + except RequestSendFailed as e: + raise SynapseError(502, "Failed to talk to master") from e return 200, result else: diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py index 56c87fa296cb..82f04d7966e2 100644 --- a/synapse/config/captcha.py +++ b/synapse/config/captcha.py @@ -32,23 +32,26 @@ def read_config(self, config, **kwargs): def generate_config_section(self, **kwargs): return """\ ## Captcha ## - # See docs/CAPTCHA_SETUP for full details of configuring this. + # See docs/CAPTCHA_SETUP.md for full details of configuring this. - # This homeserver's ReCAPTCHA public key. + # This homeserver's ReCAPTCHA public key. Must be specified if + # enable_registration_captcha is enabled. # #recaptcha_public_key: "YOUR_PUBLIC_KEY" - # This homeserver's ReCAPTCHA private key. + # This homeserver's ReCAPTCHA private key. Must be specified if + # enable_registration_captcha is enabled. # #recaptcha_private_key: "YOUR_PRIVATE_KEY" - # Enables ReCaptcha checks when registering, preventing signup + # Uncomment to enable ReCaptcha checks when registering, preventing signup # unless a captcha is answered. Requires a valid ReCaptcha - # public/private key. + # public/private key. Defaults to 'false'. # - #enable_registration_captcha: false + #enable_registration_captcha: true # The API endpoint to use for verifying m.login.recaptcha responses. + # Defaults to "https://www.recaptcha.net/recaptcha/api/siteverify". # - #recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify" + #recaptcha_siteverify_api: "https://my.recaptcha.site" """ diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index 76b8957ea502..ca61214454f8 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -311,8 +311,8 @@ def generate_config_section(self, config_dir_path, server_name, **kwargs): # Username/password for authentication to the SMTP server. By default, no # authentication is attempted. # - # smtp_user: "exampleusername" - # smtp_pass: "examplepassword" + #smtp_user: "exampleusername" + #smtp_pass: "examplepassword" # Uncomment the following to require TLS transport security for SMTP. # By default, Synapse will connect over plain text, and will then switch to diff --git a/synapse/config/key.py b/synapse/config/key.py index 066e7838c3b5..b529ea5da096 100644 --- a/synapse/config/key.py +++ b/synapse/config/key.py @@ -175,8 +175,8 @@ def generate_config_section( ) form_secret = 'form_secret: "%s"' % random_string_with_symbols(50) else: - macaroon_secret_key = "# macaroon_secret_key: " - form_secret = "# form_secret: " + macaroon_secret_key = "#macaroon_secret_key: " + form_secret = "#form_secret: " return ( """\ diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py index 6f517a71d092..6aad0d37c0cc 100644 --- a/synapse/config/metrics.py +++ b/synapse/config/metrics.py @@ -93,10 +93,11 @@ def generate_config_section(self, report_stats=None, **kwargs): #known_servers: true # Whether or not to report anonymized homeserver usage statistics. + # """ if report_stats is None: - res += "# report_stats: true|false\n" + res += "#report_stats: true|false\n" else: res += "report_stats: %s\n" % ("true" if report_stats else "false") diff --git a/synapse/config/registration.py b/synapse/config/registration.py index e7ea3a01cb87..a9aa8c3737f7 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -148,9 +148,7 @@ def generate_config_section(self, generate_secrets=False, **kwargs): random_string_with_symbols(50), ) else: - registration_shared_secret = ( - "# registration_shared_secret: " - ) + registration_shared_secret = "#registration_shared_secret: " return ( """\ diff --git a/synapse/config/server.py b/synapse/config/server.py index ed28da3deb9b..f57eefc99c14 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -434,7 +434,7 @@ class LimitRemoteRoomsConfig(object): ) self.limit_remote_rooms = LimitRemoteRoomsConfig( - **config.get("limit_remote_rooms", {}) + **(config.get("limit_remote_rooms") or {}) ) bind_port = config.get("bind_port") @@ -895,22 +895,27 @@ def generate_config_section( # Used by phonehome stats to group together related servers. #server_context: context - # Resource-constrained homeserver Settings + # Resource-constrained homeserver settings # - # If limit_remote_rooms.enabled is True, the room complexity will be - # checked before a user joins a new remote room. If it is above - # limit_remote_rooms.complexity, it will disallow joining or - # instantly leave. + # When this is enabled, the room "complexity" will be checked before a user + # joins a new remote room. If it is above the complexity limit, the server will + # disallow joining, or will instantly leave. # - # limit_remote_rooms.complexity_error can be set to customise the text - # displayed to the user when a room above the complexity threshold has - # its join cancelled. + # Room complexity is an arbitrary measure based on factors such as the number of + # users in the room. # - # Uncomment the below lines to enable: - #limit_remote_rooms: - # enabled: true - # complexity: 1.0 - # complexity_error: "This room is too complex." + limit_remote_rooms: + # Uncomment to enable room complexity checking. + # + #enabled: true + + # the limit above which rooms cannot be joined. The default is 1.0. + # + #complexity: 0.5 + + # override the error which is returned when the room is too complex. + # + #complexity_error: "This room is too complex." # Whether to require a user to be in the room to add an alias to it. # Defaults to 'true'. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 276a2b596f19..4e698981a4c8 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -80,6 +80,9 @@ def __init__( # a list of tuples of (pending pdu, order) self._pending_pdus = [] # type: List[Tuple[EventBase, int]] + + # XXX this is never actually used: see + # https://github.com/matrix-org/synapse/issues/7549 self._pending_edus = [] # type: List[Edu] # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9bd941b5a0f7..29a19b457207 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -29,6 +29,7 @@ SynapseError, ) from synapse.logging.opentracing import log_kv, set_tag, trace +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import stringutils from synapse.util.async_helpers import Linearizer @@ -535,6 +536,15 @@ def __init__(self, hs, device_handler): iterable=True, ) + # Attempt to resync out of sync device lists every 30s. + self._resync_retry_in_progress = False + self.clock.looping_call( + run_as_background_process, + 30 * 1000, + func=self._maybe_retry_device_resync, + desc="_maybe_retry_device_resync", + ) + @trace @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): @@ -679,11 +689,50 @@ def _need_to_do_resync(self, user_id, updates): return False @defer.inlineCallbacks - def user_device_resync(self, user_id): + def _maybe_retry_device_resync(self): + """Retry to resync device lists that are out of sync, except if another retry is + in progress. + """ + if self._resync_retry_in_progress: + return + + try: + # Prevent another call of this function to retry resyncing device lists so + # we don't send too many requests. + self._resync_retry_in_progress = True + # Get all of the users that need resyncing. + need_resync = yield self.store.get_user_ids_requiring_device_list_resync() + # Iterate over the set of user IDs. + for user_id in need_resync: + # Try to resync the current user's devices list. Exception handling + # isn't necessary here, since user_device_resync catches all instances + # of "Exception" that might be raised from the federation request. This + # means that if an exception is raised by this function, it must be + # because of a database issue, which means _maybe_retry_device_resync + # probably won't be able to go much further anyway. + result = yield self.user_device_resync( + user_id=user_id, mark_failed_as_stale=False, + ) + # user_device_resync only returns a result if it managed to successfully + # resync and update the database. Updating the table of users requiring + # resync isn't necessary here as user_device_resync already does it + # (through self.store.update_remote_device_list_cache). + if result: + logger.debug( + "Successfully resynced the device list for %s" % user_id, + ) + finally: + # Allow future calls to retry resyncinc out of sync device lists. + self._resync_retry_in_progress = False + + @defer.inlineCallbacks + def user_device_resync(self, user_id, mark_failed_as_stale=True): """Fetches all devices for a user and updates the device cache with them. Args: user_id (str): The user's id whose device_list will be updated. + mark_failed_as_stale (bool): Whether to mark the user's device list as stale + if the attempt to resync failed. Returns: Deferred[dict]: a dict with device info as under the "devices" in the result of this request: @@ -694,10 +743,23 @@ def user_device_resync(self, user_id): origin = get_domain_from_id(user_id) try: result = yield self.federation.query_user_devices(origin, user_id) - except (NotRetryingDestination, RequestSendFailed, HttpResponseException): - # TODO: Remember that we are now out of sync and try again - # later - logger.warning("Failed to handle device list update for %s", user_id) + except NotRetryingDestination: + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + + return + except (RequestSendFailed, HttpResponseException) as e: + logger.warning( + "Failed to handle device list update for %s: %s", user_id, e, + ) + + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + # We abort on exceptions rather than accepting the update # as otherwise synapse will 'forget' that its device list # is out of date. If we bail then we will retry the resync @@ -711,13 +773,17 @@ def user_device_resync(self, user_id): logger.info(e) return except Exception as e: - # TODO: Remember that we are now out of sync and try again - # later set_tag("error", True) log_kv( {"message": "Exception raised by federation request", "exception": e} ) logger.exception("Failed to handle device list update for %s", user_id) + + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + return log_kv({"result": result}) stream_id = result["stream_id"] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1aa017a6472c..acd1e38ec9bf 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -40,6 +40,7 @@ Codes, FederationDeniedError, FederationError, + HttpResponseException, RequestSendFailed, SynapseError, ) @@ -126,6 +127,7 @@ def __init__(self, hs): self.config = hs.config self.http_client = hs.get_simple_http_client() self._instance_name = hs.get_instance_name() + self._replication = hs.get_replication_data_handler() self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client( @@ -1035,6 +1037,12 @@ async def try_backfill(domains): # TODO: We can probably do something more intelligent here. return True except SynapseError as e: + logger.info("Failed to backfill from %s because %s", dom, e) + continue + except HttpResponseException as e: + if 400 <= e.code < 500: + raise e.to_synapse_error() + logger.info("Failed to backfill from %s because %s", dom, e) continue except CodeMessageException as e: @@ -1213,7 +1221,7 @@ async def on_event_auth(self, event_id: str) -> List[EventBase]: async def do_invite_join( self, target_hosts: Iterable[str], room_id: str, joinee: str, content: JsonDict - ) -> None: + ) -> Tuple[str, int]: """ Attempts to join the `joinee` to the room `room_id` via the servers contained in `target_hosts`. @@ -1300,15 +1308,23 @@ async def do_invite_join( room_id=room_id, room_version=room_version_obj, ) - await self._persist_auth_tree( + max_stream_id = await self._persist_auth_tree( origin, auth_chain, state, event, room_version_obj ) + # We wait here until this instance has seen the events come down + # replication (if we're using replication) as the below uses caches. + # + # TODO: Currently the events stream is written to from master + await self._replication.wait_for_stream_position( + "master", "events", max_stream_id + ) + # Check whether this room is the result of an upgrade of a room we already know # about. If so, migrate over user information predecessor = await self.store.get_room_predecessor(room_id) if not predecessor or not isinstance(predecessor.get("room_id"), str): - return + return event.event_id, max_stream_id old_room_id = predecessor["room_id"] logger.debug( "Found predecessor for %s during remote join: %s", room_id, old_room_id @@ -1321,6 +1337,7 @@ async def do_invite_join( ) logger.debug("Finished joining %s to %s", joinee, room_id) + return event.event_id, max_stream_id finally: room_queue = self.room_queues[room_id] del self.room_queues[room_id] @@ -1550,7 +1567,7 @@ async def on_invite_request( async def do_remotely_reject_invite( self, target_hosts: Iterable[str], room_id: str, user_id: str, content: JsonDict - ) -> EventBase: + ) -> Tuple[EventBase, int]: origin, event, room_version = await self._make_and_verify_event( target_hosts, room_id, user_id, "leave", content=content ) @@ -1570,9 +1587,9 @@ async def do_remotely_reject_invite( await self.federation_client.send_leave(target_hosts, event) context = await self.state_handler.compute_event_context(event) - await self.persist_events_and_notify([(event, context)]) + stream_id = await self.persist_events_and_notify([(event, context)]) - return event + return event, stream_id async def _make_and_verify_event( self, @@ -1884,7 +1901,7 @@ async def _persist_auth_tree( state: List[EventBase], event: EventBase, room_version: RoomVersion, - ) -> None: + ) -> int: """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. Persists the event separately. Notifies about the persisted events @@ -1978,7 +1995,7 @@ async def _persist_auth_tree( event, old_state=state ) - await self.persist_events_and_notify([(event, new_event_context)]) + return await self.persist_events_and_notify([(event, new_event_context)]) async def _prep_event( self, @@ -2831,7 +2848,7 @@ async def persist_events_and_notify( self, event_and_contexts: Sequence[Tuple[EventBase, EventContext]], backfilled: bool = False, - ) -> None: + ) -> int: """Persists events and tells the notifier/pushers about them, if necessary. @@ -2841,12 +2858,13 @@ async def persist_events_and_notify( backfilling or not """ if self.config.worker.writers.events != self._instance_name: - await self._send_events( + result = await self._send_events( instance_name=self.config.worker.writers.events, store=self.store, event_and_contexts=event_and_contexts, backfilled=backfilled, ) + return result["max_stream_id"] else: max_stream_id = await self.storage.persistence.persist_events( event_and_contexts, backfilled=backfilled @@ -2861,6 +2879,8 @@ async def persist_events_and_notify( for event, _ in event_and_contexts: await self._notify_persisted_event(event, max_stream_id) + return max_stream_id + async def _notify_persisted_event( self, event: EventBase, max_stream_id: int ) -> None: diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 0f0e632b626c..9ed0d23b0fab 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -290,8 +290,7 @@ def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server): return changed - @defer.inlineCallbacks - def send_threepid_validation( + async def send_threepid_validation( self, email_address, client_secret, @@ -319,7 +318,7 @@ def send_threepid_validation( """ # Check that this email/client_secret/send_attempt combo is new or # greater than what we've seen previously - session = yield self.store.get_threepid_validation_session( + session = await self.store.get_threepid_validation_session( "email", client_secret, address=email_address, validated=False ) @@ -353,7 +352,7 @@ def send_threepid_validation( # Send the mail with the link containing the token, client_secret # and session_id try: - yield send_email_func(email_address, token, client_secret, session_id) + await send_email_func(email_address, token, client_secret, session_id) except Exception: logger.exception( "Error sending threepid validation email to %s", email_address @@ -364,7 +363,7 @@ def send_threepid_validation( self.hs.clock.time_msec() + self.hs.config.email_validation_token_lifetime ) - yield self.store.start_or_continue_validation_session( + await self.store.start_or_continue_validation_session( "email", email_address, session_id, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 68b84b1410bb..ea25f0515ace 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Optional +from typing import Optional, Tuple from six import iteritems, itervalues, string_types @@ -42,6 +42,7 @@ ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.api.urls import ConsentURIBuilder +from synapse.events import EventBase from synapse.events.validator import EventValidator from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process @@ -485,9 +486,13 @@ def create_event( try: if "displayname" not in content: - content["displayname"] = yield profile.get_displayname(target) + displayname = yield profile.get_displayname(target) + if displayname is not None: + content["displayname"] = displayname if "avatar_url" not in content: - content["avatar_url"] = yield profile.get_avatar_url(target) + avatar_url = yield profile.get_avatar_url(target) + if avatar_url is not None: + content["avatar_url"] = avatar_url except Exception as e: logger.info( "Failed to get profile information for %r: %s", target, e @@ -627,7 +632,9 @@ def assert_accepted_privacy_policy(self, requester): msg = self._block_events_without_consent_error % {"consent_uri": consent_uri} raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri) - async def send_nonmember_event(self, requester, event, context, ratelimit=True): + async def send_nonmember_event( + self, requester, event, context, ratelimit=True + ) -> int: """ Persists and notifies local clients and federation of an event. @@ -636,6 +643,9 @@ async def send_nonmember_event(self, requester, event, context, ratelimit=True): context (Context) the context of the event. ratelimit (bool): Whether to rate limit this send. is_guest (bool): Whether the sender is a guest. + + Return: + The stream_id of the persisted event. """ if event.type == EventTypes.Member: raise SynapseError( @@ -656,7 +666,7 @@ async def send_nonmember_event(self, requester, event, context, ratelimit=True): ) return prev_state - await self.handle_new_client_event( + return await self.handle_new_client_event( requester=requester, event=event, context=context, ratelimit=ratelimit ) @@ -685,7 +695,7 @@ def deduplicate_state_event(self, event, context): async def create_and_send_nonmember_event( self, requester, event_dict, ratelimit=True, txn_id=None - ): + ) -> Tuple[EventBase, int]: """ Creates an event, then sends it. @@ -708,10 +718,10 @@ async def create_and_send_nonmember_event( spam_error = "Spam is not permitted here" raise SynapseError(403, spam_error, Codes.FORBIDDEN) - await self.send_nonmember_event( + stream_id = await self.send_nonmember_event( requester, event, context, ratelimit=ratelimit ) - return event + return event, stream_id @measure_func("create_new_client_event") @defer.inlineCallbacks @@ -771,7 +781,7 @@ def create_new_client_event( @measure_func("handle_new_client_event") async def handle_new_client_event( self, requester, event, context, ratelimit=True, extra_users=[] - ): + ) -> int: """Processes a new event. This includes checking auth, persisting it, notifying users, sending to remote servers, etc. @@ -784,6 +794,9 @@ async def handle_new_client_event( context (EventContext) ratelimit (bool) extra_users (list(UserID)): Any extra users to notify about event + + Return: + The stream_id of the persisted event. """ if event.is_state() and (event.type, event.state_key) == ( @@ -824,7 +837,7 @@ async def handle_new_client_event( try: # If we're a worker we need to hit out to the master. if self.config.worker.writers.events != self._instance_name: - await self.send_event( + result = await self.send_event( instance_name=self.config.worker.writers.events, event_id=event.event_id, store=self.store, @@ -834,14 +847,17 @@ async def handle_new_client_event( ratelimit=ratelimit, extra_users=extra_users, ) + stream_id = result["stream_id"] + event.internal_metadata.stream_ordering = stream_id success = True - return + return stream_id - await self.persist_and_notify_client_event( + stream_id = await self.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) success = True + return stream_id finally: if not success: # Ensure that we actually remove the entries in the push actions @@ -884,7 +900,7 @@ def _validate_canonical_alias( async def persist_and_notify_client_event( self, requester, event, context, ratelimit=True, extra_users=[] - ): + ) -> int: """Called when we have fully built the event, have already calculated the push actions for the event, and checked auth. @@ -1074,6 +1090,8 @@ def _notify(): # matters as sometimes presence code can take a while. run_in_background(self._bump_active_time, requester.user) + return event_stream_id + async def _bump_active_time(self, user): try: presence = self.hs.get_presence_handler() diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 1e6bdac0add3..a6178e74a19b 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -132,7 +132,7 @@ def check_username(self, localpart, guest_access_token=None, assigned_user_id=No def register_user( self, localpart=None, - password=None, + password_hash=None, guest_access_token=None, make_guest=False, admin=False, @@ -147,7 +147,7 @@ def register_user( Args: localpart: The local part of the user ID to register. If None, one will be generated. - password (unicode): The password to assign to this user so they can + password_hash (str|None): The hashed password to assign to this user so they can login again. This can be None which means they cannot login again via a password (e.g. the user is an application service user). user_type (str|None): type of user. One of the values from @@ -164,11 +164,6 @@ def register_user( yield self.check_registration_ratelimit(address) yield self.auth.check_auth_blocking(threepid=threepid) - password_hash = None - if password: - password_hash = yield defer.ensureDeferred( - self._auth_handler.hash(password) - ) if localpart is not None: yield self.check_username(localpart, guest_access_token=guest_access_token) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 73f9eeb39939..2698a129cac9 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,6 +22,7 @@ import math import string from collections import OrderedDict +from typing import Tuple from six import iteritems, string_types @@ -439,73 +440,78 @@ async def _move_aliases_to_new_room( new_room_id: str, old_room_state: StateMap[str], ): - directory_handler = self.hs.get_handlers().directory_handler - - aliases = await self.store.get_aliases_for_room(old_room_id) - # check to see if we have a canonical alias. canonical_alias_event = None canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, "")) if canonical_alias_event_id: canonical_alias_event = await self.store.get_event(canonical_alias_event_id) - # first we try to remove the aliases from the old room (we suppress sending - # the room_aliases event until the end). - # - # Note that we'll only be able to remove aliases that (a) aren't owned by an AS, - # and (b) unless the user is a server admin, which the user created. - # - # This is probably correct - given we don't allow such aliases to be deleted - # normally, it would be odd to allow it in the case of doing a room upgrade - - # but it makes the upgrade less effective, and you have to wonder why a room - # admin can't remove aliases that point to that room anyway. - # (cf https://github.com/matrix-org/synapse/issues/2360) - # - removed_aliases = [] - for alias_str in aliases: - alias = RoomAlias.from_string(alias_str) - try: - await directory_handler.delete_association(requester, alias) - removed_aliases.append(alias_str) - except SynapseError as e: - logger.warning("Unable to remove alias %s from old room: %s", alias, e) - - # if we didn't find any aliases, or couldn't remove anyway, we can skip the rest - # of this. - if not removed_aliases: + await self.store.update_aliases_for_room(old_room_id, new_room_id) + + if not canonical_alias_event: return - # we can now add any aliases we successfully removed to the new room. - for alias in removed_aliases: - try: - await directory_handler.create_association( - requester, - RoomAlias.from_string(alias), - new_room_id, - servers=(self.hs.hostname,), - check_membership=False, - ) - logger.info("Moved alias %s to new room", alias) - except SynapseError as e: - # I'm not really expecting this to happen, but it could if the spam - # checking module decides it shouldn't, or similar. - logger.error("Error adding alias %s to new room: %s", alias, e) + # If there is a canonical alias we need to update the one in the old + # room and set one in the new one. + old_canonical_alias_content = dict(canonical_alias_event.content) + new_canonical_alias_content = {} + + canonical = canonical_alias_event.content.get("alias") + if canonical and self.hs.is_mine_id(canonical): + new_canonical_alias_content["alias"] = canonical + old_canonical_alias_content.pop("alias", None) + + # We convert to a list as it will be a Tuple. + old_alt_aliases = list(old_canonical_alias_content.get("alt_aliases", [])) + if old_alt_aliases: + old_canonical_alias_content["alt_aliases"] = old_alt_aliases + new_alt_aliases = new_canonical_alias_content.setdefault("alt_aliases", []) + for alias in canonical_alias_event.content.get("alt_aliases", []): + try: + if self.hs.is_mine_id(alias): + new_alt_aliases.append(alias) + old_alt_aliases.remove(alias) + except Exception: + logger.info( + "Invalid alias %s in canonical alias event %s", + alias, + canonical_alias_event_id, + ) + + if not old_alt_aliases: + old_canonical_alias_content.pop("alt_aliases") # If a canonical alias event existed for the old room, fire a canonical # alias event for the new room with a copy of the information. try: - if canonical_alias_event: - await self.event_creation_handler.create_and_send_nonmember_event( - requester, - { - "type": EventTypes.CanonicalAlias, - "state_key": "", - "room_id": new_room_id, - "sender": requester.user.to_string(), - "content": canonical_alias_event.content, - }, - ratelimit=False, - ) + await self.event_creation_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.CanonicalAlias, + "state_key": "", + "room_id": old_room_id, + "sender": requester.user.to_string(), + "content": old_canonical_alias_content, + }, + ratelimit=False, + ) + except SynapseError as e: + # again I'm not really expecting this to fail, but if it does, I'd rather + # we returned the new room to the client at this point. + logger.error("Unable to send updated alias events in old room: %s", e) + + try: + await self.event_creation_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.CanonicalAlias, + "state_key": "", + "room_id": new_room_id, + "sender": requester.user.to_string(), + "content": new_canonical_alias_content, + }, + ratelimit=False, + ) except SynapseError as e: # again I'm not really expecting this to fail, but if it does, I'd rather # we returned the new room to the client at this point. @@ -513,7 +519,7 @@ async def _move_aliases_to_new_room( async def create_room( self, requester, config, ratelimit=True, creator_join_profile=None - ): + ) -> Tuple[dict, int]: """ Creates a new room. Args: @@ -530,9 +536,9 @@ async def create_room( `avatar_url` and/or `displayname`. Returns: - Deferred[dict]: - a dict containing the keys `room_id` and, if an alias was - requested, `room_alias`. + First, a dict containing the keys `room_id` and, if an alias + was, requested, `room_alias`. Secondly, the stream_id of the + last persisted event. Raises: SynapseError if the room ID couldn't be stored, or something went horribly wrong. @@ -664,7 +670,7 @@ async def create_room( # override any attempt to set room versions via the creation_content creation_content["room_version"] = room_version.identifier - await self._send_events_for_new_room( + last_stream_id = await self._send_events_for_new_room( requester, room_id, preset_config=preset_config, @@ -678,7 +684,10 @@ async def create_room( if "name" in config: name = config["name"] - await self.event_creation_handler.create_and_send_nonmember_event( + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Name, @@ -692,7 +701,10 @@ async def create_room( if "topic" in config: topic = config["topic"] - await self.event_creation_handler.create_and_send_nonmember_event( + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Topic, @@ -710,7 +722,7 @@ async def create_room( if is_direct: content["is_direct"] = is_direct - await self.room_member_handler.update_membership( + _, last_stream_id = await self.room_member_handler.update_membership( requester, UserID.from_string(invitee), room_id, @@ -724,7 +736,7 @@ async def create_room( id_access_token = invite_3pid.get("id_access_token") # optional address = invite_3pid["address"] medium = invite_3pid["medium"] - await self.hs.get_room_member_handler().do_3pid_invite( + last_stream_id = await self.hs.get_room_member_handler().do_3pid_invite( room_id, requester.user, medium, @@ -740,7 +752,7 @@ async def create_room( if room_alias: result["room_alias"] = room_alias.to_string() - return result + return result, last_stream_id async def _send_events_for_new_room( self, @@ -753,7 +765,13 @@ async def _send_events_for_new_room( room_alias=None, power_level_content_override=None, # Doesn't apply when initial state has power level state event content creator_join_profile=None, - ): + ) -> int: + """Sends the initial events into a new room. + + Returns: + The stream_id of the last event persisted. + """ + def create(etype, content, **kwargs): e = {"type": etype, "content": content} @@ -762,12 +780,16 @@ def create(etype, content, **kwargs): return e - async def send(etype, content, **kwargs): + async def send(etype, content, **kwargs) -> int: event = create(etype, content, **kwargs) logger.debug("Sending %s in new room", etype) - await self.event_creation_handler.create_and_send_nonmember_event( + ( + _, + last_stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( creator, event, ratelimit=False ) + return last_stream_id config = RoomCreationHandler.PRESETS_DICT[preset_config] @@ -792,7 +814,9 @@ async def send(etype, content, **kwargs): # of the first events that get sent into a room. pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None) if pl_content is not None: - await send(etype=EventTypes.PowerLevels, content=pl_content) + last_sent_stream_id = await send( + etype=EventTypes.PowerLevels, content=pl_content + ) else: power_level_content = { "users": {creator_id: 100}, @@ -825,33 +849,39 @@ async def send(etype, content, **kwargs): if power_level_content_override: power_level_content.update(power_level_content_override) - await send(etype=EventTypes.PowerLevels, content=power_level_content) + last_sent_stream_id = await send( + etype=EventTypes.PowerLevels, content=power_level_content + ) if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.CanonicalAlias, content={"alias": room_alias.to_string()}, ) if (EventTypes.JoinRules, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]} ) if (EventTypes.RoomHistoryVisibility, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.RoomHistoryVisibility, content={"history_visibility": config["history_visibility"]}, ) if config["guest_can_join"]: if (EventTypes.GuestAccess, "") not in initial_state: - await send( + last_sent_stream_id = await send( etype=EventTypes.GuestAccess, content={"guest_access": "can_join"} ) for (etype, state_key), content in initial_state.items(): - await send(etype=etype, state_key=state_key, content=content) + last_sent_stream_id = await send( + etype=etype, state_key=state_key, content=content + ) + + return last_sent_stream_id async def _generate_room_id( self, creator_id: str, is_public: str, room_version: RoomVersion, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 7ede6c33e4f5..77d23163c6fe 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -17,7 +17,7 @@ import abc import logging -from typing import Dict, Iterable, List, Optional, Tuple, Union +from typing import Dict, Iterable, List, Optional, Tuple from six.moves import http_client @@ -93,7 +93,7 @@ async def _remote_join( room_id: str, user: UserID, content: dict, - ) -> Optional[dict]: + ) -> Tuple[str, int]: """Try and join a room that this server is not in Args: @@ -113,7 +113,7 @@ async def _remote_reject_invite( room_id: str, target: UserID, content: dict, - ) -> dict: + ) -> Tuple[Optional[str], int]: """Attempt to reject an invite for a room this server is not in. If we fail to do so we locally mark the invite as rejected. @@ -176,7 +176,7 @@ async def _local_membership_update( ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, - ) -> EventBase: + ) -> Tuple[str, int]: user_id = target.to_string() if content is None: @@ -209,9 +209,10 @@ async def _local_membership_update( ) if duplicate is not None: # Discard the new event since this membership change is a no-op. - return duplicate + _, stream_id = await self.store.get_event_ordering(duplicate.event_id) + return duplicate.event_id, stream_id - await self.event_creation_handler.handle_new_client_event( + stream_id = await self.event_creation_handler.handle_new_client_event( requester, event, context, extra_users=[target], ratelimit=ratelimit ) @@ -235,7 +236,7 @@ async def _local_membership_update( if prev_member_event.membership == Membership.JOIN: await self._user_left_room(target, room_id) - return event + return event.event_id, stream_id async def copy_room_tags_and_direct_to_room( self, old_room_id, new_room_id, user_id @@ -285,7 +286,7 @@ async def update_membership( ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, - ) -> Union[EventBase, Optional[dict]]: + ) -> Tuple[Optional[str], int]: key = (room_id,) with (await self.member_linearizer.queue(key)): @@ -316,7 +317,7 @@ async def _update_membership( ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, - ) -> Union[EventBase, Optional[dict]]: + ) -> Tuple[Optional[str], int]: content_specified = bool(content) if content is None: content = {} @@ -420,7 +421,13 @@ async def _update_membership( same_membership = old_membership == effective_membership_state same_sender = requester.user.to_string() == old_state.sender if same_sender and same_membership and same_content: - return old_state + _, stream_id = await self.store.get_event_ordering( + old_state.event_id + ) + return ( + old_state.event_id, + stream_id, + ) if old_membership in ["ban", "leave"] and action == "kick": raise AuthError(403, "The target user is not in the room") @@ -727,7 +734,7 @@ async def do_3pid_invite( requester: Requester, txn_id: Optional[str], id_access_token: Optional[str] = None, - ) -> None: + ) -> int: if self.config.block_non_admin_invites: is_requester_admin = await self.auth.is_server_admin(requester.user) if not is_requester_admin: @@ -759,11 +766,11 @@ async def do_3pid_invite( ) if invitee: - await self.update_membership( + _, stream_id = await self.update_membership( requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id ) else: - await self._make_and_store_3pid_invite( + stream_id = await self._make_and_store_3pid_invite( requester, id_server, medium, @@ -774,6 +781,8 @@ async def do_3pid_invite( id_access_token=id_access_token, ) + return stream_id + async def _make_and_store_3pid_invite( self, requester: Requester, @@ -784,7 +793,7 @@ async def _make_and_store_3pid_invite( user: UserID, txn_id: Optional[str], id_access_token: Optional[str] = None, - ) -> None: + ) -> int: room_state = await self.state_handler.get_current_state(room_id) inviter_display_name = "" @@ -839,7 +848,10 @@ async def _make_and_store_3pid_invite( id_access_token=id_access_token, ) - await self.event_creation_handler.create_and_send_nonmember_event( + ( + event, + stream_id, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.ThirdPartyInvite, @@ -857,6 +869,7 @@ async def _make_and_store_3pid_invite( ratelimit=False, txn_id=txn_id, ) + return stream_id async def _is_host_in_room( self, current_state_ids: Dict[Tuple[str, str], str] @@ -938,7 +951,7 @@ async def _remote_join( room_id: str, user: UserID, content: dict, - ) -> None: + ) -> Tuple[str, int]: """Implements RoomMemberHandler._remote_join """ # filter ourselves out of remote_room_hosts: do_invite_join ignores it @@ -967,7 +980,7 @@ async def _remote_join( # join dance for now, since we're kinda implicitly checking # that we are allowed to join when we decide whether or not we # need to do the invite/join dance. - await self.federation_handler.do_invite_join( + event_id, stream_id = await self.federation_handler.do_invite_join( remote_room_hosts, room_id, user.to_string(), content ) await self._user_joined_room(user, room_id) @@ -977,14 +990,14 @@ async def _remote_join( if self.hs.config.limit_remote_rooms.enabled: if too_complex is False: # We checked, and we're under the limit. - return + return event_id, stream_id # Check again, but with the local state events too_complex = await self._is_local_room_too_complex(room_id) if too_complex is False: # We're under the limit. - return + return event_id, stream_id # The room is too large. Leave. requester = types.create_requester(user, None, False, None) @@ -997,6 +1010,8 @@ async def _remote_join( errcode=Codes.RESOURCE_LIMIT_EXCEEDED, ) + return event_id, stream_id + async def _remote_reject_invite( self, requester: Requester, @@ -1004,15 +1019,15 @@ async def _remote_reject_invite( room_id: str, target: UserID, content: dict, - ) -> dict: + ) -> Tuple[Optional[str], int]: """Implements RoomMemberHandler._remote_reject_invite """ fed_handler = self.federation_handler try: - ret = await fed_handler.do_remotely_reject_invite( + event, stream_id = await fed_handler.do_remotely_reject_invite( remote_room_hosts, room_id, target.to_string(), content=content, ) - return ret + return event.event_id, stream_id except Exception as e: # if we were unable to reject the exception, just mark # it as rejected on our end and plough ahead. @@ -1022,8 +1037,10 @@ async def _remote_reject_invite( # logger.warning("Failed to reject invite: %s", e) - await self.locally_reject_invite(target.to_string(), room_id) - return {} + stream_id = await self.store.locally_reject_invite( + target.to_string(), room_id + ) + return None, stream_id async def _user_joined_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_joined_room diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 5c776cc0be0d..02e0c4103d9b 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import List, Optional +from typing import List, Optional, Tuple from synapse.api.errors import SynapseError from synapse.handlers.room_member import RoomMemberHandler @@ -43,7 +43,7 @@ async def _remote_join( room_id: str, user: UserID, content: dict, - ) -> Optional[dict]: + ) -> Tuple[str, int]: """Implements RoomMemberHandler._remote_join """ if len(remote_room_hosts) == 0: @@ -59,7 +59,7 @@ async def _remote_join( await self._user_joined_room(user, room_id) - return ret + return ret["event_id"], ret["stream_id"] async def _remote_reject_invite( self, @@ -68,16 +68,17 @@ async def _remote_reject_invite( room_id: str, target: UserID, content: dict, - ) -> dict: + ) -> Tuple[Optional[str], int]: """Implements RoomMemberHandler._remote_reject_invite """ - return await self._remote_reject_client( + ret = await self._remote_reject_client( requester=requester, remote_room_hosts=remote_room_hosts, room_id=room_id, user_id=target.to_string(), content=content, ) + return ret["event_id"], ret["stream_id"] async def _user_joined_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_joined_room diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py index 63d8f9aa0d54..4d245b618b17 100644 --- a/synapse/handlers/set_password.py +++ b/synapse/handlers/set_password.py @@ -35,16 +35,13 @@ def __init__(self, hs): async def set_password( self, user_id: str, - new_password: str, + password_hash: str, logout_devices: bool, requester: Optional[Requester] = None, ): if not self.hs.config.password_localdb_enabled: raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN) - self._password_policy_handler.validate_password(new_password) - password_hash = await self._auth_handler.hash(new_password) - try: await self.store.user_set_password_hash(user_id, password_hash) except StoreError as e: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 44077f534912..2d47b9ea001b 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -144,6 +144,11 @@ def _handle_json_response(reactor, timeout_sec, request, response): d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) body = yield make_deferred_yieldable(d) + except TimeoutError as e: + logger.warning( + "{%s} [%s] Timed out reading response", request.txn_id, request.destination, + ) + raise RequestSendFailed(e, can_retry=True) from e except Exception as e: logger.warning( "{%s} [%s] Error reading response: %s", @@ -424,6 +429,8 @@ def _send_request( ) response = yield request_deferred + except TimeoutError as e: + raise RequestSendFailed(e, can_retry=True) from e except DNSLookupError as e: raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e) except Exception as e: diff --git a/synapse/http/site.py b/synapse/http/site.py index 514f2f14029b..167293c46d54 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -14,6 +14,7 @@ import contextlib import logging import time +from typing import Optional from twisted.python.failure import Failure from twisted.web.server import Request, Site @@ -45,7 +46,7 @@ class SynapseRequest(Request): request even after the client has disconnected. Attributes: - logcontext(LoggingContext) : the log context for this request + logcontext: the log context for this request """ def __init__(self, channel, *args, **kw): @@ -53,10 +54,10 @@ def __init__(self, channel, *args, **kw): self.site = channel.site self._channel = channel # this is used by the tests self.authenticated_entity = None - self.start_time = 0 + self.start_time = 0.0 # we can't yet create the logcontext, as we don't know the method. - self.logcontext = None + self.logcontext = None # type: Optional[LoggingContext] global _next_request_seq self.request_seq = _next_request_seq @@ -182,6 +183,7 @@ def finish(self): self.finish_time = time.time() Request.finish(self) if not self._is_processing: + assert self.logcontext is not None with PreserveLoggingContext(self.logcontext): self._finished_processing() @@ -249,6 +251,7 @@ def _started_processing(self, servlet_name): def _finished_processing(self): """Log the completion of this request and update the metrics """ + assert self.logcontext is not None usage = self.logcontext.get_resource_usage() if self._processing_finished_time is None: diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index d2fd29acb454..9cf31f96b3fa 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -26,7 +26,12 @@ import attr from prometheus_client import Counter, Gauge, Histogram -from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily +from prometheus_client.core import ( + REGISTRY, + CounterMetricFamily, + GaugeMetricFamily, + HistogramMetricFamily, +) from twisted.internet import reactor @@ -338,6 +343,78 @@ def collect(self): if not running_on_pypy: REGISTRY.register(GCCounts()) + +# +# PyPy GC / memory metrics +# + + +class PyPyGCStats(object): + def collect(self): + + # @stats is a pretty-printer object with __str__() returning a nice table, + # plus some fields that contain data from that table. + # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB'). + stats = gc.get_stats(memory_pressure=False) # type: ignore + # @s contains same fields as @stats, but as actual integers. + s = stats._s # type: ignore + + # also note that field naming is completely braindead + # and only vaguely correlates with the pretty-printed table. + # >>>> gc.get_stats(False) + # Total memory consumed: + # GC used: 8.7MB (peak: 39.0MB) # s.total_gc_memory, s.peak_memory + # in arenas: 3.0MB # s.total_arena_memory + # rawmalloced: 1.7MB # s.total_rawmalloced_memory + # nursery: 4.0MB # s.nursery_size + # raw assembler used: 31.0kB # s.jit_backend_used + # ----------------------------- + # Total: 8.8MB # stats.memory_used_sum + # + # Total memory allocated: + # GC allocated: 38.7MB (peak: 41.1MB) # s.total_allocated_memory, s.peak_allocated_memory + # in arenas: 30.9MB # s.peak_arena_memory + # rawmalloced: 4.1MB # s.peak_rawmalloced_memory + # nursery: 4.0MB # s.nursery_size + # raw assembler allocated: 1.0MB # s.jit_backend_allocated + # ----------------------------- + # Total: 39.7MB # stats.memory_allocated_sum + # + # Total time spent in GC: 0.073 # s.total_gc_time + + pypy_gc_time = CounterMetricFamily( + "pypy_gc_time_seconds_total", "Total time spent in PyPy GC", labels=[], + ) + pypy_gc_time.add_metric([], s.total_gc_time / 1000) + yield pypy_gc_time + + pypy_mem = GaugeMetricFamily( + "pypy_memory_bytes", + "Memory tracked by PyPy allocator", + labels=["state", "class", "kind"], + ) + # memory used by JIT assembler + pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used) + pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated) + # memory used by GCed objects + pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory) + pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory) + pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory) + pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory) + pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size) + pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size) + # totals + pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory) + pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory) + pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory) + pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory) + yield pypy_mem + + +if running_on_pypy: + REGISTRY.register(PyPyGCStats()) + + # # Twisted reactor metrics # diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index ba4551d61964..568c13eaea19 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -15,7 +15,6 @@ import logging -from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled from synapse.metrics.background_process_metrics import run_as_background_process @@ -132,8 +131,7 @@ def _resume_processing(self): self._is_processing = False self._start_processing() - @defer.inlineCallbacks - def _process(self): + async def _process(self): # we should never get here if we are already processing assert not self._is_processing @@ -142,7 +140,7 @@ def _process(self): if self.throttle_params is None: # this is our first loop: load up the throttle params - self.throttle_params = yield self.store.get_throttle_params_by_room( + self.throttle_params = await self.store.get_throttle_params_by_room( self.pusher_id ) @@ -151,7 +149,7 @@ def _process(self): while True: starting_max_ordering = self.max_stream_ordering try: - yield self._unsafe_process() + await self._unsafe_process() except Exception: logger.exception("Exception processing notifs") if self.max_stream_ordering == starting_max_ordering: @@ -159,8 +157,7 @@ def _process(self): finally: self._is_processing = False - @defer.inlineCallbacks - def _unsafe_process(self): + async def _unsafe_process(self): """ Main logic of the push loop without the wrapper function that sets up logging, measures and guards against multiple instances of it @@ -168,12 +165,12 @@ def _unsafe_process(self): """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering fn = self.store.get_unread_push_actions_for_user_in_range_for_email - unprocessed = yield fn(self.user_id, start, self.max_stream_ordering) + unprocessed = await fn(self.user_id, start, self.max_stream_ordering) soonest_due_at = None if not unprocessed: - yield self.save_last_stream_ordering_and_success(self.max_stream_ordering) + await self.save_last_stream_ordering_and_success(self.max_stream_ordering) return for push_action in unprocessed: @@ -201,15 +198,15 @@ def _unsafe_process(self): "throttle_ms": self.get_room_throttle_ms(push_action["room_id"]), } - yield self.send_notification(unprocessed, reason) + await self.send_notification(unprocessed, reason) - yield self.save_last_stream_ordering_and_success( + await self.save_last_stream_ordering_and_success( max(ea["stream_ordering"] for ea in unprocessed) ) # we update the throttle on all the possible unprocessed push actions for ea in unprocessed: - yield self.sent_notif_update_throttle(ea["room_id"], ea) + await self.sent_notif_update_throttle(ea["room_id"], ea) break else: if soonest_due_at is None or should_notify_at < soonest_due_at: @@ -227,14 +224,13 @@ def _unsafe_process(self): self.seconds_until(soonest_due_at), self.on_timer ) - @defer.inlineCallbacks - def save_last_stream_ordering_and_success(self, last_stream_ordering): + async def save_last_stream_ordering_and_success(self, last_stream_ordering): if last_stream_ordering is None: # This happens if we haven't yet processed anything return self.last_stream_ordering = last_stream_ordering - pusher_still_exists = yield self.store.update_pusher_last_stream_ordering_and_success( + pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success( self.app_id, self.email, self.user_id, @@ -275,13 +271,12 @@ def room_ready_to_notify_at(self, room_id): may_send_at = last_sent_ts + throttle_ms return may_send_at - @defer.inlineCallbacks - def sent_notif_update_throttle(self, room_id, notified_push_action): + async def sent_notif_update_throttle(self, room_id, notified_push_action): # We have sent a notification, so update the throttle accordingly. # If the event that triggered the notif happened more than # THROTTLE_RESET_AFTER_MS after the previous one that triggered a # notif, we release the throttle. Otherwise, the throttle is increased. - time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before( + time_of_previous_notifs = await self.store.get_time_of_last_push_action_before( notified_push_action["stream_ordering"] ) @@ -310,14 +305,13 @@ def sent_notif_update_throttle(self, room_id, notified_push_action): "last_sent_ts": self.clock.time_msec(), "throttle_ms": new_throttle_ms, } - yield self.store.set_throttle_params( + await self.store.set_throttle_params( self.pusher_id, room_id, self.throttle_params[room_id] ) - @defer.inlineCallbacks - def send_notification(self, push_actions, reason): + async def send_notification(self, push_actions, reason): logger.info("Sending notif email for user %r", self.user_id) - yield self.mailer.send_notification_mail( + await self.mailer.send_notification_mail( self.app_id, self.user_id, self.email, push_actions, reason ) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index ab33abbeed83..d57a66a697d5 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -26,8 +26,6 @@ import bleach import jinja2 -from twisted.internet import defer - from synapse.api.constants import EventTypes from synapse.api.errors import StoreError from synapse.logging.context import make_deferred_yieldable @@ -127,8 +125,7 @@ def __init__(self, hs, app_name, template_html, template_text): logger.info("Created Mailer for app_name %s" % app_name) - @defer.inlineCallbacks - def send_password_reset_mail(self, email_address, token, client_secret, sid): + async def send_password_reset_mail(self, email_address, token, client_secret, sid): """Send an email with a password reset link to a user Args: @@ -149,14 +146,13 @@ def send_password_reset_mail(self, email_address, token, client_secret, sid): template_vars = {"link": link} - yield self.send_email( + await self.send_email( email_address, "[%s] Password Reset" % self.hs.config.server_name, template_vars, ) - @defer.inlineCallbacks - def send_registration_mail(self, email_address, token, client_secret, sid): + async def send_registration_mail(self, email_address, token, client_secret, sid): """Send an email with a registration confirmation link to a user Args: @@ -177,14 +173,13 @@ def send_registration_mail(self, email_address, token, client_secret, sid): template_vars = {"link": link} - yield self.send_email( + await self.send_email( email_address, "[%s] Register your Email Address" % self.hs.config.server_name, template_vars, ) - @defer.inlineCallbacks - def send_add_threepid_mail(self, email_address, token, client_secret, sid): + async def send_add_threepid_mail(self, email_address, token, client_secret, sid): """Send an email with a validation link to a user for adding a 3pid to their account Args: @@ -206,20 +201,19 @@ def send_add_threepid_mail(self, email_address, token, client_secret, sid): template_vars = {"link": link} - yield self.send_email( + await self.send_email( email_address, "[%s] Validate Your Email" % self.hs.config.server_name, template_vars, ) - @defer.inlineCallbacks - def send_notification_mail( + async def send_notification_mail( self, app_id, user_id, email_address, push_actions, reason ): """Send email regarding a user's room notifications""" rooms_in_order = deduped_ordered_list([pa["room_id"] for pa in push_actions]) - notif_events = yield self.store.get_events( + notif_events = await self.store.get_events( [pa["event_id"] for pa in push_actions] ) @@ -232,7 +226,7 @@ def send_notification_mail( state_by_room = {} try: - user_display_name = yield self.store.get_profile_displayname( + user_display_name = await self.store.get_profile_displayname( UserID.from_string(user_id).localpart ) if user_display_name is None: @@ -240,14 +234,13 @@ def send_notification_mail( except StoreError: user_display_name = user_id - @defer.inlineCallbacks - def _fetch_room_state(room_id): - room_state = yield self.store.get_current_state_ids(room_id) + async def _fetch_room_state(room_id): + room_state = await self.store.get_current_state_ids(room_id) state_by_room[room_id] = room_state # Run at most 3 of these at once: sync does 10 at a time but email # notifs are much less realtime than sync so we can afford to wait a bit. - yield concurrently_execute(_fetch_room_state, rooms_in_order, 3) + await concurrently_execute(_fetch_room_state, rooms_in_order, 3) # actually sort our so-called rooms_in_order list, most recent room first rooms_in_order.sort(key=lambda r: -(notifs_by_room[r][-1]["received_ts"] or 0)) @@ -255,19 +248,19 @@ def _fetch_room_state(room_id): rooms = [] for r in rooms_in_order: - roomvars = yield self.get_room_vars( + roomvars = await self.get_room_vars( r, user_id, notifs_by_room[r], notif_events, state_by_room[r] ) rooms.append(roomvars) - reason["room_name"] = yield calculate_room_name( + reason["room_name"] = await calculate_room_name( self.store, state_by_room[reason["room_id"]], user_id, fallback_to_members=True, ) - summary_text = yield self.make_summary_text( + summary_text = await self.make_summary_text( notifs_by_room, state_by_room, notif_events, user_id, reason ) @@ -282,12 +275,11 @@ def _fetch_room_state(room_id): "reason": reason, } - yield self.send_email( + await self.send_email( email_address, "[%s] %s" % (self.app_name, summary_text), template_vars ) - @defer.inlineCallbacks - def send_email(self, email_address, subject, template_vars): + async def send_email(self, email_address, subject, template_vars): """Send an email with the given information and template text""" try: from_string = self.hs.config.email_notif_from % {"app": self.app_name} @@ -317,7 +309,7 @@ def send_email(self, email_address, subject, template_vars): logger.info("Sending email to %s" % email_address) - yield make_deferred_yieldable( + await make_deferred_yieldable( self.sendmail( self.hs.config.email_smtp_host, raw_from, @@ -332,13 +324,14 @@ def send_email(self, email_address, subject, template_vars): ) ) - @defer.inlineCallbacks - def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state_ids): + async def get_room_vars( + self, room_id, user_id, notifs, notif_events, room_state_ids + ): my_member_event_id = room_state_ids[("m.room.member", user_id)] - my_member_event = yield self.store.get_event(my_member_event_id) + my_member_event = await self.store.get_event(my_member_event_id) is_invite = my_member_event.content["membership"] == "invite" - room_name = yield calculate_room_name(self.store, room_state_ids, user_id) + room_name = await calculate_room_name(self.store, room_state_ids, user_id) room_vars = { "title": room_name, @@ -350,7 +343,7 @@ def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state_ids): if not is_invite: for n in notifs: - notifvars = yield self.get_notif_vars( + notifvars = await self.get_notif_vars( n, user_id, notif_events[n["event_id"]], room_state_ids ) @@ -377,9 +370,8 @@ def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state_ids): return room_vars - @defer.inlineCallbacks - def get_notif_vars(self, notif, user_id, notif_event, room_state_ids): - results = yield self.store.get_events_around( + async def get_notif_vars(self, notif, user_id, notif_event, room_state_ids): + results = await self.store.get_events_around( notif["room_id"], notif["event_id"], before_limit=CONTEXT_BEFORE, @@ -392,25 +384,24 @@ def get_notif_vars(self, notif, user_id, notif_event, room_state_ids): "messages": [], } - the_events = yield filter_events_for_client( + the_events = await filter_events_for_client( self.storage, user_id, results["events_before"] ) the_events.append(notif_event) for event in the_events: - messagevars = yield self.get_message_vars(notif, event, room_state_ids) + messagevars = await self.get_message_vars(notif, event, room_state_ids) if messagevars is not None: ret["messages"].append(messagevars) return ret - @defer.inlineCallbacks - def get_message_vars(self, notif, event, room_state_ids): + async def get_message_vars(self, notif, event, room_state_ids): if event.type != EventTypes.Message: return sender_state_event_id = room_state_ids[("m.room.member", event.sender)] - sender_state_event = yield self.store.get_event(sender_state_event_id) + sender_state_event = await self.store.get_event(sender_state_event_id) sender_name = name_from_member_event(sender_state_event) sender_avatar_url = sender_state_event.content.get("avatar_url") @@ -460,8 +451,7 @@ def add_image_message_vars(self, messagevars, event): return messagevars - @defer.inlineCallbacks - def make_summary_text( + async def make_summary_text( self, notifs_by_room, room_state_ids, notif_events, user_id, reason ): if len(notifs_by_room) == 1: @@ -471,17 +461,17 @@ def make_summary_text( # If the room has some kind of name, use it, but we don't # want the generated-from-names one here otherwise we'll # end up with, "new message from Bob in the Bob room" - room_name = yield calculate_room_name( + room_name = await calculate_room_name( self.store, room_state_ids[room_id], user_id, fallback_to_members=False ) my_member_event_id = room_state_ids[room_id][("m.room.member", user_id)] - my_member_event = yield self.store.get_event(my_member_event_id) + my_member_event = await self.store.get_event(my_member_event_id) if my_member_event.content["membership"] == "invite": inviter_member_event_id = room_state_ids[room_id][ ("m.room.member", my_member_event.sender) ] - inviter_member_event = yield self.store.get_event( + inviter_member_event = await self.store.get_event( inviter_member_event_id ) inviter_name = name_from_member_event(inviter_member_event) @@ -506,7 +496,7 @@ def make_summary_text( state_event_id = room_state_ids[room_id][ ("m.room.member", event.sender) ] - state_event = yield self.store.get_event(state_event_id) + state_event = await self.store.get_event(state_event_id) sender_name = name_from_member_event(state_event) if sender_name is not None and room_name is not None: @@ -535,7 +525,7 @@ def make_summary_text( } ) - member_events = yield self.store.get_events( + member_events = await self.store.get_events( [ room_state_ids[room_id][("m.room.member", s)] for s in sender_ids @@ -567,7 +557,7 @@ def make_summary_text( } ) - member_events = yield self.store.get_events( + member_events = await self.store.get_events( [room_state_ids[room_id][("m.room.member", s)] for s in sender_ids] ) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 7e23b565b9f0..c287c4e269f2 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -29,7 +29,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): """Handles events newly received from federation, including persisting and - notifying. + notifying. Returns the maximum stream ID of the persisted events. The API looks like: @@ -46,6 +46,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): "context": { .. serialized event context .. }, }], "backfilled": false + } + + 200 OK + + { + "max_stream_id": 32443, + } """ NAME = "fed_send_events" @@ -115,11 +122,11 @@ async def _handle_request(self, request): logger.info("Got %d events from federation", len(event_and_contexts)) - await self.federation_handler.persist_events_and_notify( + max_stream_id = await self.federation_handler.persist_events_and_notify( event_and_contexts, backfilled ) - return 200, {} + return 200, {"max_stream_id": max_stream_id} class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index ff80708e1e1c..489e8592a9ff 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -80,11 +80,11 @@ async def _handle_request(self, request, room_id, user_id): logger.info("remote_join: %s into room: %s", user_id, room_id) - await self.federation_handler.do_invite_join( + event_id, stream_id = await self.federation_handler.do_invite_join( remote_room_hosts, room_id, user_id, event_content ) - return 200, {} + return 200, {"event_id": event_id, "stream_id": stream_id} class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): @@ -141,10 +141,10 @@ async def _handle_request(self, request, room_id, user_id): logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id) try: - event = await self.federation_handler.do_remotely_reject_invite( + event, stream_id = await self.federation_handler.do_remotely_reject_invite( remote_room_hosts, room_id, user_id, event_content, ) - ret = event.get_pdu_json() + event_id = event.event_id except Exception as e: # if we were unable to reject the exception, just mark # it as rejected on our end and plough ahead. @@ -154,10 +154,10 @@ async def _handle_request(self, request, room_id, user_id): # logger.warning("Failed to reject invite: %s", e) - await self.member_handler.locally_reject_invite(user_id, room_id) - ret = {} + stream_id = await self.store.locally_reject_invite(user_id, room_id) + event_id = None - return 200, ret + return 200, {"event_id": event_id, "stream_id": stream_id} class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint): diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index b74b088ff4c2..c981723c1a66 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -119,11 +119,11 @@ async def _handle_request(self, request, event_id): "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) - await self.event_creation_handler.persist_and_notify_client_event( + stream_id = await self.event_creation_handler.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) - return 200, {} + return 200, {"stream_id": stream_id} def register_servlets(hs, http_server): diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py index b705a8e16c59..bde97eef328a 100644 --- a/synapse/replication/http/streams.py +++ b/synapse/replication/http/streams.py @@ -51,10 +51,7 @@ def __init__(self, hs): super().__init__(hs) self._instance_name = hs.get_instance_name() - - # We pull the streams from the replication handler (if we try and make - # them ourselves we end up in an import loop). - self.streams = hs.get_tcp_replication().get_streams() + self.streams = hs.get_replication_streams() @staticmethod def _serialize_payload(stream_name, from_token, upto_token): diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 28826302f5a9..508ad1b7209b 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,19 +14,23 @@ # limitations under the License. """A replication client for use by synapse workers. """ - +import heapq import logging -from typing import TYPE_CHECKING, Tuple +from typing import TYPE_CHECKING, Dict, List, Tuple +from twisted.internet.defer import Deferred from twisted.internet.protocol import ReconnectingClientFactory from synapse.api.constants import EventTypes +from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol from synapse.replication.tcp.streams.events import ( EventsStream, EventsStreamEventRow, EventsStreamRow, ) +from synapse.util.async_helpers import timeout_deferred +from synapse.util.metrics import Measure if TYPE_CHECKING: from synapse.server import HomeServer @@ -35,6 +39,10 @@ logger = logging.getLogger(__name__) +# How long we allow callers to wait for replication updates before timing out. +_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30 + + class DirectTcpReplicationClientFactory(ReconnectingClientFactory): """Factory for building connections to the master. Will reconnect if the connection is lost. @@ -92,6 +100,16 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() self.pusher_pool = hs.get_pusherpool() self.notifier = hs.get_notifier() + self._reactor = hs.get_reactor() + self._clock = hs.get_clock() + self._streams = hs.get_replication_streams() + self._instance_name = hs.get_instance_name() + + # Map from stream to list of deferreds waiting for the stream to + # arrive at a particular position. The lists are sorted by stream position. + self._streams_to_waiters = ( + {} + ) # type: Dict[str, List[Tuple[int, Deferred[None]]]] async def on_rdata( self, stream_name: str, instance_name: str, token: int, rows: list @@ -131,8 +149,76 @@ async def on_rdata( await self.pusher_pool.on_new_notifications(token, token) + # Notify any waiting deferreds. The list is ordered by position so we + # just iterate through the list until we reach a position that is + # greater than the received row position. + waiting_list = self._streams_to_waiters.get(stream_name, []) + + # Index of first item with a position after the current token, i.e we + # have called all deferreds before this index. If not overwritten by + # loop below means either a) no items in list so no-op or b) all items + # in list were called and so the list should be cleared. Setting it to + # `len(list)` works for both cases. + index_of_first_deferred_not_called = len(waiting_list) + + for idx, (position, deferred) in enumerate(waiting_list): + if position <= token: + try: + with PreserveLoggingContext(): + deferred.callback(None) + except Exception: + # The deferred has been cancelled or timed out. + pass + else: + # The list is sorted by position so we don't need to continue + # checking any futher entries in the list. + index_of_first_deferred_not_called = idx + break + + # Drop all entries in the waiting list that were called in the above + # loop. (This maintains the order so no need to resort) + waiting_list[:] = waiting_list[index_of_first_deferred_not_called:] + async def on_position(self, stream_name: str, instance_name: str, token: int): self.store.process_replication_rows(stream_name, instance_name, token, []) def on_remote_server_up(self, server: str): """Called when get a new REMOTE_SERVER_UP command.""" + + async def wait_for_stream_position( + self, instance_name: str, stream_name: str, position: int + ): + """Wait until this instance has received updates up to and including + the given stream position. + """ + + if instance_name == self._instance_name: + # We don't get told about updates written by this process, and + # anyway in that case we don't need to wait. + return + + current_position = self._streams[stream_name].current_token(self._instance_name) + if position <= current_position: + # We're already past the position + return + + # Create a new deferred that times out after N seconds, as we don't want + # to wedge here forever. + deferred = Deferred() + deferred = timeout_deferred( + deferred, _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS, self._reactor + ) + + waiting_list = self._streams_to_waiters.setdefault(stream_name, []) + + # We insert into the list using heapq as it is more efficient than + # pushing then resorting each time. + heapq.heappush(waiting_list, (position, deferred)) + + # We measure here to get in flight counts and average waiting time. + with Measure(self._clock, "repl.wait_for_stream_position"): + logger.info("Waiting for repl stream %r to reach %s", stream_name, position) + await make_deferred_yieldable(deferred) + logger.info( + "Finished waiting for repl stream %r to reach %s", stream_name, position + ) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 7d4000198866..0a13e1ed348c 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -59,6 +59,7 @@ def __init__(self, hs): self.event_creation_handler = hs.get_event_creation_handler() self.room_member_handler = hs.get_room_member_handler() self.auth = hs.get_auth() + self._replication = hs.get_replication_data_handler() async def on_POST(self, request, room_id): requester = await self.auth.get_user_by_req(request) @@ -73,7 +74,7 @@ async def on_POST(self, request, room_id): message = content.get("message", self.DEFAULT_MESSAGE) room_name = content.get("room_name", "Content Violation Notification") - info = await self._room_creation_handler.create_room( + info, stream_id = await self._room_creation_handler.create_room( room_creator_requester, config={ "preset": "public_chat", @@ -94,6 +95,13 @@ async def on_POST(self, request, room_id): # desirable in case the first attempt at blocking the room failed below. await self.store.block_room(room_id, requester_user_id) + # We now wait for the create room to come back in via replication so + # that we can assume that all the joins/invites have propogated before + # we try and auto join below. + # + # TODO: Currently the events stream is written to from master + await self._replication.wait_for_stream_position("master", "events", stream_id) + users = await self.state.get_current_users_in_room(room_id) kicked_users = [] failed_to_kick_users = [] diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 593ce011e888..e7f6928c859d 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -222,8 +222,14 @@ async def on_PUT(self, request, user_id): else: new_password = body["password"] logout_devices = True + + new_password_hash = await self.auth_handler.hash(new_password) + await self.set_password_handler.set_password( - target_user.to_string(), new_password, logout_devices, requester + target_user.to_string(), + new_password_hash, + logout_devices, + requester, ) if "deactivated" in body: @@ -243,11 +249,11 @@ async def on_PUT(self, request, user_id): else: # create user password = body.get("password") - if password is not None and ( - not isinstance(body["password"], text_type) - or len(body["password"]) > 512 - ): - raise SynapseError(400, "Invalid password") + password_hash = None + if password is not None: + if not isinstance(password, text_type) or len(password) > 512: + raise SynapseError(400, "Invalid password") + password_hash = await self.auth_handler.hash(password) admin = body.get("admin", None) user_type = body.get("user_type", None) @@ -259,7 +265,7 @@ async def on_PUT(self, request, user_id): user_id = await self.registration_handler.register_user( localpart=target_user.localpart, - password=password, + password_hash=password_hash, admin=bool(admin), default_display_name=displayname, user_type=user_type, @@ -298,7 +304,7 @@ class UserRegisterServlet(RestServlet): NONCE_TIMEOUT = 60 def __init__(self, hs): - self.handlers = hs.get_handlers() + self.auth_handler = hs.get_auth_handler() self.reactor = hs.get_reactor() self.nonces = {} self.hs = hs @@ -362,16 +368,16 @@ async def on_POST(self, request): 400, "password must be specified", errcode=Codes.BAD_JSON ) else: - if ( - not isinstance(body["password"], text_type) - or len(body["password"]) > 512 - ): + password = body["password"] + if not isinstance(password, text_type) or len(password) > 512: raise SynapseError(400, "Invalid password") - password = body["password"].encode("utf-8") - if b"\x00" in password: + password_bytes = password.encode("utf-8") + if b"\x00" in password_bytes: raise SynapseError(400, "Invalid password") + password_hash = await self.auth_handler.hash(password) + admin = body.get("admin", None) user_type = body.get("user_type", None) @@ -388,7 +394,7 @@ async def on_POST(self, request): want_mac_builder.update(b"\x00") want_mac_builder.update(username) want_mac_builder.update(b"\x00") - want_mac_builder.update(password) + want_mac_builder.update(password_bytes) want_mac_builder.update(b"\x00") want_mac_builder.update(b"admin" if admin else b"notadmin") if user_type: @@ -407,7 +413,7 @@ async def on_POST(self, request): user_id = await register.registration_handler.register_user( localpart=body["username"].lower(), - password=body["password"], + password_hash=password_hash, admin=bool(admin), user_type=user_type, ) @@ -523,6 +529,7 @@ def __init__(self, hs): self.store = hs.get_datastore() self.hs = hs self.auth = hs.get_auth() + self.auth_handler = hs.get_auth_handler() self._set_password_handler = hs.get_set_password_handler() async def on_POST(self, request, target_user_id): @@ -539,8 +546,10 @@ async def on_POST(self, request, target_user_id): new_password = params["new_password"] logout_devices = params.get("logout_devices", True) + new_password_hash = await self.auth_handler.hash(new_password) + await self._set_password_handler.set_password( - target_user_id, new_password, logout_devices, requester + target_user_id, new_password_hash, logout_devices, requester ) return 200, {} diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 6b5830cc3f53..105e0cf4d28a 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -93,7 +93,7 @@ def on_PUT(self, request, txn_id): async def on_POST(self, request): requester = await self.auth.get_user_by_req(request) - info = await self._room_creation_handler.create_room( + info, _ = await self._room_creation_handler.create_room( requester, self.get_room_config(request) ) @@ -202,7 +202,7 @@ async def on_PUT(self, request, room_id, event_type, state_key, txn_id=None): if event_type == EventTypes.Member: membership = content.get("membership", None) - event = await self.room_member_handler.update_membership( + event_id, _ = await self.room_member_handler.update_membership( requester, target=UserID.from_string(state_key), room_id=room_id, @@ -210,14 +210,18 @@ async def on_PUT(self, request, room_id, event_type, state_key, txn_id=None): content=content, ) else: - event = await self.event_creation_handler.create_and_send_nonmember_event( + ( + event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( requester, event_dict, txn_id=txn_id ) + event_id = event.event_id ret = {} # type: dict - if event: - set_tag("event_id", event.event_id) - ret = {"event_id": event.event_id} + if event_id: + set_tag("event_id", event_id) + ret = {"event_id": event_id} return 200, ret @@ -247,7 +251,7 @@ async def on_POST(self, request, room_id, event_type, txn_id=None): if b"ts" in request.args and requester.app_service: event_dict["origin_server_ts"] = parse_integer(request, "ts", 0) - event = await self.event_creation_handler.create_and_send_nonmember_event( + event, _ = await self.event_creation_handler.create_and_send_nonmember_event( requester, event_dict, txn_id=txn_id ) @@ -781,7 +785,7 @@ async def on_POST(self, request, room_id, event_id, txn_id=None): requester = await self.auth.get_user_by_req(request) content = parse_json_object_from_request(request) - event = await self.event_creation_handler.create_and_send_nonmember_event( + event, _ = await self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Redaction, diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 1bd023477902..d4f721b6b989 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -220,12 +220,27 @@ def __init__(self, hs): self.auth = hs.get_auth() self.auth_handler = hs.get_auth_handler() self.datastore = self.hs.get_datastore() + self.password_policy_handler = hs.get_password_policy_handler() self._set_password_handler = hs.get_set_password_handler() @interactive_auth_handler async def on_POST(self, request): body = parse_json_object_from_request(request) + # we do basic sanity checks here because the auth layer will store these + # in sessions. Pull out the new password provided to us. + if "new_password" in body: + new_password = body.pop("new_password") + if not isinstance(new_password, str) or len(new_password) > 512: + raise SynapseError(400, "Invalid password") + self.password_policy_handler.validate_password(new_password) + + # If the password is valid, hash it and store it back on the body. + # This ensures that only the hashed password is handled everywhere. + if "new_password_hash" in body: + raise SynapseError(400, "Unexpected property: new_password_hash") + body["new_password_hash"] = await self.auth_handler.hash(new_password) + # there are two possibilities here. Either the user does not have an # access token, and needs to do a password reset; or they have one and # need to validate their identity. @@ -276,12 +291,12 @@ async def on_POST(self, request): logger.error("Auth succeeded but no known type! %r", result.keys()) raise SynapseError(500, "", Codes.UNKNOWN) - assert_params_in_dict(params, ["new_password"]) - new_password = params["new_password"] + assert_params_in_dict(params, ["new_password_hash"]) + new_password_hash = params["new_password_hash"] logout_devices = params.get("logout_devices", True) await self._set_password_handler.set_password( - user_id, new_password, logout_devices, requester + user_id, new_password_hash, logout_devices, requester ) return 200, {} diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index 7bca1326d54f..75590ebaeb71 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -177,7 +177,10 @@ async def on_GET(self, request, stagetype): ) elif self._saml_enabled: - client_redirect_url = b"" + # Some SAML identity providers (e.g. Google) require a + # RelayState parameter on requests. It is not necessary here, so + # pass in a dummy redirect URL (which will never get used). + client_redirect_url = b"unused" sso_redirect_url = self._saml_handler.handle_redirect_request( client_redirect_url, session ) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index af08cc6cce82..addd4cae1906 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -426,12 +426,16 @@ async def on_POST(self, request): # we do basic sanity checks here because the auth layer will store these # in sessions. Pull out the username/password provided to us. if "password" in body: - if ( - not isinstance(body["password"], string_types) - or len(body["password"]) > 512 - ): + password = body.pop("password") + if not isinstance(password, string_types) or len(password) > 512: raise SynapseError(400, "Invalid password") - self.password_policy_handler.validate_password(body["password"]) + self.password_policy_handler.validate_password(password) + + # If the password is valid, hash it and store it back on the body. + # This ensures that only the hashed password is handled everywhere. + if "password_hash" in body: + raise SynapseError(400, "Unexpected property: password_hash") + body["password_hash"] = await self.auth_handler.hash(password) desired_username = None if "username" in body: @@ -484,7 +488,7 @@ async def on_POST(self, request): guest_access_token = body.get("guest_access_token", None) - if "initial_device_display_name" in body and "password" not in body: + if "initial_device_display_name" in body and "password_hash" not in body: # ignore 'initial_device_display_name' if sent without # a password to work around a client bug where it sent # the 'initial_device_display_name' param alone, wiping out @@ -546,11 +550,11 @@ async def on_POST(self, request): registered = False else: # NB: This may be from the auth handler and NOT from the POST - assert_params_in_dict(params, ["password"]) + assert_params_in_dict(params, ["password_hash"]) desired_username = params.get("username", None) guest_access_token = params.get("guest_access_token", None) - new_password = params.get("password", None) + new_password_hash = params.get("password_hash", None) if desired_username is not None: desired_username = desired_username.lower() @@ -583,7 +587,7 @@ async def on_POST(self, request): registered_user_id = await self.registration_handler.register_user( localpart=desired_username, - password=new_password, + password_hash=new_password_hash, guest_access_token=guest_access_token, threepid=threepid, address=client_addr, diff --git a/synapse/rest/client/v2_alpha/relations.py b/synapse/rest/client/v2_alpha/relations.py index 63f07b63da0e..89002ffbffdd 100644 --- a/synapse/rest/client/v2_alpha/relations.py +++ b/synapse/rest/client/v2_alpha/relations.py @@ -111,7 +111,7 @@ async def on_PUT_or_POST( "sender": requester.user.to_string(), } - event = await self.event_creation_handler.create_and_send_nonmember_event( + event, _ = await self.event_creation_handler.create_and_send_nonmember_event( requester, event_dict=event_dict, txn_id=txn_id ) diff --git a/synapse/server.py b/synapse/server.py index c530f1aa1ad9..ca2deb49bbe4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -90,6 +90,7 @@ from synapse.replication.tcp.client import ReplicationDataHandler from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.resource import ReplicationStreamer +from synapse.replication.tcp.streams import STREAMS_MAP from synapse.rest.media.v1.media_repository import ( MediaRepository, MediaRepositoryResource, @@ -210,6 +211,7 @@ def build_DEPENDENCY(self) "storage", "replication_streamer", "replication_data_handler", + "replication_streams", ] REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"] @@ -583,6 +585,9 @@ def build_replication_streamer(self) -> ReplicationStreamer: def build_replication_data_handler(self): return ReplicationDataHandler(self) + def build_replication_streams(self): + return {stream.NAME: stream(self) for stream in STREAMS_MAP.values()} + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/server.pyi b/synapse/server.pyi index 9e7fad7e6e53..fe8024d2d4e6 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -1,3 +1,5 @@ +from typing import Dict + import twisted.internet import synapse.api.auth @@ -28,6 +30,7 @@ import synapse.server_notices.server_notices_sender import synapse.state import synapse.storage from synapse.events.builder import EventBuilderFactory +from synapse.replication.tcp.streams import Stream class HomeServer(object): @property @@ -136,3 +139,5 @@ class HomeServer(object): pass def get_pusherpool(self) -> synapse.push.pusherpool.PusherPool: pass + def get_replication_streams(self) -> Dict[str, Stream]: + pass diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py index 999c621b9277..bf2454c01cd8 100644 --- a/synapse/server_notices/server_notices_manager.py +++ b/synapse/server_notices/server_notices_manager.py @@ -83,10 +83,10 @@ async def send_notice( if state_key is not None: event_dict["state_key"] = state_key - res = await self._event_creation_handler.create_and_send_nonmember_event( + event, _ = await self._event_creation_handler.create_and_send_nonmember_event( requester, event_dict, ratelimit=False ) - return res + return event @cached() async def get_or_create_notice_room_for_user(self, user_id): @@ -143,7 +143,7 @@ async def get_or_create_notice_room_for_user(self, user_id): } requester = create_requester(self.server_notices_mxid) - info = await self._room_creation_handler.create_room( + info, _ = await self._room_creation_handler.create_room( requester, config={ "preset": RoomCreationPreset.PRIVATE_CHAT, diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 0466c2327819..417ac8dc7c16 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List, Tuple +from typing import List, Optional, Set, Tuple from six import iteritems @@ -649,21 +649,31 @@ def get_device_list_last_stream_id_for_remotes(self, user_ids): return results @defer.inlineCallbacks - def get_user_ids_requiring_device_list_resync(self, user_ids: Collection[str]): + def get_user_ids_requiring_device_list_resync( + self, user_ids: Optional[Collection[str]] = None, + ) -> Set[str]: """Given a list of remote users return the list of users that we - should resync the device lists for. + should resync the device lists for. If None is given instead of a list, + return every user that we should resync the device lists for. Returns: - Deferred[Set[str]] + The IDs of users whose device lists need resync. """ - - rows = yield self.db.simple_select_many_batch( - table="device_lists_remote_resync", - column="user_id", - iterable=user_ids, - retcols=("user_id",), - desc="get_user_ids_requiring_device_list_resync", - ) + if user_ids: + rows = yield self.db.simple_select_many_batch( + table="device_lists_remote_resync", + column="user_id", + iterable=user_ids, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync_with_iterable", + ) + else: + rows = yield self.db.simple_select_list( + table="device_lists_remote_resync", + keyvalues=None, + retcols=("user_id",), + desc="get_user_ids_requiring_device_list_resync", + ) return {row["user_id"] for row in rows} diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 3c64cf15b776..a6572571b462 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1591,7 +1591,7 @@ def _update_backward_extremeties(self, txn, events): ], ) - async def locally_reject_invite(self, user_id: str, room_id: str): + async def locally_reject_invite(self, user_id: str, room_id: str) -> int: """Mark the invite has having been rejected even though we failed to create a leave event for it. """ @@ -1616,3 +1616,5 @@ def f(txn, stream_ordering): with self._stream_id_gen.get_next() as stream_ordering: await self.db.runInteraction("locally_reject_invite", f, stream_ordering) + + return stream_ordering diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 6775689c61b0..213d69100a1d 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -1289,12 +1289,12 @@ def get_all_new_events_txn(txn): async def is_event_after(self, event_id1, event_id2): """Returns True if event_id1 is after event_id2 in the stream """ - to_1, so_1 = await self._get_event_ordering(event_id1) - to_2, so_2 = await self._get_event_ordering(event_id2) + to_1, so_1 = await self.get_event_ordering(event_id1) + to_2, so_2 = await self.get_event_ordering(event_id2) return (to_1, so_1) > (to_2, so_2) @cachedInlineCallbacks(max_entries=5000) - def _get_event_ordering(self, event_id): + def get_event_ordering(self, event_id): res = yield self.db.simple_select_one( table="events", retcols=["topological_ordering", "stream_ordering"], diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index b0fdf82b6ae4..f159400a8757 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -740,8 +740,8 @@ async def _is_server_still_joined( # whose state has changed as we've already their new state above. users_to_ignore = [ state_key - for _, state_key in itertools.chain(delta.to_insert, delta.to_delete) - if self.is_mine_id(state_key) + for typ, state_key in itertools.chain(delta.to_insert, delta.to_delete) + if typ == EventTypes.Member and self.is_mine_id(state_key) ] if await self.main_store.is_local_host_in_room_ignoring_users( @@ -787,8 +787,8 @@ async def _handle_potentially_left_users(self, user_ids: Set[str]): for user_id in left_users: await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id) - async def locally_reject_invite(self, user_id: str, room_id: str): + async def locally_reject_invite(self, user_id: str, room_id: str) -> int: """Mark the invite has having been rejected even though we failed to create a leave event for it. """ - await self.persist_events_store.locally_reject_invite(user_id, room_id) + return await self.persist_events_store.locally_reject_invite(user_id, room_id) diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index fdff1957716f..2605f3c65b85 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -186,10 +186,15 @@ def check_yield_points_inner(*args, **kwargs): ) raise Exception(err) + # the wrapped function yielded a Deferred: yield it back up to the parent + # inlineCallbacks(). try: result = yield d - except Exception as e: - result = Failure(e) + except Exception: + # this will fish an earlier Failure out of the stack where possible, and + # thus is preferable to passing in an exeception to the Failure + # constructor, since it results in less stack-mangling. + result = Failure() if current_context() != expected_context: diff --git a/synctl b/synctl index bbccd0529088..81e9bf6b5c39 100755 --- a/synctl +++ b/synctl @@ -142,12 +142,23 @@ def start_worker(app: str, configfile: str, worker_configfile: str) -> bool: return False -def stop(pidfile, app): +def stop(pidfile: str, app: str) -> bool: + """Attempts to kill a synapse worker from the pidfile. + Args: + pidfile: path to file containing worker's pid + app: name of the worker's appservice + + Returns: + True if the process stopped successfully + False if process was already stopped or an error occured + """ + if os.path.exists(pidfile): pid = int(open(pidfile).read()) try: os.kill(pid, signal.SIGTERM) write("stopped %s" % (app,), colour=GREEN) + return True except OSError as err: if err.errno == errno.ESRCH: write("%s not running" % (app,), colour=YELLOW) @@ -155,6 +166,14 @@ def stop(pidfile, app): abort("Cannot stop %s: Operation not permitted" % (app,)) else: abort("Cannot stop %s: Unknown error" % (app,)) + return False + else: + write( + "No running worker of %s found (from %s)\nThe process might be managed by another controller (e.g. systemd)" + % (app, pidfile), + colour=YELLOW, + ) + return False Worker = collections.namedtuple( @@ -300,11 +319,17 @@ def main(): action = options.action if action == "stop" or action == "restart": + has_stopped = True for worker in workers: - stop(worker.pidfile, worker.app) + if not stop(worker.pidfile, worker.app): + # A worker could not be stopped. + has_stopped = False if start_stop_synapse: - stop(pidfile, "synapse.app.homeserver") + if not stop(pidfile, "synapse.app.homeserver"): + has_stopped = False + if not has_stopped: + sys.exit(1) # Wait for synapse to actually shutdown before starting it again if action == "restart": diff --git a/tests/federation/test_complexity.py b/tests/federation/test_complexity.py index 94980733c4bd..0c9987be54e3 100644 --- a/tests/federation/test_complexity.py +++ b/tests/federation/test_complexity.py @@ -79,7 +79,9 @@ def test_join_too_large(self): # Mock out some things, because we don't want to test the whole join fed_transport.client.get_json = Mock(return_value=defer.succeed({"v1": 9999})) - handler.federation_handler.do_invite_join = Mock(return_value=defer.succeed(1)) + handler.federation_handler.do_invite_join = Mock( + return_value=defer.succeed(("", 1)) + ) d = handler._remote_join( None, @@ -115,7 +117,9 @@ def test_join_too_large_once_joined(self): # Mock out some things, because we don't want to test the whole join fed_transport.client.get_json = Mock(return_value=defer.succeed(None)) - handler.federation_handler.do_invite_join = Mock(return_value=defer.succeed(1)) + handler.federation_handler.do_invite_join = Mock( + return_value=defer.succeed(("", 1)) + ) # Artificially raise the complexity self.hs.get_datastore().get_current_state_event_counts = lambda x: defer.succeed( diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 51e2b37218ab..2fa8d4739b36 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -86,7 +86,10 @@ def make_homeserver(self, reactor, clock): reactor.pump((1000,)) hs = self.setup_test_homeserver( - notifier=Mock(), http_client=mock_federation_client, keyring=mock_keyring + notifier=Mock(), + http_client=mock_federation_client, + keyring=mock_keyring, + replication_streams={}, ) hs.datastores = datastores diff --git a/tests/rest/client/v2_alpha/test_account.py b/tests/rest/client/v2_alpha/test_account.py index 0d6936fd36e2..3ab611f6184d 100644 --- a/tests/rest/client/v2_alpha/test_account.py +++ b/tests/rest/client/v2_alpha/test_account.py @@ -46,7 +46,7 @@ def make_homeserver(self, reactor, clock): # Email config. self.email_attempts = [] - def sendmail(smtphost, from_addr, to_addrs, msg, **kwargs): + async def sendmail(smtphost, from_addr, to_addrs, msg, **kwargs): self.email_attempts.append(msg) return @@ -358,7 +358,7 @@ def make_homeserver(self, reactor, clock): # Email config. self.email_attempts = [] - def sendmail(smtphost, from_addr, to_addrs, msg, **kwargs): + async def sendmail(smtphost, from_addr, to_addrs, msg, **kwargs): self.email_attempts.append(msg) config["email"] = { diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py index 0e04b2cf9274..43425c969a0d 100644 --- a/tests/storage/test_cleanup_extrems.py +++ b/tests/storage/test_cleanup_extrems.py @@ -39,7 +39,7 @@ def prepare(self, reactor, clock, homeserver): # Create a test user and room self.user = UserID("alice", "test") self.requester = Requester(self.user, None, False, None, None) - info = self.get_success(self.room_creator.create_room(self.requester, {})) + info, _ = self.get_success(self.room_creator.create_room(self.requester, {})) self.room_id = info["room_id"] def run_background_update(self): @@ -261,7 +261,7 @@ def prepare(self, reactor, clock, homeserver): self.user = UserID.from_string(self.register_user("user1", "password")) self.token1 = self.login("user1", "password") self.requester = Requester(self.user, None, False, None, None) - info = self.get_success(self.room_creator.create_room(self.requester, {})) + info, _ = self.get_success(self.room_creator.create_room(self.requester, {})) self.room_id = info["room_id"] self.event_creator = homeserver.get_event_creation_handler() homeserver.config.user_consent_version = self.CONSENT_VERSION diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py index a7b7fd36d3ca..a7b85004e5d3 100644 --- a/tests/storage/test_event_metrics.py +++ b/tests/storage/test_event_metrics.py @@ -33,7 +33,7 @@ def test_exposed_to_prometheus(self): events = [(3, 2), (6, 2), (4, 6)] for event_count, extrems in events: - info = self.get_success(room_creator.create_room(requester, {})) + info, _ = self.get_success(room_creator.create_room(requester, {})) room_id = info["room_id"] last_event = None diff --git a/tests/test_federation.py b/tests/test_federation.py index f297de95f1b7..c5099dd039a2 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -6,12 +6,13 @@ from synapse.logging.context import LoggingContext from synapse.types import Requester, UserID from synapse.util import Clock +from synapse.util.retryutils import NotRetryingDestination from tests import unittest from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver -class MessageAcceptTests(unittest.TestCase): +class MessageAcceptTests(unittest.HomeserverTestCase): def setUp(self): self.http_client = Mock() @@ -27,13 +28,13 @@ def setUp(self): user_id = UserID("us", "test") our_user = Requester(user_id, None, False, None, None) room_creator = self.homeserver.get_room_creation_handler() - room = ensureDeferred( + room_deferred = ensureDeferred( room_creator.create_room( our_user, room_creator.PRESETS_DICT["public_chat"], ratelimit=False ) ) self.reactor.advance(0.1) - self.room_id = self.successResultOf(room)["room_id"] + self.room_id = self.successResultOf(room_deferred)[0]["room_id"] self.store = self.homeserver.get_datastore() @@ -145,3 +146,63 @@ def post_json(destination, path, data, headers=None, timeout=0): # Make sure the invalid event isn't there extrem = maybeDeferred(self.store.get_latest_event_ids_in_room, self.room_id) self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv") + + def test_retry_device_list_resync(self): + """Tests that device lists are marked as stale if they couldn't be synced, and + that stale device lists are retried periodically. + """ + remote_user_id = "@john:test_remote" + remote_origin = "test_remote" + + # Track the number of attempts to resync the user's device list. + self.resync_attempts = 0 + + # When this function is called, increment the number of resync attempts (only if + # we're querying devices for the right user ID), then raise a + # NotRetryingDestination error to fail the resync gracefully. + def query_user_devices(destination, user_id): + if user_id == remote_user_id: + self.resync_attempts += 1 + + raise NotRetryingDestination(0, 0, destination) + + # Register the mock on the federation client. + federation_client = self.homeserver.get_federation_client() + federation_client.query_user_devices = Mock(side_effect=query_user_devices) + + # Register a mock on the store so that the incoming update doesn't fail because + # we don't share a room with the user. + store = self.homeserver.get_datastore() + store.get_rooms_for_user = Mock(return_value=["!someroom:test"]) + + # Manually inject a fake device list update. We need this update to include at + # least one prev_id so that the user's device list will need to be retried. + device_list_updater = self.homeserver.get_device_handler().device_list_updater + self.get_success( + device_list_updater.incoming_device_list_update( + origin=remote_origin, + edu_content={ + "deleted": False, + "device_display_name": "Mobile", + "device_id": "QBUAZIFURK", + "prev_id": [5], + "stream_id": 6, + "user_id": remote_user_id, + }, + ) + ) + + # Check that there was one resync attempt. + self.assertEqual(self.resync_attempts, 1) + + # Check that the resync attempt failed and caused the user's device list to be + # marked as stale. + need_resync = self.get_success( + store.get_user_ids_requiring_device_list_resync() + ) + self.assertIn(remote_user_id, need_resync) + + # Check that waiting for 30 seconds caused Synapse to retry resyncing the device + # list. + self.reactor.advance(30) + self.assertEqual(self.resync_attempts, 2) diff --git a/tox.ini b/tox.ini index 3bb4d45e2a97..9fefcb72b510 100644 --- a/tox.ini +++ b/tox.ini @@ -193,6 +193,7 @@ commands = mypy \ synapse/handlers/saml_handler.py \ synapse/handlers/sync.py \ synapse/handlers/ui_auth \ + synapse/http/site.py \ synapse/logging/ \ synapse/metrics \ synapse/module_api \