Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move to using TCP replication #2097

Merged
merged 8 commits into from
Apr 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 22 additions & 28 deletions synapse/app/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string

from synapse import events

from twisted.internet import reactor, defer
from twisted.internet import reactor
from twisted.web.resource import Resource

from daemonize import Daemonize
Expand Down Expand Up @@ -120,30 +120,25 @@ def start_listening(self, listeners):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

@defer.inlineCallbacks
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
appservice_handler = self.get_application_service_handler()

@defer.inlineCallbacks
def replicate(results):
stream = results.get("events")
if stream:
max_stream_id = stream["position"]
yield appservice_handler.notify_interested_services(max_stream_id)

while True:
try:
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
replicate(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(30)
self.get_tcp_replication().start_replication(self)

def build_tcp_replication(self):
return ASReplicationHandler(self)


class ASReplicationHandler(ReplicationClientHandler):
def __init__(self, hs):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()

def on_rdata(self, stream_name, token, rows):
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)

if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()
preserve_fn(
self.appservice_handler.notify_interested_services
)(max_stream_id)


def start(config_options):
Expand Down Expand Up @@ -199,7 +194,6 @@ def run():
reactor.run()

def start():
ps.replicate()
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()

Expand Down
22 changes: 5 additions & 17 deletions synapse/app/client_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import PublicRoomListRestServlet
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
Expand All @@ -45,7 +45,7 @@
from synapse import events


from twisted.internet import reactor, defer
from twisted.internet import reactor
from twisted.web.resource import Resource

from daemonize import Daemonize
Expand Down Expand Up @@ -145,21 +145,10 @@ def start_listening(self, listeners):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

@defer.inlineCallbacks
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
self.get_tcp_replication().start_replication(self)

while True:
try:
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(5)
def build_tcp_replication(self):
return ReplicationClientHandler(self.get_datastore())


def start(config_options):
Expand Down Expand Up @@ -209,7 +198,6 @@ def run():
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
ss.replicate()

reactor.callWhenRunning(start)

Expand Down
22 changes: 5 additions & 17 deletions synapse/app/federation_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
Expand All @@ -42,7 +42,7 @@
from synapse import events


from twisted.internet import reactor, defer
from twisted.internet import reactor
from twisted.web.resource import Resource

from daemonize import Daemonize
Expand Down Expand Up @@ -134,21 +134,10 @@ def start_listening(self, listeners):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

@defer.inlineCallbacks
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
self.get_tcp_replication().start_replication(self)

while True:
try:
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(5)
def build_tcp_replication(self):
return ReplicationClientHandler(self.get_datastore())


def start(config_options):
Expand Down Expand Up @@ -198,7 +187,6 @@ def run():
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
ss.replicate()

reactor.callWhenRunning(start)

Expand Down
128 changes: 78 additions & 50 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.util.async import sleep
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
Expand All @@ -59,7 +60,28 @@ class FederationSenderSlaveStore(
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore,
):
pass
def __init__(self, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(db_conn, hs)

# We pull out the current federation stream position now so that we
# always have a known value for the federation position in memory so
# that we don't have to bounce via a deferred once when we start the
# replication streams.
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)

def _get_federation_out_pos(self, db_conn):
sql = (
"SELECT stream_id FROM federation_stream_position"
" WHERE type = ?"
)
sql = self.database_engine.convert_param_style(sql)

txn = db_conn.cursor()
txn.execute(sql, ("federation",))
rows = txn.fetchall()
txn.close()

return rows[0][0] if rows else -1


class FederationSenderServer(HomeServer):
Expand Down Expand Up @@ -127,26 +149,27 @@ def start_listening(self, listeners):
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

@defer.inlineCallbacks
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
replication_url = self.config.worker_replication_url
send_handler = FederationSenderHandler(self)

send_handler.on_start()

while True:
try:
args = store.stream_positions()
args.update((yield send_handler.stream_positions()))
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
yield store.process_replication(result)
yield send_handler.process_replication(result)
except:
logger.exception("Error replicating from %r", replication_url)
yield sleep(30)
self.get_tcp_replication().start_replication(self)

def build_tcp_replication(self):
return FederationSenderReplicationHandler(self)


class FederationSenderReplicationHandler(ReplicationClientHandler):
def __init__(self, hs):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)

def on_rdata(self, stream_name, token, rows):
super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)

def get_streams_to_replicate(self):
args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
args.update(self.send_handler.stream_positions())
return args


def start(config_options):
Expand Down Expand Up @@ -205,7 +228,6 @@ def run():
reactor.run()

def start():
ps.replicate()
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()

Expand All @@ -229,9 +251,15 @@ class FederationSenderHandler(object):
"""Processes the replication stream and forwards the appropriate entries
to the federation sender.
"""
def __init__(self, hs):
def __init__(self, hs, replication_client):
self.store = hs.get_datastore()
self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client

self.federation_position = self.store.federation_out_pos_startup
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")

self._last_ack = self.federation_position

self._room_serials = {}
self._room_typing = {}
Expand All @@ -243,25 +271,13 @@ def on_start(self):
self.store.get_room_max_stream_ordering()
)

@defer.inlineCallbacks
def stream_positions(self):
stream_id = yield self.store.get_federation_out_pos("federation")
defer.returnValue({
"federation": stream_id,

# Ack stuff we've "processed", this should only be called from
# one process.
"federation_ack": stream_id,
})
return {"federation": self.federation_position}

@defer.inlineCallbacks
def process_replication(self, result):
def process_replication_rows(self, stream_name, token, rows):
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
fed_stream = result.get("federation")
if fed_stream:
latest_id = int(fed_stream["position"])

if stream_name == "federation":
# The federation stream containis a bunch of different types of
# rows that need to be handled differently. We parse the rows, put
# them into the appropriate collection and then send them off.
Expand All @@ -272,8 +288,9 @@ def process_replication(self, result):
device_destinations = set()

# Parse the rows in the stream
for row in fed_stream["rows"]:
position, typ, content_js = row
for row in rows:
typ = row.type
content_js = row.data
content = json.loads(content_js)

if typ == send_queue.PRESENCE_TYPE:
Expand Down Expand Up @@ -325,16 +342,27 @@ def process_replication(self, result):
for destination in device_destinations:
self.federation_sender.send_device_messages(destination)

# Record where we are in the stream.
yield self.store.update_federation_out_pos(
"federation", latest_id
)
preserve_fn(self.update_token)(token)

# We also need to poke the federation sender when new events happen
event_stream = result.get("events")
if event_stream:
latest_pos = event_stream["position"]
self.federation_sender.notify_new_events(latest_pos)
elif stream_name == "events":
self.federation_sender.notify_new_events(token)

@defer.inlineCallbacks
def update_token(self, token):
self.federation_position = token

# We linearize here to ensure we don't have races updating the token
with (yield self._fed_position_linearizer.queue(None)):
if self._last_ack < self.federation_position:
yield self.store.update_federation_out_pos(
"federation", self.federation_position
)

# We ACK this token over replication so that the master can drop
# its in memory queues
self.replication_client.send_federation_ack(self.federation_position)
self._last_ack = self.federation_position


if __name__ == '__main__':
Expand Down
Loading