Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add experimental support for sharding event persister. (#8170)
Browse files Browse the repository at this point in the history
This is *not* ready for production yet. Caveats:

1. We should write some tests...
2. The stream token that we use for events can get stalled at the minimum position of all writers. This means that new events may not be processed and e.g. sent down sync streams if a writer isn't writing or is slow.
  • Loading branch information
erikjohnston authored Sep 2, 2020
1 parent b257c78 commit 82c1ee1
Show file tree
Hide file tree
Showing 18 changed files with 206 additions and 77 deletions.
1 change: 1 addition & 0 deletions changelog.d/8170.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for sharding event persister.
21 changes: 18 additions & 3 deletions synapse/config/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,11 +832,26 @@ class ShardedWorkerHandlingConfig:
def should_handle(self, instance_name: str, key: str) -> bool:
"""Whether this instance is responsible for handling the given key.
"""

# If multiple instances are not defined we always return true.
# If multiple instances are not defined we always return true
if not self.instances or len(self.instances) == 1:
return True

return self.get_instance(key) == instance_name

def get_instance(self, key: str) -> str:
"""Get the instance responsible for handling the given key.
Note: For things like federation sending the config for which instance
is sending is known only to the sender instance if there is only one.
Therefore `should_handle` should be used where possible.
"""

if not self.instances:
return "master"

if len(self.instances) == 1:
return self.instances[0]

# We shard by taking the hash, modulo it by the number of instances and
# then checking whether this instance matches the instance at that
# index.
Expand All @@ -846,7 +861,7 @@ def should_handle(self, instance_name: str, key: str) -> bool:
dest_hash = sha256(key.encode("utf8")).digest()
dest_int = int.from_bytes(dest_hash, byteorder="little")
remainder = dest_int % (len(self.instances))
return self.instances[remainder] == instance_name
return self.instances[remainder]


__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
1 change: 1 addition & 0 deletions synapse/config/_base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,4 @@ class ShardedWorkerHandlingConfig:
instances: List[str]
def __init__(self, instances: List[str]) -> None: ...
def should_handle(self, instance_name: str, key: str) -> bool: ...
def get_instance(self, key: str) -> str: ...
37 changes: 27 additions & 10 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Union

import attr

from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
from .server import ListenerConfig, parse_listener_def


def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
"""Helper for allowing parsing a string or list of strings to a config
option expecting a list of strings.
"""

if isinstance(obj, str):
return [obj]
return obj


@attr.s
class InstanceLocationConfig:
"""The host and port to talk to an instance via HTTP replication.
Expand All @@ -33,11 +45,13 @@ class WriterLocations:
"""Specifies the instances that write various streams.
Attributes:
events: The instance that writes to the event and backfill streams.
events: The instance that writes to the typing stream.
events: The instances that write to the event and backfill streams.
typing: The instance that writes to the typing stream.
"""

events = attr.ib(default="master", type=str)
events = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter
)
typing = attr.ib(default="master", type=str)


Expand Down Expand Up @@ -105,15 +119,18 @@ def read_config(self, config, **kwargs):
writers = config.get("stream_writers") or {}
self.writers = WriterLocations(**writers)

# Check that the configured writer for events and typing also appears in
# Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing"):
instance = getattr(self.writers, stream)
if instance != "master" and instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured to write %s but does not appear in `instance_map` config."
% (instance, stream)
)
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured to write %s but does not appear in `instance_map` config."
% (instance, stream)
)

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)

def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
Expand Down
44 changes: 30 additions & 14 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,8 @@ async def backfill(self, dest, room_id, limit, extremities):
)
)

await self._handle_new_events(dest, ev_infos, backfilled=True)
if ev_infos:
await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)

# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
Expand Down Expand Up @@ -1216,7 +1217,7 @@ async def get_event(event_id: str):
event_infos.append(_NewEventInfo(event, None, auth))

await self._handle_new_events(
destination, event_infos,
destination, room_id, event_infos,
)

def _sanity_check_event(self, ev):
Expand Down Expand Up @@ -1363,15 +1364,15 @@ async def do_invite_join(
)

max_stream_id = await self._persist_auth_tree(
origin, auth_chain, state, event, room_version_obj
origin, room_id, auth_chain, state, event, room_version_obj
)

# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
self.config.worker.writers.events, "events", max_stream_id
self.config.worker.events_shard_config.get_instance(room_id),
"events",
max_stream_id,
)

# Check whether this room is the result of an upgrade of a room we already know
Expand Down Expand Up @@ -1625,7 +1626,7 @@ async def on_invite_request(
)

context = await self.state_handler.compute_event_context(event)
await self.persist_events_and_notify([(event, context)])
await self.persist_events_and_notify(event.room_id, [(event, context)])

return event

Expand All @@ -1652,7 +1653,9 @@ async def do_remotely_reject_invite(
await self.federation_client.send_leave(host_list, event)

context = await self.state_handler.compute_event_context(event)
stream_id = await self.persist_events_and_notify([(event, context)])
stream_id = await self.persist_events_and_notify(
event.room_id, [(event, context)]
)

return event, stream_id

Expand Down Expand Up @@ -1900,7 +1903,7 @@ async def _handle_new_event(
)

await self.persist_events_and_notify(
[(event, context)], backfilled=backfilled
event.room_id, [(event, context)], backfilled=backfilled
)
except Exception:
run_in_background(
Expand All @@ -1913,6 +1916,7 @@ async def _handle_new_event(
async def _handle_new_events(
self,
origin: str,
room_id: str,
event_infos: Iterable[_NewEventInfo],
backfilled: bool = False,
) -> None:
Expand Down Expand Up @@ -1944,6 +1948,7 @@ async def prep(ev_info: _NewEventInfo):
)

await self.persist_events_and_notify(
room_id,
[
(ev_info.event, context)
for ev_info, context in zip(event_infos, contexts)
Expand All @@ -1954,6 +1959,7 @@ async def prep(ev_info: _NewEventInfo):
async def _persist_auth_tree(
self,
origin: str,
room_id: str,
auth_events: List[EventBase],
state: List[EventBase],
event: EventBase,
Expand All @@ -1968,6 +1974,7 @@ async def _persist_auth_tree(
Args:
origin: Where the events came from
room_id,
auth_events
state
event
Expand Down Expand Up @@ -2042,17 +2049,20 @@ async def _persist_auth_tree(
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR

await self.persist_events_and_notify(
room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
]
],
)

new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
)

return await self.persist_events_and_notify([(event, new_event_context)])
return await self.persist_events_and_notify(
room_id, [(event, new_event_context)]
)

async def _prep_event(
self,
Expand Down Expand Up @@ -2903,21 +2913,27 @@ async def _check_key_revocation(self, public_key, url):

async def persist_events_and_notify(
self,
room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.
Args:
event_and_contexts:
room_id: The room ID of events being persisted.
event_and_contexts: Sequence of events with their associated
context that should be persisted. All events must belong to
the same room.
backfilled: Whether these events are a result of
backfilling or not
"""
if self.config.worker.writers.events != self._instance_name:
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
result = await self._send_events(
instance_name=self.config.worker.writers.events,
instance_name=instance,
store=self.store,
room_id=room_id,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
Expand Down
14 changes: 8 additions & 6 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,8 @@ def __init__(self, hs: "HomeServer"):
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
self._is_event_writer = (
self.config.worker.writers.events == hs.get_instance_name()
)
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()

self.room_invite_state_types = self.hs.config.room_invite_state_types

Expand Down Expand Up @@ -904,9 +903,10 @@ async def handle_new_client_event(

try:
# If we're a worker we need to hit out to the master.
if not self._is_event_writer:
writer_instance = self._events_shard_config.get_instance(event.room_id)
if writer_instance != self._instance_name:
result = await self.send_event(
instance_name=self.config.worker.writers.events,
instance_name=writer_instance,
event_id=event.event_id,
store=self.store,
requester=requester,
Expand Down Expand Up @@ -974,7 +974,9 @@ async def persist_and_notify_client_event(
This should only be run on the instance in charge of persisting events.
"""
assert self._is_event_writer
assert self._events_shard_config.should_handle(
self._instance_name, event.room_id
)

if ratelimit:
# We check if this is a room admin redacting an event so that we
Expand Down
14 changes: 9 additions & 5 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,9 @@ async def create_room(

# Always wait for room creation to progate before returning
await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", last_stream_id
self.hs.config.worker.events_shard_config.get_instance(room_id),
"events",
last_stream_id,
)

return result, last_stream_id
Expand Down Expand Up @@ -1260,10 +1262,10 @@ async def shutdown_room(
# We now wait for the create room to come back in via replication so
# that we can assume that all the joins/invites have propogated before
# we try and auto join below.
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", stream_id
self.hs.config.worker.events_shard_config.get_instance(new_room_id),
"events",
stream_id,
)
else:
new_room_id = None
Expand Down Expand Up @@ -1293,7 +1295,9 @@ async def shutdown_room(

# Wait for leave to come in over replication before trying to forget.
await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", stream_id
self.hs.config.worker.events_shard_config.get_instance(room_id),
"events",
stream_id,
)

await self.room_member_handler.forget(target_requester.user, room_id)
Expand Down
7 changes: 0 additions & 7 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,6 @@ def __init__(self, hs: "HomeServer"):
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles

self._event_stream_writer_instance = hs.config.worker.writers.events
self._is_on_event_persistence_instance = (
self._event_stream_writer_instance == hs.get_instance_name()
)
if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence

self._join_rate_limiter_local = Ratelimiter(
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
Expand Down
12 changes: 9 additions & 3 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ def __init__(self, hs):
self.federation_handler = hs.get_handlers().federation_handler

@staticmethod
async def _serialize_payload(store, event_and_contexts, backfilled):
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
"""
Args:
store
room_id (str)
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
Expand All @@ -88,14 +89,19 @@ async def _serialize_payload(store, event_and_contexts, backfilled):
}
)

payload = {"events": event_payloads, "backfilled": backfilled}
payload = {
"events": event_payloads,
"backfilled": backfilled,
"room_id": room_id,
}

return payload

async def _handle_request(self, request):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)

room_id = content["room_id"]
backfilled = content["backfilled"]

event_payloads = content["events"]
Expand All @@ -120,7 +126,7 @@ async def _handle_request(self, request):
logger.info("Got %d events from federation", len(event_and_contexts))

max_stream_id = await self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled
room_id, event_and_contexts, backfilled
)

return 200, {"max_stream_id": max_stream_id}
Expand Down
Loading

0 comments on commit 82c1ee1

Please sign in to comment.