Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix wait_for_stream_position for multiple waiters. (#8196)
Browse files Browse the repository at this point in the history
This fixes a bug where having multiple callers waiting on the same
stream and position will cause it to try and compare two deferreds,
which fails (due to the sorted list having an entry of `Tuple[int,
Deferred]`).
  • Loading branch information
erikjohnston authored Aug 28, 2020
1 parent d58fda9 commit 3b4556c
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
1 change: 1 addition & 0 deletions changelog.d/8196.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `wait_for_stream_position` to allow multiple waiters on same stream ID.
4 changes: 3 additions & 1 deletion synapse/python_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
"msgpack>=0.5.2",
"phonenumbers>=8.2.0",
"prometheus_client>=0.0.18,<0.9.0",
# we use attr.validators.deep_iterable, which arrived in 19.1.0
# we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
# Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
# is out in November.)
"attrs>=19.1.0",
"netaddr>=0.7.18",
"Jinja2>=2.9",
Expand Down
6 changes: 2 additions & 4 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.
"""A replication client for use by synapse workers.
"""
import heapq
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple

Expand Down Expand Up @@ -219,9 +218,8 @@ async def wait_for_stream_position(

waiting_list = self._streams_to_waiters.setdefault(stream_name, [])

# We insert into the list using heapq as it is more efficient than
# pushing then resorting each time.
heapq.heappush(waiting_list, (position, deferred))
waiting_list.append((position, deferred))
waiting_list.sort(key=lambda t: t[0])

# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
Expand Down

0 comments on commit 3b4556c

Please sign in to comment.