Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event.internal_metadata.instance_name #17300

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17300.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Expose the worker instance that persisted the event on `event.internal_metadata.instance_name`.
3 changes: 3 additions & 0 deletions rust/src/events/internal_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ pub struct EventInternalMetadata {
/// The stream ordering of this event. None, until it has been persisted.
#[pyo3(get, set)]
stream_ordering: Option<NonZeroI64>,
#[pyo3(get, set)]
instance_name: Option<String>,

/// whether this event is an outlier (ie, whether we have the state at that
/// point in the DAG)
Expand Down Expand Up @@ -232,6 +234,7 @@ impl EventInternalMetadata {
Ok(EventInternalMetadata {
data,
stream_ordering: None,
instance_name: None,
outlier: false,
})
}
Expand Down
2 changes: 2 additions & 0 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase:
pruned_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name
pruned_event.internal_metadata.outlier = event.internal_metadata.outlier

# Mark the event as redacted
Expand All @@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase:
new_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
new_event.internal_metadata.instance_name = event.internal_metadata.instance_name
new_event.internal_metadata.outlier = event.internal_metadata.outlier

return new_event
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,7 @@ async def _persist_events(
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id
event.internal_metadata.instance_name = writer_instance

return event

Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ async def _persist_events_and_state_updates(
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name

await self.db_pool.runInteraction(
"persist_events",
Expand Down
16 changes: 10 additions & 6 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class _EventRow:

event_id: str
stream_ordering: int
instance_name: str
json: str
internal_metadata: str
format_version: Optional[int]
Expand Down Expand Up @@ -1354,6 +1355,7 @@ async def _fetch_event_ids_and_get_outstanding_redactions(
rejected_reason=rejected_reason,
)
original_ev.internal_metadata.stream_ordering = row.stream_ordering
original_ev.internal_metadata.instance_name = row.instance_name
original_ev.internal_metadata.outlier = row.outlier

# Consistency check: if the content of the event has been modified in the
Expand Down Expand Up @@ -1439,6 +1441,7 @@ def _fetch_event_rows(
SELECT
e.event_id,
e.stream_ordering,
e.instance_name,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
ej.internal_metadata,
ej.json,
ej.format_version,
Expand All @@ -1462,13 +1465,14 @@ def _fetch_event_rows(
event_dict[event_id] = _EventRow(
event_id=event_id,
stream_ordering=row[1],
internal_metadata=row[2],
json=row[3],
format_version=row[4],
room_version_id=row[5],
rejected_reason=row[6],
instance_name=row[2],
internal_metadata=row[3],
json=row[4],
format_version=row[5],
room_version_id=row[6],
rejected_reason=row[7],
redactions=[],
outlier=bool(row[7]), # This is an int in SQLite3
outlier=bool(row[8]), # This is an int in SQLite3
)

# check for redactions
Expand Down
2 changes: 2 additions & 0 deletions synapse/synapse_rust/events.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class EventInternalMetadata:

stream_ordering: Optional[int]
"""the stream ordering of this event. None, until it has been persisted."""
instance_name: Optional[str]
"""the instance name of the server that persisted this event. None, until it has been persisted."""

outlier: bool
"""whether this event is an outlier (ie, whether we have the state at that
Expand Down
3 changes: 3 additions & 0 deletions tests/events/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,13 +625,16 @@ def test_unsigned_is_copied(self) -> None:
)
original.internal_metadata.stream_ordering = 1234
self.assertEqual(original.internal_metadata.stream_ordering, 1234)
original.internal_metadata.instance_name = "worker1"
self.assertEqual(original.internal_metadata.instance_name, "worker1")

cloned = clone_event(original)
cloned.unsigned["b"] = 3

self.assertEqual(original.unsigned, {"a": 1, "b": 2})
self.assertEqual(cloned.unsigned, {"a": 1, "b": 3})
self.assertEqual(cloned.internal_metadata.stream_ordering, 1234)
self.assertEqual(cloned.internal_metadata.instance_name, "worker1")
self.assertEqual(cloned.internal_metadata.txn_id, "txn")


Expand Down
10 changes: 7 additions & 3 deletions tests/replication/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def test_invites(self) -> None:
self.persist(type="m.room.create", key="", creator=USER_ID)
self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
assert event.internal_metadata.instance_name is not None
assert event.internal_metadata.stream_ordering is not None

self.replicate()
Expand All @@ -155,7 +156,7 @@ def test_invites(self) -> None:
"invite",
event.event_id,
PersistedEventPosition(
self.hs.get_instance_name(),
event.internal_metadata.instance_name,
event.internal_metadata.stream_ordering,
),
RoomVersions.V1.identifier,
Expand Down Expand Up @@ -232,11 +233,12 @@ def test_get_rooms_for_user_with_stream_ordering(self) -> None:
j2 = self.persist(
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None
self.replicate()

expected_pos = PersistedEventPosition(
"master", j2.internal_metadata.stream_ordering
j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
)
self.check(
"get_rooms_for_user_with_stream_ordering",
Expand Down Expand Up @@ -288,6 +290,7 @@ def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
msg, msgctx = self.build_event()
self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
self.replicate()
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None

event_source = RoomEventSource(self.hs)
Expand Down Expand Up @@ -329,7 +332,8 @@ def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
# joined_rooms list.
if membership_changes:
expected_pos = PersistedEventPosition(
"master", j2.internal_metadata.stream_ordering
j2.internal_metadata.instance_name,
j2.internal_metadata.stream_ordering,
)
self.assertEqual(
joined_rooms,
Expand Down
1 change: 1 addition & 0 deletions tests/storage/test_event_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ def persist(

for e in events:
e.internal_metadata.stream_ordering = self._next_stream_ordering
e.internal_metadata.instance_name = self.hs.get_instance_name()
self._next_stream_ordering += 1

def _persist(txn: LoggingTransaction) -> None:
Expand Down
Loading