Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert *StreamRow classes to inner classes #7116

Merged
merged 2 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7116.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert `*StreamRow` classes to inner classes.
2 changes: 1 addition & 1 deletion synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ def process_replication_rows(self, stream_name, token, rows):
async def _on_new_receipts(self, rows):
"""
Args:
rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]):
new receipts to be processed
"""
for receipt in rows:
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def process_rows_for_federation(transaction_queue, rows):
Args:
transaction_queue (FederationSender)
rows (list(synapse.replication.tcp.streams.FederationStreamRow))
rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
"""

# The federation stream contains a bunch of different types of
Expand Down
181 changes: 93 additions & 88 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,94 +28,6 @@

MAX_EVENTS_BEHIND = 500000

BackfillStreamRow = namedtuple(
"BackfillStreamRow",
(
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
"relates_to", # str, optional
),
)
PresenceStreamRow = namedtuple(
"PresenceStreamRow",
(
"user_id", # str
"state", # str
"last_active_ts", # int
"last_federation_update_ts", # int
"last_user_sync_ts", # int
"status_msg", # str
"currently_active", # bool
),
)
TypingStreamRow = namedtuple(
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
)
ReceiptsStreamRow = namedtuple(
"ReceiptsStreamRow",
(
"room_id", # str
"receipt_type", # str
"user_id", # str
"event_id", # str
"data", # dict
),
)
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
PushersStreamRow = namedtuple(
"PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
)


@attr.s
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
Attributes:
cache_func: Name of the cached function.
keys: The entry in the cache to invalidate. If None then will
invalidate all.
invalidation_ts: Timestamp of when the invalidation took place.
"""

cache_func = attr.ib(type=str)
keys = attr.ib(type=Optional[List[Any]])
invalidation_ts = attr.ib(type=int)


PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(
"room_id", # str
"visibility", # str
"appservice_id", # str, optional
"network_id", # str, optional
),
)


@attr.s
class DeviceListsStreamRow:
entity = attr.ib(type=str)


ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
)
AccountDataStreamRow = namedtuple(
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
)
GroupsStreamRow = namedtuple(
"GroupsStreamRow",
("group_id", "user_id", "type", "content"), # str # str # str # dict
)
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str


class Stream(object):
"""Base class for the streams.
Expand Down Expand Up @@ -234,6 +146,18 @@ class BackfillStream(Stream):
or it went from being an outlier to not.
"""

BackfillStreamRow = namedtuple(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to do ROW_TYPE = namedtuple(...)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per #synapse-dev: this is less nice for proper attrs classes, and ultimately we'd like to replace the namedtuples with attrs, so let's keep it like this, for consistency.

"BackfillStreamRow",
(
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
"relates_to", # str, optional
),
)

NAME = "backfill"
ROW_TYPE = BackfillStreamRow

Expand All @@ -246,6 +170,19 @@ def __init__(self, hs):


class PresenceStream(Stream):
PresenceStreamRow = namedtuple(
"PresenceStreamRow",
(
"user_id", # str
"state", # str
"last_active_ts", # int
"last_federation_update_ts", # int
"last_user_sync_ts", # int
"status_msg", # str
"currently_active", # bool
),
)

NAME = "presence"
ROW_TYPE = PresenceStreamRow

Expand All @@ -260,6 +197,10 @@ def __init__(self, hs):


class TypingStream(Stream):
TypingStreamRow = namedtuple(
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
)

NAME = "typing"
ROW_TYPE = TypingStreamRow

Expand All @@ -273,6 +214,17 @@ def __init__(self, hs):


class ReceiptsStream(Stream):
ReceiptsStreamRow = namedtuple(
"ReceiptsStreamRow",
(
"room_id", # str
"receipt_type", # str
"user_id", # str
"event_id", # str
"data", # dict
),
)

NAME = "receipts"
ROW_TYPE = ReceiptsStreamRow

Expand All @@ -289,6 +241,8 @@ class PushRulesStream(Stream):
"""A user has changed their push rules
"""

PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str

NAME = "push_rules"
ROW_TYPE = PushRulesStreamRow

Expand All @@ -309,6 +263,11 @@ class PushersStream(Stream):
"""A user has added/changed/removed a pusher
"""

PushersStreamRow = namedtuple(
"PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
)

NAME = "pushers"
ROW_TYPE = PushersStreamRow

Expand All @@ -326,6 +285,21 @@ class CachesStream(Stream):
the cache on the workers
"""

@attr.s
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
Attributes:
cache_func: Name of the cached function.
keys: The entry in the cache to invalidate. If None then will
invalidate all.
invalidation_ts: Timestamp of when the invalidation took place.
"""

cache_func = attr.ib(type=str)
keys = attr.ib(type=Optional[List[Any]])
invalidation_ts = attr.ib(type=int)

NAME = "caches"
ROW_TYPE = CachesStreamRow

Expand All @@ -342,6 +316,16 @@ class PublicRoomsStream(Stream):
"""The public rooms list changed
"""

PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(
"room_id", # str
"visibility", # str
"appservice_id", # str, optional
"network_id", # str, optional
),
)

NAME = "public_rooms"
ROW_TYPE = PublicRoomsStreamRow

Expand All @@ -359,6 +343,10 @@ class DeviceListsStream(Stream):
told about a device update.
"""

@attr.s
class DeviceListsStreamRow:
entity = attr.ib(type=str)

NAME = "device_lists"
ROW_TYPE = DeviceListsStreamRow

Expand All @@ -375,6 +363,8 @@ class ToDeviceStream(Stream):
"""New to_device messages for a client
"""

ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str

NAME = "to_device"
ROW_TYPE = ToDeviceStreamRow

Expand All @@ -391,6 +381,10 @@ class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""

TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
)

NAME = "tag_account_data"
ROW_TYPE = TagAccountDataStreamRow

Expand All @@ -407,6 +401,10 @@ class AccountDataStream(Stream):
"""Global or per room account data was changed
"""

AccountDataStreamRow = namedtuple(
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
)

NAME = "account_data"
ROW_TYPE = AccountDataStreamRow

Expand All @@ -432,6 +430,11 @@ async def update_function(self, from_token, to_token, limit):


class GroupServerStream(Stream):
GroupsStreamRow = namedtuple(
"GroupsStreamRow",
("group_id", "user_id", "type", "content"), # str # str # str # dict
)

NAME = "groups"
ROW_TYPE = GroupsStreamRow

Expand All @@ -448,6 +451,8 @@ class UserSignatureStream(Stream):
"""A user has signed their own device with their user-signing key
"""

UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str

NAME = "user_signature"
ROW_TYPE = UserSignatureStreamRow

Expand Down
16 changes: 8 additions & 8 deletions synapse/replication/tcp/streams/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@

from ._base import Stream

FederationStreamRow = namedtuple(
"FederationStreamRow",
(
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
),
)


class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation
sending disabled.
"""

FederationStreamRow = namedtuple(
"FederationStreamRow",
(
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
),
)

NAME = "federation"
ROW_TYPE = FederationStreamRow

Expand Down
4 changes: 2 additions & 2 deletions tests/replication/tcp/streams/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.replication.tcp.streams._base import ReceiptsStreamRow
from synapse.replication.tcp.streams._base import ReceiptsStream

from tests.replication.tcp.streams._base import BaseStreamTestCase

Expand All @@ -38,7 +38,7 @@ def test_receipt(self):
rdata_rows = self.test_handler.received_rdata_rows
self.assertEqual(1, len(rdata_rows))
self.assertEqual(rdata_rows[0][0], "receipts")
row = rdata_rows[0][2] # type: ReceiptsStreamRow
row = rdata_rows[0][2] # type: ReceiptsStream.ReceiptsStreamRow
self.assertEqual(ROOM_ID, row.room_id)
self.assertEqual("m.read", row.receipt_type)
self.assertEqual(USER_ID, row.user_id)
Expand Down