Merge pull request #2894 from matrix-org/erikj/handle_unpersisted_events_push

Ensure all push actions are deleted from staging
This commit is contained in:
Erik Johnston 2018-02-26 14:28:35 +00:00 committed by GitHub
commit 73fe866847
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 36 deletions

View File

@ -407,11 +407,21 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
self._rotate_notifs, 30 * 60 * 1000 self._rotate_notifs, 30 * 60 * 1000
) )
def _set_push_actions_for_event_and_users_txn(self, txn, event): def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
""" all_events_and_contexts):
"""Handles moving push actions from staging table to main
event_push_actions table for all events in `events_and_contexts`.
Also ensures that all events in `all_events_and_contexts` are removed
from the push action staging area.
Args: Args:
event: the event set actions for events_and_contexts (list[(EventBase, EventContext)]): events
tuples: list of tuples of (user_id, actions) we are persisting
all_events_and_contexts (list[(EventBase, EventContext)]): all
events that we were going to persist. This includes events
we've already persisted, etc, that wouldn't appear in
events_and_context.
""" """
sql = """ sql = """
@ -424,34 +434,41 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
WHERE event_id = ? WHERE event_id = ?
""" """
txn.execute(sql, ( if events_and_contexts:
event.room_id, event.internal_metadata.stream_ordering, txn.executemany(sql, (
event.depth, event.event_id, (
)) event.room_id, event.internal_metadata.stream_ordering,
event.depth, event.event_id,
)
for event, _ in events_and_contexts
))
user_ids = self._simple_select_onecol_txn( for event, _ in events_and_contexts:
txn, user_ids = self._simple_select_onecol_txn(
table="event_push_actions_staging", txn,
keyvalues={ table="event_push_actions_staging",
"event_id": event.event_id, keyvalues={
}, "event_id": event.event_id,
retcol="user_id", },
) retcol="user_id",
self._simple_delete_txn(
txn,
table="event_push_actions_staging",
keyvalues={
"event_id": event.event_id,
},
)
for uid in user_ids:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(event.room_id, uid,)
) )
for uid in user_ids:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(event.room_id, uid,)
)
# Now we delete the staging area for *all* events that were being
# persisted.
txn.executemany(
"DELETE FROM event_push_actions_staging WHERE event_id = ?",
(
(event.event_id,)
for event, _ in all_events_and_contexts
)
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_push_actions_for_user(self, user_id, before=None, limit=50, def get_push_actions_for_user(self, user_id, before=None, limit=50,
only_highlight=False): only_highlight=False):

View File

@ -627,6 +627,8 @@ class EventsStore(EventsWorkerStore):
list of the event ids which are the forward extremities. list of the event ids which are the forward extremities.
""" """
all_events_and_contexts = events_and_contexts
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
self._update_current_state_txn(txn, state_delta_for_room, max_stream_order) self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
@ -689,6 +691,7 @@ class EventsStore(EventsWorkerStore):
self._update_metadata_tables_txn( self._update_metadata_tables_txn(
txn, txn,
events_and_contexts=events_and_contexts, events_and_contexts=events_and_contexts,
all_events_and_contexts=all_events_and_contexts,
backfilled=backfilled, backfilled=backfilled,
) )
@ -1086,26 +1089,33 @@ class EventsStore(EventsWorkerStore):
ec for ec in events_and_contexts if ec[0] not in to_remove ec for ec in events_and_contexts if ec[0] not in to_remove
] ]
def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled): def _update_metadata_tables_txn(self, txn, events_and_contexts,
all_events_and_contexts, backfilled):
"""Update all the miscellaneous tables for new events """Update all the miscellaneous tables for new events
Args: Args:
txn (twisted.enterprise.adbapi.Connection): db connection txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting we are persisting
all_events_and_contexts (list[(EventBase, EventContext)]): all
events that we were going to persist. This includes events
we've already persisted, etc, that wouldn't appear in
events_and_context.
backfilled (bool): True if the events were backfilled backfilled (bool): True if the events were backfilled
""" """
# Insert all the push actions into the event_push_actions table.
self._set_push_actions_for_event_and_users_txn(
txn,
events_and_contexts=events_and_contexts,
all_events_and_contexts=all_events_and_contexts,
)
if not events_and_contexts: if not events_and_contexts:
# nothing to do here # nothing to do here
return return
for event, context in events_and_contexts: for event, context in events_and_contexts:
# Insert all the push actions into the event_push_actions table.
self._set_push_actions_for_event_and_users_txn(
txn, event,
)
if event.type == EventTypes.Redaction and event.redacts is not None: if event.type == EventTypes.Redaction and event.redacts is not None:
# Remove the entries in the event_push_actions table for the # Remove the entries in the event_push_actions table for the
# redacted event. # redacted event.

View File

@ -75,7 +75,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
) )
yield self.store.runInteraction( yield self.store.runInteraction(
"", self.store._set_push_actions_for_event_and_users_txn, "", self.store._set_push_actions_for_event_and_users_txn,
event, [(event, None)], [(event, None)],
) )
def _rotate(stream): def _rotate(stream):