Atomically persit push actions when we persist the event
This commit is contained in:
parent
f28cc45183
commit
7b0d846407
|
@ -20,3 +20,4 @@ class EventContext(object):
|
||||||
self.current_state = current_state
|
self.current_state = current_state
|
||||||
self.state_group = None
|
self.state_group = None
|
||||||
self.rejected = False
|
self.rejected = False
|
||||||
|
self.push_actions = []
|
||||||
|
|
|
@ -264,13 +264,13 @@ class BaseHandler(object):
|
||||||
"You don't have permission to redact events"
|
"You don't have permission to redact events"
|
||||||
)
|
)
|
||||||
|
|
||||||
(event_stream_id, max_stream_id) = yield self.store.persist_event(
|
|
||||||
event, context=context
|
|
||||||
)
|
|
||||||
|
|
||||||
action_generator = ActionGenerator(self.hs)
|
action_generator = ActionGenerator(self.hs)
|
||||||
yield action_generator.handle_push_actions_for_event(
|
yield action_generator.handle_push_actions_for_event(
|
||||||
event, self, context.current_state
|
event, context, self
|
||||||
|
)
|
||||||
|
|
||||||
|
(event_stream_id, max_stream_id) = yield self.store.persist_event(
|
||||||
|
event, context=context
|
||||||
)
|
)
|
||||||
|
|
||||||
destinations = set()
|
destinations = set()
|
||||||
|
|
|
@ -236,12 +236,6 @@ class FederationHandler(BaseHandler):
|
||||||
user = UserID.from_string(event.state_key)
|
user = UserID.from_string(event.state_key)
|
||||||
yield user_joined_room(self.distributor, user, event.room_id)
|
yield user_joined_room(self.distributor, user, event.room_id)
|
||||||
|
|
||||||
if not backfilled and not event.internal_metadata.is_outlier():
|
|
||||||
action_generator = ActionGenerator(self.hs)
|
|
||||||
yield action_generator.handle_push_actions_for_event(
|
|
||||||
event, self
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _filter_events_for_server(self, server_name, room_id, events):
|
def _filter_events_for_server(self, server_name, room_id, events):
|
||||||
event_to_state = yield self.store.get_state_for_events(
|
event_to_state = yield self.store.get_state_for_events(
|
||||||
|
@ -1073,6 +1067,12 @@ class FederationHandler(BaseHandler):
|
||||||
auth_events=auth_events,
|
auth_events=auth_events,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if not backfilled and not event.internal_metadata.is_outlier():
|
||||||
|
action_generator = ActionGenerator(self.hs)
|
||||||
|
yield action_generator.handle_push_actions_for_event(
|
||||||
|
event, context, self
|
||||||
|
)
|
||||||
|
|
||||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||||
event,
|
event,
|
||||||
context=context,
|
context=context,
|
||||||
|
|
|
@ -19,8 +19,6 @@ import bulk_push_rule_evaluator
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -36,23 +34,15 @@ class ActionGenerator:
|
||||||
# tag (ie. we just need all the users).
|
# tag (ie. we just need all the users).
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def handle_push_actions_for_event(self, event, handler, current_state):
|
def handle_push_actions_for_event(self, event, context, handler):
|
||||||
if event.type == EventTypes.Redaction and event.redacts is not None:
|
|
||||||
yield self.store.remove_push_actions_for_event_id(
|
|
||||||
event.room_id, event.redacts
|
|
||||||
)
|
|
||||||
|
|
||||||
bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
|
bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
|
||||||
event.room_id, self.hs, self.store
|
event.room_id, self.hs, self.store
|
||||||
)
|
)
|
||||||
|
|
||||||
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
|
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
|
||||||
event, handler, current_state
|
event, handler, context.current_state
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self.store.set_push_actions_for_event_and_users(
|
context.push_actions = [
|
||||||
event,
|
|
||||||
[
|
|
||||||
(uid, None, actions) for uid, actions in actions_by_user.items()
|
(uid, None, actions) for uid, actions in actions_by_user.items()
|
||||||
]
|
]
|
||||||
)
|
|
||||||
|
|
|
@ -24,8 +24,7 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class EventPushActionsStore(SQLBaseStore):
|
class EventPushActionsStore(SQLBaseStore):
|
||||||
@defer.inlineCallbacks
|
def _set_push_actions_for_event_and_users(self, txn, event, tuples):
|
||||||
def set_push_actions_for_event_and_users(self, event, tuples):
|
|
||||||
"""
|
"""
|
||||||
:param event: the event set actions for
|
:param event: the event set actions for
|
||||||
:param tuples: list of tuples of (user_id, profile_tag, actions)
|
:param tuples: list of tuples of (user_id, profile_tag, actions)
|
||||||
|
@ -44,18 +43,12 @@ class EventPushActionsStore(SQLBaseStore):
|
||||||
'highlight': 1 if _action_has_highlight(actions) else 0,
|
'highlight': 1 if _action_has_highlight(actions) else 0,
|
||||||
})
|
})
|
||||||
|
|
||||||
def f(txn):
|
|
||||||
for uid, _, __ in tuples:
|
for uid, _, __ in tuples:
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
|
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
|
||||||
(event.room_id, uid)
|
(event.room_id, uid)
|
||||||
)
|
)
|
||||||
return self._simple_insert_many_txn(txn, "event_push_actions", values)
|
self._simple_insert_many_txn(txn, "event_push_actions", values)
|
||||||
|
|
||||||
yield self.runInteraction(
|
|
||||||
"set_actions_for_event_and_users",
|
|
||||||
f,
|
|
||||||
)
|
|
||||||
|
|
||||||
@cachedInlineCallbacks(num_args=3, lru=True, tree=True)
|
@cachedInlineCallbacks(num_args=3, lru=True, tree=True)
|
||||||
def get_unread_event_push_actions_by_room_for_user(
|
def get_unread_event_push_actions_by_room_for_user(
|
||||||
|
@ -107,9 +100,7 @@ class EventPushActionsStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
defer.returnValue(ret)
|
defer.returnValue(ret)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
def _remove_push_actions_for_event_id(self, txn, room_id, event_id):
|
||||||
def remove_push_actions_for_event_id(self, room_id, event_id):
|
|
||||||
def f(txn):
|
|
||||||
# Sad that we have to blow away the cache for the whole room here
|
# Sad that we have to blow away the cache for the whole room here
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
|
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
|
||||||
|
@ -119,10 +110,6 @@ class EventPushActionsStore(SQLBaseStore):
|
||||||
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
|
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
|
||||||
(room_id, event_id)
|
(room_id, event_id)
|
||||||
)
|
)
|
||||||
yield self.runInteraction(
|
|
||||||
"remove_push_actions_for_event_id",
|
|
||||||
f
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _action_has_highlight(actions):
|
def _action_has_highlight(actions):
|
||||||
|
|
|
@ -205,25 +205,31 @@ class EventsStore(SQLBaseStore):
|
||||||
@log_function
|
@log_function
|
||||||
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
|
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
|
||||||
is_new_state=True):
|
is_new_state=True):
|
||||||
|
depth_updates = {}
|
||||||
|
for event, context in events_and_contexts:
|
||||||
# Remove the any existing cache entries for the event_ids
|
# Remove the any existing cache entries for the event_ids
|
||||||
for event, _ in events_and_contexts:
|
|
||||||
txn.call_after(self._invalidate_get_event_cache, event.event_id)
|
txn.call_after(self._invalidate_get_event_cache, event.event_id)
|
||||||
|
|
||||||
if not backfilled:
|
if not backfilled:
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self._events_stream_cache.entity_has_changed,
|
self._events_stream_cache.entity_has_changed,
|
||||||
event.room_id, event.internal_metadata.stream_ordering,
|
event.room_id, event.internal_metadata.stream_ordering,
|
||||||
)
|
)
|
||||||
|
|
||||||
depth_updates = {}
|
if not event.internal_metadata.is_outlier():
|
||||||
for event, _ in events_and_contexts:
|
|
||||||
if event.internal_metadata.is_outlier():
|
|
||||||
continue
|
|
||||||
depth_updates[event.room_id] = max(
|
depth_updates[event.room_id] = max(
|
||||||
event.depth, depth_updates.get(event.room_id, event.depth)
|
event.depth, depth_updates.get(event.room_id, event.depth)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if context.push_actions:
|
||||||
|
self._set_push_actions_for_event_and_users(
|
||||||
|
txn, event, context.push_actions
|
||||||
|
)
|
||||||
|
|
||||||
|
if event.type == EventTypes.Redaction and event.redacts is not None:
|
||||||
|
self._remove_push_actions_for_event_id(
|
||||||
|
txn, event.room_id, event.redacts
|
||||||
|
)
|
||||||
|
|
||||||
for room_id, depth in depth_updates.items():
|
for room_id, depth in depth_updates.items():
|
||||||
self._update_min_depth_for_room_txn(txn, room_id, depth)
|
self._update_min_depth_for_room_txn(txn, room_id, depth)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue