Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix wait_for_stream_position for multiple waiters. #8196

Merged
merged 4 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8196.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `wait_for_stream_position` to allow multiple waiters on same stream ID.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a bugfix as this can't currently happen, it only starts being a problem with sharded event persister.

I also plan to squash all these changelogs together anyway.

4 changes: 3 additions & 1 deletion synapse/python_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
"msgpack>=0.5.2",
"phonenumbers>=8.2.0",
"prometheus_client>=0.0.18,<0.9.0",
# we use attr.validators.deep_iterable, which arrived in 19.1.0
# we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
# Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
# is out in November.)
"attrs>=19.1.0",
"netaddr>=0.7.18",
"Jinja2>=2.9",
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.
"""A replication client for use by synapse workers.
"""
import heapq
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple

Expand Down Expand Up @@ -221,7 +220,8 @@ async def wait_for_stream_position(

# We insert into the list using heapq as it is more efficient than
# pushing then resorting each time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this comment should be updated now? 🤨

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woops

heapq.heappush(waiting_list, (position, deferred))
waiting_list.append((position, deferred))
waiting_list.sort(key=lambda t: t[0])

# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
Expand Down