Measure some /sync related things
This commit is contained in:
parent
31a2b892d8
commit
6c558ee8bc
|
@ -19,6 +19,7 @@ from synapse.streams.config import PaginationConfig
|
|||
from synapse.api.constants import Membership, EventTypes
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
|
@ -368,50 +369,51 @@ class SyncHandler(BaseHandler):
|
|||
typing events for that room.
|
||||
"""
|
||||
|
||||
typing_key = since_token.typing_key if since_token else "0"
|
||||
with Measure(self.clock, "ephemeral_by_room"):
|
||||
typing_key = since_token.typing_key if since_token else "0"
|
||||
|
||||
rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
|
||||
room_ids = [room.room_id for room in rooms]
|
||||
rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
|
||||
room_ids = [room.room_id for room in rooms]
|
||||
|
||||
typing_source = self.event_sources.sources["typing"]
|
||||
typing, typing_key = yield typing_source.get_new_events(
|
||||
user=sync_config.user,
|
||||
from_key=typing_key,
|
||||
limit=sync_config.filter_collection.ephemeral_limit(),
|
||||
room_ids=room_ids,
|
||||
is_guest=sync_config.is_guest,
|
||||
)
|
||||
now_token = now_token.copy_and_replace("typing_key", typing_key)
|
||||
typing_source = self.event_sources.sources["typing"]
|
||||
typing, typing_key = yield typing_source.get_new_events(
|
||||
user=sync_config.user,
|
||||
from_key=typing_key,
|
||||
limit=sync_config.filter_collection.ephemeral_limit(),
|
||||
room_ids=room_ids,
|
||||
is_guest=sync_config.is_guest,
|
||||
)
|
||||
now_token = now_token.copy_and_replace("typing_key", typing_key)
|
||||
|
||||
ephemeral_by_room = {}
|
||||
ephemeral_by_room = {}
|
||||
|
||||
for event in typing:
|
||||
# we want to exclude the room_id from the event, but modifying the
|
||||
# result returned by the event source is poor form (it might cache
|
||||
# the object)
|
||||
room_id = event["room_id"]
|
||||
event_copy = {k: v for (k, v) in event.iteritems()
|
||||
if k != "room_id"}
|
||||
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
|
||||
for event in typing:
|
||||
# we want to exclude the room_id from the event, but modifying the
|
||||
# result returned by the event source is poor form (it might cache
|
||||
# the object)
|
||||
room_id = event["room_id"]
|
||||
event_copy = {k: v for (k, v) in event.iteritems()
|
||||
if k != "room_id"}
|
||||
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
|
||||
|
||||
receipt_key = since_token.receipt_key if since_token else "0"
|
||||
receipt_key = since_token.receipt_key if since_token else "0"
|
||||
|
||||
receipt_source = self.event_sources.sources["receipt"]
|
||||
receipts, receipt_key = yield receipt_source.get_new_events(
|
||||
user=sync_config.user,
|
||||
from_key=receipt_key,
|
||||
limit=sync_config.filter_collection.ephemeral_limit(),
|
||||
room_ids=room_ids,
|
||||
is_guest=sync_config.is_guest,
|
||||
)
|
||||
now_token = now_token.copy_and_replace("receipt_key", receipt_key)
|
||||
receipt_source = self.event_sources.sources["receipt"]
|
||||
receipts, receipt_key = yield receipt_source.get_new_events(
|
||||
user=sync_config.user,
|
||||
from_key=receipt_key,
|
||||
limit=sync_config.filter_collection.ephemeral_limit(),
|
||||
room_ids=room_ids,
|
||||
is_guest=sync_config.is_guest,
|
||||
)
|
||||
now_token = now_token.copy_and_replace("receipt_key", receipt_key)
|
||||
|
||||
for event in receipts:
|
||||
room_id = event["room_id"]
|
||||
# exclude room id, as above
|
||||
event_copy = {k: v for (k, v) in event.iteritems()
|
||||
if k != "room_id"}
|
||||
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
|
||||
for event in receipts:
|
||||
room_id = event["room_id"]
|
||||
# exclude room id, as above
|
||||
event_copy = {k: v for (k, v) in event.iteritems()
|
||||
if k != "room_id"}
|
||||
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
|
||||
|
||||
defer.returnValue((now_token, ephemeral_by_room))
|
||||
|
||||
|
@ -619,58 +621,61 @@ class SyncHandler(BaseHandler):
|
|||
"""
|
||||
:returns a Deferred TimelineBatch
|
||||
"""
|
||||
filtering_factor = 2
|
||||
timeline_limit = sync_config.filter_collection.timeline_limit()
|
||||
load_limit = max(timeline_limit * filtering_factor, 10)
|
||||
max_repeat = 5 # Only try a few times per room, otherwise
|
||||
room_key = now_token.room_key
|
||||
end_key = room_key
|
||||
with Measure(self.clock, "load_filtered_recents"):
|
||||
filtering_factor = 2
|
||||
timeline_limit = sync_config.filter_collection.timeline_limit()
|
||||
load_limit = max(timeline_limit * filtering_factor, 10)
|
||||
max_repeat = 5 # Only try a few times per room, otherwise
|
||||
room_key = now_token.room_key
|
||||
end_key = room_key
|
||||
|
||||
limited = recents is None or newly_joined_room or timeline_limit < len(recents)
|
||||
limited = recents is None or newly_joined_room or timeline_limit < len(recents)
|
||||
|
||||
if recents is not None:
|
||||
recents = sync_config.filter_collection.filter_room_timeline(recents)
|
||||
recents = yield self._filter_events_for_client(
|
||||
sync_config.user.to_string(),
|
||||
recents,
|
||||
is_peeking=sync_config.is_guest,
|
||||
if recents is not None:
|
||||
recents = sync_config.filter_collection.filter_room_timeline(recents)
|
||||
recents = yield self._filter_events_for_client(
|
||||
sync_config.user.to_string(),
|
||||
recents,
|
||||
is_peeking=sync_config.is_guest,
|
||||
)
|
||||
else:
|
||||
recents = []
|
||||
|
||||
since_key = None
|
||||
if since_token and not newly_joined_room:
|
||||
since_key = since_token.room_key
|
||||
|
||||
while limited and len(recents) < timeline_limit and max_repeat:
|
||||
events, end_key = yield self.store.get_room_events_stream_for_room(
|
||||
room_id,
|
||||
limit=load_limit + 1,
|
||||
from_key=since_key,
|
||||
to_key=end_key,
|
||||
)
|
||||
loaded_recents = sync_config.filter_collection.filter_room_timeline(
|
||||
events
|
||||
)
|
||||
loaded_recents = yield self._filter_events_for_client(
|
||||
sync_config.user.to_string(),
|
||||
loaded_recents,
|
||||
is_peeking=sync_config.is_guest,
|
||||
)
|
||||
loaded_recents.extend(recents)
|
||||
recents = loaded_recents
|
||||
|
||||
if len(events) <= load_limit:
|
||||
limited = False
|
||||
break
|
||||
max_repeat -= 1
|
||||
|
||||
if len(recents) > timeline_limit:
|
||||
limited = True
|
||||
recents = recents[-timeline_limit:]
|
||||
room_key = recents[0].internal_metadata.before
|
||||
|
||||
prev_batch_token = now_token.copy_and_replace(
|
||||
"room_key", room_key
|
||||
)
|
||||
else:
|
||||
recents = []
|
||||
|
||||
since_key = None
|
||||
if since_token and not newly_joined_room:
|
||||
since_key = since_token.room_key
|
||||
|
||||
while limited and len(recents) < timeline_limit and max_repeat:
|
||||
events, end_key = yield self.store.get_room_events_stream_for_room(
|
||||
room_id,
|
||||
limit=load_limit + 1,
|
||||
from_key=since_key,
|
||||
to_key=end_key,
|
||||
)
|
||||
loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
|
||||
loaded_recents = yield self._filter_events_for_client(
|
||||
sync_config.user.to_string(),
|
||||
loaded_recents,
|
||||
is_peeking=sync_config.is_guest,
|
||||
)
|
||||
loaded_recents.extend(recents)
|
||||
recents = loaded_recents
|
||||
|
||||
if len(events) <= load_limit:
|
||||
limited = False
|
||||
break
|
||||
max_repeat -= 1
|
||||
|
||||
if len(recents) > timeline_limit:
|
||||
limited = True
|
||||
recents = recents[-timeline_limit:]
|
||||
room_key = recents[0].internal_metadata.before
|
||||
|
||||
prev_batch_token = now_token.copy_and_replace(
|
||||
"room_key", room_key
|
||||
)
|
||||
|
||||
defer.returnValue(TimelineBatch(
|
||||
events=recents,
|
||||
|
@ -831,50 +836,53 @@ class SyncHandler(BaseHandler):
|
|||
# updates even if they occured logically before the previous event.
|
||||
# TODO(mjark) Check for new redactions in the state events.
|
||||
|
||||
if full_state:
|
||||
if batch:
|
||||
state = yield self.store.get_state_for_event(batch.events[0].event_id)
|
||||
else:
|
||||
state = yield self.get_state_at(
|
||||
room_id, stream_position=now_token
|
||||
with Measure(self.clock, "compute_state_delta"):
|
||||
if full_state:
|
||||
if batch:
|
||||
state = yield self.store.get_state_for_event(
|
||||
batch.events[0].event_id
|
||||
)
|
||||
else:
|
||||
state = yield self.get_state_at(
|
||||
room_id, stream_position=now_token
|
||||
)
|
||||
|
||||
timeline_state = {
|
||||
(event.type, event.state_key): event
|
||||
for event in batch.events if event.is_state()
|
||||
}
|
||||
|
||||
state = _calculate_state(
|
||||
timeline_contains=timeline_state,
|
||||
timeline_start=state,
|
||||
previous={},
|
||||
)
|
||||
elif batch.limited:
|
||||
state_at_previous_sync = yield self.get_state_at(
|
||||
room_id, stream_position=since_token
|
||||
)
|
||||
|
||||
timeline_state = {
|
||||
(event.type, event.state_key): event
|
||||
for event in batch.events if event.is_state()
|
||||
}
|
||||
state_at_timeline_start = yield self.store.get_state_for_event(
|
||||
batch.events[0].event_id
|
||||
)
|
||||
|
||||
state = _calculate_state(
|
||||
timeline_contains=timeline_state,
|
||||
timeline_start=state,
|
||||
previous={},
|
||||
)
|
||||
elif batch.limited:
|
||||
state_at_previous_sync = yield self.get_state_at(
|
||||
room_id, stream_position=since_token
|
||||
)
|
||||
timeline_state = {
|
||||
(event.type, event.state_key): event
|
||||
for event in batch.events if event.is_state()
|
||||
}
|
||||
|
||||
state_at_timeline_start = yield self.store.get_state_for_event(
|
||||
batch.events[0].event_id
|
||||
)
|
||||
state = _calculate_state(
|
||||
timeline_contains=timeline_state,
|
||||
timeline_start=state_at_timeline_start,
|
||||
previous=state_at_previous_sync,
|
||||
)
|
||||
else:
|
||||
state = {}
|
||||
|
||||
timeline_state = {
|
||||
(event.type, event.state_key): event
|
||||
for event in batch.events if event.is_state()
|
||||
}
|
||||
|
||||
state = _calculate_state(
|
||||
timeline_contains=timeline_state,
|
||||
timeline_start=state_at_timeline_start,
|
||||
previous=state_at_previous_sync,
|
||||
)
|
||||
else:
|
||||
state = {}
|
||||
|
||||
defer.returnValue({
|
||||
(e.type, e.state_key): e
|
||||
for e in sync_config.filter_collection.filter_room_state(state.values())
|
||||
})
|
||||
defer.returnValue({
|
||||
(e.type, e.state_key): e
|
||||
for e in sync_config.filter_collection.filter_room_state(state.values())
|
||||
})
|
||||
|
||||
def check_joined_room(self, sync_config, state_delta):
|
||||
"""
|
||||
|
@ -896,20 +904,21 @@ class SyncHandler(BaseHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
|
||||
last_unread_event_id = self.last_read_event_id_for_room_and_user(
|
||||
room_id, sync_config.user.to_string(), ephemeral_by_room
|
||||
)
|
||||
|
||||
notifs = []
|
||||
if last_unread_event_id:
|
||||
notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
|
||||
room_id, sync_config.user.to_string(), last_unread_event_id
|
||||
with Measure(self.clock, "unread_notifs_for_room_id"):
|
||||
last_unread_event_id = self.last_read_event_id_for_room_and_user(
|
||||
room_id, sync_config.user.to_string(), ephemeral_by_room
|
||||
)
|
||||
defer.returnValue(notifs)
|
||||
|
||||
# There is no new information in this period, so your notification
|
||||
# count is whatever it was last time.
|
||||
defer.returnValue(None)
|
||||
notifs = []
|
||||
if last_unread_event_id:
|
||||
notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
|
||||
room_id, sync_config.user.to_string(), last_unread_event_id
|
||||
)
|
||||
defer.returnValue(notifs)
|
||||
|
||||
# There is no new information in this period, so your notification
|
||||
# count is whatever it was last time.
|
||||
defer.returnValue(None)
|
||||
|
||||
|
||||
def _action_has_highlight(actions):
|
||||
|
|
|
@ -19,6 +19,7 @@ from ._base import BaseHandler
|
|||
|
||||
from synapse.api.errors import SynapseError, AuthError
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.types import UserID
|
||||
|
||||
import logging
|
||||
|
@ -222,6 +223,7 @@ class TypingNotificationHandler(BaseHandler):
|
|||
class TypingNotificationEventSource(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.clock = hs.get_clock()
|
||||
self._handler = None
|
||||
self._room_member_handler = None
|
||||
|
||||
|
@ -247,19 +249,20 @@ class TypingNotificationEventSource(object):
|
|||
}
|
||||
|
||||
def get_new_events(self, from_key, room_ids, **kwargs):
|
||||
from_key = int(from_key)
|
||||
handler = self.handler()
|
||||
with Measure(self.clock, "typing.get_new_events"):
|
||||
from_key = int(from_key)
|
||||
handler = self.handler()
|
||||
|
||||
events = []
|
||||
for room_id in room_ids:
|
||||
if room_id not in handler._room_serials:
|
||||
continue
|
||||
if handler._room_serials[room_id] <= from_key:
|
||||
continue
|
||||
events = []
|
||||
for room_id in room_ids:
|
||||
if room_id not in handler._room_serials:
|
||||
continue
|
||||
if handler._room_serials[room_id] <= from_key:
|
||||
continue
|
||||
|
||||
events.append(self._make_event_for(room_id))
|
||||
events.append(self._make_event_for(room_id))
|
||||
|
||||
return events, handler._latest_room_serial
|
||||
return events, handler._latest_room_serial
|
||||
|
||||
def get_current_key(self):
|
||||
return self.handler()._latest_room_serial
|
||||
|
|
Loading…
Reference in New Issue