Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Replace HTTP replication with TCP replication #2069

Closed
wants to merge 11 commits into from
46 changes: 19 additions & 27 deletions synapse/app/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
Expand All @@ -36,7 +36,7 @@

from synapse import events

from twisted.internet import reactor, defer
from twisted.internet import reactor
from twisted.web.resource import Resource

from daemonize import Daemonize
Expand Down Expand Up @@ -120,30 +120,23 @@ def start_listening(self, listeners):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

@defer.inlineCallbacks
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
appservice_handler = self.get_application_service_handler()

@defer.inlineCallbacks
def replicate(results):
stream = results.get("events")
if stream:
max_stream_id = stream["position"]
yield appservice_handler.notify_interested_services(max_stream_id)

while True:
try:
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
replicate(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(30)
self.get_tcp_replication().start_replication(self)

def build_tcp_replication(self):
return ASReplicationHandler(self)


class ASReplicationHandler(ReplicationClientHandler):
def __init__(self, hs):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()

def on_rdata(self, stream_name, token, rows):
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)

if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()
self.appservice_handler.notify_interested_services(max_stream_id)


def start(config_options):
Expand Down Expand Up @@ -199,7 +192,6 @@ def run():
reactor.run()

def start():
ps.replicate()
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()

Expand Down
22 changes: 5 additions & 17 deletions synapse/app/client_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import PublicRoomListRestServlet
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
Expand All @@ -45,7 +45,7 @@
from synapse import events


from twisted.internet import reactor, defer
from twisted.internet import reactor
from twisted.web.resource import Resource

from daemonize import Daemonize
Expand Down Expand Up @@ -145,21 +145,10 @@ def start_listening(self, listeners):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

@defer.inlineCallbacks
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
self.get_tcp_replication().start_replication(self)

while True:
try:
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(5)
def build_tcp_replication(self):
return ReplicationClientHandler(self.get_datastore())


def start(config_options):
Expand Down Expand Up @@ -209,7 +198,6 @@ def run():
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
ss.replicate()

reactor.callWhenRunning(start)

Expand Down
22 changes: 5 additions & 17 deletions synapse/app/federation_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
Expand All @@ -42,7 +42,7 @@
from synapse import events


from twisted.internet import reactor, defer
from twisted.internet import reactor
from twisted.web.resource import Resource

from daemonize import Daemonize
Expand Down Expand Up @@ -134,21 +134,10 @@ def start_listening(self, listeners):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

@defer.inlineCallbacks
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
self.get_tcp_replication().start_replication(self)

while True:
try:
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(5)
def build_tcp_replication(self):
return ReplicationClientHandler(self.get_datastore())


def start(config_options):
Expand Down Expand Up @@ -198,7 +187,6 @@ def run():
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
ss.replicate()

reactor.callWhenRunning(start)

Expand Down
110 changes: 62 additions & 48 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.util.async import sleep
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
Expand All @@ -59,7 +60,23 @@ class FederationSenderSlaveStore(
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore,
):
pass
def __init__(self, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)

def _get_federation_out_pos(self, db_conn):
sql = (
"SELECT stream_id FROM federation_stream_position"
" WHERE type = ?"
)
sql = self.database_engine.convert_param_style(sql)

txn = db_conn.cursor()
txn.execute(sql, ("federation",))
rows = txn.fetchall()
txn.close()

return rows[0][0] if rows else -1


class FederationSenderServer(HomeServer):
Expand Down Expand Up @@ -127,26 +144,29 @@ def start_listening(self, listeners):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

@defer.inlineCallbacks
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
send_handler = FederationSenderHandler(self)

send_handler.on_start()

while True:
try:
args = store.stream_positions()
args.update((yield send_handler.stream_positions()))
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
yield send_handler.process_replication(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(30)
self.get_tcp_replication().start_replication(self)

def build_tcp_replication(self):
return FederationSenderReplicationHandler(self)


class FederationSenderReplicationHandler(ReplicationClientHandler):
def __init__(self, hs):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs)

def on_rdata(self, stream_name, token, rows):
super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)
if stream_name == "federation":
self.send_federation_ack(token)

def get_streams_to_replicate(self):
args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
args.update(self.send_handler.stream_positions())
return args


def start(config_options):
Expand Down Expand Up @@ -205,7 +225,6 @@ def run():
reactor.run()

def start():
ps.replicate()
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()

Expand Down Expand Up @@ -233,6 +252,9 @@ def __init__(self, hs):
self.store = hs.get_datastore()
self.federation_sender = hs.get_federation_sender()

self.federation_position = self.store.federation_out_pos_startup
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")

self._room_serials = {}
self._room_typing = {}

Expand All @@ -243,25 +265,13 @@ def on_start(self):
self.store.get_room_max_stream_ordering()
)

@defer.inlineCallbacks
def stream_positions(self):
stream_id = yield self.store.get_federation_out_pos("federation")
defer.returnValue({
"federation": stream_id,

# Ack stuff we've "processed", this should only be called from
# one process.
"federation_ack": stream_id,
})
return {"federation": self.federation_position}

@defer.inlineCallbacks
def process_replication(self, result):
def process_replication_rows(self, stream_name, token, rows):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
fed_stream = result.get("federation")
if fed_stream:
latest_id = int(fed_stream["position"])

if stream_name == "federation":
# The federation stream containis a bunch of different types of
# rows that need to be handled differently. We parse the rows, put
# them into the appropriate collection and then send them off.
Expand All @@ -272,8 +282,9 @@ def process_replication(self, result):
device_destinations = set()

# Parse the rows in the stream
for row in fed_stream["rows"]:
position, typ, content_js = row
for row in rows:
typ = row.type
content_js = row.data
content = json.loads(content_js)

if typ == send_queue.PRESENCE_TYPE:
Expand Down Expand Up @@ -325,16 +336,19 @@ def process_replication(self, result):
for destination in device_destinations:
self.federation_sender.send_device_messages(destination)

# Record where we are in the stream.
yield self.store.update_federation_out_pos(
"federation", latest_id
)
self.update_token(token)

# We also need to poke the federation sender when new events happen
event_stream = result.get("events")
if event_stream:
latest_pos = event_stream["position"]
self.federation_sender.notify_new_events(latest_pos)
elif stream_name == "events":
self.federation_sender.notify_new_events(token)

@defer.inlineCallbacks
def update_token(self, token):
self.federation_position = token
with (yield self._fed_position_linearizer.queue(None)):
yield self.store.update_federation_out_pos(
"federation", self.federation_position
)


if __name__ == '__main__':
Expand Down
11 changes: 11 additions & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from synapse.metrics import register_memory_metrics, get_metrics_for
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.federation.transport.server import TransportLayerServer

from synapse.util.rlimit import change_resource_limit
Expand Down Expand Up @@ -222,6 +223,16 @@ def start_listening(self):
),
interface=address
)
elif listener["type"] == "replication":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would have been awesome to have this in commit 390511f, to make that easier to review, ftr...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops! Sorry, will reshuffle if we split this up more

bind_addresses = listener["bind_addresses"]
for address in bind_addresses:
factory = ReplicationStreamProtocolFactory(self)
server_listener = reactor.listenTCP(
listener["port"], factory, interface=address
)
reactor.addSystemEventTrigger(
"before", "shutdown", server_listener.stopListening,
)
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

Expand Down
Loading