From 06bbf1029cf2558213646d3b692621bed5178066 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 6 Oct 2023 11:41:57 -0400 Subject: [PATCH] Convert simple_select_list_paginate_txn to return tuples. (#16433) --- changelog.d/16433.misc | 1 + synapse/api/presence.py | 4 -- synapse/federation/send_queue.py | 2 +- synapse/rest/admin/federation.py | 8 ++- synapse/storage/database.py | 6 +- synapse/storage/databases/main/presence.py | 58 +++++++++++++------ .../storage/databases/main/transactions.py | 27 +++++---- 7 files changed, 67 insertions(+), 39 deletions(-) create mode 100644 changelog.d/16433.misc diff --git a/changelog.d/16433.misc b/changelog.d/16433.misc new file mode 100644 index 0000000000..bd7cdd42af --- /dev/null +++ b/changelog.d/16433.misc @@ -0,0 +1 @@ +Reduce memory allocations. diff --git a/synapse/api/presence.py b/synapse/api/presence.py index b78f419994..afef6712e1 100644 --- a/synapse/api/presence.py +++ b/synapse/api/presence.py @@ -80,10 +80,6 @@ class UserPresenceState: def as_dict(self) -> JsonDict: return attr.asdict(self) - @staticmethod - def from_dict(d: JsonDict) -> "UserPresenceState": - return UserPresenceState(**d) - def copy_and_replace(self, **kwargs: Any) -> "UserPresenceState": return attr.evolve(self, **kwargs) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 6520795635..525968bcba 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -395,7 +395,7 @@ class PresenceDestinationsRow(BaseFederationRow): @staticmethod def from_data(data: JsonDict) -> "PresenceDestinationsRow": return PresenceDestinationsRow( - state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"] + state=UserPresenceState(**data["state"]), destinations=data["dests"] ) def to_data(self) -> JsonDict: diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py index e0ee55bd0e..8a617af599 100644 --- a/synapse/rest/admin/federation.py +++ b/synapse/rest/admin/federation.py @@ -198,7 +198,13 @@ class DestinationMembershipRestServlet(RestServlet): rooms, total = await self._store.get_destination_rooms_paginate( destination, start, limit, direction ) - response = {"rooms": rooms, "total": total} + response = { + "rooms": [ + {"room_id": room_id, "stream_ordering": stream_ordering} + for room_id, stream_ordering in rooms + ], + "total": total, + } if (start + limit) < total: response["next_token"] = str(start + len(rooms)) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index ca894edd5a..7d8af5c610 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -2418,7 +2418,7 @@ class DatabasePool: keyvalues: Optional[Dict[str, Any]] = None, exclude_keyvalues: Optional[Dict[str, Any]] = None, order_direction: str = "ASC", - ) -> List[Dict[str, Any]]: + ) -> List[Tuple[Any, ...]]: """ Executes a SELECT query on the named table with start and limit, of row numbers, which may return zero or number of rows from start to limit, @@ -2447,7 +2447,7 @@ class DatabasePool: order_direction: Whether the results should be ordered "ASC" or "DESC". Returns: - The result as a list of dictionaries. + The result as a list of tuples. """ if order_direction not in ["ASC", "DESC"]: raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") @@ -2474,7 +2474,7 @@ class DatabasePool: ) txn.execute(sql, arg_list + [limit, start]) - return cls.cursor_to_dict(txn) + return txn.fetchall() async def simple_search_list( self, diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 805c23f89f..519f05fb60 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -20,6 +20,7 @@ from typing import ( Mapping, Optional, Tuple, + Union, cast, ) @@ -385,28 +386,47 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) limit = 100 offset = 0 while True: - rows = await self.db_pool.runInteraction( - "get_presence_for_all_users", - self.db_pool.simple_select_list_paginate_txn, - "presence_stream", - orderby="stream_id", - start=offset, - limit=limit, - exclude_keyvalues=exclude_keyvalues, - retcols=( - "user_id", - "state", - "last_active_ts", - "last_federation_update_ts", - "last_user_sync_ts", - "status_msg", - "currently_active", + rows = cast( + List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]], + await self.db_pool.runInteraction( + "get_presence_for_all_users", + self.db_pool.simple_select_list_paginate_txn, + "presence_stream", + orderby="stream_id", + start=offset, + limit=limit, + exclude_keyvalues=exclude_keyvalues, + retcols=( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + order_direction="ASC", ), - order_direction="ASC", ) - for row in rows: - users_to_state[row["user_id"]] = UserPresenceState(**row) + for ( + user_id, + state, + last_active_ts, + last_federation_update_ts, + last_user_sync_ts, + status_msg, + currently_active, + ) in rows: + users_to_state[user_id] = UserPresenceState( + user_id=user_id, + state=state, + last_active_ts=last_active_ts, + last_federation_update_ts=last_federation_update_ts, + last_user_sync_ts=last_user_sync_ts, + status_msg=status_msg, + currently_active=bool(currently_active), + ) # We've run out of updates to query if len(rows) < limit: diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 8f70eff809..f35757280d 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -526,7 +526,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): start: int, limit: int, direction: Direction = Direction.FORWARDS, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: """Function to retrieve a paginated list of destination's rooms. This will return a json list of rooms and the total number of rooms. @@ -537,12 +537,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): limit: number of rows to retrieve direction: sort ascending or descending by room_id Returns: - A tuple of a dict of rooms and a count of total rooms. + A tuple of a list of room tuples and a count of total rooms. + + Each room tuple is room_id, stream_ordering. """ def get_destination_rooms_paginate_txn( txn: LoggingTransaction, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: if direction == Direction.BACKWARDS: order = "DESC" else: @@ -556,14 +558,17 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql, [destination]) count = cast(Tuple[int], txn.fetchone())[0] - rooms = self.db_pool.simple_select_list_paginate_txn( - txn=txn, - table="destination_rooms", - orderby="room_id", - start=start, - limit=limit, - retcols=("room_id", "stream_ordering"), - order_direction=order, + rooms = cast( + List[Tuple[str, int]], + self.db_pool.simple_select_list_paginate_txn( + txn=txn, + table="destination_rooms", + orderby="room_id", + start=start, + limit=limit, + retcols=("room_id", "stream_ordering"), + order_direction=order, + ), ) return rooms, count