Merge pull request #717 from matrix-org/erikj/backfill_state

Check if we've already backfilled events
This commit is contained in:
Erik Johnston 2016-04-12 13:30:30 +01:00
commit 318cb1f207
2 changed files with 59 additions and 15 deletions

View File

@ -280,6 +280,10 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def backfill(self, dest, room_id, limit, extremities=[]): def backfill(self, dest, room_id, limit, extremities=[]):
""" Trigger a backfill request to `dest` for the given `room_id` """ Trigger a backfill request to `dest` for the given `room_id`
This will attempt to get more events from the remote. This may return
be successfull and still return no events if the other side has no new
events to offer.
""" """
if dest == self.server_name: if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.") raise SynapseError(400, "Can't backfill from self.")
@ -294,6 +298,16 @@ class FederationHandler(BaseHandler):
extremities=extremities, extremities=extremities,
) )
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
set(e.event_id for e in events)
)
events = [e for e in events if e.event_id not in seen_events]
if not events:
defer.returnValue([])
event_map = {e.event_id: e for e in events} event_map = {e.event_id: e for e in events}
event_ids = set(e.event_id for e in events) event_ids = set(e.event_id for e in events)
@ -353,6 +367,7 @@ class FederationHandler(BaseHandler):
for a in auth_events.values(): for a in auth_events.values():
if a.event_id in seen_events: if a.event_id in seen_events:
continue continue
a.internal_metadata.outlier = True
ev_infos.append({ ev_infos.append({
"event": a, "event": a,
"auth_events": { "auth_events": {
@ -373,19 +388,22 @@ class FederationHandler(BaseHandler):
} }
}) })
yield self._handle_new_events(
dest, ev_infos,
backfilled=True,
)
events.sort(key=lambda e: e.depth) events.sort(key=lambda e: e.depth)
for event in events: for event in events:
if event in events_to_state: if event in events_to_state:
continue continue
ev_infos.append({ # We store these one at a time since each event depends on the
"event": event, # previous to work out the state.
}) # TODO: We can probably do something more clever here.
yield self._handle_new_event(
yield self._handle_new_events( dest, event
dest, ev_infos,
backfilled=True,
) )
defer.returnValue(events) defer.returnValue(events)
@ -458,11 +476,15 @@ class FederationHandler(BaseHandler):
# TODO: Should we try multiple of these at a time? # TODO: Should we try multiple of these at a time?
for dom in domains: for dom in domains:
try: try:
events = yield self.backfill( yield self.backfill(
dom, room_id, dom, room_id,
limit=100, limit=100,
extremities=[e for e in extremities.keys()] extremities=[e for e in extremities.keys()]
) )
# If this succeeded then we probably already have the
# appropriate stuff.
# TODO: We can probably do something more intelligent here.
defer.returnValue(True)
except SynapseError as e: except SynapseError as e:
logger.info( logger.info(
"Failed to backfill from %s because %s", "Failed to backfill from %s because %s",
@ -488,8 +510,6 @@ class FederationHandler(BaseHandler):
) )
continue continue
if events:
defer.returnValue(True)
defer.returnValue(False) defer.returnValue(False)
success = yield try_backfill(likely_domains) success = yield try_backfill(likely_domains)
@ -1076,7 +1096,8 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def _handle_new_event(self, origin, event, state=None, auth_events=None): def _handle_new_event(self, origin, event, state=None, auth_events=None,
backfilled=False):
context = yield self._prep_event( context = yield self._prep_event(
origin, event, origin, event,
state=state, state=state,
@ -1092,6 +1113,7 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event( event_stream_id, max_stream_id = yield self.store.persist_event(
event, event,
context=context, context=context,
backfilled=backfilled,
) )
# this intentionally does not yield: we don't care about the result # this intentionally does not yield: we don't care about the result
@ -1104,6 +1126,11 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False): def _handle_new_events(self, origin, event_infos, backfilled=False):
"""Creates the appropriate contexts and persists events. The events
should not depend on one another, e.g. this should be used to persist
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
"""
contexts = yield defer.gatherResults( contexts = yield defer.gatherResults(
[ [
self._prep_event( self._prep_event(

View File

@ -118,7 +118,7 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def persist_event(self, event, context, current_state=None): def persist_event(self, event, context, current_state=None, backfilled=False):
try: try:
with self._stream_id_gen.get_next() as stream_ordering: with self._stream_id_gen.get_next() as stream_ordering:
@ -131,6 +131,7 @@ class EventsStore(SQLBaseStore):
event=event, event=event,
context=context, context=context,
current_state=current_state, current_state=current_state,
backfilled=backfilled,
) )
except _RollbackButIsFineException: except _RollbackButIsFineException:
pass pass
@ -195,7 +196,7 @@ class EventsStore(SQLBaseStore):
defer.returnValue({e.event_id: e for e in events}) defer.returnValue({e.event_id: e for e in events})
@log_function @log_function
def _persist_event_txn(self, txn, event, context, current_state): def _persist_event_txn(self, txn, event, context, current_state, backfilled=False):
# We purposefully do this first since if we include a `current_state` # We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table # key, we *want* to update the `current_state_events` table
if current_state: if current_state:
@ -238,7 +239,7 @@ class EventsStore(SQLBaseStore):
return self._persist_events_txn( return self._persist_events_txn(
txn, txn,
[(event, context)], [(event, context)],
backfilled=False, backfilled=backfilled,
) )
@log_function @log_function
@ -543,6 +544,22 @@ class EventsStore(SQLBaseStore):
(event.event_id, event.redacts) (event.event_id, event.redacts)
) )
@defer.inlineCallbacks
def have_events_in_timeline(self, event_ids):
"""Given a list of event ids, check if we have already processed and
stored them as non outliers.
"""
rows = yield self._simple_select_many_batch(
table="events",
retcols=("event_id",),
column="event_id",
iterable=list(event_ids),
keyvalues={"outlier": False},
desc="have_events_in_timeline",
)
defer.returnValue(set(r["event_id"] for r in rows))
def have_events(self, event_ids): def have_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them. """Given a list of event ids, check if we have already processed them.