Merge pull request #5480 from matrix-org/erikj/extremities_dummy_events
Add experimental option to reduce extremities.
This commit is contained in:
commit
e0be8d7016
|
@ -0,0 +1 @@
|
||||||
|
Add an EXPERIMENTAL config option to try and periodically clean up extremities by sending dummy events.
|
|
@ -317,6 +317,12 @@ class ServerConfig(Config):
|
||||||
|
|
||||||
_check_resource_config(self.listeners)
|
_check_resource_config(self.listeners)
|
||||||
|
|
||||||
|
# An experimental option to try and periodically clean up extremities
|
||||||
|
# by sending dummy events.
|
||||||
|
self.cleanup_extremities_with_dummy_events = config.get(
|
||||||
|
"cleanup_extremities_with_dummy_events", False,
|
||||||
|
)
|
||||||
|
|
||||||
def has_tls_listener(self):
|
def has_tls_listener(self):
|
||||||
return any(l["tls"] for l in self.listeners)
|
return any(l["tls"] for l in self.listeners)
|
||||||
|
|
||||||
|
|
|
@ -92,6 +92,18 @@ class _EventInternalMetadata(object):
|
||||||
"""
|
"""
|
||||||
return getattr(self, "soft_failed", False)
|
return getattr(self, "soft_failed", False)
|
||||||
|
|
||||||
|
def should_proactively_send(self):
|
||||||
|
"""Whether the event, if ours, should be sent to other clients and
|
||||||
|
servers.
|
||||||
|
|
||||||
|
This is used for sending dummy events internally. Servers and clients
|
||||||
|
can still explicitly fetch the event.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool
|
||||||
|
"""
|
||||||
|
return getattr(self, "proactively_send", True)
|
||||||
|
|
||||||
|
|
||||||
def _event_dict_property(key):
|
def _event_dict_property(key):
|
||||||
# We want to be able to use hasattr with the event dict properties.
|
# We want to be able to use hasattr with the event dict properties.
|
||||||
|
|
|
@ -168,6 +168,9 @@ class FederationSender(object):
|
||||||
if not is_mine and send_on_behalf_of is None:
|
if not is_mine and send_on_behalf_of is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if not event.internal_metadata.should_proactively_send():
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Get the state from before the event.
|
# Get the state from before the event.
|
||||||
# We need to make sure that this is the state from before
|
# We need to make sure that this is the state from before
|
||||||
|
|
|
@ -34,9 +34,10 @@ from synapse.api.errors import (
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import RoomVersions
|
||||||
from synapse.api.urls import ConsentURIBuilder
|
from synapse.api.urls import ConsentURIBuilder
|
||||||
from synapse.events.validator import EventValidator
|
from synapse.events.validator import EventValidator
|
||||||
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
from synapse.types import RoomAlias, UserID
|
from synapse.types import RoomAlias, UserID, create_requester
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.frozenutils import frozendict_json_encoder
|
from synapse.util.frozenutils import frozendict_json_encoder
|
||||||
from synapse.util.logcontext import run_in_background
|
from synapse.util.logcontext import run_in_background
|
||||||
|
@ -261,6 +262,18 @@ class EventCreationHandler(object):
|
||||||
if self._block_events_without_consent_error:
|
if self._block_events_without_consent_error:
|
||||||
self._consent_uri_builder = ConsentURIBuilder(self.config)
|
self._consent_uri_builder = ConsentURIBuilder(self.config)
|
||||||
|
|
||||||
|
if (
|
||||||
|
not self.config.worker_app
|
||||||
|
and self.config.cleanup_extremities_with_dummy_events
|
||||||
|
):
|
||||||
|
self.clock.looping_call(
|
||||||
|
lambda: run_as_background_process(
|
||||||
|
"send_dummy_events_to_fill_extremities",
|
||||||
|
self._send_dummy_events_to_fill_extremities
|
||||||
|
),
|
||||||
|
5 * 60 * 1000,
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
|
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
|
||||||
prev_events_and_hashes=None, require_consent=True):
|
prev_events_and_hashes=None, require_consent=True):
|
||||||
|
@ -874,3 +887,63 @@ class EventCreationHandler(object):
|
||||||
yield presence.bump_presence_active_time(user)
|
yield presence.bump_presence_active_time(user)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error bumping presence active time")
|
logger.exception("Error bumping presence active time")
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _send_dummy_events_to_fill_extremities(self):
|
||||||
|
"""Background task to send dummy events into rooms that have a large
|
||||||
|
number of extremities
|
||||||
|
"""
|
||||||
|
|
||||||
|
room_ids = yield self.store.get_rooms_with_many_extremities(
|
||||||
|
min_count=10, limit=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
for room_id in room_ids:
|
||||||
|
# For each room we need to find a joined member we can use to send
|
||||||
|
# the dummy event with.
|
||||||
|
|
||||||
|
prev_events_and_hashes = yield self.store.get_prev_events_for_room(
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
latest_event_ids = (
|
||||||
|
event_id for (event_id, _, _) in prev_events_and_hashes
|
||||||
|
)
|
||||||
|
|
||||||
|
members = yield self.state.get_current_users_in_room(
|
||||||
|
room_id, latest_event_ids=latest_event_ids,
|
||||||
|
)
|
||||||
|
|
||||||
|
user_id = None
|
||||||
|
for member in members:
|
||||||
|
if self.hs.is_mine_id(member):
|
||||||
|
user_id = member
|
||||||
|
break
|
||||||
|
|
||||||
|
if not user_id:
|
||||||
|
# We don't have a joined user.
|
||||||
|
# TODO: We should do something here to stop the room from
|
||||||
|
# appearing next time.
|
||||||
|
continue
|
||||||
|
|
||||||
|
requester = create_requester(user_id)
|
||||||
|
|
||||||
|
event, context = yield self.create_event(
|
||||||
|
requester,
|
||||||
|
{
|
||||||
|
"type": "org.matrix.dummy_event",
|
||||||
|
"content": {},
|
||||||
|
"room_id": room_id,
|
||||||
|
"sender": user_id,
|
||||||
|
},
|
||||||
|
prev_events_and_hashes=prev_events_and_hashes,
|
||||||
|
)
|
||||||
|
|
||||||
|
event.internal_metadata.proactively_send = False
|
||||||
|
|
||||||
|
yield self.send_nonmember_event(
|
||||||
|
requester,
|
||||||
|
event,
|
||||||
|
context,
|
||||||
|
ratelimit=False,
|
||||||
|
)
|
||||||
|
|
|
@ -190,6 +190,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
room_id,
|
room_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_rooms_with_many_extremities(self, min_count, limit):
|
||||||
|
"""Get the top rooms with at least N extremities.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
min_count (int): The minimum number of extremities
|
||||||
|
limit (int): The maximum number of rooms to return.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[list]: At most `limit` room IDs that have at least
|
||||||
|
`min_count` extremities, sorted by extremity count.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _get_rooms_with_many_extremities_txn(txn):
|
||||||
|
sql = """
|
||||||
|
SELECT room_id FROM event_forward_extremities
|
||||||
|
GROUP BY room_id
|
||||||
|
HAVING count(*) > ?
|
||||||
|
ORDER BY count(*) DESC
|
||||||
|
LIMIT ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (min_count, limit))
|
||||||
|
return [room_id for room_id, in txn]
|
||||||
|
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_rooms_with_many_extremities",
|
||||||
|
_get_rooms_with_many_extremities_txn,
|
||||||
|
)
|
||||||
|
|
||||||
@cached(max_entries=5000, iterable=True)
|
@cached(max_entries=5000, iterable=True)
|
||||||
def get_latest_event_ids_in_room(self, room_id):
|
def get_latest_event_ids_in_room(self, room_id):
|
||||||
return self._simple_select_onecol(
|
return self._simple_select_onecol(
|
||||||
|
|
|
@ -222,3 +222,44 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||||
)
|
)
|
||||||
self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c]))
|
self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c]))
|
||||||
|
|
||||||
|
|
||||||
|
class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
|
||||||
|
def make_homeserver(self, reactor, clock):
|
||||||
|
config = self.default_config()
|
||||||
|
config["cleanup_extremities_with_dummy_events"] = True
|
||||||
|
return self.setup_test_homeserver(config=config)
|
||||||
|
|
||||||
|
def prepare(self, reactor, clock, homeserver):
|
||||||
|
self.store = homeserver.get_datastore()
|
||||||
|
self.room_creator = homeserver.get_room_creation_handler()
|
||||||
|
|
||||||
|
# Create a test user and room
|
||||||
|
self.user = UserID("alice", "test")
|
||||||
|
self.requester = Requester(self.user, None, False, None, None)
|
||||||
|
info = self.get_success(self.room_creator.create_room(self.requester, {}))
|
||||||
|
self.room_id = info["room_id"]
|
||||||
|
|
||||||
|
def test_send_dummy_event(self):
|
||||||
|
# Create a bushy graph with 50 extremities.
|
||||||
|
|
||||||
|
event_id_start = self.create_and_send_event(self.room_id, self.user)
|
||||||
|
|
||||||
|
for _ in range(50):
|
||||||
|
self.create_and_send_event(
|
||||||
|
self.room_id, self.user, prev_event_ids=[event_id_start]
|
||||||
|
)
|
||||||
|
|
||||||
|
latest_event_ids = self.get_success(
|
||||||
|
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
)
|
||||||
|
self.assertEqual(len(latest_event_ids), 50)
|
||||||
|
|
||||||
|
# Pump the reactor repeatedly so that the background updates have a
|
||||||
|
# chance to run.
|
||||||
|
self.pump(10 * 60)
|
||||||
|
|
||||||
|
latest_event_ids = self.get_success(
|
||||||
|
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
)
|
||||||
|
self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))
|
||||||
|
|
Loading…
Reference in New Issue