Federation Sender & Appservice Pusher Stream Optimisations (#13251)

* Replace `get_new_events_for_appservice` with `get_all_new_events_stream`

The functions were near identical and this brings the AS worker closer
to the way federation senders work which can allow for multiple workers
to handle AS traffic.

* Pull received TS alongside events when processing the stream

This avoids an extra query -per event- when both federation sender
and appservice pusher process events.
This commit is contained in:
Nick Mills-Barrett 2022-07-15 10:36:56 +02:00 committed by GitHub
parent fe15a865a5
commit 21eeacc995
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 62 additions and 89 deletions

1
changelog.d/13251.misc Normal file
View File

@ -0,0 +1 @@
Optimise federation sender and appservice pusher event stream processing queries. Contributed by Nick @ Beeper (@fizzadar).

View File

@ -351,7 +351,11 @@ class FederationSender(AbstractFederationSender):
self._is_processing = True self._is_processing = True
while True: while True:
last_token = await self.store.get_federation_out_pos("events") last_token = await self.store.get_federation_out_pos("events")
next_token, events = await self.store.get_all_new_events_stream( (
next_token,
events,
event_to_received_ts,
) = await self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100 last_token, self._last_poked_id, limit=100
) )
@ -476,7 +480,7 @@ class FederationSender(AbstractFederationSender):
await self._send_pdu(event, sharded_destinations) await self._send_pdu(event, sharded_destinations)
now = self.clock.time_msec() now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id) ts = event_to_received_ts[event.event_id]
assert ts is not None assert ts is not None
synapse.metrics.event_processing_lag_by_event.labels( synapse.metrics.event_processing_lag_by_event.labels(
"federation_sender" "federation_sender"
@ -509,7 +513,7 @@ class FederationSender(AbstractFederationSender):
if events: if events:
now = self.clock.time_msec() now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id) ts = event_to_received_ts[events[-1].event_id]
assert ts is not None assert ts is not None
synapse.metrics.event_processing_lag.labels( synapse.metrics.event_processing_lag.labels(

View File

@ -104,14 +104,15 @@ class ApplicationServicesHandler:
with Measure(self.clock, "notify_interested_services"): with Measure(self.clock, "notify_interested_services"):
self.is_processing = True self.is_processing = True
try: try:
limit = 100
upper_bound = -1 upper_bound = -1
while upper_bound < self.current_max: while upper_bound < self.current_max:
last_token = await self.store.get_appservice_last_pos()
( (
upper_bound, upper_bound,
events, events,
) = await self.store.get_new_events_for_appservice( event_to_received_ts,
self.current_max, limit ) = await self.store.get_all_new_events_stream(
last_token, self.current_max, limit=100, get_prev_content=True
) )
events_by_room: Dict[str, List[EventBase]] = {} events_by_room: Dict[str, List[EventBase]] = {}
@ -150,7 +151,7 @@ class ApplicationServicesHandler:
) )
now = self.clock.time_msec() now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id) ts = event_to_received_ts[event.event_id]
assert ts is not None assert ts is not None
synapse.metrics.event_processing_lag_by_event.labels( synapse.metrics.event_processing_lag_by_event.labels(
@ -187,7 +188,7 @@ class ApplicationServicesHandler:
if events: if events:
now = self.clock.time_msec() now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id) ts = event_to_received_ts[events[-1].event_id]
assert ts is not None assert ts is not None
synapse.metrics.event_processing_lag.labels( synapse.metrics.event_processing_lag.labels(

View File

@ -371,52 +371,30 @@ class ApplicationServiceTransactionWorkerStore(
device_list_summary=DeviceListUpdates(), device_list_summary=DeviceListUpdates(),
) )
async def get_appservice_last_pos(self) -> int:
"""
Get the last stream ordering position for the appservice process.
"""
return await self.db_pool.simple_select_one_onecol(
table="appservice_stream_position",
retcol="stream_ordering",
keyvalues={},
desc="get_appservice_last_pos",
)
async def set_appservice_last_pos(self, pos: int) -> None: async def set_appservice_last_pos(self, pos: int) -> None:
def set_appservice_last_pos_txn(txn: LoggingTransaction) -> None: """
txn.execute( Set the last stream ordering position for the appservice process.
"UPDATE appservice_stream_position SET stream_ordering = ?", (pos,) """
await self.db_pool.simple_update_one(
table="appservice_stream_position",
keyvalues={},
updatevalues={"stream_ordering": pos},
desc="set_appservice_last_pos",
) )
await self.db_pool.runInteraction(
"set_appservice_last_pos", set_appservice_last_pos_txn
)
async def get_new_events_for_appservice(
self, current_id: int, limit: int
) -> Tuple[int, List[EventBase]]:
"""Get all new events for an appservice"""
def get_new_events_for_appservice_txn(
txn: LoggingTransaction,
) -> Tuple[int, List[str]]:
sql = (
"SELECT e.stream_ordering, e.event_id"
" FROM events AS e"
" WHERE"
" (SELECT stream_ordering FROM appservice_stream_position)"
" < e.stream_ordering"
" AND e.stream_ordering <= ?"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
)
txn.execute(sql, (current_id, limit))
rows = txn.fetchall()
upper_bound = current_id
if len(rows) == limit:
upper_bound = rows[-1][0]
return upper_bound, [row[1] for row in rows]
upper_bound, event_ids = await self.db_pool.runInteraction(
"get_new_events_for_appservice", get_new_events_for_appservice_txn
)
events = await self.get_events_as_list(event_ids, get_prev_content=True)
return upper_bound, events
async def get_type_stream_id_for_appservice( async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str self, service: ApplicationService, type: str
) -> int: ) -> int:

View File

@ -292,25 +292,6 @@ class EventsWorkerStore(SQLBaseStore):
super().process_replication_rows(stream_name, instance_name, token, rows) super().process_replication_rows(stream_name, instance_name, token, rows)
async def get_received_ts(self, event_id: str) -> Optional[int]:
"""Get received_ts (when it was persisted) for the event.
Raises an exception for unknown events.
Args:
event_id: The event ID to query.
Returns:
Timestamp in milliseconds, or None for events that were persisted
before received_ts was implemented.
"""
return await self.db_pool.simple_select_one_onecol(
table="events",
keyvalues={"event_id": event_id},
retcol="received_ts",
desc="get_received_ts",
)
async def have_censored_event(self, event_id: str) -> bool: async def have_censored_event(self, event_id: str) -> bool:
"""Check if an event has been censored, i.e. if the content of the event has been erased """Check if an event has been censored, i.e. if the content of the event has been erased
from the database due to a redaction. from the database due to a redaction.

View File

@ -1022,8 +1022,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
} }
async def get_all_new_events_stream( async def get_all_new_events_stream(
self, from_id: int, current_id: int, limit: int self, from_id: int, current_id: int, limit: int, get_prev_content: bool = False
) -> Tuple[int, List[EventBase]]: ) -> Tuple[int, List[EventBase], Dict[str, Optional[int]]]:
"""Get all new events """Get all new events
Returns all events with from_id < stream_ordering <= current_id. Returns all events with from_id < stream_ordering <= current_id.
@ -1032,19 +1032,21 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
from_id: the stream_ordering of the last event we processed from_id: the stream_ordering of the last event we processed
current_id: the stream_ordering of the most recently processed event current_id: the stream_ordering of the most recently processed event
limit: the maximum number of events to return limit: the maximum number of events to return
get_prev_content: whether to fetch previous event content
Returns: Returns:
A tuple of (next_id, events), where `next_id` is the next value to A tuple of (next_id, events, event_to_received_ts), where `next_id`
pass as `from_id` (it will either be the stream_ordering of the is the next value to pass as `from_id` (it will either be the
last returned event, or, if fewer than `limit` events were found, stream_ordering of the last returned event, or, if fewer than `limit`
the `current_id`). events were found, the `current_id`). The `event_to_received_ts` is
a dictionary mapping event ID to the event `received_ts`.
""" """
def get_all_new_events_stream_txn( def get_all_new_events_stream_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> Tuple[int, List[str]]: ) -> Tuple[int, Dict[str, Optional[int]]]:
sql = ( sql = (
"SELECT e.stream_ordering, e.event_id" "SELECT e.stream_ordering, e.event_id, e.received_ts"
" FROM events AS e" " FROM events AS e"
" WHERE" " WHERE"
" ? < e.stream_ordering AND e.stream_ordering <= ?" " ? < e.stream_ordering AND e.stream_ordering <= ?"
@ -1059,15 +1061,21 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if len(rows) == limit: if len(rows) == limit:
upper_bound = rows[-1][0] upper_bound = rows[-1][0]
return upper_bound, [row[1] for row in rows] event_to_received_ts: Dict[str, Optional[int]] = {
row[1]: row[2] for row in rows
}
return upper_bound, event_to_received_ts
upper_bound, event_ids = await self.db_pool.runInteraction( upper_bound, event_to_received_ts = await self.db_pool.runInteraction(
"get_all_new_events_stream", get_all_new_events_stream_txn "get_all_new_events_stream", get_all_new_events_stream_txn
) )
events = await self.get_events_as_list(event_ids) events = await self.get_events_as_list(
event_to_received_ts.keys(),
get_prev_content=get_prev_content,
)
return upper_bound, events return upper_bound, events, event_to_received_ts
async def get_federation_out_pos(self, typ: str) -> int: async def get_federation_out_pos(self, typ: str) -> int:
if self._need_to_reset_federation_stream_positions: if self._need_to_reset_federation_stream_positions:

View File

@ -50,7 +50,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
self.mock_scheduler = Mock() self.mock_scheduler = Mock()
hs = Mock() hs = Mock()
hs.get_datastores.return_value = Mock(main=self.mock_store) hs.get_datastores.return_value = Mock(main=self.mock_store)
self.mock_store.get_received_ts.return_value = make_awaitable(0) self.mock_store.get_appservice_last_pos.return_value = make_awaitable(None)
self.mock_store.set_appservice_last_pos.return_value = make_awaitable(None) self.mock_store.set_appservice_last_pos.return_value = make_awaitable(None)
self.mock_store.set_appservice_stream_type_pos.return_value = make_awaitable( self.mock_store.set_appservice_stream_type_pos.return_value = make_awaitable(
None None
@ -76,9 +76,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
event = Mock( event = Mock(
sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar" sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar"
) )
self.mock_store.get_new_events_for_appservice.side_effect = [ self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [])), make_awaitable((0, [], {})),
make_awaitable((1, [event])), make_awaitable((1, [event], {event.event_id: 0})),
] ]
self.handler.notify_interested_services(RoomStreamToken(None, 1)) self.handler.notify_interested_services(RoomStreamToken(None, 1))
@ -95,8 +95,8 @@ class AppServiceHandlerTestCase(unittest.TestCase):
event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar") event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
self.mock_as_api.query_user.return_value = make_awaitable(True) self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [ self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [event])), make_awaitable((0, [event], {event.event_id: 0})),
] ]
self.handler.notify_interested_services(RoomStreamToken(None, 0)) self.handler.notify_interested_services(RoomStreamToken(None, 0))
@ -112,8 +112,8 @@ class AppServiceHandlerTestCase(unittest.TestCase):
event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar") event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
self.mock_as_api.query_user.return_value = make_awaitable(True) self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [ self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [event])), make_awaitable((0, [event], {event.event_id: 0})),
] ]
self.handler.notify_interested_services(RoomStreamToken(None, 0)) self.handler.notify_interested_services(RoomStreamToken(None, 0))