Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use symbolic names for replication stream names (#7768)
Browse files Browse the repository at this point in the history
This makes it much easier to find where streams are referenced.
  • Loading branch information
richvdh authored Jul 1, 2020
1 parent a6eae69 commit f01e2ca
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 22 deletions.
1 change: 1 addition & 0 deletions changelog.d/7768.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use symbolic names for replication stream names.
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
from synapse.storage.data_stores.main.account_data import AccountDataWorkerStore
from synapse.storage.data_stores.main.tags import TagsWorkerStore
from synapse.storage.database import Database
Expand All @@ -39,12 +40,12 @@ def get_max_account_data_stream_id(self):
return self._account_data_id_gen.get_current_token()

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "tag_account_data":
if stream_name == TagAccountDataStream.NAME:
self._account_data_id_gen.advance(token)
for row in rows:
self.get_tags_for_user.invalidate((row.user_id,))
self._account_data_stream_cache.entity_has_changed(row.user_id, token)
elif stream_name == "account_data":
elif stream_name == AccountDataStream.NAME:
self._account_data_id_gen.advance(token)
for row in rows:
if not row.room_id:
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/slave/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage.data_stores.main.deviceinbox import DeviceInboxWorkerStore
from synapse.storage.database import Database
from synapse.util.caches.expiringcache import ExpiringCache
Expand Down Expand Up @@ -44,7 +45,7 @@ def __init__(self, database: Database, db_conn, hs):
)

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "to_device":
if stream_name == ToDeviceStream.NAME:
self._device_inbox_id_gen.advance(token)
for row in rows:
if row.entity.startswith("@"):
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/slave/storage/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import GroupServerStream
from synapse.storage.data_stores.main.group_server import GroupServerWorkerStore
from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache
Expand All @@ -38,7 +39,7 @@ def get_group_stream_token(self):
return self._group_updates_id_gen.get_current_token()

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "groups":
if stream_name == GroupServerStream.NAME:
self._group_updates_id_gen.advance(token)
for row in rows:
self._group_updates_stream_cache.entity_has_changed(row.user_id, token)
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/slave/storage/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.replication.tcp.streams import PresenceStream
from synapse.storage import DataStore
from synapse.storage.data_stores.main.presence import PresenceStore
from synapse.storage.database import Database
Expand Down Expand Up @@ -42,7 +43,7 @@ def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "presence":
if stream_name == PresenceStream.NAME:
self._presence_id_gen.advance(token)
for row in rows:
self.presence_stream_cache.entity_has_changed(row.user_id, token)
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/slave/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.replication.tcp.streams import PushRulesStream
from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore

from .events import SlavedEventStore
Expand All @@ -30,7 +31,7 @@ def get_max_push_rules_stream_id(self):
return self._push_rules_stream_id_gen.get_current_token()

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "push_rules":
if stream_name == PushRulesStream.NAME:
self._push_rules_stream_id_gen.advance(token)
for row in rows:
self.get_push_rules_for_user.invalidate((row.user_id,))
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.replication.tcp.streams import PushersStream
from synapse.storage.data_stores.main.pusher import PusherWorkerStore
from synapse.storage.database import Database

Expand All @@ -32,6 +33,6 @@ def get_pushers_stream_token(self):
return self._pushers_id_gen.get_current_token()

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "pushers":
if stream_name == PushersStream.NAME:
self._pushers_id_gen.advance(token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
11 changes: 2 additions & 9 deletions synapse/replication/slave/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.replication.tcp.streams import ReceiptsStream
from synapse.storage.data_stores.main.receipts import ReceiptsWorkerStore
from synapse.storage.database import Database

from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker

# So, um, we want to borrow a load of functions intended for reading from
# a DataStore, but we don't want to take functions that either write to the
# DataStore or are cached and don't have cache invalidation logic.
#
# Rather than write duplicate versions of those functions, or lift them to
# a common base class, we going to grab the underlying __func__ object from
# the method descriptor on the DataStore and chuck them into our class.


class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
def __init__(self, database: Database, db_conn, hs):
Expand All @@ -52,7 +45,7 @@ def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_room.invalidate((room_id, receipt_type))

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "receipts":
if stream_name == ReceiptsStream.NAME:
self._receipts_id_gen.advance(token)
for row in rows:
self.invalidate_caches_for_receipt(
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/slave/storage/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.replication.tcp.streams import PublicRoomsStream
from synapse.storage.data_stores.main.room import RoomWorkerStore
from synapse.storage.database import Database

Expand All @@ -31,7 +32,7 @@ def get_current_public_room_stream_id(self):
return self._public_room_id_gen.get_current_token()

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "public_rooms":
if stream_name == PublicRoomsStream.NAME:
self._public_room_id_gen.advance(token)

return super().process_replication_rows(stream_name, instance_name, token, rows)
8 changes: 5 additions & 3 deletions synapse/storage/data_stores/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from typing import Any, Iterable, Optional, Tuple

from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams import BackfillStream, CachesStream
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamCurrentStateRow,
EventsStreamEventRow,
)
Expand Down Expand Up @@ -71,10 +73,10 @@ def get_all_updated_caches_txn(txn):
)

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "events":
if stream_name == EventsStream.NAME:
for row in rows:
self._process_event_stream_row(token, row)
elif stream_name == "backfill":
elif stream_name == BackfillStream.NAME:
for row in rows:
self._invalidate_caches_for_event(
-token,
Expand All @@ -86,7 +88,7 @@ def process_replication_rows(self, stream_name, instance_name, token, rows):
row.relates_to,
backfilled=True,
)
elif stream_name == "caches":
elif stream_name == CachesStream.NAME:
if self._cache_id_gen:
self._cache_id_gen.advance(instance_name, token)

Expand Down
6 changes: 4 additions & 2 deletions synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
from synapse.logging.context import PreserveLoggingContext, current_context
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import Database
from synapse.storage.util.id_generators import StreamIdGenerator
Expand Down Expand Up @@ -113,9 +115,9 @@ def __init__(self, database: Database, db_conn, hs):
self._event_fetch_ongoing = 0

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == "events":
if stream_name == EventsStream.NAME:
self._stream_id_gen.advance(token)
elif stream_name == "backfill":
elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(-token)

super().process_replication_rows(stream_name, instance_name, token, rows)
Expand Down

0 comments on commit f01e2ca

Please sign in to comment.