Add a storage method for returning all current presence from all users (#9650)
Split off from https://github.com/matrix-org/synapse/pull/9491
Adds a storage method for getting the current presence of all local users, optionally excluding those that are offline. This will be used by the code in #9491 when a PresenceRouter module informs Synapse that a given user should have `"ALL"` user presence updates routed to them. Specifically, it is used here: b588f16e39/synapse/handlers/presence.py (L1131-L1133)
Note that there is a `get_all_presence_updates` function just above. That function is intended to walk up the table through stream IDs, and is primarily used by the presence replication stream. I could possibly make use of it in the PresenceRouter-related code, but it would be a bit of a bodge.
This commit is contained in:
parent
c602ba8336
commit
fae81f2f68
|
@ -0,0 +1 @@
|
||||||
|
Add a storage method for pulling all current user presence state from the database.
|
|
@ -1906,6 +1906,7 @@ class DatabasePool:
|
||||||
retcols: Iterable[str],
|
retcols: Iterable[str],
|
||||||
filters: Optional[Dict[str, Any]] = None,
|
filters: Optional[Dict[str, Any]] = None,
|
||||||
keyvalues: Optional[Dict[str, Any]] = None,
|
keyvalues: Optional[Dict[str, Any]] = None,
|
||||||
|
exclude_keyvalues: Optional[Dict[str, Any]] = None,
|
||||||
order_direction: str = "ASC",
|
order_direction: str = "ASC",
|
||||||
) -> List[Dict[str, Any]]:
|
) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
|
@ -1929,7 +1930,10 @@ class DatabasePool:
|
||||||
apply a WHERE ? LIKE ? clause.
|
apply a WHERE ? LIKE ? clause.
|
||||||
keyvalues:
|
keyvalues:
|
||||||
column names and values to select the rows with, or None to not
|
column names and values to select the rows with, or None to not
|
||||||
apply a WHERE clause.
|
apply a WHERE key = value clause.
|
||||||
|
exclude_keyvalues:
|
||||||
|
column names and values to exclude rows with, or None to not
|
||||||
|
apply a WHERE key != value clause.
|
||||||
order_direction: Whether the results should be ordered "ASC" or "DESC".
|
order_direction: Whether the results should be ordered "ASC" or "DESC".
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
|
@ -1938,7 +1942,7 @@ class DatabasePool:
|
||||||
if order_direction not in ["ASC", "DESC"]:
|
if order_direction not in ["ASC", "DESC"]:
|
||||||
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
|
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
|
||||||
|
|
||||||
where_clause = "WHERE " if filters or keyvalues else ""
|
where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else ""
|
||||||
arg_list = [] # type: List[Any]
|
arg_list = [] # type: List[Any]
|
||||||
if filters:
|
if filters:
|
||||||
where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
|
where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
|
||||||
|
@ -1947,6 +1951,9 @@ class DatabasePool:
|
||||||
if keyvalues:
|
if keyvalues:
|
||||||
where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
|
where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
|
||||||
arg_list += list(keyvalues.values())
|
arg_list += list(keyvalues.values())
|
||||||
|
if exclude_keyvalues:
|
||||||
|
where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues)
|
||||||
|
arg_list += list(exclude_keyvalues.values())
|
||||||
|
|
||||||
sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
|
sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
|
||||||
", ".join(retcols),
|
", ".join(retcols),
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from typing import List, Tuple
|
from typing import Dict, List, Tuple
|
||||||
|
|
||||||
from synapse.api.presence import UserPresenceState
|
from synapse.api.presence import UserPresenceState
|
||||||
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
||||||
|
@ -157,5 +157,63 @@ class PresenceStore(SQLBaseStore):
|
||||||
|
|
||||||
return {row["user_id"]: UserPresenceState(**row) for row in rows}
|
return {row["user_id"]: UserPresenceState(**row) for row in rows}
|
||||||
|
|
||||||
|
async def get_presence_for_all_users(
|
||||||
|
self,
|
||||||
|
include_offline: bool = True,
|
||||||
|
) -> Dict[str, UserPresenceState]:
|
||||||
|
"""Retrieve the current presence state for all users.
|
||||||
|
|
||||||
|
Note that the presence_stream table is culled frequently, so it should only
|
||||||
|
contain the latest presence state for each user.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
include_offline: Whether to include offline presence states
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dict of user IDs to their current UserPresenceState.
|
||||||
|
"""
|
||||||
|
users_to_state = {}
|
||||||
|
|
||||||
|
exclude_keyvalues = None
|
||||||
|
if not include_offline:
|
||||||
|
# Exclude offline presence state
|
||||||
|
exclude_keyvalues = {"state": "offline"}
|
||||||
|
|
||||||
|
# This may be a very heavy database query.
|
||||||
|
# We paginate in order to not block a database connection.
|
||||||
|
limit = 100
|
||||||
|
offset = 0
|
||||||
|
while True:
|
||||||
|
rows = await self.db_pool.runInteraction(
|
||||||
|
"get_presence_for_all_users",
|
||||||
|
self.db_pool.simple_select_list_paginate_txn,
|
||||||
|
"presence_stream",
|
||||||
|
orderby="stream_id",
|
||||||
|
start=offset,
|
||||||
|
limit=limit,
|
||||||
|
exclude_keyvalues=exclude_keyvalues,
|
||||||
|
retcols=(
|
||||||
|
"user_id",
|
||||||
|
"state",
|
||||||
|
"last_active_ts",
|
||||||
|
"last_federation_update_ts",
|
||||||
|
"last_user_sync_ts",
|
||||||
|
"status_msg",
|
||||||
|
"currently_active",
|
||||||
|
),
|
||||||
|
order_direction="ASC",
|
||||||
|
)
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
users_to_state[row["user_id"]] = UserPresenceState(**row)
|
||||||
|
|
||||||
|
# We've run out of updates to query
|
||||||
|
if len(rows) < limit:
|
||||||
|
break
|
||||||
|
|
||||||
|
offset += limit
|
||||||
|
|
||||||
|
return users_to_state
|
||||||
|
|
||||||
def get_current_presence_token(self):
|
def get_current_presence_token(self):
|
||||||
return self._presence_id_gen.get_current_token()
|
return self._presence_id_gen.get_current_token()
|
||||||
|
|
Loading…
Reference in New Issue