Skip to content

Commit

Permalink
Convert *StreamRow classes to inner classes (matrix-org#7116)
Browse files Browse the repository at this point in the history
This just helps keep the rows closer to their streams, so that it's easier to
see what the format of each stream is.
  • Loading branch information
richvdh authored and phil-flex committed Jun 16, 2020
1 parent f8c1d38 commit 56eb76c
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 100 deletions.
1 change: 1 addition & 0 deletions changelog.d/7116.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert `*StreamRow` classes to inner classes.
2 changes: 1 addition & 1 deletion synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ def process_replication_rows(self, stream_name, token, rows):
async def _on_new_receipts(self, rows):
"""
Args:
rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]):
new receipts to be processed
"""
for receipt in rows:
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def process_rows_for_federation(transaction_queue, rows):
Args:
transaction_queue (FederationSender)
rows (list(synapse.replication.tcp.streams.FederationStreamRow))
rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
"""

# The federation stream contains a bunch of different types of
Expand Down
181 changes: 93 additions & 88 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,94 +28,6 @@

MAX_EVENTS_BEHIND = 500000

BackfillStreamRow = namedtuple(
"BackfillStreamRow",
(
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
"relates_to", # str, optional
),
)
PresenceStreamRow = namedtuple(
"PresenceStreamRow",
(
"user_id", # str
"state", # str
"last_active_ts", # int
"last_federation_update_ts", # int
"last_user_sync_ts", # int
"status_msg", # str
"currently_active", # bool
),
)
TypingStreamRow = namedtuple(
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
)
ReceiptsStreamRow = namedtuple(
"ReceiptsStreamRow",
(
"room_id", # str
"receipt_type", # str
"user_id", # str
"event_id", # str
"data", # dict
),
)
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
PushersStreamRow = namedtuple(
"PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
)


@attr.s
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
Attributes:
cache_func: Name of the cached function.
keys: The entry in the cache to invalidate. If None then will
invalidate all.
invalidation_ts: Timestamp of when the invalidation took place.
"""

cache_func = attr.ib(type=str)
keys = attr.ib(type=Optional[List[Any]])
invalidation_ts = attr.ib(type=int)


PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(
"room_id", # str
"visibility", # str
"appservice_id", # str, optional
"network_id", # str, optional
),
)


@attr.s
class DeviceListsStreamRow:
entity = attr.ib(type=str)


ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
)
AccountDataStreamRow = namedtuple(
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
)
GroupsStreamRow = namedtuple(
"GroupsStreamRow",
("group_id", "user_id", "type", "content"), # str # str # str # dict
)
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str


class Stream(object):
"""Base class for the streams.
Expand Down Expand Up @@ -234,6 +146,18 @@ class BackfillStream(Stream):
or it went from being an outlier to not.
"""

BackfillStreamRow = namedtuple(
"BackfillStreamRow",
(
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
"relates_to", # str, optional
),
)

NAME = "backfill"
ROW_TYPE = BackfillStreamRow

Expand All @@ -246,6 +170,19 @@ def __init__(self, hs):


class PresenceStream(Stream):
PresenceStreamRow = namedtuple(
"PresenceStreamRow",
(
"user_id", # str
"state", # str
"last_active_ts", # int
"last_federation_update_ts", # int
"last_user_sync_ts", # int
"status_msg", # str
"currently_active", # bool
),
)

NAME = "presence"
ROW_TYPE = PresenceStreamRow

Expand All @@ -260,6 +197,10 @@ def __init__(self, hs):


class TypingStream(Stream):
TypingStreamRow = namedtuple(
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
)

NAME = "typing"
ROW_TYPE = TypingStreamRow

Expand All @@ -273,6 +214,17 @@ def __init__(self, hs):


class ReceiptsStream(Stream):
ReceiptsStreamRow = namedtuple(
"ReceiptsStreamRow",
(
"room_id", # str
"receipt_type", # str
"user_id", # str
"event_id", # str
"data", # dict
),
)

NAME = "receipts"
ROW_TYPE = ReceiptsStreamRow

