Use StateResolutionHandler to resolve state in persist events
... and thus benefit (hopefully) from its cache.
This commit is contained in:
parent
225dc3b4cb
commit
ebfe64e3d6
|
@ -27,7 +27,6 @@ from synapse.util.logutils import log_function
|
|||
from synapse.util.metrics import Measure
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.state import resolve_events_with_factory
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.types import get_domain_from_id
|
||||
|
||||
|
@ -237,6 +236,8 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
self._event_persist_queue = _EventPeristenceQueue()
|
||||
|
||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||
|
||||
def persist_events(self, events_and_contexts, backfilled=False):
|
||||
"""
|
||||
Write events to the database
|
||||
|
@ -402,6 +403,7 @@ class EventsStore(SQLBaseStore):
|
|||
"Calculating state delta for room %s", room_id,
|
||||
)
|
||||
current_state = yield self._get_new_state_after_events(
|
||||
room_id,
|
||||
ev_ctx_rm, new_latest_event_ids,
|
||||
)
|
||||
if current_state is not None:
|
||||
|
@ -487,11 +489,14 @@ class EventsStore(SQLBaseStore):
|
|||
defer.returnValue(new_latest_event_ids)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_new_state_after_events(self, events_context, new_latest_event_ids):
|
||||
def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids):
|
||||
"""Calculate the current state dict after adding some new events to
|
||||
a room
|
||||
|
||||
Args:
|
||||
room_id (str):
|
||||
room to which the events are being added. Used for logging etc
|
||||
|
||||
events_context (list[(EventBase, EventContext)]):
|
||||
events and contexts which are being added to the room
|
||||
|
||||
|
@ -507,8 +512,8 @@ class EventsStore(SQLBaseStore):
|
|||
if not new_latest_event_ids:
|
||||
defer.returnValue({})
|
||||
|
||||
state_sets = []
|
||||
state_groups = set()
|
||||
# map from state_group to ((type, key) -> event_id) state map
|
||||
state_groups = {}
|
||||
missing_event_ids = []
|
||||
was_updated = False
|
||||
for event_id in new_latest_event_ids:
|
||||
|
@ -529,11 +534,9 @@ class EventsStore(SQLBaseStore):
|
|||
# If we've already seen the state group don't bother adding
|
||||
# it to the state sets again
|
||||
if ctx.state_group not in state_groups:
|
||||
state_sets.append(ctx.current_state_ids)
|
||||
state_groups[ctx.state_group] = ctx.current_state_ids
|
||||
if ctx.delta_ids or hasattr(ev, "state_key"):
|
||||
was_updated = True
|
||||
# Add this as a seen state group
|
||||
state_groups.add(ctx.state_group)
|
||||
break
|
||||
else:
|
||||
# If we couldn't find it, then we'll need to pull
|
||||
|
@ -550,55 +553,28 @@ class EventsStore(SQLBaseStore):
|
|||
missing_event_ids,
|
||||
)
|
||||
|
||||
groups = set(event_to_groups.itervalues()) - state_groups
|
||||
groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys())
|
||||
|
||||
if groups:
|
||||
group_to_state = yield self._get_state_for_groups(groups)
|
||||
state_sets.extend(group_to_state.itervalues())
|
||||
state_groups.update(group_to_state)
|
||||
|
||||
if len(state_sets) == 1:
|
||||
# If there is only one state set, then we know what the current
|
||||
if len(state_groups) == 1:
|
||||
# If there is only one state group, then we know what the current
|
||||
# state is.
|
||||
defer.returnValue(state_sets[0])
|
||||
defer.returnValue(state_groups.values()[0])
|
||||
|
||||
# We work out the current state by passing the state sets to the
|
||||
# state resolution algorithm. It may ask for some events, including
|
||||
# the events we have yet to persist, so we need a slightly more
|
||||
# complicated event lookup function than simply looking the events
|
||||
# up in the db.
|
||||
|
||||
logger.info(
|
||||
"Resolving state with %i state sets", len(state_sets),
|
||||
)
|
||||
|
||||
events_map = {ev.event_id: ev for ev, _ in events_context}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_events(ev_ids):
|
||||
# We get the events by first looking at the list of events we
|
||||
# are trying to persist, and then fetching the rest from the DB.
|
||||
db = []
|
||||
to_return = {}
|
||||
for ev_id in ev_ids:
|
||||
ev = events_map.get(ev_id, None)
|
||||
if ev:
|
||||
to_return[ev_id] = ev
|
||||
else:
|
||||
db.append(ev_id)
|
||||
|
||||
if db:
|
||||
evs = yield self.get_events(
|
||||
ev_ids, get_prev_content=False, check_redacted=False,
|
||||
)
|
||||
to_return.update(evs)
|
||||
defer.returnValue(to_return)
|
||||
|
||||
current_state = yield resolve_events_with_factory(
|
||||
state_sets,
|
||||
event_map={},
|
||||
state_map_factory=get_events,
|
||||
return self.get_events(
|
||||
ev_ids, get_prev_content=False, check_redacted=False,
|
||||
)
|
||||
events_map = {ev.event_id: ev for ev, _ in events_context}
|
||||
logger.debug("calling resolve_state_groups from preserve_events")
|
||||
res = yield self._state_resolution_handler.resolve_state_groups(
|
||||
room_id, state_groups, events_map, get_events
|
||||
)
|
||||
defer.returnValue(current_state)
|
||||
|
||||
defer.returnValue(res.state)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _calculate_state_delta(self, room_id, current_state):
|
||||
|
|
Loading…
Reference in New Issue