Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add EventStreamPosition type #8388

Merged
merged 3 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8388.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `EventStreamPosition` type.
16 changes: 10 additions & 6 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
from synapse.types import (
JsonDict,
MutableStateMap,
PersistedEventPosition,
RoomStreamToken,
StateMap,
UserID,
get_domain_from_id,
Expand Down Expand Up @@ -2956,7 +2958,7 @@ async def persist_events_and_notify(
)
return result["max_stream_id"]
else:
max_stream_id = await self.storage.persistence.persist_events(
max_stream_token = await self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)

Expand All @@ -2967,12 +2969,12 @@ async def persist_events_and_notify(

if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
await self._notify_persisted_event(event, max_stream_id)
await self._notify_persisted_event(event, max_stream_token)

return max_stream_id
return max_stream_token.stream

async def _notify_persisted_event(
self, event: EventBase, max_stream_id: int
self, event: EventBase, max_stream_token: RoomStreamToken
) -> None:
"""Checks to see if notifier/pushers should be notified about the
event or not.
Expand All @@ -2998,9 +3000,11 @@ async def _notify_persisted_event(
elif event.internal_metadata.is_outlier():
return

event_stream_id = event.internal_metadata.stream_ordering
event_pos = PersistedEventPosition(
self._instance_name, event.internal_metadata.stream_ordering
)
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
event, event_pos, max_stream_token, extra_users=extra_users
)

async def _clean_room_for_join(self, room_id: str) -> None:
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ def is_inviter_member_event(e):
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")

event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
event_pos, max_stream_token = await self.storage.persistence.persist_event(
event, context=context
)

Expand All @@ -1149,7 +1149,7 @@ def is_inviter_member_event(e):
def _notify():
try:
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
event, event_pos, max_stream_token, extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room event")
Expand All @@ -1161,7 +1161,7 @@ def _notify():
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)

return event_stream_id
return event_pos.stream

async def _bump_active_time(self, user: UserID) -> None:
try:
Expand Down
10 changes: 5 additions & 5 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ async def generate_sync_result(
raise NotImplementedError()
else:
joined_room_ids = await self.get_rooms_for_user_at(
user_id, now_token.room_stream_id
user_id, now_token.room_key
)
sync_result_builder = SyncResultBuilder(
sync_config,
Expand Down Expand Up @@ -1916,7 +1916,7 @@ async def _generate_room_entry(
raise Exception("Unrecognized rtype: %r", room_builder.rtype)

async def get_rooms_for_user_at(
self, user_id: str, stream_ordering: int
self, user_id: str, room_key: RoomStreamToken
) -> FrozenSet[str]:
"""Get set of joined rooms for a user at the given stream ordering.

Expand All @@ -1942,15 +1942,15 @@ async def get_rooms_for_user_at(
# If the membership's stream ordering is after the given stream
# ordering, we need to go and work out if the user was in the room
# before.
for room_id, membership_stream_ordering in joined_rooms:
if membership_stream_ordering <= stream_ordering:
for room_id, event_pos in joined_rooms:
if not event_pos.persisted_after(room_key):
joined_room_ids.add(room_id)
continue

logger.info("User joined room after current token: %s", room_id)

extrems = await self.store.get_forward_extremeties_for_room(
room_id, stream_ordering
room_id, event_pos.stream
)
users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
if user_id in users_in_room:
Expand Down
55 changes: 29 additions & 26 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.streams.config import PaginationConfig
from synapse.types import Collection, RoomStreamToken, StreamToken, UserID
from synapse.types import (
Collection,
PersistedEventPosition,
RoomStreamToken,
StreamToken,
UserID,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -187,7 +193,7 @@ def __init__(self, hs: "synapse.server.HomeServer"):
self.store = hs.get_datastore()
self.pending_new_room_events = (
[]
) # type: List[Tuple[int, EventBase, Collection[UserID]]]
) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]

# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
Expand Down Expand Up @@ -246,8 +252,8 @@ def add_replication_callback(self, cb: Callable[[], None]):
def on_new_room_event(
self,
event: EventBase,
room_stream_id: int,
max_room_stream_id: int,
event_pos: PersistedEventPosition,
max_room_stream_token: RoomStreamToken,
extra_users: Collection[UserID] = [],
):
""" Used by handlers to inform the notifier something has happened
Expand All @@ -261,16 +267,16 @@ def on_new_room_event(
until all previous events have been persisted before notifying
the client streams.
"""
self.pending_new_room_events.append((room_stream_id, event, extra_users))
self._notify_pending_new_room_events(max_room_stream_id)
self.pending_new_room_events.append((event_pos, event, extra_users))
self._notify_pending_new_room_events(max_room_stream_token)

self.notify_replication()

def _notify_pending_new_room_events(self, max_room_stream_id: int):
def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken):
"""Notify for the room events that were queued waiting for a previous
event to be persisted.
Args:
max_room_stream_id: The highest stream_id below which all
max_room_stream_token: The highest stream_id below which all
events have been persisted.
"""
pending = self.pending_new_room_events
Expand All @@ -279,11 +285,9 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int):
users = set() # type: Set[UserID]
rooms = set() # type: Set[str]

for room_stream_id, event, extra_users in pending:
if room_stream_id > max_room_stream_id:
self.pending_new_room_events.append(
(room_stream_id, event, extra_users)
)
for event_pos, event, extra_users in pending:
if event_pos.persisted_after(max_room_stream_token):
self.pending_new_room_events.append((event_pos, event, extra_users))
else:
if (
event.type == EventTypes.Member
Expand All @@ -296,39 +300,38 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int):