Expand All @@ -289,6 +241,8 @@ class PushRulesStream(Stream):
"""A user has changed their push rules
"""

PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str

NAME = "push_rules"
ROW_TYPE = PushRulesStreamRow

Expand All @@ -309,6 +263,11 @@ class PushersStream(Stream):
"""A user has added/changed/removed a pusher
"""

PushersStreamRow = namedtuple(
"PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
)

NAME = "pushers"
ROW_TYPE = PushersStreamRow

Expand All @@ -326,6 +285,21 @@ class CachesStream(Stream):
the cache on the workers
"""

@attr.s
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
Attributes:
cache_func: Name of the cached function.
keys: The entry in the cache to invalidate. If None then will
invalidate all.
invalidation_ts: Timestamp of when the invalidation took place.
"""

cache_func = attr.ib(type=str)
keys = attr.ib(type=Optional[List[Any]])
invalidation_ts = attr.ib(type=int)

NAME = "caches"
ROW_TYPE = CachesStreamRow

Expand All @@ -342,6 +316,16 @@ class PublicRoomsStream(Stream):
"""The public rooms list changed
"""

PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(
"room_id", # str
"visibility", # str
"appservice_id", # str, optional
"network_id", # str, optional
),
)

NAME = "public_rooms"
ROW_TYPE = PublicRoomsStreamRow

Expand All @@ -359,6 +343,10 @@ class DeviceListsStream(Stream):
told about a device update.
"""

@attr.s
class DeviceListsStreamRow:
entity = attr.ib(type=str)

NAME = "device_lists"
ROW_TYPE = DeviceListsStreamRow

Expand All @@ -375,6 +363,8 @@ class ToDeviceStream(Stream):
"""New to_device messages for a client
"""

ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str

NAME = "to_device"
ROW_TYPE = ToDeviceStreamRow

Expand All @@ -391,6 +381,10 @@ class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""

TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
)

NAME = "tag_account_data"
ROW_TYPE = TagAccountDataStreamRow

Expand All @@ -407,6 +401,10 @@ class AccountDataStream(Stream):
"""Global or per room account data was changed
"""

AccountDataStreamRow = namedtuple(
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
)

NAME = "account_data"
ROW_TYPE = AccountDataStreamRow

Expand All @@ -432,6 +430,11 @@ async def update_function(self, from_token, to_token, limit):


class GroupServerStream(Stream):
GroupsStreamRow = namedtuple(
"GroupsStreamRow",
("group_id", "user_id", "type", "content"), # str # str # str # dict
)

NAME = "groups"
ROW_TYPE = GroupsStreamRow

Expand All @@ -448,6 +451,8 @@ class UserSignatureStream(Stream):
"""A user has signed their own device with their user-signing key
"""

UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str

NAME = "user_signature"
ROW_TYPE = UserSignatureStreamRow

Expand Down
16 changes: 8 additions & 8 deletions synapse/replication/tcp/streams/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@

from ._base import Stream

FederationStreamRow = namedtuple(
"FederationStreamRow",
(
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
),
)


class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""

FederationStreamRow = namedtuple(
"FederationStreamRow",
(
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
),
)

NAME = "federation"
ROW_TYPE = FederationStreamRow

Expand Down
4 changes: 2 additions & 2 deletions tests/replication/tcp/streams/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.replication.tcp.streams._base import ReceiptsStreamRow
from synapse.replication.tcp.streams._base import ReceiptsStream

from tests.replication.tcp.streams._base import BaseStreamTestCase

Expand All @@ -38,7 +38,7 @@ def test_receipt(self):
rdata_rows = self.test_handler.received_rdata_rows
self.assertEqual(1, len(rdata_rows))
self.assertEqual(rdata_rows[0][0], "receipts")
row = rdata_rows[0][2] # type: ReceiptsStreamRow
row = rdata_rows[0][2] # type: ReceiptsStream.ReceiptsStreamRow
self.assertEqual(ROOM_ID, row.room_id)
self.assertEqual("m.read", row.receipt_type)
self.assertEqual(USER_ID, row.user_id)
Expand Down

0 comments on commit 56eb76c

Please sign in to comment.