Fix AssertionErrors after purging events (#11642)

* Fix AssertionErrors after purging events

If you purged a bunch of events from your database, and then restarted synapse
without receiving more events, then you would get a bunch of AssertionErrors on
restart.

This fixes the situation by rewinding the stream processors.

* `check-newsfragment`: ignore deleted newsfiles
This commit is contained in:
Richard van der Hoff 2022-01-04 16:36:33 +00:00 committed by GitHub
parent 878aa55293
commit b38bdae3a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 30 additions and 6 deletions

1
changelog.d/11536.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a long-standing bug which could cause `AssertionError`s to be written to the log when Synapse was restarted after purging events from the database.

View File

@ -1 +0,0 @@
Improvements to log messages around handling stream ids.

1
changelog.d/11642.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a long-standing bug which could cause `AssertionError`s to be written to the log when Synapse was restarted after purging events from the database.

View File

@ -42,8 +42,8 @@ echo "--------------------------"
echo echo
matched=0 matched=0
for f in $(git diff --name-only FETCH_HEAD... -- changelog.d); do for f in $(git diff --diff-filter=d --name-only FETCH_HEAD... -- changelog.d); do
# check that any modified newsfiles on this branch end with a full stop. # check that any added newsfiles on this branch end with a full stop.
lastchar=$(tr -d '\n' < "$f" | tail -c 1) lastchar=$(tr -d '\n' < "$f" | tail -c 1)
if [ "$lastchar" != '.' ] && [ "$lastchar" != '!' ]; then if [ "$lastchar" != '.' ] && [ "$lastchar" != '!' ]; then
echo -e "\e[31mERROR: newsfragment $f does not end with a '.' or '!'\e[39m" >&2 echo -e "\e[31mERROR: newsfragment $f does not end with a '.' or '!'\e[39m" >&2

View File

@ -80,6 +80,17 @@ class StatsHandler:
# If self.pos is None then means we haven't fetched it from DB # If self.pos is None then means we haven't fetched it from DB
if self.pos is None: if self.pos is None:
self.pos = await self.store.get_stats_positions() self.pos = await self.store.get_stats_positions()
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self.pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
logger.warning(
"Event stream ordering appears to have gone backwards (%i -> %i): "
"rewinding stats processor",
self.pos,
room_max_stream_ordering,
)
self.pos = room_max_stream_ordering
# Loop round handling deltas until we're up to date # Loop round handling deltas until we're up to date

View File

@ -152,6 +152,18 @@ class UserDirectoryHandler(StateDeltasHandler):
if self.pos is None: if self.pos is None:
return None return None
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self.pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
logger.warning(
"Event stream ordering appears to have gone backwards (%i -> %i): "
"rewinding user directory processor",
self.pos,
room_max_stream_ordering,
)
self.pos = room_max_stream_ordering
# Loop round handling deltas until we're up to date # Loop round handling deltas until we're up to date
while True: while True:
with Measure(self.clock, "user_dir_delta"): with Measure(self.clock, "user_dir_delta"):