Improve performance of getting typing updates for replication
Fetching the list of all new typing notifications involved iterating over all rooms and comparing their serial. Lets move to using a stream change cache, like we do for other streams.
This commit is contained in:
parent
a6cf7d9d9a
commit
5f02017aea
|
@ -20,6 +20,7 @@ from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import AuthError, SynapseError
|
from synapse.api.errors import AuthError, SynapseError
|
||||||
from synapse.types import UserID, get_domain_from_id
|
from synapse.types import UserID, get_domain_from_id
|
||||||
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
from synapse.util.logcontext import run_in_background
|
from synapse.util.logcontext import run_in_background
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
from synapse.util.wheel_timer import WheelTimer
|
from synapse.util.wheel_timer import WheelTimer
|
||||||
|
@ -68,6 +69,11 @@ class TypingHandler(object):
|
||||||
# map room IDs to sets of users currently typing
|
# map room IDs to sets of users currently typing
|
||||||
self._room_typing = {}
|
self._room_typing = {}
|
||||||
|
|
||||||
|
# caches which room_ids changed at which serials
|
||||||
|
self._typing_stream_change_cache = StreamChangeCache(
|
||||||
|
"TypingStreamChangeCache", self._latest_room_serial,
|
||||||
|
)
|
||||||
|
|
||||||
self.clock.looping_call(
|
self.clock.looping_call(
|
||||||
self._handle_timeouts,
|
self._handle_timeouts,
|
||||||
5000,
|
5000,
|
||||||
|
@ -274,19 +280,29 @@ class TypingHandler(object):
|
||||||
|
|
||||||
self._latest_room_serial += 1
|
self._latest_room_serial += 1
|
||||||
self._room_serials[member.room_id] = self._latest_room_serial
|
self._room_serials[member.room_id] = self._latest_room_serial
|
||||||
|
self._typing_stream_change_cache.entity_has_changed(
|
||||||
|
member.room_id, self._latest_room_serial,
|
||||||
|
)
|
||||||
|
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
"typing_key", self._latest_room_serial, rooms=[member.room_id]
|
"typing_key", self._latest_room_serial, rooms=[member.room_id]
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_all_typing_updates(self, last_id, current_id):
|
def get_all_typing_updates(self, last_id, current_id):
|
||||||
# TODO: Work out a way to do this without scanning the entire state.
|
|
||||||
if last_id == current_id:
|
if last_id == current_id:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
|
||||||
|
last_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if changed_rooms is None:
|
||||||
|
changed_rooms = self._room_serials
|
||||||
|
|
||||||
rows = []
|
rows = []
|
||||||
for room_id, serial in self._room_serials.items():
|
for room_id in changed_rooms:
|
||||||
if last_id < serial and serial <= current_id:
|
serial = self._room_serials[room_id]
|
||||||
|
if last_id < serial <= current_id:
|
||||||
typing = self._room_typing[room_id]
|
typing = self._room_typing[room_id]
|
||||||
rows.append((serial, room_id, list(typing)))
|
rows.append((serial, room_id, list(typing)))
|
||||||
rows.sort()
|
rows.sort()
|
||||||
|
|
Loading…
Reference in New Issue