Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Refactor the Appservice scheduler code (#5886)
Browse files Browse the repository at this point in the history
Get rid of the labyrinthine `recoverer_fn` code, and clean up the startup code
(it seemed to be previously inexplicably split between
`ApplicationServiceScheduler.start` and `_Recoverer.start`).

Add some docstrings too.
  • Loading branch information
richvdh authored Aug 20, 2019
2 parents baa3f4a + 5019945 commit 72bc285
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 49 deletions.
1 change: 1 addition & 0 deletions changelog.d/5886.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor the Appservice scheduler code.
110 changes: 65 additions & 45 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,35 +70,37 @@ def __init__(self, hs):
self.store = hs.get_datastore()
self.as_api = hs.get_application_service_api()

def create_recoverer(service, callback):
return _Recoverer(self.clock, self.store, self.as_api, service, callback)

self.txn_ctrl = _TransactionController(
self.clock, self.store, self.as_api, create_recoverer
)
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)

@defer.inlineCallbacks
def start(self):
logger.info("Starting appservice scheduler")

# check for any DOWN ASes and start recoverers for them.
recoverers = yield _Recoverer.start(
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
services = yield self.store.get_appservices_by_state(
ApplicationServiceState.DOWN
)
self.txn_ctrl.add_recoverers(recoverers)

for service in services:
self.txn_ctrl.start_recoverer(service)

def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)


class _ServiceQueuer(object):
"""Queues events for the same application service together, sending
transactions as soon as possible. Once a transaction is sent successfully,
this schedules any other events in the queue to run.
"""Queue of events waiting to be sent to appservices.
Groups events into transactions per-appservice, and sends them on to the
TransactionController. Makes sure that we only have one transaction in flight per
appservice at a given time.
"""

def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}

# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
Expand Down Expand Up @@ -136,13 +138,29 @@ def _send_request(self, service):


class _TransactionController(object):
def __init__(self, clock, store, as_api, recoverer_fn):
"""Transaction manager.
Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
if a transaction fails.
(Note we have only have one of these in the homeserver.)
Args:
clock (synapse.util.Clock):
store (synapse.storage.DataStore):
as_api (synapse.appservice.api.ApplicationServiceApi):
"""

def __init__(self, clock, store, as_api):
self.clock = clock
self.store = store
self.as_api = as_api
self.recoverer_fn = recoverer_fn
# keep track of how many recoverers there are
self.recoverers = []

# map from service id to recoverer instance
self.recoverers = {}

# for UTs
self.RECOVERER_CLASS = _Recoverer

@defer.inlineCallbacks
def send(self, service, events):
Expand All @@ -154,61 +172,63 @@ def send(self, service, events):
if sent:
yield txn.complete(self.store)
else:
run_in_background(self._start_recoverer, service)
run_in_background(self._on_txn_fail, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._start_recoverer, service)
run_in_background(self._on_txn_fail, service)

@defer.inlineCallbacks
def on_recovered(self, recoverer):
self.recoverers.remove(recoverer)
logger.info(
"Successfully recovered application service AS ID %s", recoverer.service.id
)
self.recoverers.pop(recoverer.service.id)
logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state(
recoverer.service, ApplicationServiceState.UP
)

def add_recoverers(self, recoverers):
for r in recoverers:
self.recoverers.append(r)
if len(recoverers) > 0:
logger.info("New active recoverers: %s", len(self.recoverers))

@defer.inlineCallbacks
def _start_recoverer(self, service):
def _on_txn_fail(self, service):
try:
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
logger.info(
"Application service falling behind. Starting recoverer. AS ID %s",
service.id,
)
recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer])
recoverer.recover()
self.start_recoverer(service)
except Exception:
logger.exception("Error starting AS recoverer")

def start_recoverer(self, service):
"""Start a Recoverer for the given service
Args:
service (synapse.appservice.ApplicationService):
"""
logger.info("Starting recoverer for AS ID %s", service.id)
assert service.id not in self.recoverers
recoverer = self.RECOVERER_CLASS(
self.clock, self.store, self.as_api, service, self.on_recovered
)
self.recoverers[service.id] = recoverer
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))

@defer.inlineCallbacks
def _is_service_up(self, service):
state = yield self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None


class _Recoverer(object):
@staticmethod
@defer.inlineCallbacks
def start(clock, store, as_api, callback):
services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
for r in recoverers:
logger.info(
"Starting recoverer for AS ID %s which was marked as " "DOWN",
r.service.id,
)
r.recover()
return recoverers
"""Manages retries and backoff for a DOWN appservice.
We have one of these for each appservice which is currently considered DOWN.
Args:
clock (synapse.util.Clock):
store (synapse.storage.DataStore):
as_api (synapse.appservice.api.ApplicationServiceApi):
service (synapse.appservice.ApplicationService): the service we are managing
callback (callable[_Recoverer]): called once the service recovers.
"""

def __init__(self, clock, store, as_api, service, callback):
self.clock = clock
Expand Down
6 changes: 2 additions & 4 deletions tests/appservice/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ def setUp(self):
self.recoverer = Mock()
self.recoverer_fn = Mock(return_value=self.recoverer)
self.txnctrl = _TransactionController(
clock=self.clock,
store=self.store,
as_api=self.as_api,
recoverer_fn=self.recoverer_fn,
clock=self.clock, store=self.store, as_api=self.as_api
)
self.txnctrl.RECOVERER_CLASS = self.recoverer_fn

def test_single_service_up_txn_sent(self):
# Test: The AS is up and the txn is successfully sent.
Expand Down

0 comments on commit 72bc285

Please sign in to comment.