From 7b0d846407a593ccd204f82aaa1090b8af8df84c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Feb 2016 16:19:15 +0000 Subject: [PATCH] Atomically persit push actions when we persist the event --- synapse/events/snapshot.py | 1 + synapse/handlers/_base.py | 10 +++--- synapse/handlers/federation.py | 12 +++---- synapse/push/action_generator.py | 20 +++--------- synapse/storage/event_push_actions.py | 45 ++++++++++----------------- synapse/storage/events.py | 26 ++++++++++------ 6 files changed, 49 insertions(+), 65 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f51200d18e..8a475417a6 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -20,3 +20,4 @@ class EventContext(object): self.current_state = current_state self.state_group = None self.rejected = False + self.push_actions = [] diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index d3f722b22e..064e8723c8 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -264,13 +264,13 @@ class BaseHandler(object): "You don't have permission to redact events" ) - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) - action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( - event, self, context.current_state + event, context, self + ) + + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context ) destinations = set() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b78b0502d9..da55d43541 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -236,12 +236,6 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - if not backfilled and not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( - event, self - ) - @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): event_to_state = yield self.store.get_state_for_events( @@ -1073,6 +1067,12 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) + if not backfilled and not event.internal_metadata.is_outlier(): + action_generator = ActionGenerator(self.hs) + yield action_generator.handle_push_actions_for_event( + event, context, self + ) + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index d8f8256a1f..e0da0868ec 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -19,8 +19,6 @@ import bulk_push_rule_evaluator import logging -from synapse.api.constants import EventTypes - logger = logging.getLogger(__name__) @@ -36,23 +34,15 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_push_actions_for_event(self, event, handler, current_state): - if event.type == EventTypes.Redaction and event.redacts is not None: - yield self.store.remove_push_actions_for_event_id( - event.room_id, event.redacts - ) - + def handle_push_actions_for_event(self, event, context, handler): bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( event.room_id, self.hs, self.store ) actions_by_user = yield bulk_evaluator.action_for_event_by_user( - event, handler, current_state + event, handler, context.current_state ) - yield self.store.set_push_actions_for_event_and_users( - event, - [ - (uid, None, actions) for uid, actions in actions_by_user.items() - ] - ) + context.push_actions = [ + (uid, None, actions) for uid, actions in actions_by_user.items() + ] diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index d0a969f50b..466f07a1c4 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -24,8 +24,7 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): - @defer.inlineCallbacks - def set_push_actions_for_event_and_users(self, event, tuples): + def _set_push_actions_for_event_and_users(self, txn, event, tuples): """ :param event: the event set actions for :param tuples: list of tuples of (user_id, profile_tag, actions) @@ -44,18 +43,12 @@ class EventPushActionsStore(SQLBaseStore): 'highlight': 1 if _action_has_highlight(actions) else 0, }) - def f(txn): - for uid, _, __ in tuples: - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (event.room_id, uid) - ) - return self._simple_insert_many_txn(txn, "event_push_actions", values) - - yield self.runInteraction( - "set_actions_for_event_and_users", - f, - ) + for uid, _, __ in tuples: + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (event.room_id, uid) + ) + self._simple_insert_many_txn(txn, "event_push_actions", values) @cachedInlineCallbacks(num_args=3, lru=True, tree=True) def get_unread_event_push_actions_by_room_for_user( @@ -107,21 +100,15 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) - @defer.inlineCallbacks - def remove_push_actions_for_event_id(self, room_id, event_id): - def f(txn): - # Sad that we have to blow away the cache for the whole room here - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (room_id,) - ) - txn.execute( - "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", - (room_id, event_id) - ) - yield self.runInteraction( - "remove_push_actions_for_event_id", - f + def _remove_push_actions_for_event_id(self, txn, room_id, event_id): + # Sad that we have to blow away the cache for the whole room here + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id,) + ) + txn.execute( + "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", + (room_id, event_id) ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c6ed54721c..7d4012c414 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -205,23 +205,29 @@ class EventsStore(SQLBaseStore): @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled, is_new_state=True): - - # Remove the any existing cache entries for the event_ids - for event, _ in events_and_contexts: + depth_updates = {} + for event, context in events_and_contexts: + # Remove the any existing cache entries for the event_ids txn.call_after(self._invalidate_get_event_cache, event.event_id) - if not backfilled: txn.call_after( self._events_stream_cache.entity_has_changed, event.room_id, event.internal_metadata.stream_ordering, ) - depth_updates = {} - for event, _ in events_and_contexts: - if event.internal_metadata.is_outlier(): - continue - depth_updates[event.room_id] = max( - event.depth, depth_updates.get(event.room_id, event.depth) + if not event.internal_metadata.is_outlier(): + depth_updates[event.room_id] = max( + event.depth, depth_updates.get(event.room_id, event.depth) + ) + + if context.push_actions: + self._set_push_actions_for_event_and_users( + txn, event, context.push_actions + ) + + if event.type == EventTypes.Redaction and event.redacts is not None: + self._remove_push_actions_for_event_id( + txn, event.room_id, event.redacts ) for room_id, depth in depth_updates.items():