Use upserts for updating `event_push_summary` (#13153)

This commit is contained in:
Erik Johnston 2022-07-05 13:51:04 +01:00 committed by GitHub
parent 347165bc06
commit 578a5e24a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 8 additions and 40 deletions

1
changelog.d/13153.misc Normal file
View File

@ -0,0 +1 @@
Reduce DB usage of `/sync` when a large number of unread messages have recently been sent in a room.

View File

@ -1013,8 +1013,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
sql = """
SELECT user_id, room_id,
coalesce(old.%s, 0) + upd.cnt,
upd.stream_ordering,
old.user_id
upd.stream_ordering
FROM (
SELECT user_id, room_id, count(*) as cnt,
max(stream_ordering) as stream_ordering
@ -1042,7 +1041,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
summaries[(row[0], row[1])] = _EventPushSummary(
unread_count=row[2],
stream_ordering=row[3],
old_user_id=row[4],
notif_count=0,
)
@ -1063,57 +1061,27 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
summaries[(row[0], row[1])] = _EventPushSummary(
unread_count=0,
stream_ordering=row[3],
old_user_id=row[4],
notif_count=row[2],
)
logger.info("Rotating notifications, handling %d rows", len(summaries))
# If the `old.user_id` above is NULL then we know there isn't already an
# entry in the table, so we simply insert it. Otherwise we update the
# existing table.
self.db_pool.simple_insert_many_txn(
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
keys=(
"user_id",
"room_id",
"notif_count",
"unread_count",
"stream_ordering",
),
values=[
key_names=("user_id", "room_id"),
key_values=[(user_id, room_id) for user_id, room_id in summaries],
value_names=("notif_count", "unread_count", "stream_ordering"),
value_values=[
(
user_id,
room_id,
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
)
for ((user_id, room_id), summary) in summaries.items()
if summary.old_user_id is None
for summary in summaries.values()
],
)
txn.execute_batch(
"""
UPDATE event_push_summary
SET notif_count = ?, unread_count = ?, stream_ordering = ?
WHERE user_id = ? AND room_id = ?
""",
(
(
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
user_id,
room_id,
)
for ((user_id, room_id), summary) in summaries.items()
if summary.old_user_id is not None
),
)
txn.execute(
"UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
(rotate_to_stream_ordering,),
@ -1293,5 +1261,4 @@ class _EventPushSummary:
unread_count: int
stream_ordering: int
old_user_id: str
notif_count: int