diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 1925a48039..5b3df6932b 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -149,7 +149,8 @@ class ReceiptsHandler(BaseHandler): """Gets all receipts for a room, upto the given key. """ result = yield self.store.get_linearized_receipts_for_room( - room_id, None, to_key + room_id, + to_key=to_key, ) if not result: @@ -176,7 +177,9 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] events = yield self.store.get_linearized_receipts_for_rooms( - rooms, from_key, to_key + rooms, + from_key=from_key, + to_key=to_key, ) defer.returnValue((events, to_key)) @@ -196,7 +199,9 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] events = yield self.store.get_linearized_receipts_for_rooms( - rooms, from_key, to_key + rooms, + from_key=from_key, + to_key=to_key, ) defer.returnValue((events, to_key)) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 56b9fedfd8..d515a0a15c 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -34,8 +34,17 @@ class ReceiptsStore(SQLBaseStore): self._receipts_stream_cache = _RoomStreamChangeCache() @defer.inlineCallbacks - def get_linearized_receipts_for_rooms(self, room_ids, from_key, to_key): + def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): """Get receipts for multiple rooms for sending to clients. + + Args: + room_ids (list): List of room_ids. + to_key (int): Max stream id to fetch receipts upto. + from_key (int): Min stream id to fetch receipts from. None fetches + from the start. + + Returns: + list: A list of receipts. """ room_ids = set(room_ids) @@ -46,7 +55,9 @@ class ReceiptsStore(SQLBaseStore): results = yield defer.gatherResults( [ - self.get_linearized_receipts_for_room(room_id, from_key, to_key) + self.get_linearized_receipts_for_room( + room_id, to_key, from_key=from_key + ) for room_id in room_ids ], consumeErrors=True, @@ -55,8 +66,17 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue([ev for res in results for ev in res]) @defer.inlineCallbacks - def get_linearized_receipts_for_room(self, room_id, from_key, to_key): + def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. + + Args: + room_ids (str): The room id. + to_key (int): Max stream id to fetch receipts upto. + from_key (int): Min stream id to fetch receipts from. None fetches + from the start. + + Returns: + list: A list of receipts. """ def f(txn): if from_key: @@ -288,6 +308,9 @@ class _RoomStreamChangeCache(object): @defer.inlineCallbacks def get_rooms_changed(self, store, room_ids, key): + """Returns subset of room ids that have had new receipts since the + given key. If the key is too old it will just return the given list. + """ if key > (yield self._get_earliest_key(store)): keys = self._cache.keys() i = keys.bisect_right(key) @@ -302,6 +325,8 @@ class _RoomStreamChangeCache(object): @defer.inlineCallbacks def room_has_changed(self, store, room_id, key): + """Informs the cache that the room has been changed at the given key. + """ if key > (yield self._get_earliest_key(store)): old_key = self._room_to_key.get(room_id, None) if old_key: