Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
MORE HACKS
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jan 19, 2023
1 parent 91faaf6 commit 50776af
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
2 changes: 1 addition & 1 deletion synapse/replication/tcp/streams/partial_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ def __init__(self, hs: "HomeServer"):
super().__init__(
hs.get_instance_name(),
# TODO(faster_joins, multiple writers): we need to account for instance names
current_token_without_instance(store.get_un_partial_stated_events_token),
store.get_un_partial_stated_events_token,
store.get_un_partial_stated_events_from_stream,
)
16 changes: 11 additions & 5 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,12 @@ def get_chain_id_txn(txn: Cursor) -> int:
db_conn, "un_partial_stated_event_stream", "stream_id"
)

def get_un_partial_stated_events_token(self) -> int:
# TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
# writers because workers that don't write often will hold all
# readers up.
return self._un_partial_stated_events_stream_id_gen.get_current_token()
def get_un_partial_stated_events_token(self, instance_name: str) -> int:
return (
self._un_partial_stated_events_stream_id_gen.get_current_token_for_writer(
instance_name
)
)

async def get_un_partial_stated_events_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
Expand Down Expand Up @@ -408,6 +409,11 @@ def process_replication_position(
self._stream_id_gen.advance(instance_name, token)
elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(instance_name, -token)
elif stream_name == UnPartialStatedEventStream.NAME:
logger.info(
"Advancing %s token to %s", UnPartialStatedEventStream.NAME, token
)
self._un_partial_stated_events_stream_id_gen.advance(instance_name, token)
super().process_replication_position(stream_name, instance_name, token)

async def have_censored_event(self, event_id: str) -> bool:
Expand Down

0 comments on commit 50776af

Please sign in to comment.