diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 83dab32bcb..a38300ec6a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -294,6 +294,15 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + seen_events = yield self.store.have_events_in_timeline( + set(e.event_id for e in events) + ) + + events = [e for e in events if e.event_id not in seen_events] + + if not events: + defer.returnValue([]) + event_map = {e.event_id: e for e in events} event_ids = set(e.event_id for e in events) @@ -353,6 +362,7 @@ class FederationHandler(BaseHandler): for a in auth_events.values(): if a.event_id in seen_events: continue + a.internal_metadata.outlier = True ev_infos.append({ "event": a, "auth_events": { @@ -373,6 +383,11 @@ class FederationHandler(BaseHandler): } }) + yield self._handle_new_events( + dest, ev_infos, + backfilled=True, + ) + events.sort(key=lambda e: e.depth) for event in events: @@ -383,10 +398,9 @@ class FederationHandler(BaseHandler): "event": event, }) - yield self._handle_new_events( - dest, ev_infos, - backfilled=True, - ) + yield self._handle_new_event( + dest, event + ) defer.returnValue(events) @@ -458,11 +472,12 @@ class FederationHandler(BaseHandler): # TODO: Should we try multiple of these at a time? for dom in domains: try: - events = yield self.backfill( + yield self.backfill( dom, room_id, limit=100, extremities=[e for e in extremities.keys()] ) + defer.returnValue(True) except SynapseError as e: logger.info( "Failed to backfill from %s because %s", @@ -488,8 +503,6 @@ class FederationHandler(BaseHandler): ) continue - if events: - defer.returnValue(True) defer.returnValue(False) success = yield try_backfill(likely_domains) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 308a2c9b02..f847c161af 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -543,6 +543,22 @@ class EventsStore(SQLBaseStore): (event.event_id, event.redacts) ) + @defer.inlineCallbacks + def have_events_in_timeline(self, event_ids): + """Given a list of event ids, check if we have already processed and + stored them as non outliers. + """ + rows = yield self._simple_select_many_batch( + table="events", + retcols=("event_id",), + column="event_id", + iterable=list(event_ids), + keyvalues={"outlier": False}, + desc="have_events_in_timeline", + ) + + defer.returnValue(set(r["event_id"] for r in rows)) + def have_events(self, event_ids): """Given a list of event ids, check if we have already processed them.