This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Move to using TCP replication #2097
Merged
Merged
Changes from 6 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
52bfa60
Add basic replication client handler and factory
erikjohnston 3a1f3f8
Change slave storage to use new replication interface
erikjohnston 36c28bc
Update all the workers and master to use TCP replication
erikjohnston 6ce6bbe
Move where we ack federation
erikjohnston 3376f16
Shuffle and comment synchrotron presence
erikjohnston d160579
Remove unused worker config option
erikjohnston ac66e11
Add the appropriate amount of preserve_fn
erikjohnston d1d5362
Add comment
erikjohnston File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,9 +31,10 @@ | |
from synapse.replication.slave.storage.registration import SlavedRegistrationStore | ||
from synapse.replication.slave.storage.transactions import TransactionStore | ||
from synapse.replication.slave.storage.devices import SlavedDeviceStore | ||
from synapse.replication.tcp.client import ReplicationClientHandler | ||
from synapse.storage.engines import create_engine | ||
from synapse.storage.presence import UserPresenceState | ||
from synapse.util.async import sleep | ||
from synapse.util.async import Linearizer | ||
from synapse.util.httpresourcetree import create_resource_tree | ||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext | ||
from synapse.util.manhole import manhole | ||
|
@@ -59,7 +60,23 @@ class FederationSenderSlaveStore( | |
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, | ||
SlavedRegistrationStore, SlavedDeviceStore, | ||
): | ||
pass | ||
def __init__(self, db_conn, hs): | ||
super(FederationSenderSlaveStore, self).__init__(db_conn, hs) | ||
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) | ||
|
||
def _get_federation_out_pos(self, db_conn): | ||
sql = ( | ||
"SELECT stream_id FROM federation_stream_position" | ||
" WHERE type = ?" | ||
) | ||
sql = self.database_engine.convert_param_style(sql) | ||
|
||
txn = db_conn.cursor() | ||
txn.execute(sql, ("federation",)) | ||
rows = txn.fetchall() | ||
txn.close() | ||
|
||
return rows[0][0] if rows else -1 | ||
|
||
|
||
class FederationSenderServer(HomeServer): | ||
|
@@ -127,26 +144,27 @@ def start_listening(self, listeners): | |
else: | ||
logger.warn("Unrecognized listener type: %s", listener["type"]) | ||
|
||
@defer.inlineCallbacks | ||
def replicate(self): | ||
http_client = self.get_simple_http_client() | ||
store = self.get_datastore() | ||
replication_url = self.config.worker_replication_url | ||
send_handler = FederationSenderHandler(self) | ||
|
||
send_handler.on_start() | ||
|
||
while True: | ||
try: | ||
args = store.stream_positions() | ||
args.update((yield send_handler.stream_positions())) | ||
args["timeout"] = 30000 | ||
result = yield http_client.get_json(replication_url, args=args) | ||
yield store.process_replication(result) | ||
yield send_handler.process_replication(result) | ||
except: | ||
logger.exception("Error replicating from %r", replication_url) | ||
yield sleep(30) | ||
self.get_tcp_replication().start_replication(self) | ||
|
||
def build_tcp_replication(self): | ||
return FederationSenderReplicationHandler(self) | ||
|
||
|
||
class FederationSenderReplicationHandler(ReplicationClientHandler): | ||
def __init__(self, hs): | ||
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) | ||
self.send_handler = FederationSenderHandler(hs, self) | ||
|
||
def on_rdata(self, stream_name, token, rows): | ||
super(FederationSenderReplicationHandler, self).on_rdata( | ||
stream_name, token, rows | ||
) | ||
self.send_handler.process_replication_rows(stream_name, token, rows) | ||
|
||
def get_streams_to_replicate(self): | ||
args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate() | ||
args.update(self.send_handler.stream_positions()) | ||
return args | ||
|
||
|
||
def start(config_options): | ||
|
@@ -205,7 +223,6 @@ def run(): | |
reactor.run() | ||
|
||
def start(): | ||
ps.replicate() | ||
ps.get_datastore().start_profiling() | ||
ps.get_state_handler().start_caching() | ||
|
||
|
@@ -229,9 +246,15 @@ class FederationSenderHandler(object): | |
"""Processes the replication stream and forwards the appropriate entries | ||
to the federation sender. | ||
""" | ||
def __init__(self, hs): | ||
def __init__(self, hs, replication_client): | ||
self.store = hs.get_datastore() | ||
self.federation_sender = hs.get_federation_sender() | ||
self.replication_client = replication_client | ||
|
||
self.federation_position = self.store.federation_out_pos_startup | ||
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") | ||
|
||
self._last_ack = self.federation_position | ||
|
||
self._room_serials = {} | ||
self._room_typing = {} | ||
|
@@ -243,25 +266,13 @@ def on_start(self): | |
self.store.get_room_max_stream_ordering() | ||
) | ||
|
||
@defer.inlineCallbacks | ||
def stream_positions(self): | ||
stream_id = yield self.store.get_federation_out_pos("federation") | ||
defer.returnValue({ | ||
"federation": stream_id, | ||
|
||
# Ack stuff we've "processed", this should only be called from | ||
# one process. | ||
"federation_ack": stream_id, | ||
}) | ||
return {"federation": self.federation_position} | ||
|
||
@defer.inlineCallbacks | ||
def process_replication(self, result): | ||
def process_replication_rows(self, stream_name, token, rows): | ||
# The federation stream contains things that we want to send out, e.g. | ||
# presence, typing, etc. | ||
fed_stream = result.get("federation") | ||
if fed_stream: | ||
latest_id = int(fed_stream["position"]) | ||
|
||
if stream_name == "federation": | ||
# The federation stream containis a bunch of different types of | ||
# rows that need to be handled differently. We parse the rows, put | ||
# them into the appropriate collection and then send them off. | ||
|
@@ -272,8 +283,9 @@ def process_replication(self, result): | |
device_destinations = set() | ||
|
||
# Parse the rows in the stream | ||
for row in fed_stream["rows"]: | ||
position, typ, content_js = row | ||
for row in rows: | ||
typ = row.type | ||
content_js = row.data | ||
content = json.loads(content_js) | ||
|
||
if typ == send_queue.PRESENCE_TYPE: | ||
|
@@ -325,16 +337,27 @@ def process_replication(self, result): | |
for destination in device_destinations: | ||
self.federation_sender.send_device_messages(destination) | ||
|
||
# Record where we are in the stream. | ||
yield self.store.update_federation_out_pos( | ||
"federation", latest_id | ||
) | ||
self.update_token(token) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
# We also need to poke the federation sender when new events happen | ||
event_stream = result.get("events") | ||
if event_stream: | ||
latest_pos = event_stream["position"] | ||
self.federation_sender.notify_new_events(latest_pos) | ||
elif stream_name == "events": | ||
self.federation_sender.notify_new_events(token) | ||
|
||
@defer.inlineCallbacks | ||
def update_token(self, token): | ||
self.federation_position = token | ||
|
||
# We linearize here to ensure we don't have races updating the token | ||
with (yield self._fed_position_linearizer.queue(None)): | ||
if self._last_ack < self.federation_position: | ||
yield self.store.update_federation_out_pos( | ||
"federation", self.federation_position | ||
) | ||
|
||
# We ACK this token over replication so that the master can drop | ||
# its in memory queues | ||
self.replication_client.send_federation_ack(self.federation_position) | ||
self._last_ack = self.federation_position | ||
|
||
|
||
if __name__ == '__main__': | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to store this rather than let the handler just call get_federation_out_pos?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we always want to have the position available rather than having to jump through a deferred the first time round. This is the same as what we do with all the ID generators too, we fetch them out of the database during start up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth a comment then?