-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Limit the number of events that can be created on a given room concurrently #1620
Conversation
max_count(int): The maximum number of concurrent access | ||
""" | ||
self.max_count = max_count | ||
self.key_to_defer = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing
# the second element is a list of deferreds for the things blocked from executing.
def queue(self, key): | ||
entry = self.key_to_defer.setdefault(key, [0, []]) | ||
|
||
if entry[0] >= self.max_count: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When one of the things currently executing finishes it will callback
# this item so that it can continue executing.
def _ctx_manager(): | ||
try: | ||
yield | ||
finally: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# We've finished executing so check if there are any things blocked waiting to execute and start one of them
try: | ||
entry[1].pop(0).callback(None) | ||
except IndexError: | ||
if entry[0] == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# If nothing else is executing for this key then remove it from the map
if builder.type == EventTypes.Member: | ||
membership = builder.content.get("membership", None) | ||
target = UserID.from_string(builder.state_key) | ||
with (yield self.limiter.queue(builder.room_id)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Restrict the number of events that can happen concurrently in a room.
# Otherwise synapse may try to add many event simultaneously, forking the room state and making state resolution more expensive
# See (link to github issue) for more information.
@@ -50,6 +50,8 @@ def __init__(self, hs): | |||
|
|||
self.pagination_lock = ReadWriteLock() | |||
|
|||
self.limiter = Limiter(max_count=5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on the choice of 5 here.
It'd be totes awesome to have some metrics on the total number of blocked and executing things for each limiter. |
LGTM apart from the missing comments |
No description provided.