diff --git a/changelog.d/5480.misc b/changelog.d/5480.misc new file mode 100644 index 0000000000..3001bcc1fe --- /dev/null +++ b/changelog.d/5480.misc @@ -0,0 +1 @@ +Add an EXPERIMENTAL config option to try and periodically clean up extremities by sending dummy events. diff --git a/synapse/config/server.py b/synapse/config/server.py index 7d56e2d141..6e5b46e6c3 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -317,6 +317,12 @@ class ServerConfig(Config): _check_resource_config(self.listeners) + # An experimental option to try and periodically clean up extremities + # by sending dummy events. + self.cleanup_extremities_with_dummy_events = config.get( + "cleanup_extremities_with_dummy_events", False, + ) + def has_tls_listener(self): return any(l["tls"] for l in self.listeners) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 1edd19cc13..7154bcbea6 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -92,6 +92,18 @@ class _EventInternalMetadata(object): """ return getattr(self, "soft_failed", False) + def should_proactively_send(self): + """Whether the event, if ours, should be sent to other clients and + servers. + + This is used for sending dummy events internally. Servers and clients + can still explicitly fetch the event. + + Returns: + bool + """ + return getattr(self, "proactively_send", True) + def _event_dict_property(key): # We want to be able to use hasattr with the event dict properties. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 4f0f939102..4224b29ecf 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -168,6 +168,9 @@ class FederationSender(object): if not is_mine and send_on_behalf_of is None: return + if not event.internal_metadata.should_proactively_send(): + return + try: # Get the state from before the event. # We need to make sure that this is the state from before diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 11650dc80c..7728ea230d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -34,9 +34,10 @@ from synapse.api.errors import ( from synapse.api.room_versions import RoomVersions from synapse.api.urls import ConsentURIBuilder from synapse.events.validator import EventValidator +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.storage.state import StateFilter -from synapse.types import RoomAlias, UserID +from synapse.types import RoomAlias, UserID, create_requester from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background @@ -261,6 +262,18 @@ class EventCreationHandler(object): if self._block_events_without_consent_error: self._consent_uri_builder = ConsentURIBuilder(self.config) + if ( + not self.config.worker_app + and self.config.cleanup_extremities_with_dummy_events + ): + self.clock.looping_call( + lambda: run_as_background_process( + "send_dummy_events_to_fill_extremities", + self._send_dummy_events_to_fill_extremities + ), + 5 * 60 * 1000, + ) + @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, prev_events_and_hashes=None, require_consent=True): @@ -874,3 +887,63 @@ class EventCreationHandler(object): yield presence.bump_presence_active_time(user) except Exception: logger.exception("Error bumping presence active time") + + @defer.inlineCallbacks + def _send_dummy_events_to_fill_extremities(self): + """Background task to send dummy events into rooms that have a large + number of extremities + """ + + room_ids = yield self.store.get_rooms_with_many_extremities( + min_count=10, limit=5, + ) + + for room_id in room_ids: + # For each room we need to find a joined member we can use to send + # the dummy event with. + + prev_events_and_hashes = yield self.store.get_prev_events_for_room( + room_id, + ) + + latest_event_ids = ( + event_id for (event_id, _, _) in prev_events_and_hashes + ) + + members = yield self.state.get_current_users_in_room( + room_id, latest_event_ids=latest_event_ids, + ) + + user_id = None + for member in members: + if self.hs.is_mine_id(member): + user_id = member + break + + if not user_id: + # We don't have a joined user. + # TODO: We should do something here to stop the room from + # appearing next time. + continue + + requester = create_requester(user_id) + + event, context = yield self.create_event( + requester, + { + "type": "org.matrix.dummy_event", + "content": {}, + "room_id": room_id, + "sender": user_id, + }, + prev_events_and_hashes=prev_events_and_hashes, + ) + + event.internal_metadata.proactively_send = False + + yield self.send_nonmember_event( + requester, + event, + context, + ratelimit=False, + ) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 09e39c2c28..e8d16edbc8 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -190,6 +190,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas room_id, ) + def get_rooms_with_many_extremities(self, min_count, limit): + """Get the top rooms with at least N extremities. + + Args: + min_count (int): The minimum number of extremities + limit (int): The maximum number of rooms to return. + + Returns: + Deferred[list]: At most `limit` room IDs that have at least + `min_count` extremities, sorted by extremity count. + """ + + def _get_rooms_with_many_extremities_txn(txn): + sql = """ + SELECT room_id FROM event_forward_extremities + GROUP BY room_id + HAVING count(*) > ? + ORDER BY count(*) DESC + LIMIT ? + """ + + txn.execute(sql, (min_count, limit)) + return [room_id for room_id, in txn] + + return self.runInteraction( + "get_rooms_with_many_extremities", + _get_rooms_with_many_extremities_txn, + ) + @cached(max_entries=5000, iterable=True) def get_latest_event_ids_in_room(self, room_id): return self._simple_select_onecol( diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py index f4c81ef77d..e9e2d5337c 100644 --- a/tests/storage/test_cleanup_extrems.py +++ b/tests/storage/test_cleanup_extrems.py @@ -222,3 +222,44 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): self.store.get_latest_event_ids_in_room(self.room_id) ) self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c])) + + +class CleanupExtremDummyEventsTestCase(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + config = self.default_config() + config["cleanup_extremities_with_dummy_events"] = True + return self.setup_test_homeserver(config=config) + + def prepare(self, reactor, clock, homeserver): + self.store = homeserver.get_datastore() + self.room_creator = homeserver.get_room_creation_handler() + + # Create a test user and room + self.user = UserID("alice", "test") + self.requester = Requester(self.user, None, False, None, None) + info = self.get_success(self.room_creator.create_room(self.requester, {})) + self.room_id = info["room_id"] + + def test_send_dummy_event(self): + # Create a bushy graph with 50 extremities. + + event_id_start = self.create_and_send_event(self.room_id, self.user) + + for _ in range(50): + self.create_and_send_event( + self.room_id, self.user, prev_event_ids=[event_id_start] + ) + + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + self.assertEqual(len(latest_event_ids), 50) + + # Pump the reactor repeatedly so that the background updates have a + # chance to run. + self.pump(10 * 60) + + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))