Implement presence state visibilty limiting when polling eventsource for stream

This commit is contained in:
Paul "LeoNerd" Evans 2014-09-02 16:29:04 +01:00
parent 86d6232236
commit da31b96b55
3 changed files with 147 additions and 15 deletions

View File

@ -156,7 +156,7 @@ class PresenceHandler(BaseHandler):
defer.returnValue(True) defer.returnValue(True)
if (yield self.store.user_rooms_intersect( if (yield self.store.user_rooms_intersect(
[observer_user, observed_user] [u.to_string() for u in observer_user, observed_user]
)): )):
defer.returnValue(True) defer.returnValue(True)
@ -772,15 +772,52 @@ class PresenceEventSource(object):
self.hs = hs self.hs = hs
self.clock = hs.get_clock() self.clock = hs.get_clock()
@defer.inlineCallbacks
def is_visible(self, observer_user, observed_user):
if observer_user == observed_user:
defer.returnValue(True)
presence = self.hs.get_handlers().presence_handler
if (yield presence.store.user_rooms_intersect(
[u.to_string() for u in observer_user, observed_user]
)):
defer.returnValue(True)
if observed_user.is_mine:
pushmap = presence._local_pushmap
defer.returnValue(
observed_user.localpart in pushmap and
observer_user in pushmap[observed_user.localpart]
)
else:
recvmap = presence._remote_recvmap
defer.returnValue(
observed_user in recvmap and
observer_user in recvmap[observed_user]
)
@defer.inlineCallbacks
def get_new_events_for_user(self, user, from_key, limit): def get_new_events_for_user(self, user, from_key, limit):
from_key = int(from_key) from_key = int(from_key)
observer_user = user
presence = self.hs.get_handlers().presence_handler presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap cachemap = presence._user_cachemap
# TODO(paul): limit, and filter by visibility updates = []
updates = [(k, cachemap[k]) for k in cachemap # TODO(paul): use a DeferredList ? How to limit concurrency.
if from_key < cachemap[k].serial] for observed_user in cachemap.keys():
if not (from_key < cachemap[observed_user].serial):
continue
if (yield self.is_visible(observer_user, observed_user)):
updates.append((observed_user, cachemap[observed_user]))
# TODO(paul): limit
if updates: if updates:
clock = self.clock clock = self.clock
@ -788,14 +825,15 @@ class PresenceEventSource(object):
latest_serial = max([x[1].serial for x in updates]) latest_serial = max([x[1].serial for x in updates])
data = [x[1].make_event(user=x[0], clock=clock) for x in updates] data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
return ((data, latest_serial)) defer.returnValue((data, latest_serial))
else: else:
return (([], presence._user_cachemap_latest_serial)) defer.returnValue(([], presence._user_cachemap_latest_serial))
def get_current_key(self): def get_current_key(self):
presence = self.hs.get_handlers().presence_handler presence = self.hs.get_handlers().presence_handler
return presence._user_cachemap_latest_serial return presence._user_cachemap_latest_serial
@defer.inlineCallbacks
def get_pagination_rows(self, user, pagination_config, key): def get_pagination_rows(self, user, pagination_config, key):
# TODO (erikj): Does this make sense? Ordering? # TODO (erikj): Does this make sense? Ordering?
@ -812,7 +850,17 @@ class PresenceEventSource(object):
presence = self.hs.get_handlers().presence_handler presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap cachemap = presence._user_cachemap
# TODO(paul): limit, and filter by visibility updates = []
# TODO(paul): use a DeferredList ? How to limit concurrency.
for observed_user in cachemap.keys():
if not (to_key < cachemap[observed_user].serial < from_key):
continue
if (yield self.is_visible(observer_user, observed_user)):
updates.append((observed_user, cachemap[observed_user]))
# TODO(paul): limit
updates = [(k, cachemap[k]) for k in cachemap updates = [(k, cachemap[k]) for k in cachemap
if to_key < cachemap[k].serial < from_key] if to_key < cachemap[k].serial < from_key]
@ -830,13 +878,13 @@ class PresenceEventSource(object):
next_token = next_token.copy_and_replace( next_token = next_token.copy_and_replace(
"presence_key", earliest_serial "presence_key", earliest_serial
) )
return ((data, next_token)) defer.returnValue((data, next_token))
else: else:
if not to_token: if not to_token:
to_token = from_token.copy_and_replace( to_token = from_token.copy_and_replace(
"presence_key", 0 "presence_key", 0
) )
return (([], to_token)) defer.returnValue(([], to_token))
class UserPresenceCache(object): class UserPresenceCache(object):

View File

@ -118,7 +118,9 @@ class PresenceStateTestCase(unittest.TestCase):
room_member_handler.get_room_members = get_room_members room_member_handler.get_room_members = get_room_members
def user_rooms_intersect(userlist): def user_rooms_intersect(userlist):
shared = all(map(lambda u: u in self.room_members, userlist)) room_member_ids = map(lambda u: u.to_string(), self.room_members)
shared = all(map(lambda i: i in room_member_ids, userlist))
return defer.succeed(shared) return defer.succeed(shared)
self.datastore.user_rooms_intersect = user_rooms_intersect self.datastore.user_rooms_intersect = user_rooms_intersect
@ -562,6 +564,13 @@ class PresencePushTestCase(unittest.TestCase):
return defer.succeed([]) return defer.succeed([])
self.datastore.get_joined_hosts_for_room = get_room_hosts self.datastore.get_joined_hosts_for_room = get_room_hosts
def user_rooms_intersect(userlist):
room_member_ids = map(lambda u: u.to_string(), self.room_members)
shared = all(map(lambda i: i in room_member_ids, userlist))
return defer.succeed(shared)
self.datastore.user_rooms_intersect = user_rooms_intersect
@defer.inlineCallbacks @defer.inlineCallbacks
def fetch_room_distributions_into(room_id, localusers=None, def fetch_room_distributions_into(room_id, localusers=None,
remotedomains=None, ignore_user=None): remotedomains=None, ignore_user=None):
@ -604,6 +613,7 @@ class PresencePushTestCase(unittest.TestCase):
self.u_apple = hs.parse_userid("@apple:test") self.u_apple = hs.parse_userid("@apple:test")
self.u_banana = hs.parse_userid("@banana:test") self.u_banana = hs.parse_userid("@banana:test")
self.u_clementine = hs.parse_userid("@clementine:test") self.u_clementine = hs.parse_userid("@clementine:test")
self.u_durian = hs.parse_userid("@durian:test")
self.u_elderberry = hs.parse_userid("@elderberry:test") self.u_elderberry = hs.parse_userid("@elderberry:test")
# Remote user # Remote user
@ -632,6 +642,7 @@ class PresencePushTestCase(unittest.TestCase):
{"presence": ONLINE} {"presence": ONLINE}
) )
# Apple sees self-reflection
(events, _) = yield self.event_source.get_new_events_for_user( (events, _) = yield self.event_source.get_new_events_for_user(
self.u_apple, 0, None self.u_apple, 0, None
) )
@ -647,6 +658,55 @@ class PresencePushTestCase(unittest.TestCase):
"last_active_ago": 0, "last_active_ago": 0,
}}, }},
], ],
msg="Presence event should be visible to self-reflection"
)
# Banana sees it because of presence subscription
(events, _) = yield self.event_source.get_new_events_for_user(
self.u_banana, 0, None
)
self.assertEquals(self.event_source.get_current_key(), 1)
self.assertEquals(events,
[
{"type": "m.presence",
"content": {
"user_id": "@apple:test",
"presence": ONLINE,
"state": ONLINE,
"last_active_ago": 0,
}},
],
msg="Presence event should be visible to explicit subscribers"
)
# Elderberry sees it because of same room
(events, _) = yield self.event_source.get_new_events_for_user(
self.u_elderberry, 0, None
)
self.assertEquals(self.event_source.get_current_key(), 1)
self.assertEquals(events,
[
{"type": "m.presence",
"content": {
"user_id": "@apple:test",
"presence": ONLINE,
"state": ONLINE,
"last_active_ago": 0,
}},
],
msg="Presence event should be visible to other room members"
)
# Durian is not in the room, should not see this event
(events, _) = yield self.event_source.get_new_events_for_user(
self.u_durian, 0, None
)
self.assertEquals(self.event_source.get_current_key(), 1)
self.assertEquals(events, [],
msg="Presence event should not be visible to others"
) )
presence = yield self.handler.get_presence_list( presence = yield self.handler.get_presence_list(
@ -664,6 +724,10 @@ class PresencePushTestCase(unittest.TestCase):
presence presence
) )
# TODO(paul): Gut-wrenching
banana_set = self.handler._local_pushmap.setdefault("banana", set())
banana_set.add(self.u_apple)
yield self.handler.set_state(self.u_banana, self.u_banana, yield self.handler.set_state(self.u_banana, self.u_banana,
{"presence": ONLINE} {"presence": ONLINE}
) )
@ -825,6 +889,8 @@ class PresencePushTestCase(unittest.TestCase):
"a-room" "a-room"
) )
self.room_members.append(self.u_clementine)
(events, _) = yield self.event_source.get_new_events_for_user( (events, _) = yield self.event_source.get_new_events_for_user(
self.u_apple, 0, None self.u_apple, 0, None
) )

View File

@ -269,11 +269,16 @@ class PresenceEventStreamTestCase(unittest.TestCase):
hs.register_servlets() hs.register_servlets()
hs.handlers.room_member_handler = Mock(spec=[ hs.handlers.room_member_handler = Mock(spec=[])
"get_rooms_for_user",
]) self.room_members = []
hs.handlers.room_member_handler.get_rooms_for_user = (
lambda u: defer.succeed([])) def get_rooms_for_user(user):
if user in self.room_members:
return ["a-room"]
else:
return []
hs.handlers.room_member_handler.get_rooms_for_user = get_rooms_for_user
self.mock_datastore = hs.get_datastore() self.mock_datastore = hs.get_datastore()
@ -285,6 +290,17 @@ class PresenceEventStreamTestCase(unittest.TestCase):
return defer.succeed(None) return defer.succeed(None)
self.mock_datastore.get_profile_avatar_url = get_profile_avatar_url self.mock_datastore.get_profile_avatar_url = get_profile_avatar_url
def user_rooms_intersect(user_list):
room_member_ids = map(lambda u: u.to_string(), self.room_members)
shared = all(map(lambda i: i in room_member_ids, user_list))
return defer.succeed(shared)
self.mock_datastore.user_rooms_intersect = user_rooms_intersect
def get_joined_hosts_for_room(room_id):
return []
self.mock_datastore.get_joined_hosts_for_room = get_joined_hosts_for_room
self.presence = hs.get_handlers().presence_handler self.presence = hs.get_handlers().presence_handler
self.u_apple = hs.parse_userid("@apple:test") self.u_apple = hs.parse_userid("@apple:test")
@ -292,6 +308,8 @@ class PresenceEventStreamTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_shortpoll(self): def test_shortpoll(self):
self.room_members = [self.u_apple, self.u_banana]
self.mock_datastore.set_presence_state.return_value = defer.succeed( self.mock_datastore.set_presence_state.return_value = defer.succeed(
{"state": ONLINE}) {"state": ONLINE})
self.mock_datastore.get_presence_list.return_value = defer.succeed( self.mock_datastore.get_presence_list.return_value = defer.succeed(