Flesh out more stub functions.

This commit is contained in:
Kegan Dougal 2015-03-06 15:12:24 +00:00
parent 141ec04d19
commit f260cb72cd
4 changed files with 52 additions and 12 deletions

View File

@ -20,6 +20,11 @@ import re
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ApplicationServiceState(object):
DOWN = "down"
UP = "up"
class ApplicationService(object): class ApplicationService(object):
"""Defines an application service. This definition is mostly what is """Defines an application service. This definition is mostly what is
provided to the /register AS API. provided to the /register AS API.

View File

@ -49,7 +49,11 @@ This is all tied together by the AppServiceScheduler which DIs the required
components. components.
""" """
from synapse.appservice import ApplicationServiceState
from twisted.internet import defer from twisted.internet import defer
import logging
logger = logging.getLogger(__name__)
class AppServiceScheduler(object): class AppServiceScheduler(object):
@ -162,21 +166,36 @@ class _TransactionController(object):
if txn.send(self.as_api): if txn.send(self.as_api):
txn.complete(self.store) txn.complete(self.store)
else: else:
# TODO mark AS as down
self._start_recoverer(service) self._start_recoverer(service)
self.clock.call_later(1000, self.start_polling) self.clock.call_later(1000, self.start_polling)
def on_recovered(self, service): @defer.inlineCallbacks
# TODO mark AS as UP def on_recovered(self, recoverer):
pass applied_state = yield self.store.set_appservice_state(
recoverer.service,
ApplicationServiceState.UP
)
if not applied_state:
logger.error("Failed to apply appservice state UP to service %s",
recoverer.service)
def add_recoverers(self, recoverers): def add_recoverers(self, recoverers):
for r in recoverers: for r in recoverers:
self.recoverers.append(r) self.recoverers.append(r)
@defer.inlineCallbacks
def _start_recoverer(self, service): def _start_recoverer(self, service):
applied_state = yield self.store.set_appservice_state(
service,
ApplicationServiceState.DOWN
)
if applied_state:
recoverer = self.recoverer_fn(service, self.on_recovered) recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer])
recoverer.recover() recoverer.recover()
else:
logger.error("Failed to apply appservice state DOWN to service %s",
service)
def _is_service_up(self, service): def _is_service_up(self, service):
pass pass
@ -193,7 +212,9 @@ class _Recoverer(object):
@staticmethod @staticmethod
@defer.inlineCallbacks @defer.inlineCallbacks
def start(clock, store, as_api, callback): def start(clock, store, as_api, callback):
services = yield store.get_failing_appservices() services = yield store.get_appservices_by_state(
ApplicationServiceState.DOWN
)
recoverers = [ recoverers = [
_Recoverer(clock, store, as_api, s, callback) for s in services _Recoverer(clock, store, as_api, s, callback) for s in services
] ]
@ -228,7 +249,7 @@ class _Recoverer(object):
self._set_service_recovered() self._set_service_recovered()
def _set_service_recovered(self): def _set_service_recovered(self):
self.callback(self.service) self.callback(self)
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_oldest_txn(self): def _get_oldest_txn(self):

View File

@ -343,15 +343,28 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
def __init__(self, hs): def __init__(self, hs):
super(ApplicationServiceTransactionStore, self).__init__(hs) super(ApplicationServiceTransactionStore, self).__init__(hs)
def get_failing_appservices(self): def get_appservices_by_state(self, state):
"""Get a list of application services which are down. """Get a list of application services based on their state.
Args:
state(ApplicationServiceState): The state to filter on.
Returns: Returns:
A Deferred which resolves to a list of ApplicationServices, which A Deferred which resolves to a list of ApplicationServices, which
may be empty. may be empty.
""" """
pass pass
def set_appservice_state(self, service, state):
"""Set the application service state.
Args:
service(ApplicationService): The service whose state to set.
state(ApplicationServiceState): The connectivity state to apply.
Returns:
A Deferred which resolves to True if the state was set successfully.
"""
pass
def complete_appservice_txn(self, txn_id, service): def complete_appservice_txn(self, txn_id, service):
"""Completes an application service transaction. """Completes an application service transaction.

View File

@ -57,7 +57,8 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
self.assertEquals(1, txn.complete.call_count) self.assertEquals(1, txn.complete.call_count)
# 2 because it needs to get None to know there are no more txns # 2 because it needs to get None to know there are no more txns
self.assertEquals(2, self.store.get_oldest_txn.call_count) self.assertEquals(2, self.store.get_oldest_txn.call_count)
self.assertEquals(1, self.callback.call_count) self.callback.assert_called_once_with(self.recoverer)
self.assertEquals(self.recoverer.service, self.service)
def test_recover_retry_txn(self): def test_recover_retry_txn(self):
txn = Mock() txn = Mock()
@ -91,7 +92,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
self.clock.advance_time(16000) self.clock.advance_time(16000)
self.assertEquals(1, txn.send.call_count) # new mock reset call count self.assertEquals(1, txn.send.call_count) # new mock reset call count
self.assertEquals(1, txn.complete.call_count) self.assertEquals(1, txn.complete.call_count)
self.assertEquals(1, self.callback.call_count) self.callback.assert_called_once_with(self.recoverer)
class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase):