Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Rename Storage and vars
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed May 30, 2022
1 parent 78211cb commit 7bd6f2c
Show file tree
Hide file tree
Showing 36 changed files with 151 additions and 99 deletions.
2 changes: 1 addition & 1 deletion synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.handler = hs.get_federation_handler()
self.storage = hs.get_storage()
self.storage_controllers = hs.get_storage_controllers()
self._spam_checker = hs.get_spam_checker()
self._federation_event_handler = hs.get_federation_event_handler()
self.state = hs.get_state_handler()
Expand Down
8 changes: 5 additions & 3 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
class AdminHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.storage = hs.get_storage()
self.state_storage_controller = self.storage.state
self.storage_controllers = hs.get_storage_controllers()
self.state_storage_controller = self.storage_controllers.state

async def get_whois(self, user: UserID) -> JsonDict:
connections = []
Expand Down Expand Up @@ -197,7 +197,9 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->

from_key = events[-1].internal_metadata.after

events = await filter_events_for_client(self.storage, user_id, events)
events = await filter_events_for_client(
self.storage_controllers, user_id, events
)

writer.write_events(room_id, events)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.notifier = hs.get_notifier()
self.state = hs.get_state_handler()
self.state_storage = hs.get_storage().state
self.state_storage = hs.get_storage_controllers().state
self._auth_handler = hs.get_auth_handler()
self.server_name = hs.hostname

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def get_stream(
class EventHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.storage = hs.get_storage()
self.storage = hs.get_storage_controllers()

async def get_event(
self,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def __init__(self, hs: "HomeServer"):
self.hs = hs

self.store = hs.get_datastores().main
self.storage = hs.get_storage()
self.storage = hs.get_storage_controllers()
self.state_storage_controller = self.storage.state
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
Expand Down
15 changes: 9 additions & 6 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ class FederationEventHandler:

def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastores().main
self._storage = hs.get_storage()
self._state_storage_controller = self._storage.state
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state

self._state_handler = hs.get_state_handler()
self._event_creation_handler = hs.get_event_creation_handler()
Expand Down Expand Up @@ -1440,7 +1440,7 @@ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
# we're not bothering about room state, so flag the event as an outlier.
event.internal_metadata.outlier = True

context = EventContext.for_outlier(self._storage)
context = EventContext.for_outlier(self._storage_controllers)
try:
validate_event_for_room_version(room_version_obj, event)
check_auth_rules_for_event(room_version_obj, event, auth)
Expand Down Expand Up @@ -1898,7 +1898,7 @@ async def _update_context_for_auth_events(
)

return EventContext.with_state(
storage=self._storage,
storage=self._storage_controllers,
state_group=state_group,
state_group_before_event=context.state_group_before_event,
state_delta_due_to_event=state_updates,
Expand Down Expand Up @@ -1988,11 +1988,14 @@ async def persist_events_and_notify(
)
return result["max_stream_id"]
else:
assert self._storage.persistence
assert self._storage_controllers.persistence

# Note that this returns the events that were persisted, which may not be
# the same as were passed in if some were deduplicated due to transaction IDs.
events, max_stream_token = await self._storage.persistence.persist_events(
(
events,
max_stream_token,
) = await self._storage_controllers.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)

Expand Down
10 changes: 5 additions & 5 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def __init__(self, hs: "HomeServer"):
]
] = ResponseCache(hs.get_clock(), "initial_sync_cache")
self._event_serializer = hs.get_event_client_serializer()
self.storage = hs.get_storage()
self.state_storage_controller = self.storage.state
self.storage_controllers = hs.get_storage_controllers()
self.state_storage_controller = self.storage_controllers.state

async def snapshot_all_rooms(
self,
Expand Down Expand Up @@ -219,7 +219,7 @@ async def handle_room(event: RoomsForUser) -> None:
).addErrback(unwrapFirstError)

messages = await filter_events_for_client(
self.storage, user_id, messages
self.storage_controllers, user_id, messages
)

start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
Expand Down Expand Up @@ -372,7 +372,7 @@ async def _room_initial_sync_parted(
)

messages = await filter_events_for_client(
self.storage, user_id, messages, is_peeking=is_peeking
self.storage_controllers, user_id, messages, is_peeking=is_peeking
)

start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token)
Expand Down Expand Up @@ -477,7 +477,7 @@ async def get_receipts() -> List[JsonDict]:
)

