Merge pull request #2892 from matrix-org/erikj/batch_inserts_push_actions
Batch inserts into event_push_actions_staging
This commit is contained in:
commit
e5b4a208ce
|
@ -144,6 +144,7 @@ class BulkPushRuleEvaluator(object):
|
||||||
Deferred
|
Deferred
|
||||||
"""
|
"""
|
||||||
rules_by_user = yield self._get_rules_for_event(event, context)
|
rules_by_user = yield self._get_rules_for_event(event, context)
|
||||||
|
actions_by_user = {}
|
||||||
|
|
||||||
room_members = yield self.store.get_joined_users_from_context(
|
room_members = yield self.store.get_joined_users_from_context(
|
||||||
event, context
|
event, context
|
||||||
|
@ -189,14 +190,17 @@ class BulkPushRuleEvaluator(object):
|
||||||
if matches:
|
if matches:
|
||||||
actions = [x for x in rule['actions'] if x != 'dont_notify']
|
actions = [x for x in rule['actions'] if x != 'dont_notify']
|
||||||
if actions and 'notify' in actions:
|
if actions and 'notify' in actions:
|
||||||
# Push rules say we should notify the user of this event,
|
# Push rules say we should notify the user of this event
|
||||||
# so we mark it in the DB in the staging area. (This
|
actions_by_user[uid] = actions
|
||||||
# will then get handled when we persist the event)
|
|
||||||
yield self.store.add_push_actions_to_staging(
|
|
||||||
event.event_id, uid, actions,
|
|
||||||
)
|
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Mark in the DB staging area the push actions for users who should be
|
||||||
|
# notified for this event. (This will then get handled when we persist
|
||||||
|
# the event)
|
||||||
|
yield self.store.add_push_actions_to_staging(
|
||||||
|
event.event_id, actions_by_user,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _condition_checker(evaluator, conditions, uid, display_name, cache):
|
def _condition_checker(evaluator, conditions, uid, display_name, cache):
|
||||||
for cond in conditions:
|
for cond in conditions:
|
||||||
|
|
|
@ -775,32 +775,51 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||||
(rotate_to_stream_ordering,)
|
(rotate_to_stream_ordering,)
|
||||||
)
|
)
|
||||||
|
|
||||||
def add_push_actions_to_staging(self, event_id, user_id, actions):
|
def add_push_actions_to_staging(self, event_id, user_id_actions):
|
||||||
"""Add the push actions for the user and event to the push
|
"""Add the push actions for the event to the push action staging area.
|
||||||
action staging area.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event_id (str)
|
event_id (str)
|
||||||
user_id (str)
|
user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
|
||||||
actions (list[dict|str]): An action can either be a string or
|
user_id to list of push actions, where an action can either be
|
||||||
dict.
|
a string or dict.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred
|
Deferred
|
||||||
"""
|
"""
|
||||||
|
|
||||||
is_highlight = 1 if _action_has_highlight(actions) else 0
|
if not user_id_actions:
|
||||||
|
return
|
||||||
|
|
||||||
return self._simple_insert(
|
# This is a helper function for generating the necessary tuple that
|
||||||
table="event_push_actions_staging",
|
# can be used to inert into the `event_push_actions_staging` table.
|
||||||
values={
|
def _gen_entry(user_id, actions):
|
||||||
"event_id": event_id,
|
is_highlight = 1 if _action_has_highlight(actions) else 0
|
||||||
"user_id": user_id,
|
return (
|
||||||
"actions": _serialize_action(actions, is_highlight),
|
event_id, # event_id column
|
||||||
"notif": 1,
|
user_id, # user_id column
|
||||||
"highlight": is_highlight,
|
_serialize_action(actions, is_highlight), # actions column
|
||||||
},
|
1, # notif column
|
||||||
desc="add_push_actions_to_staging",
|
is_highlight, # highlight column
|
||||||
|
)
|
||||||
|
|
||||||
|
def _add_push_actions_to_staging_txn(txn):
|
||||||
|
# We don't use _simple_insert_many here to avoid the overhead
|
||||||
|
# of generating lists of dicts.
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
INSERT INTO event_push_actions_staging
|
||||||
|
(event_id, user_id, actions, notif, highlight)
|
||||||
|
VALUES (?, ?, ?, ?, ?)
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.executemany(sql, (
|
||||||
|
_gen_entry(user_id, actions)
|
||||||
|
for user_id, actions in user_id_actions.iteritems()
|
||||||
|
))
|
||||||
|
|
||||||
|
return self.runInteraction(
|
||||||
|
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
def remove_push_actions_from_staging(self, event_id):
|
def remove_push_actions_from_staging(self, event_id):
|
||||||
|
|
|
@ -230,10 +230,12 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
|
||||||
state_handler = self.hs.get_state_handler()
|
state_handler = self.hs.get_state_handler()
|
||||||
context = yield state_handler.compute_event_context(event)
|
context = yield state_handler.compute_event_context(event)
|
||||||
|
|
||||||
for user_id, actions in push_actions:
|
yield self.master_store.add_push_actions_to_staging(
|
||||||
yield self.master_store.add_push_actions_to_staging(
|
event.event_id, {
|
||||||
event.event_id, user_id, actions,
|
user_id: actions
|
||||||
)
|
for user_id, actions in push_actions
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
ordering = None
|
ordering = None
|
||||||
if backfill:
|
if backfill:
|
||||||
|
|
|
@ -71,7 +71,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
|
||||||
event.depth = stream
|
event.depth = stream
|
||||||
|
|
||||||
yield self.store.add_push_actions_to_staging(
|
yield self.store.add_push_actions_to_staging(
|
||||||
event.event_id, user_id, action,
|
event.event_id, {user_id: action},
|
||||||
)
|
)
|
||||||
yield self.store.runInteraction(
|
yield self.store.runInteraction(
|
||||||
"", self.store._set_push_actions_for_event_and_users_txn,
|
"", self.store._set_push_actions_for_event_and_users_txn,
|
||||||
|
|
Loading…
Reference in New Issue