Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Resolve and share state_groups for all historical events in batch (MSC2716) #10975

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
64448b3
Fix event context for outliers in important MSC2716 spot
MadLittleMods Sep 29, 2021
f3174cd
Add changelog
MadLittleMods Sep 29, 2021
6713a2a
Update changelog.d/10938.bugfix
MadLittleMods Sep 29, 2021
97fa9a2
Update synapse/handlers/message.py
MadLittleMods Sep 29, 2021
fa4f20d
Only generate context when we need to (it's not free to throw away)
MadLittleMods Sep 29, 2021
4fea37e
Add proper state and state_groups so historical events return state f…
MadLittleMods Sep 29, 2021
8fb4d6f
Force /context to return state for the given historical event
MadLittleMods Sep 30, 2021
96d9d11
Set full state for each historical event like others
MadLittleMods Sep 30, 2021
b20fd16
Fix boolean logic for testing whether msc2716 is enabled
MadLittleMods Sep 30, 2021
0362887
Merge branch 'develop' into madlittlemods/msc2716-resolve-state-for-a…
MadLittleMods Oct 1, 2021
cafb1dc
Remove debug logs
MadLittleMods Oct 1, 2021
487754f
Restore back to what was before
MadLittleMods Oct 1, 2021
43f1328
Resolve and share state_groups for all historical events in batch
MadLittleMods Oct 2, 2021
d494673
Add sql comment
MadLittleMods Oct 2, 2021
6005c46
Remove unused code and fix lint
MadLittleMods Oct 2, 2021
1227154
Add changelog
MadLittleMods Oct 2, 2021
10c91ee
Fix upsert many being weird with combining key and value columns whic…
MadLittleMods Oct 2, 2021
d0d6699
Add findings when testing with Element
MadLittleMods Oct 2, 2021
aa2e56e
Connect the state to the insertion event which is inherited by the re…
MadLittleMods Oct 5, 2021
3b085ab
Remove debug logging
MadLittleMods Oct 5, 2021
b975bd2
Merge branch 'develop' into madlittlemods/msc2716-resolve-state-for-a…
MadLittleMods Oct 5, 2021
c5ea94c
Merge branch 'develop' into madlittlemods/msc2716-resolve-state-for-a…
MadLittleMods Oct 8, 2021
dc34f0f
Label fake event ID's as fake
MadLittleMods Oct 8, 2021
77ffb69
Merge branch 'develop' into madlittlemods/msc2716-resolve-state-for-a…
MadLittleMods Oct 9, 2021
1d1830d
Fix typo
MadLittleMods Oct 9, 2021
14d6672
Merge branch 'develop' into madlittlemods/msc2716-resolve-state-for-a…
MadLittleMods Oct 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,9 +1010,8 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No
room_version = await self._store.get_room_version(marker_event.room_id)
create_event = await self._store.get_create_event_for_room(marker_event.room_id)
room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
if (
not room_version.msc2716_historical
or not self._config.experimental.msc2716_enabled
if not room_version.msc2716_historical and (
not self._config.experimental.msc2716_enabled
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
or marker_event.sender != room_creator
):
return
Expand Down
66 changes: 42 additions & 24 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,36 +610,19 @@ async def create_event(

builder.internal_metadata.historical = historical

# Strip down the auth_event_ids to only what we need to auth the event.
# For example, we don't need extra m.room.member that don't match event.sender
if auth_event_ids is not None:
# If auth events are provided, prev events must be also.
assert prev_event_ids is not None

temp_event = await builder.build(
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
)
auth_events = await self.store.get_events_as_list(auth_event_ids)
# Create a StateMap[str]
auth_event_state_map = {
(e.type, e.state_key): e.event_id for e in auth_events
}
# Actually strip down and use the necessary auth events
auth_event_ids = self._event_auth_handler.compute_auth_events(
event=temp_event,
current_state_ids=auth_event_state_map,
for_verification=False,
)

event, context = await self.create_new_client_event(
builder=builder,
requester=requester,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
)
logger.info(
"create_new_client_event %s event=%s state_group=%s",
event.type,
event.event_id,
context._state_group,
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# In an ideal world we wouldn't need the second part of this condition. However,
# this behaviour isn't spec'd yet, meaning we should be able to deactivate this
Expand Down Expand Up @@ -939,6 +922,32 @@ async def create_new_client_event(
Tuple of created event, context
"""

# Strip down the auth_event_ids to only what we need to auth the event.
# For example, we don't need extra m.room.member that don't match event.sender
if auth_event_ids is not None:
# If auth events are provided, prev events must be also.
assert prev_event_ids is not None

# Copy the full auth state before it stripped down
full_state_ids_at_event = auth_event_ids.copy()

temp_event = await builder.build(
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
)
auth_events = await self.store.get_events_as_list(auth_event_ids)
# Create a StateMap[str]
auth_event_state_map = {
(e.type, e.state_key): e.event_id for e in auth_events
}
# Actually strip down and use the necessary auth events
auth_event_ids = self._event_auth_handler.compute_auth_events(
event=temp_event,
current_state_ids=auth_event_state_map,
for_verification=False,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved down here so we can get a copy of the full_state_ids_at_event to put in the insertion events below.


if prev_event_ids is not None:
assert (
len(prev_event_ids) <= 10
Expand Down Expand Up @@ -969,7 +978,16 @@ async def create_new_client_event(
event.internal_metadata.outlier = True
context = EventContext.for_outlier()
else:
context = await self.state.compute_event_context(event)
old_state = None
# Define the state for historical messages while we know to get all of
# state_groups setup properly when we `compute_event_context`.
if builder.internal_metadata.is_historical() and full_state_ids_at_event:
old_state = await self.store.get_events_as_list(full_state_ids_at_event)

context = await self.state.compute_event_context(
event,
# old_state=old_state
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

if requester:
context.app_service = requester.app_service
Expand Down
52 changes: 34 additions & 18 deletions synapse/rest/client/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
logger = logging.getLogger(__name__)


def generate_fake_prev_event_id():
return "$" + random_string(43)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved


class RoomBatchSendEventRestServlet(RestServlet):
"""
API endpoint which can insert a batch of events historically back in time
Expand Down Expand Up @@ -216,6 +220,10 @@ async def on_POST(
prev_state_ids = list(prev_state_map.values())
auth_event_ids = prev_state_ids

# Make the state events float off on their own

prev_event_id_for_state_chain = generate_fake_prev_event_id()
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

state_event_ids_at_start = []
for state_event in body["state_events_at_start"]:
assert_params_in_dict(
Expand All @@ -240,9 +248,6 @@ async def on_POST(
# Mark all events as historical
event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True

# Make the state events float off on their own
fake_prev_event_id = "$" + random_string(43)

# TODO: This is pretty much the same as some other code to handle inserting state in this file
if event_dict["type"] == EventTypes.Member:
membership = event_dict["content"].get("membership", None)
Expand All @@ -254,8 +259,8 @@ async def on_POST(
room_id=room_id,
action=membership,
content=event_dict["content"],
outlier=True,
prev_event_ids=[fake_prev_event_id],
# outlier=True,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
prev_event_ids=[prev_event_id_for_state_chain],
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
Expand All @@ -274,7 +279,7 @@ async def on_POST(
),
event_dict,
outlier=True,
prev_event_ids=[fake_prev_event_id],
prev_event_ids=[prev_event_id_for_state_chain],
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
Expand All @@ -284,6 +289,8 @@ async def on_POST(

state_event_ids_at_start.append(event_id)
auth_event_ids.append(event_id)
# Connect all the state in a floating chain
prev_event_id_for_state_chain = event_id

events_to_create = body["events"]

Expand All @@ -298,11 +305,6 @@ async def on_POST(
batch_id_to_connect_to = batch_id_from_query
base_insertion_event = None
if batch_id_from_query:
# All but the first base insertion event should point at a fake
# event, which causes the HS to ask for the state at the start of
# the batch later.
prev_event_ids = [fake_prev_event_id]

# Verify the batch_id_from_query corresponds to an actual insertion event
# and have the batch connected.
corresponding_insertion_event_id = (
Expand All @@ -325,14 +327,12 @@ async def on_POST(
# an insertion event), in which case we just create a new insertion event
# that can then get pointed to by a "marker" event later.
else:
prev_event_ids = prev_event_ids_from_query
Copy link
Contributor Author

@MadLittleMods MadLittleMods Oct 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was actually a bug. We only want the base insertion event to be tied to prev_event_ids_from_query.

Whereas previously, this attached the base and normal insertion event for the first batch (when no ?batch_id was specified)

Copy link
Contributor Author

@MadLittleMods MadLittleMods Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(notice the extra prev_event for the insertion event in chunk0)

Before (expected) Before (actual, because of this bug)
Mermaid live editor Mermaid live editor


base_insertion_event_dict = self._create_insertion_event_dict(
sender=requester.user.to_string(),
room_id=room_id,
origin_server_ts=last_event_in_batch["origin_server_ts"],
)
base_insertion_event_dict["prev_events"] = prev_event_ids.copy()
base_insertion_event_dict["prev_events"] = prev_event_ids_from_query.copy()

(
base_insertion_event,
Expand Down Expand Up @@ -383,11 +383,20 @@ async def on_POST(
# Prepend the insertion event to the start of the batch (oldest-in-time)
events_to_create = [insertion_event] + events_to_create

# Also connect the historical event chain to floating state chain,
# which causes the HS to ask for the state at the start of
# the batch later.
prev_event_ids = [prev_event_id_for_state_chain]
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

event_ids = []
events_to_persist = []
for ev in events_to_create:
assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])

assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
event.sender,
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

event_dict = {
"type": ev["type"],
"origin_server_ts": ev["origin_server_ts"],
Expand All @@ -410,17 +419,24 @@ async def on_POST(
historical=True,
depth=inherited_depth,
)

# Normally this is done when persisting the event but we have to
# pre-emptively do it here because we create all the events first,
# then persist them in another pass below. And we want to share
# state_groups across the whole batch so this lookup needs to work
# for the next event in the batch in this loop.
await self.store.store_state_group_id_for_event_id(
event_id=event.event_id,
state_group_id=context._state_group,
)

logger.debug(
"RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s",
event,
prev_event_ids,
auth_event_ids,
)

assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
event.sender,
)

events_to_persist.append((event, context))
event_id = event.event_id

Expand Down
12 changes: 12 additions & 0 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,15 @@ async def compute_event_context(
entry = await self.resolve_state_groups_for_events(
event.room_id, event.prev_event_ids()
)
logger.info(
"compute_event_context %s event=%s (event.prev_event_ids=%s) entry.state_group=%s entry.prev_group=%s entry.delta_ids=%s",
event.type,
event.event_id,
event.prev_event_ids(),
entry.state_group,
entry.prev_group,
entry.delta_ids,
)

state_ids_before_event = entry.state
state_group_before_event = entry.state_group
Expand Down Expand Up @@ -400,6 +409,9 @@ async def resolve_state_groups_for_events(
state_groups_ids = await self.state_store.get_state_groups_ids(
room_id, event_ids
)
logger.info(
"resolve_state_groups_for_events state_groups_ids=%s", state_groups_ids
)

if len(state_groups_ids) == 0:
return _StateCacheEntry(state={}, state_group=None)
Expand Down
23 changes: 13 additions & 10 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1763,9 +1763,8 @@ def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
retcol="creator",
allow_none=True,
)
if (
not room_version.msc2716_historical
or not self.hs.config.experimental.msc2716_enabled
if not room_version.msc2716_historical and (
not self.hs.config.experimental.msc2716_enabled
or event.sender != room_creator
):
return
Expand Down Expand Up @@ -1825,9 +1824,8 @@ def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase):
retcol="creator",
allow_none=True,
)
if (
not room_version.msc2716_historical
or not self.hs.config.experimental.msc2716_enabled
if not room_version.msc2716_historical and (
not self.hs.config.experimental.msc2716_enabled
or event.sender != room_creator
):
return
Expand Down Expand Up @@ -2071,13 +2069,18 @@ def _store_event_state_mappings_txn(

state_groups[event.event_id] = context.state_group

self.db_pool.simple_insert_many_txn(
self.db_pool.simple_upsert_many_txn(
Copy link
Contributor Author

@MadLittleMods MadLittleMods Oct 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to an upsert to avoid colliding with a previous entry we put in via store_state_group_id_for_event_id earlier before we persisted the events.

txn,
table="event_to_state_groups",
values=[
{"state_group": state_group_id, "event_id": event_id}
key_names=("event_id",),
key_values=(
(event_id,) for event_id, state_group_id in state_groups.items()
),
value_names=("state_group", "event_id"),
value_values=(
(state_group_id, event_id)
for event_id, state_group_id in state_groups.items()
],
),
)

for event_id, state_group_id in state_groups.items():
Expand Down
13 changes: 13 additions & 0 deletions synapse/storage/databases/main/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,16 @@ async def get_insertion_event_by_batch_id(
retcol="event_id",
allow_none=True,
)

async def store_state_group_id_for_event_id(
self, event_id: str, state_group_id: int
) -> Optional[str]:
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what's going on here. The braces to enclose the await is legit syntax (it'll build a set, like {"hello"}). As written the function is going to return None.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #11310

await self.db_pool.simple_upsert(
table="event_to_state_groups",
keyvalues={"event_id": event_id},
values={"state_group": state_group_id, "event_id": event_id},
# Unique constraint on event_id so we don't have to lock
lock=False,
)
}
6 changes: 5 additions & 1 deletion synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

SCHEMA_VERSION = 64 # remember to update the list below when updating
SCHEMA_VERSION = 65 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema

This should be incremented whenever the codebase changes its requirements on the
Expand Down Expand Up @@ -41,6 +41,10 @@

Changes in SCHEMA_VERSION = 64:
- MSC2716: Rename related tables and columns from "chunks" to "batches".

Changes in SCHEMA_VERSION = 65:
- MSC2716: Remove unique event_id constraint from insertion_event_edges
because an insertion event can have multiple edges.
"""


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

DROP INDEX insertion_event_edges_event_id;
CREATE INDEX IF NOT EXISTS insertion_event_edges_event_id ON insertion_event_edges(event_id);