Shuffle things room

This commit is contained in:
Erik Johnston 2016-05-24 09:43:35 +01:00
parent c0c79ef444
commit 137e6a4557
1 changed files with 33 additions and 37 deletions

View File

@ -499,6 +499,10 @@ class SyncHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def generate_sync_result(self, sync_config, since_token=None, full_state=False): def generate_sync_result(self, sync_config, since_token=None, full_state=False):
# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = yield self.event_sources.get_current_token() now_token = yield self.event_sources.get_current_token()
sync_result_builer = SyncResultBuilder( sync_result_builer = SyncResultBuilder(
@ -511,9 +515,10 @@ class SyncHandler(object):
sync_result_builer sync_result_builer
) )
newly_joined_rooms, newly_joined_users = yield self.generate_sync_entry_for_rooms( res = yield self.generate_sync_entry_for_rooms(
sync_result_builer, account_data_by_room sync_result_builer, account_data_by_room
) )
newly_joined_rooms, newly_joined_users = res
yield self.generate_sync_entry_for_presence( yield self.generate_sync_entry_for_presence(
sync_result_builer, newly_joined_rooms, newly_joined_users sync_result_builer, newly_joined_rooms, newly_joined_users
@ -631,7 +636,7 @@ class SyncHandler(object):
if sync_result_builer.since_token: if sync_result_builer.since_token:
res = yield self._get_rooms_changed(sync_result_builer, ignored_users) res = yield self._get_rooms_changed(sync_result_builer, ignored_users)
joined, invited, archived, newly_joined_rooms = res room_entries, invited, newly_joined_rooms = res
tags_by_room = yield self.store.get_updated_tags( tags_by_room = yield self.store.get_updated_tags(
user_id, user_id,
@ -639,13 +644,12 @@ class SyncHandler(object):
) )
else: else:
res = yield self._get_all_rooms(sync_result_builer, ignored_users) res = yield self._get_all_rooms(sync_result_builer, ignored_users)
joined, invited, archived, newly_joined_rooms = res room_entries, invited, newly_joined_rooms = res
tags_by_room = yield self.store.get_tags_for_user(user_id) tags_by_room = yield self.store.get_tags_for_user(user_id)
def handle_joined(room_entry): def handle_room_entries(room_entry):
return self._generate_room_entry( return self._generate_room_entry(
"joined",
sync_result_builer, sync_result_builer,
ignored_users, ignored_users,
room_entry, room_entry,
@ -655,21 +659,7 @@ class SyncHandler(object):
always_include=sync_result_builer.full_state, always_include=sync_result_builer.full_state,
) )
yield concurrently_execute(handle_joined, joined, 10) yield concurrently_execute(handle_room_entries, room_entries, 10)
def handle_archived(room_entry):
return self._generate_room_entry(
"archived",
sync_result_builer,
ignored_users,
room_entry,
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id),
account_data=account_data_by_room.get(room_entry.room_id, {}),
always_include=sync_result_builer.full_state,
)
yield concurrently_execute(handle_archived, archived, 10)
sync_result_builer.invited.extend(invited) sync_result_builer.invited.extend(invited)
@ -711,7 +701,7 @@ class SyncHandler(object):
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
newly_joined_rooms = [] newly_joined_rooms = []
archived = [] room_entries = []
invited = [] invited = []
for room_id, events in mem_change_events_by_room_id.items(): for room_id, events in mem_change_events_by_room_id.items():
non_joins = [e for e in events if e.membership != Membership.JOIN] non_joins = [e for e in events if e.membership != Membership.JOIN]
@ -759,8 +749,9 @@ class SyncHandler(object):
if since_token and since_token.is_after(leave_token): if since_token and since_token.is_after(leave_token):
continue continue
archived.append(RoomSyncResultBuilder( room_entries.append(RoomSyncResultBuilder(
room_id=room_id, room_id=room_id,
rtype="archived",
events=None, events=None,
newly_joined=room_id in newly_joined_rooms, newly_joined=room_id in newly_joined_rooms,
full_state=False, full_state=False,
@ -778,7 +769,6 @@ class SyncHandler(object):
limit=timeline_limit + 1, limit=timeline_limit + 1,
) )
joined = []
# We loop through all room ids, even if there are no new events, in case # We loop through all room ids, even if there are no new events, in case
# there are non room events taht we need to notify about. # there are non room events taht we need to notify about.
for room_id in joined_room_ids: for room_id in joined_room_ids:
@ -789,8 +779,9 @@ class SyncHandler(object):
prev_batch_token = now_token.copy_and_replace("room_key", start_key) prev_batch_token = now_token.copy_and_replace("room_key", start_key)
joined.append(RoomSyncResultBuilder( room_entries.append(RoomSyncResultBuilder(
room_id=room_id, room_id=room_id,
rtype="joined",
events=events, events=events,
newly_joined=room_id in newly_joined_rooms, newly_joined=room_id in newly_joined_rooms,
full_state=False, full_state=False,
@ -798,8 +789,9 @@ class SyncHandler(object):
upto_token=prev_batch_token, upto_token=prev_batch_token,
)) ))
else: else:
joined.append(RoomSyncResultBuilder( room_entries.append(RoomSyncResultBuilder(
room_id=room_id, room_id=room_id,
rtype="joined",
events=[], events=[],
newly_joined=room_id in newly_joined_rooms, newly_joined=room_id in newly_joined_rooms,
full_state=False, full_state=False,
@ -807,7 +799,7 @@ class SyncHandler(object):
upto_token=since_token, upto_token=since_token,
)) ))
defer.returnValue((joined, invited, archived, newly_joined_rooms)) defer.returnValue((room_entries, invited, newly_joined_rooms))
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_all_rooms(self, sync_result_builer, ignored_users): def _get_all_rooms(self, sync_result_builer, ignored_users):
@ -825,14 +817,14 @@ class SyncHandler(object):
membership_list=membership_list membership_list=membership_list
) )
joined = [] room_entries = []
invited = [] invited = []
archived = []
for event in room_list: for event in room_list:
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
joined.append(RoomSyncResultBuilder( room_entries.append(RoomSyncResultBuilder(
room_id=event.room_id, room_id=event.room_id,
rtype="joined",
events=None, events=None,
newly_joined=False, newly_joined=False,
full_state=True, full_state=True,
@ -857,8 +849,9 @@ class SyncHandler(object):
leave_token = now_token.copy_and_replace( leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,) "room_key", "s%d" % (event.stream_ordering,)
) )
archived.append(RoomSyncResultBuilder( room_entries.append(RoomSyncResultBuilder(
room_id=event.room_id, room_id=event.room_id,
rtype="archived",
events=None, events=None,
newly_joined=False, newly_joined=False,
full_state=True, full_state=True,
@ -866,10 +859,10 @@ class SyncHandler(object):
upto_token=leave_token, upto_token=leave_token,
)) ))
defer.returnValue((joined, invited, archived, [])) defer.returnValue((room_entries, invited, []))
@defer.inlineCallbacks @defer.inlineCallbacks
def _generate_room_entry(self, room_type, sync_result_builer, ignored_users, def _generate_room_entry(self, sync_result_builer, ignored_users,
room_builder, ephemeral, tags, account_data, room_builder, ephemeral, tags, account_data,
always_include=False): always_include=False):
since_token = sync_result_builer.since_token since_token = sync_result_builer.since_token
@ -892,7 +885,7 @@ class SyncHandler(object):
now_token=upto_token, now_token=upto_token,
since_token=since_token, since_token=since_token,
recents=events, recents=events,
newly_joined_room=newly_joined, # FIXME newly_joined_room=newly_joined,
) )
account_data_events = [] account_data_events = []
@ -922,7 +915,7 @@ class SyncHandler(object):
full_state=full_state full_state=full_state
) )
if room_type == "joined": if room_builder.rtype == "joined":
unread_notifications = {} unread_notifications = {}
room_sync = JoinedSyncResult( room_sync = JoinedSyncResult(
room_id=room_id, room_id=room_id,
@ -943,7 +936,7 @@ class SyncHandler(object):
unread_notifications["highlight_count"] = notifs["highlight_count"] unread_notifications["highlight_count"] = notifs["highlight_count"]
sync_result_builer.joined.append(room_sync) sync_result_builer.joined.append(room_sync)
elif room_type == "archived": elif room_builder.rtype == "archived":
room_sync = ArchivedSyncResult( room_sync = ArchivedSyncResult(
room_id=room_id, room_id=room_id,
timeline=batch, timeline=batch,
@ -952,6 +945,8 @@ class SyncHandler(object):
) )
if room_sync or always_include: if room_sync or always_include:
sync_result_builer.archived.append(room_sync) sync_result_builer.archived.append(room_sync)
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
def _action_has_highlight(actions): def _action_has_highlight(actions):
@ -1017,9 +1012,10 @@ class SyncResultBuilder(object):
class RoomSyncResultBuilder(object): class RoomSyncResultBuilder(object):
def __init__(self, room_id, events, newly_joined, full_state, since_token, def __init__(self, room_id, rtype, events, newly_joined, full_state,
upto_token): since_token, upto_token):
self.room_id = room_id self.room_id = room_id
self.rtype = rtype
self.events = events self.events = events
self.newly_joined = newly_joined self.newly_joined = newly_joined
self.full_state = full_state self.full_state = full_state