Include read receipts in v2 sync

This commit is contained in:
Mark Haines 2015-11-02 17:54:04 +00:00
parent 3eb62873f6
commit 2657140c58
1 changed files with 31 additions and 16 deletions

View File

@ -174,7 +174,7 @@ class SyncHandler(BaseHandler):
""" """
now_token = yield self.event_sources.get_current_token() now_token = yield self.event_sources.get_current_token()
now_token, typing_by_room = yield self.typing_by_room( now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token sync_config, now_token
) )
@ -207,7 +207,7 @@ class SyncHandler(BaseHandler):
sync_config=sync_config, sync_config=sync_config,
now_token=now_token, now_token=now_token,
timeline_since_token=timeline_since_token, timeline_since_token=timeline_since_token,
typing_by_room=typing_by_room ephemeral_by_room=ephemeral_by_room,
) )
joined.append(room_sync) joined.append(room_sync)
elif event.membership == Membership.INVITE: elif event.membership == Membership.INVITE:
@ -240,7 +240,7 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def full_state_sync_for_joined_room(self, room_id, sync_config, def full_state_sync_for_joined_room(self, room_id, sync_config,
now_token, timeline_since_token, now_token, timeline_since_token,
typing_by_room): ephemeral_by_room):
"""Sync a room for a client which is starting without any state """Sync a room for a client which is starting without any state
Returns: Returns:
A Deferred JoinedSyncResult. A Deferred JoinedSyncResult.
@ -259,12 +259,12 @@ class SyncHandler(BaseHandler):
room_id=room_id, room_id=room_id,
timeline=batch, timeline=batch,
state=current_state_events, state=current_state_events,
ephemeral=typing_by_room.get(room_id, []), ephemeral=ephemeral_by_room.get(room_id, []),
)) ))
@defer.inlineCallbacks @defer.inlineCallbacks
def typing_by_room(self, sync_config, now_token, since_token=None): def ephemeral_by_room(self, sync_config, now_token, since_token=None):
"""Get the typing events for each room the user is in """Get the ephemeral events for each room the user is in
Args: Args:
sync_config (SyncConfig): The flags, filters and user for the sync. sync_config (SyncConfig): The flags, filters and user for the sync.
now_token (StreamToken): Where the server is currently up to. now_token (StreamToken): Where the server is currently up to.
@ -286,12 +286,27 @@ class SyncHandler(BaseHandler):
) )
now_token = now_token.copy_and_replace("typing_key", typing_key) now_token = now_token.copy_and_replace("typing_key", typing_key)
typing_by_room = {event["room_id"]: [event] for event in typing} ephemeral_by_room = {}
for event in typing:
event.pop("room_id")
logger.debug("Typing %r", typing_by_room)
defer.returnValue((now_token, typing_by_room)) for event in typing:
room_id = event.pop("room_id")
ephemeral_by_room.setdefault(room_id, []).append(event)
receipt_key = since_token.receipt_key if since_token else "0"
receipt_source = self.event_sources.sources["receipt"]
receipts, receipt_key = yield receipt_source.get_new_events_for_user(
user=sync_config.user,
from_key=receipt_key,
limit=sync_config.filter.ephemeral_limit(),
)
now_token = now_token.copy_and_replace("receipt_key", receipt_key)
for event in receipts:
room_id = event.pop("room_id")
ephemeral_by_room.setdefault(room_id, []).append(event)
defer.returnValue((now_token, ephemeral_by_room))
@defer.inlineCallbacks @defer.inlineCallbacks
def full_state_sync_for_archived_room(self, room_id, sync_config, def full_state_sync_for_archived_room(self, room_id, sync_config,
@ -333,7 +348,7 @@ class SyncHandler(BaseHandler):
) )
now_token = now_token.copy_and_replace("presence_key", presence_key) now_token = now_token.copy_and_replace("presence_key", presence_key)
now_token, typing_by_room = yield self.typing_by_room( now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token, since_token sync_config, now_token, since_token
) )
@ -399,7 +414,7 @@ class SyncHandler(BaseHandler):
limited=limited, limited=limited,
), ),
state=state, state=state,
ephemeral=typing_by_room.get(room_id, []) ephemeral=ephemeral_by_room.get(room_id, [])
) )
if room_sync: if room_sync:
joined.append(room_sync) joined.append(room_sync)
@ -416,7 +431,7 @@ class SyncHandler(BaseHandler):
for room_id in joined_room_ids: for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room( room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token, room_id, sync_config, since_token, now_token,
typing_by_room ephemeral_by_room
) )
if room_sync: if room_sync:
joined.append(room_sync) joined.append(room_sync)
@ -487,7 +502,7 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config, def incremental_sync_with_gap_for_room(self, room_id, sync_config,
since_token, now_token, since_token, now_token,
typing_by_room): ephemeral_by_room):
""" Get the incremental delta needed to bring the client up to date for """ Get the incremental delta needed to bring the client up to date for
the room. Gives the client the most recent events and the changes to the room. Gives the client the most recent events and the changes to
state. state.
@ -528,7 +543,7 @@ class SyncHandler(BaseHandler):
room_id=room_id, room_id=room_id,
timeline=batch, timeline=batch,
state=state_events_delta, state=state_events_delta,
ephemeral=typing_by_room.get(room_id, []) ephemeral=ephemeral_by_room.get(room_id, [])
) )
logging.debug("Room sync: %r", room_sync) logging.debug("Room sync: %r", room_sync)