Add single instance & logging stuff

Copy the stuff over from http pusher that prevents multiple instances of process running at once and sets up logging and measure blocks.
This commit is contained in:
David Baker 2016-04-19 14:52:58 +01:00
parent 07d765209d
commit e2a01455af
1 changed files with 39 additions and 8 deletions

View File

@ -19,6 +19,7 @@ import logging
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -56,6 +57,8 @@ class EmailPusher(object):
# See httppusher # See httppusher
self.max_stream_ordering = None self.max_stream_ordering = None
self.processing = False
@defer.inlineCallbacks @defer.inlineCallbacks
def on_started(self): def on_started(self):
self.throttle_params = yield self.store.get_throttle_params_by_room( self.throttle_params = yield self.store.get_throttle_params_by_room(
@ -63,20 +66,48 @@ class EmailPusher(object):
) )
yield self._process() yield self._process()
def on_stop(self):
if self.timed_call:
self.timed_call.cancel()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering): def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
with Measure(self.clock, "push.on_new_notifications"):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
yield self._process() yield self._process()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_timer(self): def on_timer(self):
self.timed_call = None self.timed_call = None
with Measure(self.clock, "push.on_timer"):
yield self._process() yield self._process()
@defer.inlineCallbacks @defer.inlineCallbacks
def _process(self): def _process(self):
if self.processing:
return
with LoggingContext("emailpush._process"):
with Measure(self.clock, "emailpush._process"):
try:
self.processing = True
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
except:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self.processing = False
def _unsafe_process(self):
"""
Main logic of the push loop without the wrapper function that sets
up logging, measures and guards against multiple instances of it
being run.
"""
last_notifs = yield self.store.get_time_of_latest_push_action_by_room_for_user( last_notifs = yield self.store.get_time_of_latest_push_action_by_room_for_user(
self.user_id self.user_id
) )