Merge pull request #1876 from matrix-org/erikj/shared_member_store
Make presence.get_new_events a bit faster
This commit is contained in:
commit
a8331897aa
|
@ -1011,7 +1011,7 @@ class PresenceEventSource(object):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
|
def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
|
||||||
**kwargs):
|
explicit_room_id=None, **kwargs):
|
||||||
# The process for getting presence events are:
|
# The process for getting presence events are:
|
||||||
# 1. Get the rooms the user is in.
|
# 1. Get the rooms the user is in.
|
||||||
# 2. Get the list of user in the rooms.
|
# 2. Get the list of user in the rooms.
|
||||||
|
@ -1028,22 +1028,24 @@ class PresenceEventSource(object):
|
||||||
user_id = user.to_string()
|
user_id = user.to_string()
|
||||||
if from_key is not None:
|
if from_key is not None:
|
||||||
from_key = int(from_key)
|
from_key = int(from_key)
|
||||||
room_ids = room_ids or []
|
|
||||||
|
|
||||||
presence = self.get_presence_handler()
|
presence = self.get_presence_handler()
|
||||||
stream_change_cache = self.store.presence_stream_cache
|
stream_change_cache = self.store.presence_stream_cache
|
||||||
|
|
||||||
if not room_ids:
|
|
||||||
rooms = yield self.store.get_rooms_for_user(user_id)
|
|
||||||
room_ids = set(e.room_id for e in rooms)
|
|
||||||
else:
|
|
||||||
room_ids = set(room_ids)
|
|
||||||
|
|
||||||
max_token = self.store.get_current_presence_token()
|
max_token = self.store.get_current_presence_token()
|
||||||
|
|
||||||
plist = yield self.store.get_presence_list_accepted(user.localpart)
|
plist = yield self.store.get_presence_list_accepted(user.localpart)
|
||||||
friends = set(row["observed_user_id"] for row in plist)
|
users_interested_in = set(row["observed_user_id"] for row in plist)
|
||||||
friends.add(user_id) # So that we receive our own presence
|
users_interested_in.add(user_id) # So that we receive our own presence
|
||||||
|
|
||||||
|
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
||||||
|
user_id
|
||||||
|
)
|
||||||
|
users_interested_in.update(users_who_share_room)
|
||||||
|
|
||||||
|
if explicit_room_id:
|
||||||
|
user_ids = yield self.store.get_users_in_room(explicit_room_id)
|
||||||
|
users_interested_in.update(user_ids)
|
||||||
|
|
||||||
user_ids_changed = set()
|
user_ids_changed = set()
|
||||||
changed = None
|
changed = None
|
||||||
|
@ -1055,35 +1057,19 @@ class PresenceEventSource(object):
|
||||||
# work out if we share a room or they're in our presence list
|
# work out if we share a room or they're in our presence list
|
||||||
get_updates_counter.inc("stream")
|
get_updates_counter.inc("stream")
|
||||||
for other_user_id in changed:
|
for other_user_id in changed:
|
||||||
if other_user_id in friends:
|
if other_user_id in users_interested_in:
|
||||||
user_ids_changed.add(other_user_id)
|
user_ids_changed.add(other_user_id)
|
||||||
continue
|
|
||||||
other_rooms = yield self.store.get_rooms_for_user(other_user_id)
|
|
||||||
if room_ids.intersection(e.room_id for e in other_rooms):
|
|
||||||
user_ids_changed.add(other_user_id)
|
|
||||||
continue
|
|
||||||
else:
|
else:
|
||||||
# Too many possible updates. Find all users we can see and check
|
# Too many possible updates. Find all users we can see and check
|
||||||
# if any of them have changed.
|
# if any of them have changed.
|
||||||
get_updates_counter.inc("full")
|
get_updates_counter.inc("full")
|
||||||
|
|
||||||
user_ids_to_check = set()
|
|
||||||
for room_id in room_ids:
|
|
||||||
users = yield self.store.get_users_in_room(room_id)
|
|
||||||
user_ids_to_check.update(users)
|
|
||||||
|
|
||||||
user_ids_to_check.update(friends)
|
|
||||||
|
|
||||||
# Always include yourself. Only really matters for when the user is
|
|
||||||
# not in any rooms, but still.
|
|
||||||
user_ids_to_check.add(user_id)
|
|
||||||
|
|
||||||
if from_key:
|
if from_key:
|
||||||
user_ids_changed = stream_change_cache.get_entities_changed(
|
user_ids_changed = stream_change_cache.get_entities_changed(
|
||||||
user_ids_to_check, from_key,
|
users_interested_in, from_key,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
user_ids_changed = user_ids_to_check
|
user_ids_changed = users_interested_in
|
||||||
|
|
||||||
updates = yield presence.current_state_for_users(user_ids_changed)
|
updates = yield presence.current_state_for_users(user_ids_changed)
|
||||||
|
|
||||||
|
|
|
@ -437,6 +437,7 @@ class RoomEventSource(object):
|
||||||
limit,
|
limit,
|
||||||
room_ids,
|
room_ids,
|
||||||
is_guest,
|
is_guest,
|
||||||
|
explicit_room_id=None,
|
||||||
):
|
):
|
||||||
# We just ignore the key for now.
|
# We just ignore the key for now.
|
||||||
|
|
||||||
|
|
|
@ -378,6 +378,7 @@ class Notifier(object):
|
||||||
limit=limit,
|
limit=limit,
|
||||||
is_guest=is_peeking,
|
is_guest=is_peeking,
|
||||||
room_ids=room_ids,
|
room_ids=room_ids,
|
||||||
|
explicit_room_id=explicit_room_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
if name == "room":
|
if name == "room":
|
||||||
|
|
|
@ -73,6 +73,9 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
# to reach inside the __dict__ to extract them.
|
# to reach inside the __dict__ to extract them.
|
||||||
get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
|
get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
|
||||||
get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
|
get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
|
||||||
|
get_users_who_share_room_with_user = (
|
||||||
|
RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
|
||||||
|
)
|
||||||
get_latest_event_ids_in_room = EventFederationStore.__dict__[
|
get_latest_event_ids_in_room = EventFederationStore.__dict__[
|
||||||
"get_latest_event_ids_in_room"
|
"get_latest_event_ids_in_room"
|
||||||
]
|
]
|
||||||
|
|
|
@ -280,6 +280,23 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
user_id, membership_list=[Membership.JOIN],
|
user_id, membership_list=[Membership.JOIN],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@cachedInlineCallbacks(max_entries=50000, cache_context=True, iterable=True)
|
||||||
|
def get_users_who_share_room_with_user(self, user_id, cache_context):
|
||||||
|
"""Returns the set of users who share a room with `user_id`
|
||||||
|
"""
|
||||||
|
rooms = yield self.get_rooms_for_user(
|
||||||
|
user_id, on_invalidate=cache_context.invalidate,
|
||||||
|
)
|
||||||
|
|
||||||
|
user_who_share_room = set()
|
||||||
|
for room in rooms:
|
||||||
|
user_ids = yield self.get_users_in_room(
|
||||||
|
room.room_id, on_invalidate=cache_context.invalidate,
|
||||||
|
)
|
||||||
|
user_who_share_room.update(user_ids)
|
||||||
|
|
||||||
|
defer.returnValue(user_who_share_room)
|
||||||
|
|
||||||
def forget(self, user_id, room_id):
|
def forget(self, user_id, room_id):
|
||||||
"""Indicate that user_id wishes to discard history for room_id."""
|
"""Indicate that user_id wishes to discard history for room_id."""
|
||||||
def f(txn):
|
def f(txn):
|
||||||
|
|
|
@ -478,6 +478,11 @@ class CacheListDescriptor(object):
|
||||||
|
|
||||||
|
|
||||||
class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))):
|
class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))):
|
||||||
|
# We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
|
||||||
|
# which namedtuple does for us (i.e. two _CacheContext are the same if
|
||||||
|
# their caches and keys match). This is important in particular to
|
||||||
|
# dedupe when we add callbacks to lru cache nodes, otherwise the number
|
||||||
|
# of callbacks would grow.
|
||||||
def invalidate(self):
|
def invalidate(self):
|
||||||
self.cache.invalidate(self.key)
|
self.cache.invalidate(self.key)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue