Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaces all usages of StreamIdGenerator with MultiWriterIdGenerator #17229

Merged
merged 9 commits into from
May 30, 2024
24 changes: 14 additions & 10 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
from synapse.storage.util.id_generators import IdGenerator, StreamIdGenerator
from synapse.storage.util.id_generators import IdGenerator, MultiWriterIdGenerator
from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules
from synapse.types import JsonDict
from synapse.util import json_encoder, unwrapFirstError
Expand Down Expand Up @@ -126,7 +126,7 @@ class PushRulesWorkerStore(
`get_max_push_rules_stream_id` which can be called in the initializer.
"""

_push_rules_stream_id_gen: StreamIdGenerator
_push_rules_stream_id_gen: MultiWriterIdGenerator

def __init__(
self,
Expand All @@ -140,14 +140,17 @@ def __init__(
hs.get_instance_name() in hs.config.worker.writers.push_rules
)

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._push_rules_stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"push_rules_stream",
"stream_id",
is_writer=self._is_push_writer,
self._push_rules_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="push_rules_stream",
instance_name=self._instance_name,
tables=[
("push_rules_stream", "instance_name", "stream_id"),
],
sequence_name="push_rules_stream_sequence",
writers=hs.config.worker.writers.push_rules,
)

push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict(
Expand Down Expand Up @@ -880,6 +883,7 @@ def _insert_push_rules_update_txn(
raise Exception("Not a push writer")

values = {
"instance_name": self._instance_name,
"stream_id": stream_id,
"event_stream_ordering": event_stream_ordering,
"user_id": user_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ ALTER TABLE device_lists_changes_in_room ADD COLUMN instance_name TEXT;
ALTER TABLE device_lists_remote_pending ADD COLUMN instance_name TEXT;
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

ALTER TABLE e2e_cross_signing_keys ADD COLUMN instance_name TEXT;

ALTER TABLE push_rules_stream ADD COLUMN instance_name TEXT;
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ CREATE SEQUENCE IF NOT EXISTS e2e_cross_signing_keys_sequence;
SELECT setval('e2e_cross_signing_keys_sequence', (
SELECT COALESCE(MAX(stream_id), 1) FROM e2e_cross_signing_keys
));


CREATE SEQUENCE IF NOT EXISTS push_rules_stream_sequence;

SELECT setval('push_rules_stream_sequence', (
SELECT COALESCE(MAX(stream_id), 1) FROM push_rules_stream
));