messages = await filter_events_for_client(
self.storage, user_id, messages, is_peeking=is_peeking
self.storage_controllers, user_id, messages, is_peeking=is_peeking
)

start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token)
Expand Down
14 changes: 7 additions & 7 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
self.store = hs.get_datastores().main
self.storage = hs.get_storage()
self.state_storage_controller = self.storage.state
self.storage_controllers = hs.get_storage_controllers()
self.state_storage_controller = self.storage_controllers.state
self._event_serializer = hs.get_event_client_serializer()
self._ephemeral_events_enabled = hs.config.server.enable_ephemeral_messages

Expand Down Expand Up @@ -206,7 +206,7 @@ async def get_state_events(
is_peeking = not joined

visible_events = await filter_events_for_client(
self.storage,
self.storage_controllers,
user_id,
[last_event],
filter_send_to_client=False,
Expand Down Expand Up @@ -406,7 +406,7 @@ def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self._event_auth_handler = hs.get_event_auth_handler()
self.store = hs.get_datastores().main
self.storage = hs.get_storage()
self.storage_controllers = hs.get_storage_controllers()
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
Expand Down Expand Up @@ -1021,7 +1021,7 @@ async def create_new_client_event(
# after it is created
if builder.internal_metadata.outlier:
event.internal_metadata.outlier = True
context = EventContext.for_outlier(self.storage)
context = EventContext.for_outlier(self.storage_controllers)
elif (
event.type == EventTypes.MSC2716_INSERTION
and state_event_ids
Expand Down Expand Up @@ -1434,7 +1434,7 @@ async def persist_and_notify_client_event(
"""
extra_users = extra_users or []

assert self.storage.persistence is not None
assert self.storage_controllers.persistence is not None
assert self._events_shard_config.should_handle(
self._instance_name, event.room_id
)
Expand Down Expand Up @@ -1668,7 +1668,7 @@ async def persist_and_notify_client_event(
event,
event_pos,
max_stream_token,
) = await self.storage.persistence.persist_event(
) = await self.storage_controllers.persistence.persist_event(
event, context=context, backfilled=backfilled
)

Expand Down
13 changes: 8 additions & 5 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.storage = hs.get_storage()
self.storage_controllers = hs.get_storage_controllers()
self.state_storage_controller = self.storage_controllers.state
self.clock = hs.get_clock()
self._server_name = hs.hostname
Expand Down Expand Up @@ -352,7 +352,7 @@ async def _purge_history(
self._purges_in_progress_by_room.add(room_id)
try:
async with self.pagination_lock.write(room_id):
await self.storage.purge_events.purge_history(
await self.storage_controllers.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete")
Expand Down Expand Up @@ -414,7 +414,7 @@ async def purge_room(self, room_id: str, force: bool = False) -> None:
if joined:
raise SynapseError(400, "Users are still joined to this room")

await self.storage.purge_events.purge_room(room_id)
await self.storage_controllers.purge_events.purge_room(room_id)

async def get_messages(
self,
Expand Down Expand Up @@ -520,7 +520,10 @@ async def get_messages(
events = await event_filter.filter(events)

events = await filter_events_for_client(
self.storage, user_id, events, is_peeking=(member_event_id is None)
self.storage_controllers,
user_id,
events,
is_peeking=(member_event_id is None),
)

if not events:
Expand Down Expand Up @@ -653,7 +656,7 @@ async def _shutdown_and_purge_room(
400, "Users are still joined to this room"
)

await self.storage.purge_events.purge_room(room_id)
await self.storage_controllers.purge_events.purge_room(room_id)

logger.info("complete")
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __bool__(self) -> bool:
class RelationsHandler:
def __init__(self, hs: "HomeServer"):
self._main_store = hs.get_datastores().main
self._storage = hs.get_storage()
self._storage_controllers = hs.get_storage_controllers()
self._auth = hs.get_auth()
self._clock = hs.get_clock()
self._event_handler = hs.get_event_handler()
Expand Down Expand Up @@ -143,7 +143,10 @@ async def get_relations(
)

events = await filter_events_for_client(
self._storage, user_id, events, is_peeking=(member_event_id is None)
self._storage_controllers,
user_id,
events,
is_peeking=(member_event_id is None),
)

now = self._clock.time_msec()
Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.storage = hs.get_storage()
self.storage_controllers = hs.get_storage_controllers()
self.state_storage_controller = self.storage_controllers.state
self._relations_handler = hs.get_relations_handler()

Expand Down Expand Up @@ -1236,7 +1236,10 @@ async def filter_evts(events: List[EventBase]) -> List[EventBase]:
if use_admin_priviledge:
return events
return await filter_events_for_client(
self.storage, user.to_string(), events, is_peeking=is_peeking
self.storage_controllers,
user.to_string(),
events,
is_peeking=is_peeking,
)

event = await self.store.get_event(
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class RoomBatchHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastores().main
self.state_storage_controller = hs.get_storage().state
self.state_storage_controller = hs.get_storage_controllers().state
self.event_creation_handler = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
self.auth = hs.get_auth()
Expand Down
12 changes: 6 additions & 6 deletions synapse/handlers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def __init__(self, hs: "HomeServer"):
self.hs = hs
self._event_serializer = hs.get_event_client_serializer()
self._relations_handler = hs.get_relations_handler()
self.storage = hs.get_storage()
self.state_storage_controller = self.storage.state
self.storage_controllers = hs.get_storage_controllers()
self.state_storage_controller = self.storage_controllers.state
self.auth = hs.get_auth()

async def get_old_rooms_from_upgraded_room(self, room_id: str) -> Iterable[str]:
Expand Down Expand Up @@ -460,7 +460,7 @@ async def _search_by_rank(
filtered_events = await search_filter.filter([r["event"] for r in results])

events = await filter_events_for_client(
self.storage, user.to_string(), filtered_events
self.storage_controllers, user.to_string(), filtered_events
)

events.sort(key=lambda e: -rank_map[e.event_id])
Expand Down Expand Up @@ -559,7 +559,7 @@ async def _search_by_recent(
filtered_events = await search_filter.filter([r["event"] for r in results])

events = await filter_events_for_client(
self.storage, user.to_string(), filtered_events
self.storage_controllers, user.to_string(), filtered_events
)

room_events.extend(events)
Expand Down Expand Up @@ -644,11 +644,11 @@ async def _calculate_event_contexts(
)

events_before = await filter_events_for_client(
self.storage, user.to_string(), res.events_before
self.storage_controllers, user.to_string(), res.events_before
)

events_after = await filter_events_for_client(
self.storage, user.to_string(), res.events_after
self.storage_controllers, user.to_string(), res.events_after
)

context: JsonDict = {
Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.storage = hs.get_storage()
self.state_storage_controller = self.storage.state
self.storage_controllers = hs.get_storage_controllers()
self.state_storage_controller = self.storage_controllers.state

# TODO: flush cache entries on subsequent sync request.
# Once we get the next /sync request (ie, one with the same access token
Expand Down Expand Up @@ -512,7 +512,7 @@ async def _load_filtered_recents(
current_state_ids = frozenset(current_state_ids_map.values())

recents = await filter_events_for_client(
self.storage,
self.storage_controllers,
sync_config.user.to_string(),
recents,
always_include_ids=current_state_ids,
Expand Down Expand Up @@ -580,7 +580,7 @@ async def _load_filtered_recents(
current_state_ids = frozenset(current_state_ids_map.values())

loaded_recents = await filter_events_for_client(
self.storage,
self.storage_controllers,
sync_config.user.to_string(),
loaded_recents,
always_include_ids=current_state_ids,
Expand Down
4 changes: 2 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def __init__(self, hs: "HomeServer"):
self.room_to_user_streams: Dict[str, Set[_NotifierUserStream]] = {}

self.hs = hs
self.storage = hs.get_storage()
self.storage_controllers = hs.get_storage_controllers()
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastores().main
self.pending_new_room_events: List[_PendingRoomEventEntry] = []
Expand Down Expand Up @@ -623,7 +623,7 @@ async def check_for_updates(

if name == "room":
new_events = await filter_events_for_client(
self.storage,
self.storage_controllers,
user.to_string(),
new_events,
is_peeking=is_peeking,
Expand Down
Loading

0 comments on commit 7bd6f2c

Please sign in to comment.