Merge pull request #975 from matrix-org/erikj/multi_event_persist
Ensure we only persist an event once at a time
This commit is contained in:
commit
f5da3bacb2
|
@ -26,7 +26,7 @@ from synapse.api.constants import EventTypes
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
|
|
||||||
from canonicaljson import encode_canonical_json
|
from canonicaljson import encode_canonical_json
|
||||||
from collections import deque, namedtuple
|
from collections import deque, namedtuple, OrderedDict
|
||||||
|
|
||||||
import synapse
|
import synapse
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
@ -403,6 +403,23 @@ class EventsStore(SQLBaseStore):
|
||||||
and the rejections table. Things reading from those table will need to check
|
and the rejections table. Things reading from those table will need to check
|
||||||
whether the event was rejected.
|
whether the event was rejected.
|
||||||
"""
|
"""
|
||||||
|
# Ensure that we don't have the same event twice.
|
||||||
|
# Pick the earliest non-outlier if there is one, else the earliest one.
|
||||||
|
new_events_and_contexts = OrderedDict()
|
||||||
|
for event, context in events_and_contexts:
|
||||||
|
prev_event_context = new_events_and_contexts.get(event.event_id)
|
||||||
|
if prev_event_context:
|
||||||
|
if not event.internal_metadata.is_outlier():
|
||||||
|
if prev_event_context[0].internal_metadata.is_outlier():
|
||||||
|
# To ensure correct ordering we pop, as OrderedDict is
|
||||||
|
# ordered by first insertion.
|
||||||
|
new_events_and_contexts.pop(event.event_id, None)
|
||||||
|
new_events_and_contexts[event.event_id] = (event, context)
|
||||||
|
else:
|
||||||
|
new_events_and_contexts[event.event_id] = (event, context)
|
||||||
|
|
||||||
|
events_and_contexts = new_events_and_contexts.values()
|
||||||
|
|
||||||
depth_updates = {}
|
depth_updates = {}
|
||||||
for event, context in events_and_contexts:
|
for event, context in events_and_contexts:
|
||||||
# Remove the any existing cache entries for the event_ids
|
# Remove the any existing cache entries for the event_ids
|
||||||
|
@ -433,8 +450,6 @@ class EventsStore(SQLBaseStore):
|
||||||
for event_id, outlier in txn.fetchall()
|
for event_id, outlier in txn.fetchall()
|
||||||
}
|
}
|
||||||
|
|
||||||
# Remove the events that we've seen before.
|
|
||||||
event_map = {}
|
|
||||||
to_remove = set()
|
to_remove = set()
|
||||||
for event, context in events_and_contexts:
|
for event, context in events_and_contexts:
|
||||||
if context.rejected:
|
if context.rejected:
|
||||||
|
@ -445,23 +460,6 @@ class EventsStore(SQLBaseStore):
|
||||||
to_remove.add(event)
|
to_remove.add(event)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Handle the case of the list including the same event multiple
|
|
||||||
# times. The tricky thing here is when they differ by whether
|
|
||||||
# they are an outlier.
|
|
||||||
if event.event_id in event_map:
|
|
||||||
other = event_map[event.event_id]
|
|
||||||
|
|
||||||
if not other.internal_metadata.is_outlier():
|
|
||||||
to_remove.add(event)
|
|
||||||
continue
|
|
||||||
elif not event.internal_metadata.is_outlier():
|
|
||||||
to_remove.add(event)
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
to_remove.add(other)
|
|
||||||
|
|
||||||
event_map[event.event_id] = event
|
|
||||||
|
|
||||||
if event.event_id not in have_persisted:
|
if event.event_id not in have_persisted:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue