Don't bother checking for updates if the stream token hasn't advanced for a user

This commit is contained in:
Mark Haines 2015-05-13 13:42:21 +01:00
parent cffe6057fb
commit 63878c0379
12 changed files with 123 additions and 55 deletions

View File

@ -105,7 +105,9 @@ class BaseHandler(object):
if not suppress_auth: if not suppress_auth:
self.auth.check(event, auth_events=context.current_state) self.auth.check(event, auth_events=context.current_state)
yield self.store.persist_event(event, context=context) (event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)
federation_handler = self.hs.get_handlers().federation_handler federation_handler = self.hs.get_handlers().federation_handler
@ -142,7 +144,8 @@ class BaseHandler(object):
with PreserveLoggingContext(): with PreserveLoggingContext():
# Don't block waiting on waking up all the listeners. # Don't block waiting on waking up all the listeners.
notify_d = self.notifier.on_new_room_event( notify_d = self.notifier.on_new_room_event(
event, extra_users=extra_users event, event_stream_id, max_stream_id,
extra_users=extra_users
) )
def log_failure(f): def log_failure(f):

View File

@ -160,7 +160,7 @@ class FederationHandler(BaseHandler):
) )
try: try:
yield self._handle_new_event( _, event_stream_id, max_stream_id = yield self._handle_new_event(
origin, origin,
event, event,
state=state, state=state,
@ -203,7 +203,8 @@ class FederationHandler(BaseHandler):
with PreserveLoggingContext(): with PreserveLoggingContext():
d = self.notifier.on_new_room_event( d = self.notifier.on_new_room_event(
event, extra_users=extra_users event, event_stream_id, max_stream_id,
extra_users=extra_users
) )
def log_failure(f): def log_failure(f):
@ -561,7 +562,7 @@ class FederationHandler(BaseHandler):
if e.event_id in auth_ids if e.event_id in auth_ids
} }
yield self._handle_new_event( _, event_stream_id, max_stream_id = yield self._handle_new_event(
origin, origin,
new_event, new_event,
state=state, state=state,
@ -571,7 +572,8 @@ class FederationHandler(BaseHandler):
with PreserveLoggingContext(): with PreserveLoggingContext():
d = self.notifier.on_new_room_event( d = self.notifier.on_new_room_event(
new_event, extra_users=[joinee] new_event, event_stream_id, max_stream_id,
extra_users=[joinee]
) )
def log_failure(f): def log_failure(f):
@ -637,7 +639,9 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = False event.internal_metadata.outlier = False
context = yield self._handle_new_event(origin, event) context, event_stream_id, max_stream_id = yield self._handle_new_event(
origin, event
)
logger.debug( logger.debug(
"on_send_join_request: After _handle_new_event: %s, sigs: %s", "on_send_join_request: After _handle_new_event: %s, sigs: %s",
@ -653,7 +657,7 @@ class FederationHandler(BaseHandler):
with PreserveLoggingContext(): with PreserveLoggingContext():
d = self.notifier.on_new_room_event( d = self.notifier.on_new_room_event(
event, extra_users=extra_users event, event_stream_id, max_stream_id, extra_users=extra_users
) )
def log_failure(f): def log_failure(f):
@ -727,7 +731,7 @@ class FederationHandler(BaseHandler):
context = yield self.state_handler.compute_event_context(event) context = yield self.state_handler.compute_event_context(event)
yield self.store.persist_event( event_stream_id, max_stream_id = yield self.store.persist_event(
event, event,
context=context, context=context,
backfilled=False, backfilled=False,
@ -736,7 +740,8 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(event.state_key) target_user = UserID.from_string(event.state_key)
with PreserveLoggingContext(): with PreserveLoggingContext():
d = self.notifier.on_new_room_event( d = self.notifier.on_new_room_event(
event, extra_users=[target_user], event, event_stream_id, max_stream_id,
extra_users=[target_user],
) )
def log_failure(f): def log_failure(f):
@ -914,7 +919,7 @@ class FederationHandler(BaseHandler):
) )
raise raise
yield self.store.persist_event( event_stream_id, max_stream_id = yield self.store.persist_event(
event, event,
context=context, context=context,
backfilled=backfilled, backfilled=backfilled,
@ -922,7 +927,7 @@ class FederationHandler(BaseHandler):
current_state=current_state, current_state=current_state,
) )
defer.returnValue(context) defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks @defer.inlineCallbacks
def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,

View File

@ -345,6 +345,8 @@ class PresenceHandler(BaseHandler):
curr_users = yield rm_handler.get_room_members(room_id) curr_users = yield rm_handler.get_room_members(room_id)
for local_user in [c for c in curr_users if self.hs.is_mine(c)]: for local_user in [c for c in curr_users if self.hs.is_mine(c)]:
statuscache = self._get_or_offline_usercache(local_user)
statuscache.update({}, serial=self._user_cachemap_latest_serial)
self.push_update_to_local_and_remote( self.push_update_to_local_and_remote(
observed_user=local_user, observed_user=local_user,
users_to_push=[user], users_to_push=[user],
@ -820,6 +822,8 @@ class PresenceHandler(BaseHandler):
room_ids=[], statuscache=None): room_ids=[], statuscache=None):
with PreserveLoggingContext(): with PreserveLoggingContext():
self.notifier.on_new_user_event( self.notifier.on_new_user_event(
"presence_key",
self._user_cachemap_latest_serial,
users_to_push, users_to_push,
room_ids, room_ids,
) )

View File

@ -218,7 +218,9 @@ class TypingNotificationHandler(BaseHandler):
self._room_serials[room_id] = self._latest_room_serial self._room_serials[room_id] = self._latest_room_serial
with PreserveLoggingContext(): with PreserveLoggingContext():
self.notifier.on_new_user_event(rooms=[room_id]) self.notifier.on_new_user_event(
"typing_key", self._latest_room_serial, rooms=[room_id]
)
class TypingNotificationEventSource(object): class TypingNotificationEventSource(object):

View File

@ -52,12 +52,11 @@ class _NotificationListener(object):
def notified(self): def notified(self):
return self.deferred.called return self.deferred.called
def notify(self): def notify(self, token):
""" Inform whoever is listening about the new events. """ Inform whoever is listening about the new events.
""" """
try: try:
self.deferred.callback(None) self.deferred.callback(token)
except defer.AlreadyCalledError: except defer.AlreadyCalledError:
pass pass
@ -73,15 +72,18 @@ class _NotifierUserStream(object):
""" """
def __init__(self, user, rooms, current_token, appservice=None): def __init__(self, user, rooms, current_token, appservice=None):
self.user = user self.user = str(user)
self.appservice = appservice self.appservice = appservice
self.listeners = set() self.listeners = set()
self.rooms = rooms self.rooms = set(rooms)
self.current_token = current_token self.current_token = current_token
def notify(self, new_token): def notify(self, stream_key, stream_id):
self.current_token = self.current_token.copy_and_replace(
stream_key, stream_id
)
for listener in self.listeners: for listener in self.listeners:
listener.notify(new_token) listener.notify(self.current_token)
self.listeners.clear() self.listeners.clear()
def remove(self, notifier): def remove(self, notifier):
@ -117,6 +119,7 @@ class Notifier(object):
self.event_sources = hs.get_event_sources() self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.pending_new_room_events = []
self.clock = hs.get_clock() self.clock = hs.get_clock()
@ -153,9 +156,21 @@ class Notifier(object):
lambda: count(bool, self.appservice_to_user_streams.values()), lambda: count(bool, self.appservice_to_user_streams.values()),
) )
def notify_pending_new_room_events(self, max_room_stream_id):
pending = sorted(self.pending_new_room_events)
self.pending_new_room_events = []
for event, room_stream_id, extra_users in pending:
if room_stream_id > max_room_stream_id:
self.pending_new_room_events.append((
event, room_stream_id, extra_users
))
else:
self._on_new_room_event(event, room_stream_id, extra_users)
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
def on_new_room_event(self, event, new_token, extra_users=[]): def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
extra_users=[]):
""" Used by handlers to inform the notifier something has happened """ Used by handlers to inform the notifier something has happened
in the room, room event wise. in the room, room event wise.
@ -163,8 +178,18 @@ class Notifier(object):
listening to the room, and any listeners for the users in the listening to the room, and any listeners for the users in the
`extra_users` param. `extra_users` param.
""" """
assert isinstance(new_token, StreamToken)
yield run_on_reactor() yield run_on_reactor()
self.notify_pending_new_room_events(max_room_stream_id)
if room_stream_id > max_room_stream_id:
self.pending_new_room_events.append((
event, room_stream_id, extra_users
))
else:
self._on_new_room_event(event, room_stream_id, extra_users)
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
# poke any interested application service. # poke any interested application service.
self.hs.get_handlers().appservice_handler.notify_interested_services( self.hs.get_handlers().appservice_handler.notify_interested_services(
event event
@ -197,33 +222,32 @@ class Notifier(object):
for user_stream in user_streams: for user_stream in user_streams:
try: try:
user_stream.notify(new_token) user_stream.notify("room_key", "s%d" % (room_stream_id,))
except: except:
logger.exception("Failed to notify listener") logger.exception("Failed to notify listener")
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_new_user_event(self, new_token, users=[], rooms=[]): def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend """ Used to inform listeners that something has happend
presence/user event wise. presence/user event wise.
Will wake up all listeners for the given users and rooms. Will wake up all listeners for the given users and rooms.
""" """
assert isinstance(new_token, StreamToken)
yield run_on_reactor() yield run_on_reactor()
user_streams = set() user_streams = set()
for user in users: for user in users:
user_stream = self.user_to_user_stream.get(user) user_stream = self.user_to_user_stream.get(user)
if user_stream: if user_stream is not None:
user_stream.add(user_stream) user_streams.add(user_stream)
for room in rooms: for room in rooms:
user_streams |= self.room_to_user_streams.get(room, set()) user_streams |= self.room_to_user_streams.get(room, set())
for user_stream in user_streams: for user_stream in user_streams:
try: try:
user_streams.notify(new_token) user_stream.notify(stream_key, new_token)
except: except:
logger.exception("Failed to notify listener") logger.exception("Failed to notify listener")
@ -236,12 +260,12 @@ class Notifier(object):
deferred = defer.Deferred() deferred = defer.Deferred()
user_stream = self.user_to_user_streams.get(user) user = str(user)
user_stream = self.user_to_user_stream.get(user)
if user_stream is None: if user_stream is None:
appservice = yield self.store.get_app_service_by_user_id( appservice = yield self.store.get_app_service_by_user_id(user)
user.to_string()
)
current_token = yield self.event_sources.get_current_token() current_token = yield self.event_sources.get_current_token()
rooms = yield self.store.get_rooms_for_user(user)
user_stream = _NotifierUserStream( user_stream = _NotifierUserStream(
user=user, user=user,
rooms=rooms, rooms=rooms,
@ -252,8 +276,9 @@ class Notifier(object):
else: else:
current_token = user_stream.current_token current_token = user_stream.current_token
listener = [_NotificationListener(deferred)]
if timeout and not current_token.is_after(from_token): if timeout and not current_token.is_after(from_token):
listener = [_NotificationListener(deferred)]
user_stream.listeners.add(listener[0]) user_stream.listeners.add(listener[0])
if current_token.is_after(from_token): if current_token.is_after(from_token):
@ -334,7 +359,7 @@ class Notifier(object):
self.user_to_user_stream[user_stream.user] = user_stream self.user_to_user_stream[user_stream.user] = user_stream
for room in user_stream.rooms: for room in user_stream.rooms:
s = self.room_to_user_stream.setdefault(room, set()) s = self.room_to_user_streams.setdefault(room, set())
s.add(user_stream) s.add(user_stream)
if user_stream.appservice: if user_stream.appservice:
@ -343,10 +368,12 @@ class Notifier(object):
).add(user_stream) ).add(user_stream)
def _user_joined_room(self, user, room_id): def _user_joined_room(self, user, room_id):
user = str(user)
new_user_stream = self.user_to_user_stream.get(user) new_user_stream = self.user_to_user_stream.get(user)
room_streams = self.room_to_user_streams.setdefault(room_id, set()) if new_user_stream is not None:
room_streams.add(new_user_stream) room_streams = self.room_to_user_streams.setdefault(room_id, set())
new_user_stream.rooms.add(room_id) room_streams.add(new_user_stream)
new_user_stream.rooms.add(room_id)
def _discard_if_notified(listener_set): def _discard_if_notified(listener_set):

View File

@ -64,6 +64,9 @@ class EventsStore(SQLBaseStore):
except _RollbackButIsFineException: except _RollbackButIsFineException:
pass pass
max_persisted_id = yield self._stream_id_gen.get_max_token(self)
defer.returnValue((stream_ordering, max_persisted_id))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True, def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False, get_prev_content=False, allow_rejected=False,

View File

@ -70,6 +70,8 @@ class DomainSpecificString(
"""Return a string encoding the fields of the structure object.""" """Return a string encoding the fields of the structure object."""
return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain) return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain)
__str__ = to_string
@classmethod @classmethod
def create(cls, localpart, domain,): def create(cls, localpart, domain,):
return cls(localpart=localpart, domain=domain) return cls(localpart=localpart, domain=domain)
@ -107,7 +109,6 @@ class StreamToken(
def from_string(cls, string): def from_string(cls, string):
try: try:
keys = string.split(cls._SEPARATOR) keys = string.split(cls._SEPARATOR)
return cls(*keys) return cls(*keys)
except: except:
raise SynapseError(400, "Invalid Token") raise SynapseError(400, "Invalid Token")
@ -115,6 +116,22 @@ class StreamToken(
def to_string(self): def to_string(self):
return self._SEPARATOR.join([str(k) for k in self]) return self._SEPARATOR.join([str(k) for k in self])
@property
def room_stream_id(self):
# TODO(markjh): Awful hack to work around hacks in the presence tests
if type(self.room_key) is int:
return self.room_key
else:
return int(self.room_key[1:].split("-")[-1])
def is_after(self, other_token):
"""Does this token contain events that the other doesn't?"""
return (
(other_token.room_stream_id < self.room_stream_id)
or (int(other_token.presence_key) < int(self.presence_key))
or (int(other_token.typing_key) < int(self.typing_key))
)
def copy_and_replace(self, key, new_value): def copy_and_replace(self, key, new_value):
d = self._asdict() d = self._asdict()
d[key] = new_value d[key] = new_value

View File

@ -83,7 +83,7 @@ class FederationTestCase(unittest.TestCase):
"hashes": {"sha256":"AcLrgtUIqqwaGoHhrEvYG1YLDIsVPYJdSRGhkp3jJp8"}, "hashes": {"sha256":"AcLrgtUIqqwaGoHhrEvYG1YLDIsVPYJdSRGhkp3jJp8"},
}) })
self.datastore.persist_event.return_value = defer.succeed(None) self.datastore.persist_event.return_value = defer.succeed((1,1))
self.datastore.get_room.return_value = defer.succeed(True) self.datastore.get_room.return_value = defer.succeed(True)
self.auth.check_host_in_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True)
@ -126,5 +126,5 @@ class FederationTestCase(unittest.TestCase):
self.auth.check.assert_called_once_with(ANY, auth_events={}) self.auth.check.assert_called_once_with(ANY, auth_events={})
self.notifier.on_new_room_event.assert_called_once_with( self.notifier.on_new_room_event.assert_called_once_with(
ANY, extra_users=[] ANY, 1, 1, extra_users=[]
) )

View File

@ -87,6 +87,8 @@ class RoomMemberHandlerTestCase(unittest.TestCase):
self.ratelimiter = hs.get_ratelimiter() self.ratelimiter = hs.get_ratelimiter()
self.ratelimiter.send_message.return_value = (True, 0) self.ratelimiter.send_message.return_value = (True, 0)
self.datastore.persist_event.return_value = (1,1)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_invite(self): def test_invite(self):
room_id = "!foo:red" room_id = "!foo:red"
@ -160,7 +162,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase):
event, context=context, event, context=context,
) )
self.notifier.on_new_room_event.assert_called_once_with( self.notifier.on_new_room_event.assert_called_once_with(
event, extra_users=[UserID.from_string(target_user_id)] event, 1, 1, extra_users=[UserID.from_string(target_user_id)]
) )
self.assertFalse(self.datastore.get_room.called) self.assertFalse(self.datastore.get_room.called)
self.assertFalse(self.datastore.store_room.called) self.assertFalse(self.datastore.store_room.called)
@ -226,7 +228,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase):
event, context=context event, context=context
) )
self.notifier.on_new_room_event.assert_called_once_with( self.notifier.on_new_room_event.assert_called_once_with(
event, extra_users=[user] event, 1, 1, extra_users=[user]
) )
join_signal_observer.assert_called_with( join_signal_observer.assert_called_with(
@ -304,7 +306,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase):
event, context=context event, context=context
) )
self.notifier.on_new_room_event.assert_called_once_with( self.notifier.on_new_room_event.assert_called_once_with(
event, extra_users=[user] event, 1, 1, extra_users=[user]
) )
leave_signal_observer.assert_called_with( leave_signal_observer.assert_called_with(

View File

@ -183,7 +183,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
) )
self.on_new_user_event.assert_has_calls([ self.on_new_user_event.assert_has_calls([
call(rooms=[self.room_id]), call('typing_key', 1, rooms=[self.room_id]),
]) ])
self.assertEquals(self.event_source.get_current_key(), 1) self.assertEquals(self.event_source.get_current_key(), 1)
@ -246,7 +246,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
) )
self.on_new_user_event.assert_has_calls([ self.on_new_user_event.assert_has_calls([
call(rooms=[self.room_id]), call('typing_key', 1, rooms=[self.room_id]),
]) ])
self.assertEquals(self.event_source.get_current_key(), 1) self.assertEquals(self.event_source.get_current_key(), 1)
@ -300,7 +300,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
) )
self.on_new_user_event.assert_has_calls([ self.on_new_user_event.assert_has_calls([
call(rooms=[self.room_id]), call('typing_key', 1, rooms=[self.room_id]),
]) ])
yield put_json.await_calls() yield put_json.await_calls()
@ -332,7 +332,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
) )
self.on_new_user_event.assert_has_calls([ self.on_new_user_event.assert_has_calls([
call(rooms=[self.room_id]), call('typing_key', 1, rooms=[self.room_id]),
]) ])
self.on_new_user_event.reset_mock() self.on_new_user_event.reset_mock()
@ -352,7 +352,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
self.clock.advance_time(11) self.clock.advance_time(11)
self.on_new_user_event.assert_has_calls([ self.on_new_user_event.assert_has_calls([
call(rooms=[self.room_id]), call('typing_key', 2, rooms=[self.room_id]),
]) ])
self.assertEquals(self.event_source.get_current_key(), 2) self.assertEquals(self.event_source.get_current_key(), 2)
@ -378,7 +378,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
) )
self.on_new_user_event.assert_has_calls([ self.on_new_user_event.assert_has_calls([
call(rooms=[self.room_id]), call('typing_key', 3, rooms=[self.room_id]),
]) ])
self.on_new_user_event.reset_mock() self.on_new_user_event.reset_mock()

View File

@ -27,6 +27,7 @@ from synapse.handlers.presence import PresenceHandler
from synapse.rest.client.v1 import presence from synapse.rest.client.v1 import presence
from synapse.rest.client.v1 import events from synapse.rest.client.v1 import events
from synapse.types import UserID from synapse.types import UserID
from synapse.util.async import run_on_reactor
OFFLINE = PresenceState.OFFLINE OFFLINE = PresenceState.OFFLINE
@ -264,6 +265,7 @@ class PresenceEventStreamTestCase(unittest.TestCase):
datastore=Mock(spec=[ datastore=Mock(spec=[
"set_presence_state", "set_presence_state",
"get_presence_list", "get_presence_list",
"get_rooms_for_user",
]), ]),
clock=Mock(spec=[ clock=Mock(spec=[
"call_later", "call_later",
@ -298,6 +300,9 @@ class PresenceEventStreamTestCase(unittest.TestCase):
self.mock_datastore.get_app_service_by_user_id = Mock( self.mock_datastore.get_app_service_by_user_id = Mock(
return_value=defer.succeed(None) return_value=defer.succeed(None)
) )
self.mock_datastore.get_rooms_for_user = (
lambda u: get_rooms_for_user(UserID.from_string(u))
)
def get_profile_displayname(user_id): def get_profile_displayname(user_id):
return defer.succeed("Frank") return defer.succeed("Frank")
@ -350,19 +355,19 @@ class PresenceEventStreamTestCase(unittest.TestCase):
self.mock_datastore.set_presence_state.return_value = defer.succeed( self.mock_datastore.set_presence_state.return_value = defer.succeed(
{"state": ONLINE} {"state": ONLINE}
) )
self.mock_datastore.get_presence_list.return_value = defer.succeed( self.mock_datastore.get_presence_list.return_value = defer.succeed([])
[]
)
yield self.presence.set_state(self.u_banana, self.u_banana, yield self.presence.set_state(self.u_banana, self.u_banana,
state={"presence": ONLINE} state={"presence": ONLINE}
) )
yield run_on_reactor()
(code, response) = yield self.mock_resource.trigger("GET", (code, response) = yield self.mock_resource.trigger("GET",
"/events?from=0_1_0&timeout=0", None) "/events?from=s0_1_0&timeout=0", None)
self.assertEquals(200, code) self.assertEquals(200, code)
self.assertEquals({"start": "0_1_0", "end": "0_2_0", "chunk": [ self.assertEquals({"start": "s0_1_0", "end": "s0_2_0", "chunk": [
{"type": "m.presence", {"type": "m.presence",
"content": { "content": {
"user_id": "@banana:test", "user_id": "@banana:test",

View File

@ -355,7 +355,7 @@ class MemoryDataStore(object):
return [] return []
def get_room_events_max_id(self): def get_room_events_max_id(self):
return 0 # TODO (erikj) return "s0" # TODO (erikj)
def get_send_event_level(self, room_id): def get_send_event_level(self, room_id):
return defer.succeed(0) return defer.succeed(0)