Fix logic for dropping old events in fed queue (#11806)

Co-authored-by: Brendan Abolivier <babolivier@matrix.org>
Co-authored-by: Richard van der Hoff <richard@matrix.org>
This commit is contained in:
Andrew Morgan 2022-01-24 12:20:01 +00:00 committed by Andrew Morgan
parent fd05a3ed03
commit 8ff465d206
3 changed files with 27 additions and 7 deletions

1
changelog.d/11806.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room.

View File

@ -1432,7 +1432,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
if room_version.event_format == EventFormatVersions.V1: if room_version.event_format == EventFormatVersions.V1:
for prev_event_tuple in prev_events: for prev_event_tuple in prev_events:
if not isinstance(prev_event_tuple, list) or len(prev_events) != 2: if (
not isinstance(prev_event_tuple, list)
or len(prev_event_tuple) != 2
):
logger.info("Invalid prev_events for %s", event_id) logger.info("Invalid prev_events for %s", event_id)
break break

View File

@ -12,10 +12,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Tuple, Union
import attr import attr
from parameterized import parameterized from parameterized import parameterized
from synapse.api.room_versions import RoomVersions from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
RoomVersion,
)
from synapse.events import _EventInternalMetadata from synapse.events import _EventInternalMetadata
from synapse.util import json_encoder from synapse.util import json_encoder
@ -506,11 +512,21 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
) )
self.assertSetEqual(difference, set()) self.assertSetEqual(difference, set())
def test_prune_inbound_federation_queue(self): @parameterized.expand(
"Test that pruning of inbound federation queues work" [(room_version,) for room_version in KNOWN_ROOM_VERSIONS.values()]
)
def test_prune_inbound_federation_queue(self, room_version: RoomVersion):
"""Test that pruning of inbound federation queues work"""
room_id = "some_room_id" room_id = "some_room_id"
def prev_event_format(prev_event_id: str) -> Union[Tuple[str, dict], str]:
"""Account for differences in prev_events format across room versions"""
if room_version.event_format == EventFormatVersions.V1:
return prev_event_id, {}
return prev_event_id
# Insert a bunch of events that all reference the previous one. # Insert a bunch of events that all reference the previous one.
self.get_success( self.get_success(
self.store.db_pool.simple_insert_many( self.store.db_pool.simple_insert_many(
@ -522,7 +538,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
"received_ts": 0, "received_ts": 0,
"event_id": f"$fake_event_id_{i + 1}", "event_id": f"$fake_event_id_{i + 1}",
"event_json": json_encoder.encode( "event_json": json_encoder.encode(
{"prev_events": [f"$fake_event_id_{i}"]} {"prev_events": [prev_event_format(f"$fake_event_id_{i}")]}
), ),
"internal_metadata": "{}", "internal_metadata": "{}",
} }
@ -535,12 +551,12 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
# Calling prune once should return True, i.e. a prune happen. The second # Calling prune once should return True, i.e. a prune happen. The second
# time it shouldn't. # time it shouldn't.
pruned = self.get_success( pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6) self.store.prune_staged_events_in_room(room_id, room_version)
) )
self.assertTrue(pruned) self.assertTrue(pruned)
pruned = self.get_success( pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6) self.store.prune_staged_events_in_room(room_id, room_version)
) )
self.assertFalse(pruned) self.assertFalse(pruned)