Cache the most recent serial for each room
This commit is contained in:
parent
e1150cac4b
commit
591c4bf223
|
@ -146,6 +146,10 @@ class PresenceHandler(BaseHandler):
|
||||||
self._user_cachemap = {}
|
self._user_cachemap = {}
|
||||||
self._user_cachemap_latest_serial = 0
|
self._user_cachemap_latest_serial = 0
|
||||||
|
|
||||||
|
# map room_ids to the latest presence serial for a member of that
|
||||||
|
# room
|
||||||
|
self._room_serials = {}
|
||||||
|
|
||||||
metrics.register_callback(
|
metrics.register_callback(
|
||||||
"userCachemap:size",
|
"userCachemap:size",
|
||||||
lambda: len(self._user_cachemap),
|
lambda: len(self._user_cachemap),
|
||||||
|
@ -909,6 +913,9 @@ class PresenceHandler(BaseHandler):
|
||||||
"""
|
"""
|
||||||
if room_ids is None:
|
if room_ids is None:
|
||||||
room_ids = yield self.get_joined_rooms_for_user(user)
|
room_ids = yield self.get_joined_rooms_for_user(user)
|
||||||
|
|
||||||
|
for room_id in room_ids:
|
||||||
|
self._room_serials[room_id] = self._user_cachemap_latest_serial
|
||||||
if add_to_cache:
|
if add_to_cache:
|
||||||
statuscache = self._get_or_make_usercache(user)
|
statuscache = self._get_or_make_usercache(user)
|
||||||
else:
|
else:
|
||||||
|
@ -1069,8 +1076,6 @@ class PresenceEventSource(object):
|
||||||
def get_new_events_for_user(self, user, from_key, limit):
|
def get_new_events_for_user(self, user, from_key, limit):
|
||||||
from_key = int(from_key)
|
from_key = int(from_key)
|
||||||
|
|
||||||
observer_user = user
|
|
||||||
|
|
||||||
presence = self.hs.get_handlers().presence_handler
|
presence = self.hs.get_handlers().presence_handler
|
||||||
cachemap = presence._user_cachemap
|
cachemap = presence._user_cachemap
|
||||||
|
|
||||||
|
@ -1079,17 +1084,28 @@ class PresenceEventSource(object):
|
||||||
clock = self.clock
|
clock = self.clock
|
||||||
latest_serial = 0
|
latest_serial = 0
|
||||||
|
|
||||||
|
presence_list = yield presence.store.get_presence_list(
|
||||||
|
user.localpart, accepted=True
|
||||||
|
)
|
||||||
|
if presence_list is None:
|
||||||
|
presence_list = ()
|
||||||
|
user_ids_to_check = set(
|
||||||
|
UserID.from_string(p["observed_user_id"]) for p in presence_list
|
||||||
|
)
|
||||||
|
room_ids = yield presence.get_joined_rooms_for_user(user)
|
||||||
|
for room_id in set(room_ids) & set(presence._room_serials):
|
||||||
|
if presence._room_serials[room_id] > from_key:
|
||||||
|
joined = yield presence.get_joined_users_for_room_id(room_id)
|
||||||
|
user_ids_to_check |= set(joined)
|
||||||
|
|
||||||
updates = []
|
updates = []
|
||||||
# TODO(paul): use a DeferredList ? How to limit concurrency.
|
# TODO(paul): use a DeferredList ? How to limit concurrency.
|
||||||
for observed_user in cachemap.keys():
|
for observed_user in user_ids_to_check & set(cachemap):
|
||||||
cached = cachemap[observed_user]
|
cached = cachemap[observed_user]
|
||||||
|
|
||||||
if cached.serial <= from_key or cached.serial > max_serial:
|
if cached.serial <= from_key or cached.serial > max_serial:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not (yield self.is_visible(observer_user, observed_user)):
|
|
||||||
continue
|
|
||||||
|
|
||||||
latest_serial = max(cached.serial, latest_serial)
|
latest_serial = max(cached.serial, latest_serial)
|
||||||
updates.append(cached.make_event(user=observed_user, clock=clock))
|
updates.append(cached.make_event(user=observed_user, clock=clock))
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue