Use the same path for incremental with gap or without gap

This commit is contained in:
Erik Johnston 2016-01-27 17:06:52 +00:00
parent b97f6626b6
commit aca3193efb
3 changed files with 147 additions and 212 deletions

View File

@ -72,7 +72,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
) )
class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
"room_id", # str "room_id", # str
"timeline", # TimelineBatch "timeline", # TimelineBatch
"state", # dict[(str, str), FrozenEvent] "state", # dict[(str, str), FrozenEvent]
@ -429,44 +429,20 @@ class SyncHandler(BaseHandler):
defer.returnValue((now_token, ephemeral_by_room)) defer.returnValue((now_token, ephemeral_by_room))
@defer.inlineCallbacks
def full_state_sync_for_archived_room(self, room_id, sync_config, def full_state_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token, leave_event_id, leave_token,
timeline_since_token, tags_by_room, timeline_since_token, tags_by_room,
account_data_by_room): account_data_by_room):
"""Sync a room for a client which is starting without any state """Sync a room for a client which is starting without any state
Returns: Returns:
A Deferred JoinedSyncResult. A Deferred ArchivedSyncResult.
""" """
batch = yield self.load_filtered_recents( return self.incremental_sync_for_archived_room(
room_id, sync_config, leave_token, since_token=timeline_since_token sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room,
account_data_by_room, full_state=True, leave_token=leave_token,
) )
leave_state = yield self.store.get_state_for_event(leave_event_id)
leave_state = {
(e.type, e.state_key): e
for e in sync_config.filter_collection.filter_room_state(
leave_state.values()
)
}
account_data = self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
)
account_data = sync_config.filter_collection.filter_room_account_data(
account_data
)
defer.returnValue(ArchivedSyncResult(
room_id=room_id,
timeline=batch,
state=leave_state,
account_data=account_data,
))
@defer.inlineCallbacks @defer.inlineCallbacks
def incremental_sync_with_gap(self, sync_config, since_token): def incremental_sync_with_gap(self, sync_config, since_token):
""" Get the incremental delta needed to bring the client up to """ Get the incremental delta needed to bring the client up to
@ -512,173 +488,127 @@ class SyncHandler(BaseHandler):
sync_config.user sync_config.user
) )
user_id = sync_config.user.to_string()
timeline_limit = sync_config.filter_collection.timeline_limit() timeline_limit = sync_config.filter_collection.timeline_limit()
tags_by_room = yield self.store.get_updated_tags( tags_by_room = yield self.store.get_updated_tags(
sync_config.user.to_string(), user_id,
since_token.account_data_key, since_token.account_data_key,
) )
account_data, account_data_by_room = ( account_data, account_data_by_room = (
yield self.store.get_updated_account_data_for_user( yield self.store.get_updated_account_data_for_user(
sync_config.user.to_string(), user_id,
since_token.account_data_key, since_token.account_data_key,
) )
) )
# Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_room_changes_for_user( rooms_changed = yield self.store.get_room_changes_for_user(
sync_config.user.to_string(), since_token.room_key, now_token.room_key user_id, since_token.room_key, now_token.room_key
) )
mem_change_events_by_room_id = {}
for event in rooms_changed:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
newly_joined_rooms = []
archived = []
invited = []
for room_id, events in mem_change_events_by_room_id.items():
non_joins = [e for e in events if e.membership != Membership.JOIN]
has_join = len(non_joins) != len(events)
# We want to figure out if we joined the room at some point since
# the last sync (even if we have since left). This is to make sure
# we do send down the room, and with full state, where necessary
if room_id in joined_room_ids or has_join:
old_state = yield self.get_state_at(room_id, since_token)
old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
newly_joined_rooms.append(room_id)
if room_id in joined_room_ids:
continue
if not non_joins:
continue
# Only bother if we're still currently invited
should_invite = non_joins[-1].membership == Membership.INVITE
if should_invite:
room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
if room_sync:
invited.append(room_sync)
# Always include leave/ban events. Just take the last one.
# TODO: How do we handle ban -> leave in same batch?
leave_events = [
e for e in non_joins
if e.membership in (Membership.LEAVE, Membership.BAN)
]
if leave_events:
leave_event = leave_events[-1]
room_sync = yield self.incremental_sync_for_archived_room(
sync_config, room_id, leave_event.event_id, since_token,
tags_by_room, account_data_by_room,
full_state=room_id in newly_joined_rooms
)
if room_sync:
archived.append(room_sync)
# Get all events for rooms we're currently joined to.
room_to_events = yield self.store.get_room_events_stream_for_rooms( room_to_events = yield self.store.get_room_events_stream_for_rooms(
room_ids=room_ids, room_ids=joined_room_ids,
from_key=since_token.room_key, from_key=since_token.room_key,
to_key=now_token.room_key, to_key=now_token.room_key,
limit=timeline_limit + 1, limit=timeline_limit + 1,
) )
room_events = [
event
for events, _ in room_to_events.values()
for event in events
]
room_events.extend(rooms_changed)
# room_events, _ = yield self.store.get_room_events_stream(
# sync_config.user.to_string(),
# from_key=since_token.room_key,
# to_key=now_token.room_key,
# limit=timeline_limit + 1,
# )
joined = [] joined = []
archived = [] # We loop through all room ids, even if there are no new events, in case
if len(room_events) <= timeline_limit: # there are non room events taht we need to notify about.
# There is no gap in any of the rooms. Therefore we can just for room_id in joined_room_ids:
# partition the new events by room and return them. room_entry = room_to_events.get(room_id, None)
logger.debug("Got %i events for incremental sync - not limited",
len(room_events))
invite_events = [] if room_entry:
leave_events = [] events, start_key = room_entry
events_by_room_id = {}
for event in room_events:
events_by_room_id.setdefault(event.room_id, []).append(event)
if event.room_id not in joined_room_ids:
if (event.type == EventTypes.Member
and event.state_key == sync_config.user.to_string()):
if event.membership == Membership.INVITE:
invite_events.append(event)
elif event.membership in (Membership.LEAVE, Membership.BAN):
leave_events.append(event)
for room_id in joined_room_ids: prev_batch_token = now_token.copy_and_replace("room_key", start_key)
recents = events_by_room_id.get(room_id, [])
logger.debug("Events for room %s: %r", room_id, recents)
state = {
(event.type, event.state_key): event
for event in recents if event.is_state()}
limited = False
if recents: newly_joined_room = room_id in newly_joined_rooms
prev_batch = now_token.copy_and_replace( full_state = newly_joined_room
"room_key", recents[0].internal_metadata.before
)
else:
prev_batch = now_token
just_joined = yield self.check_joined_room(sync_config, state) batch = yield self.load_filtered_recents(
if just_joined: room_id, sync_config, prev_batch_token,
logger.debug("User has just joined %s: needs full state", since_token=since_token,
room_id) recents=events,
state = yield self.get_state_at(room_id, now_token) newly_joined_room=newly_joined_room,
# the timeline is inherently limited if we've just joined
limited = True
recents = sync_config.filter_collection.filter_room_timeline(recents)
state = {
(e.type, e.state_key): e
for e in sync_config.filter_collection.filter_room_state(
state.values()
)
}
acc_data = self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
) )
else:
acc_data = sync_config.filter_collection.filter_room_account_data( batch = TimelineBatch(
acc_data events=[],
prev_batch=since_token,
limited=False,
) )
full_state = False
ephemeral = sync_config.filter_collection.filter_room_ephemeral( room_sync = yield self.incremental_sync_with_gap_for_room(
ephemeral_by_room.get(room_id, []) room_id=room_id,
) sync_config=sync_config,
since_token=since_token,
room_sync = JoinedSyncResult( now_token=now_token,
room_id=room_id, ephemeral_by_room=ephemeral_by_room,
timeline=TimelineBatch( tags_by_room=tags_by_room,
events=recents, account_data_by_room=account_data_by_room,
prev_batch=prev_batch, all_ephemeral_by_room=all_ephemeral_by_room,
limited=limited, batch=batch,
), full_state=full_state,
state=state,
ephemeral=ephemeral,
account_data=acc_data,
unread_notifications={},
)
logger.debug("Result for room %s: %r", room_id, room_sync)
if room_sync:
notifs = yield self.unread_notifs_for_room_id(
room_id, sync_config, all_ephemeral_by_room
)
if notifs is not None:
notif_dict = room_sync.unread_notifications
notif_dict["notification_count"] = len(notifs)
notif_dict["highlight_count"] = len([
1 for notif in notifs
if _action_has_highlight(notif["actions"])
])
joined.append(room_sync)
else:
logger.debug("Got %i events for incremental sync - hit limit",
len(room_events))
invite_events = yield self.store.get_invites_for_user(
sync_config.user.to_string()
)
leave_events = yield self.store.get_leave_and_ban_events_for_user(
sync_config.user.to_string()
)
for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token,
ephemeral_by_room, tags_by_room, account_data_by_room,
all_ephemeral_by_room=all_ephemeral_by_room,
)
if room_sync:
joined.append(room_sync)
for leave_event in leave_events:
room_sync = yield self.incremental_sync_for_archived_room(
sync_config, leave_event, since_token, tags_by_room,
account_data_by_room
) )
if room_sync: if room_sync:
archived.append(room_sync) joined.append(room_sync)
invited = [
InvitedSyncResult(room_id=event.room_id, invite=event)
for event in invite_events
]
account_data_for_user = sync_config.filter_collection.filter_account_data( account_data_for_user = sync_config.filter_collection.filter_account_data(
self.account_data_for_user(account_data) self.account_data_for_user(account_data)
@ -699,12 +629,10 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def load_filtered_recents(self, room_id, sync_config, now_token, def load_filtered_recents(self, room_id, sync_config, now_token,
since_token=None): since_token=None, recents=None, newly_joined_room=False):
""" """
:returns a Deferred TimelineBatch :returns a Deferred TimelineBatch
""" """
limited = True
recents = []
filtering_factor = 2 filtering_factor = 2
timeline_limit = sync_config.filter_collection.timeline_limit() timeline_limit = sync_config.filter_collection.timeline_limit()
load_limit = max(timeline_limit * filtering_factor, 100) load_limit = max(timeline_limit * filtering_factor, 100)
@ -712,11 +640,27 @@ class SyncHandler(BaseHandler):
room_key = now_token.room_key room_key = now_token.room_key
end_key = room_key end_key = room_key
limited = recents is None or newly_joined_room or timeline_limit < len(recents)
if recents is not None:
recents = sync_config.filter_collection.filter_room_timeline(recents)
recents = yield self._filter_events_for_client(
sync_config.user.to_string(),
recents,
is_peeking=sync_config.is_guest,
)
else:
recents = []
since_key = None
if since_token and not newly_joined_room:
since_key = since_token.room_key
while limited and len(recents) < timeline_limit and max_repeat: while limited and len(recents) < timeline_limit and max_repeat:
events, end_key = yield self.store.get_recent_room_events_stream_for_room( events, end_key = yield self.store.get_room_events_stream_for_room(
room_id, room_id,
limit=load_limit + 1, limit=load_limit + 1,
from_key=since_token.room_key if since_token else None, from_key=since_key,
to_key=end_key, to_key=end_key,
) )
loaded_recents = sync_config.filter_collection.filter_room_timeline(events) loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
@ -727,6 +671,7 @@ class SyncHandler(BaseHandler):
) )
loaded_recents.extend(recents) loaded_recents.extend(recents)
recents = loaded_recents recents = loaded_recents
if len(events) <= load_limit: if len(events) <= load_limit:
limited = False limited = False
break break
@ -742,7 +687,9 @@ class SyncHandler(BaseHandler):
) )
defer.returnValue(TimelineBatch( defer.returnValue(TimelineBatch(
events=recents, prev_batch=prev_batch_token, limited=limited events=recents,
prev_batch=prev_batch_token,
limited=limited or newly_joined_room
)) ))
@defer.inlineCallbacks @defer.inlineCallbacks
@ -750,24 +697,8 @@ class SyncHandler(BaseHandler):
since_token, now_token, since_token, now_token,
ephemeral_by_room, tags_by_room, ephemeral_by_room, tags_by_room,
account_data_by_room, account_data_by_room,
all_ephemeral_by_room): all_ephemeral_by_room,
""" Get the incremental delta needed to bring the client up to date for batch, full_state=False):
the room. Gives the client the most recent events and the changes to
state.
Returns:
A Deferred JoinedSyncResult
"""
logger.debug("Doing incremental sync for room %s between %s and %s",
room_id, since_token, now_token)
# TODO(mjark): Check for redactions we might have missed.
batch = yield self.load_filtered_recents(
room_id, sync_config, now_token, since_token,
)
logger.debug("Recents %r", batch)
if batch.limited: if batch.limited:
current_state = yield self.get_state_at(room_id, now_token) current_state = yield self.get_state_at(room_id, now_token)
@ -832,43 +763,48 @@ class SyncHandler(BaseHandler):
defer.returnValue(room_sync) defer.returnValue(room_sync)
@defer.inlineCallbacks @defer.inlineCallbacks
def incremental_sync_for_archived_room(self, sync_config, leave_event, def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id,
since_token, tags_by_room, since_token, tags_by_room,
account_data_by_room): account_data_by_room, full_state,
leave_token=None):
""" Get the incremental delta needed to bring the client up to date for """ Get the incremental delta needed to bring the client up to date for
the archived room. the archived room.
Returns: Returns:
A Deferred ArchivedSyncResult A Deferred ArchivedSyncResult
""" """
stream_token = yield self.store.get_stream_token_for_event( if not leave_token:
leave_event.event_id stream_token = yield self.store.get_stream_token_for_event(
) leave_event_id
)
leave_token = since_token.copy_and_replace("room_key", stream_token) leave_token = since_token.copy_and_replace("room_key", stream_token)
if since_token.is_after(leave_token): if since_token and since_token.is_after(leave_token):
defer.returnValue(None) defer.returnValue(None)
batch = yield self.load_filtered_recents( batch = yield self.load_filtered_recents(
leave_event.room_id, sync_config, leave_token, since_token, room_id, sync_config, leave_token, since_token,
) )
logger.debug("Recents %r", batch) logger.debug("Recents %r", batch)
state_events_at_leave = yield self.store.get_state_for_event( state_events_at_leave = yield self.store.get_state_for_event(
leave_event.event_id leave_event_id
) )
state_at_previous_sync = yield self.get_state_at( if not full_state:
leave_event.room_id, stream_position=since_token state_at_previous_sync = yield self.get_state_at(
) room_id, stream_position=since_token
)
state_events_delta = yield self.compute_state_delta( state_events_delta = yield self.compute_state_delta(
since_token=since_token, since_token=since_token,
previous_state=state_at_previous_sync, previous_state=state_at_previous_sync,
current_state=state_events_at_leave, current_state=state_events_at_leave,
) )
else:
state_events_delta = state_events_at_leave
state_events_delta = { state_events_delta = {
(e.type, e.state_key): e (e.type, e.state_key): e
@ -878,7 +814,7 @@ class SyncHandler(BaseHandler):
} }
account_data = self.account_data_for_room( account_data = self.account_data_for_room(
leave_event.room_id, tags_by_room, account_data_by_room room_id, tags_by_room, account_data_by_room
) )
account_data = sync_config.filter_collection.filter_room_account_data( account_data = sync_config.filter_collection.filter_room_account_data(
@ -886,7 +822,7 @@ class SyncHandler(BaseHandler):
) )
room_sync = ArchivedSyncResult( room_sync = ArchivedSyncResult(
room_id=leave_event.room_id, room_id=room_id,
timeline=batch, timeline=batch,
state=state_events_delta, state=state_events_delta,
account_data=account_data, account_data=account_data,

View File

@ -128,7 +128,6 @@ class EventsStore(SQLBaseStore):
is_new_state=is_new_state, is_new_state=is_new_state,
current_state=current_state, current_state=current_state,
) )
logger.info("Invalidating %r at %r", event.room_id, stream_ordering)
self._events_stream_cache.room_has_changed(None, event.room_id, stream_ordering) self._events_stream_cache.room_has_changed(None, event.room_id, stream_ordering)
except _RollbackButIsFineException: except _RollbackButIsFineException:
pass pass

View File

@ -179,7 +179,7 @@ class StreamStore(SQLBaseStore):
room_ids = list(room_ids) room_ids = list(room_ids)
for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)): for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
res = yield defer.gatherResults([ res = yield defer.gatherResults([
self.get_recent_room_events_stream_for_room( self.get_room_events_stream_for_room(
room_id, from_key, to_key, limit room_id, from_key, to_key, limit
).addCallback(lambda r, rm: (rm, r), room_id) ).addCallback(lambda r, rm: (rm, r), room_id)
for room_id in room_ids for room_id in room_ids
@ -189,7 +189,7 @@ class StreamStore(SQLBaseStore):
defer.returnValue(results) defer.returnValue(results)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_recent_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0): def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0):
if from_key is not None: if from_key is not None:
from_id = RoomStreamToken.parse_stream_token(from_key).stream from_id = RoomStreamToken.parse_stream_token(from_key).stream
else: else:
@ -246,7 +246,7 @@ class StreamStore(SQLBaseStore):
key = from_key key = from_key
return ret, key return ret, key
res = yield self.runInteraction("get_recent_room_events_stream_for_room", f) res = yield self.runInteraction("get_room_events_stream_for_room", f)
defer.returnValue(res) defer.returnValue(res)
def get_room_changes_for_user(self, user_id, from_key, to_key): def get_room_changes_for_user(self, user_id, from_key, to_key):