Properly handle unknown results for the stream change cache. (#14592)
StreamChangeCache.get_all_changed_entities can return None to signify it does not have information at the given stream position. Two callers (related to device lists and presence) were treating this response the same as an empty list (i.e. there being no updates).
This commit is contained in:
parent
6acb6d772a
commit
fac8a38525
|
@ -0,0 +1 @@
|
||||||
|
Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances.
|
|
@ -1764,14 +1764,14 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||||
Returns:
|
Returns:
|
||||||
A list of presence states for the given user to receive.
|
A list of presence states for the given user to receive.
|
||||||
"""
|
"""
|
||||||
|
updated_users = None
|
||||||
if from_key:
|
if from_key:
|
||||||
# Only return updates since the last sync
|
# Only return updates since the last sync
|
||||||
updated_users = self.store.presence_stream_cache.get_all_entities_changed(
|
updated_users = self.store.presence_stream_cache.get_all_entities_changed(
|
||||||
from_key
|
from_key
|
||||||
)
|
)
|
||||||
if not updated_users:
|
|
||||||
updated_users = []
|
|
||||||
|
|
||||||
|
if updated_users is not None:
|
||||||
# Get the actual presence update for each change
|
# Get the actual presence update for each change
|
||||||
users_to_state = await self.get_presence_handler().current_state_for_users(
|
users_to_state = await self.get_presence_handler().current_state_for_users(
|
||||||
updated_users
|
updated_users
|
||||||
|
|
|
@ -842,12 +842,11 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||||
user_ids, from_key
|
user_ids, from_key
|
||||||
)
|
)
|
||||||
|
|
||||||
if not user_ids_to_check:
|
# If an empty set was returned, there's nothing to do.
|
||||||
|
if user_ids_to_check is not None and not user_ids_to_check:
|
||||||
return set()
|
return set()
|
||||||
|
|
||||||
def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
|
def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
|
||||||
changes: Set[str] = set()
|
|
||||||
|
|
||||||
stream_id_where_clause = "stream_id > ?"
|
stream_id_where_clause = "stream_id > ?"
|
||||||
sql_args = [from_key]
|
sql_args = [from_key]
|
||||||
|
|
||||||
|
@ -858,19 +857,25 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||||
sql = f"""
|
sql = f"""
|
||||||
SELECT DISTINCT user_id FROM device_lists_stream
|
SELECT DISTINCT user_id FROM device_lists_stream
|
||||||
WHERE {stream_id_where_clause}
|
WHERE {stream_id_where_clause}
|
||||||
AND
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Query device changes with a batch of users at a time
|
# If the stream change cache gave us no information, fetch *all*
|
||||||
# Assertion for mypy's benefit; see also
|
# users between the stream IDs.
|
||||||
# https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions
|
if user_ids_to_check is None:
|
||||||
assert user_ids_to_check is not None
|
txn.execute(sql, sql_args)
|
||||||
for chunk in batch_iter(user_ids_to_check, 100):
|
return {user_id for user_id, in txn}
|
||||||
clause, args = make_in_list_sql_clause(
|
|
||||||
txn.database_engine, "user_id", chunk
|
# Otherwise, fetch changes for the given users.
|
||||||
)
|
else:
|
||||||
txn.execute(sql + clause, sql_args + args)
|
changes: Set[str] = set()
|
||||||
changes.update(user_id for user_id, in txn)
|
|
||||||
|
# Query device changes with a batch of users at a time
|
||||||
|
for chunk in batch_iter(user_ids_to_check, 100):
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
txn.database_engine, "user_id", chunk
|
||||||
|
)
|
||||||
|
txn.execute(sql + " AND " + clause, sql_args + args)
|
||||||
|
changes.update(user_id for user_id, in txn)
|
||||||
|
|
||||||
return changes
|
return changes
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue