Merge pull request #341 from matrix-org/markjh/v2_sync_receipts

Include read receipts in v2 sync
This commit is contained in:
Mark Haines 2015-11-03 18:45:13 +00:00
commit f74f48e9e6
1 changed files with 31 additions and 16 deletions

View File

@ -185,7 +185,7 @@ class SyncHandler(BaseHandler):
""" """
now_token = yield self.event_sources.get_current_token() now_token = yield self.event_sources.get_current_token()
now_token, typing_by_room = yield self.typing_by_room( now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token sync_config, now_token
) )
@ -222,7 +222,7 @@ class SyncHandler(BaseHandler):
sync_config=sync_config, sync_config=sync_config,
now_token=now_token, now_token=now_token,
timeline_since_token=timeline_since_token, timeline_since_token=timeline_since_token,
typing_by_room=typing_by_room, ephemeral_by_room=ephemeral_by_room,
tags_by_room=tags_by_room, tags_by_room=tags_by_room,
) )
joined.append(room_sync) joined.append(room_sync)
@ -257,7 +257,7 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def full_state_sync_for_joined_room(self, room_id, sync_config, def full_state_sync_for_joined_room(self, room_id, sync_config,
now_token, timeline_since_token, now_token, timeline_since_token,
typing_by_room, tags_by_room): ephemeral_by_room, tags_by_room):
"""Sync a room for a client which is starting without any state """Sync a room for a client which is starting without any state
Returns: Returns:
A Deferred JoinedSyncResult. A Deferred JoinedSyncResult.
@ -276,7 +276,7 @@ class SyncHandler(BaseHandler):
room_id=room_id, room_id=room_id,
timeline=batch, timeline=batch,
state=current_state_events, state=current_state_events,
ephemeral=typing_by_room.get(room_id, []), ephemeral=ephemeral_by_room.get(room_id, []),
private_user_data=self.private_user_data_for_room( private_user_data=self.private_user_data_for_room(
room_id, tags_by_room room_id, tags_by_room
), ),
@ -293,8 +293,8 @@ class SyncHandler(BaseHandler):
return private_user_data return private_user_data
@defer.inlineCallbacks @defer.inlineCallbacks
def typing_by_room(self, sync_config, now_token, since_token=None): def ephemeral_by_room(self, sync_config, now_token, since_token=None):
"""Get the typing events for each room the user is in """Get the ephemeral events for each room the user is in
Args: Args:
sync_config (SyncConfig): The flags, filters and user for the sync. sync_config (SyncConfig): The flags, filters and user for the sync.
now_token (StreamToken): Where the server is currently up to. now_token (StreamToken): Where the server is currently up to.
@ -316,12 +316,27 @@ class SyncHandler(BaseHandler):
) )
now_token = now_token.copy_and_replace("typing_key", typing_key) now_token = now_token.copy_and_replace("typing_key", typing_key)
typing_by_room = {event["room_id"]: [event] for event in typing} ephemeral_by_room = {}
for event in typing:
event.pop("room_id")
logger.debug("Typing %r", typing_by_room)
defer.returnValue((now_token, typing_by_room)) for event in typing:
room_id = event.pop("room_id")
ephemeral_by_room.setdefault(room_id, []).append(event)
receipt_key = since_token.receipt_key if since_token else "0"
receipt_source = self.event_sources.sources["receipt"]
receipts, receipt_key = yield receipt_source.get_new_events_for_user(
user=sync_config.user,
from_key=receipt_key,
limit=sync_config.filter.ephemeral_limit(),
)
now_token = now_token.copy_and_replace("receipt_key", receipt_key)
for event in receipts:
room_id = event.pop("room_id")
ephemeral_by_room.setdefault(room_id, []).append(event)
defer.returnValue((now_token, ephemeral_by_room))
@defer.inlineCallbacks @defer.inlineCallbacks
def full_state_sync_for_archived_room(self, room_id, sync_config, def full_state_sync_for_archived_room(self, room_id, sync_config,
@ -366,7 +381,7 @@ class SyncHandler(BaseHandler):
) )
now_token = now_token.copy_and_replace("presence_key", presence_key) now_token = now_token.copy_and_replace("presence_key", presence_key)
now_token, typing_by_room = yield self.typing_by_room( now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token, since_token sync_config, now_token, since_token
) )
@ -437,7 +452,7 @@ class SyncHandler(BaseHandler):
limited=limited, limited=limited,
), ),
state=state, state=state,
ephemeral=typing_by_room.get(room_id, []), ephemeral=ephemeral_by_room.get(room_id, []),
private_user_data=self.private_user_data_for_room( private_user_data=self.private_user_data_for_room(
room_id, tags_by_room room_id, tags_by_room
), ),
@ -457,7 +472,7 @@ class SyncHandler(BaseHandler):
for room_id in joined_room_ids: for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room( room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token, room_id, sync_config, since_token, now_token,
typing_by_room, tags_by_room ephemeral_by_room, tags_by_room
) )
if room_sync: if room_sync:
joined.append(room_sync) joined.append(room_sync)
@ -528,7 +543,7 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config, def incremental_sync_with_gap_for_room(self, room_id, sync_config,
since_token, now_token, since_token, now_token,
typing_by_room, tags_by_room): ephemeral_by_room, tags_by_room):
""" Get the incremental delta needed to bring the client up to date for """ Get the incremental delta needed to bring the client up to date for
the room. Gives the client the most recent events and the changes to the room. Gives the client the most recent events and the changes to
state. state.
@ -569,7 +584,7 @@ class SyncHandler(BaseHandler):
room_id=room_id, room_id=room_id,
timeline=batch, timeline=batch,
state=state_events_delta, state=state_events_delta,
ephemeral=typing_by_room.get(room_id, []), ephemeral=ephemeral_by_room.get(room_id, []),
private_user_data=self.private_user_data_for_room( private_user_data=self.private_user_data_for_room(
room_id, tags_by_room room_id, tags_by_room
), ),