if users or rooms:
self.on_new_event(
"room_key",
RoomStreamToken(None, max_room_stream_id),
users=users,
rooms=rooms,
"room_key", max_room_stream_token, users=users, rooms=rooms,
)
self._on_updated_room_token(max_room_stream_id)
self._on_updated_room_token(max_room_stream_token)

def _on_updated_room_token(self, max_room_stream_id: int):
def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
"""Poke services that might care that the room position has been
updated.
"""

# poke any interested application service.
run_as_background_process(
"_notify_app_services", self._notify_app_services, max_room_stream_id
"_notify_app_services", self._notify_app_services, max_room_stream_token
)

run_as_background_process(
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
)

if self.federation_sender:
self.federation_sender.notify_new_events(max_room_stream_id)
self.federation_sender.notify_new_events(max_room_stream_token.stream)

async def _notify_app_services(self, max_room_stream_id: int):
async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try:
await self.appservice_handler.notify_interested_services(max_room_stream_id)
await self.appservice_handler.notify_interested_services(
max_room_stream_token.stream
)
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_pusher_pool(self, max_room_stream_id: int):
async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
await self._pusher_pool.on_new_notifications(max_room_stream_id)
await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
except Exception:
logger.exception("Error pusher pool of event")

Expand Down
12 changes: 9 additions & 3 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
EventsStreamEventRow,
EventsStreamRow,
)
from synapse.types import UserID
from synapse.types import PersistedEventPosition, RoomStreamToken, UserID
from synapse.util.async_helpers import timeout_deferred
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -151,8 +151,14 @@ async def on_rdata(
extra_users = () # type: Tuple[UserID, ...]
if event.type == EventTypes.Member:
extra_users = (UserID.from_string(event.state_key),)
max_token = self.store.get_room_max_stream_ordering()
self.notifier.on_new_room_event(event, token, max_token, extra_users)

max_token = RoomStreamToken(
None, self.store.get_room_max_stream_ordering()
)
event_pos = PersistedEventPosition(instance_name, token)
self.notifier.on_new_room_event(
event, event_pos, max_token, extra_users
)

# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
Expand Down
14 changes: 9 additions & 5 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Set

Expand All @@ -37,7 +36,7 @@
ProfileInfo,
RoomsForUser,
)
from synapse.types import Collection, get_domain_from_id
from synapse.types import Collection, PersistedEventPosition, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
Expand Down Expand Up @@ -387,7 +386,7 @@ def _get_rooms_for_user_with_stream_ordering_txn(
# for rooms the server is participating in.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT room_id, e.stream_ordering
SELECT room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
Expand All @@ -397,7 +396,7 @@ def _get_rooms_for_user_with_stream_ordering_txn(
"""
else:
sql = """
SELECT room_id, e.stream_ordering
SELECT room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN room_memberships AS m USING (room_id, event_id)
INNER JOIN events AS e USING (room_id, event_id)
Expand All @@ -408,7 +407,12 @@ def _get_rooms_for_user_with_stream_ordering_txn(
"""

txn.execute(sql, (user_id, Membership.JOIN))
return frozenset(GetRoomsForUserWithStreamOrdering(*row) for row in txn)
return frozenset(
GetRoomsForUserWithStreamOrdering(
room_id, PersistedEventPosition(instance, stream_id)
)
for room_id, instance, stream_id in txn
)

async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
Expand Down
14 changes: 9 additions & 5 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases import Databases
from synapse.storage.databases.main.events import DeltaState
from synapse.types import Collection, StateMap
from synapse.types import Collection, PersistedEventPosition, RoomStreamToken, StateMap
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -190,6 +190,7 @@ def __init__(self, hs, stores: Databases):
self.persist_events_store = stores.persist_events

self._clock = hs.get_clock()
self._instance_name = hs.get_instance_name()
self.is_mine_id = hs.is_mine_id
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
Expand All @@ -198,7 +199,7 @@ async def persist_events(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
) -> RoomStreamToken:
"""
Write events to the database
Args:
Expand Down Expand Up @@ -228,11 +229,11 @@ async def persist_events(
defer.gatherResults(deferreds, consumeErrors=True)
)

return self.main_store.get_current_events_token()
return RoomStreamToken(None, self.main_store.get_current_events_token())

async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[int, int]:
) -> Tuple[PersistedEventPosition, RoomStreamToken]:
"""
Returns:
The stream ordering of `event`, and the stream ordering of the
Expand All @@ -247,7 +248,10 @@ async def persist_event(
await make_deferred_yieldable(deferred)

max_persisted_id = self.main_store.get_current_events_token()
return (event.internal_metadata.stream_ordering, max_persisted_id)
event_stream_id = event.internal_metadata.stream_ordering

pos = PersistedEventPosition(self._instance_name, event_stream_id)
return pos, RoomStreamToken(None, max_persisted_id)

def _maybe_start_persisting(self, room_id: str):
async def persisting_queue(item):
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
)

GetRoomsForUserWithStreamOrdering = namedtuple(
"_GetRoomsForUserWithStreamOrdering", ("room_id", "stream_ordering")
"_GetRoomsForUserWithStreamOrdering", ("room_id", "event_pos")
)


Expand Down
15 changes: 15 additions & 0 deletions synapse/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,21 @@ def copy_and_replace(self, key, new_value) -> "StreamToken":
StreamToken.START = StreamToken.from_string("s0_0")


@attr.s(slots=True, frozen=True)
class PersistedEventPosition:
"""Position of a newly persisted event with instance that persisted it.

This can be used to test whether the event is persisted before or after a
RoomStreamToken.
"""

instance_name = attr.ib(type=str)
stream = attr.ib(type=int)

def persisted_after(self, token: RoomStreamToken) -> bool:
return token.stream < self.stream


class ThirdPartyInstanceID(
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
):
Expand Down
Loading