Don't add rejected events if we've seen them befrore. Add some comments to explain what the code is doing mechanically
This commit is contained in:
parent
1b3c3e6d68
commit
efeb6176c1
|
@ -397,6 +397,12 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
@log_function
|
||||
def _persist_events_txn(self, txn, events_and_contexts, backfilled):
|
||||
"""Insert some number of room events into the necessary database tables.
|
||||
|
||||
Rejected events are only inserted into the events table, the events_json table,
|
||||
and the rejections table. Things reading from those table will need to check
|
||||
whether the event was rejected.
|
||||
"""
|
||||
depth_updates = {}
|
||||
for event, context in events_and_contexts:
|
||||
# Remove the any existing cache entries for the event_ids
|
||||
|
@ -427,15 +433,21 @@ class EventsStore(SQLBaseStore):
|
|||
for event_id, outlier in txn.fetchall()
|
||||
}
|
||||
|
||||
# Remove the events that we've seen before.
|
||||
event_map = {}
|
||||
to_remove = set()
|
||||
for event, context in events_and_contexts:
|
||||
if context.rejected:
|
||||
# If the event is rejected then we don't care if the event
|
||||
# was an outlier or not.
|
||||
if event.event_id in have_persisted:
|
||||
# If we have already seen the event then ignore it.
|
||||
to_remove.add(event)
|
||||
continue
|
||||
|
||||
# Handle the case of the list including the same event multiple
|
||||
# times. The tricky thing here is when they differ by whether
|
||||
# they are an outlier.
|
||||
if context.rejected:
|
||||
continue
|
||||
|
||||
if event.event_id in event_map:
|
||||
other = event_map[event.event_id]
|
||||
|
||||
|
@ -457,6 +469,12 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
outlier_persisted = have_persisted[event.event_id]
|
||||
if not event.internal_metadata.is_outlier() and outlier_persisted:
|
||||
# We received a copy of an event that we had already stored as
|
||||
# an outlier in the database. We now have some state at that
|
||||
# so we need to update the state_groups table with that state.
|
||||
|
||||
# insert into the state_group, state_groups_state and
|
||||
# event_to_state_groups tables.
|
||||
self._store_mult_state_groups_txn(txn, ((event, context),))
|
||||
|
||||
metadata_json = encode_json(
|
||||
|
@ -472,6 +490,8 @@ class EventsStore(SQLBaseStore):
|
|||
(metadata_json, event.event_id,)
|
||||
)
|
||||
|
||||
# Add an entry to the ex_outlier_stream table to replicate the
|
||||
# change in outlier status to our workers.
|
||||
stream_order = event.internal_metadata.stream_ordering
|
||||
state_group_id = context.state_group or context.new_state_group_id
|
||||
self._simple_insert_txn(
|
||||
|
@ -493,6 +513,8 @@ class EventsStore(SQLBaseStore):
|
|||
(False, event.event_id,)
|
||||
)
|
||||
|
||||
# Update the event_backward_extremities table now that this
|
||||
# event isn't an outlier any more.
|
||||
self._update_extremeties(txn, [event])
|
||||
|
||||
events_and_contexts = [
|
||||
|
@ -557,24 +579,30 @@ class EventsStore(SQLBaseStore):
|
|||
],
|
||||
)
|
||||
|
||||
# Remove the rejected events from the list now that we've added them
|
||||
# to the events table and the events_json table.
|
||||
to_remove = set()
|
||||
for event, context in events_and_contexts:
|
||||
if context.rejected:
|
||||
# Insert the event_id into the rejections table
|
||||
self._store_rejections_txn(
|
||||
txn, event.event_id, context.rejected
|
||||
)
|
||||
to_remove.add(event.event_id)
|
||||
to_remove.add(event)
|
||||
|
||||
events_and_contexts = [
|
||||
ec for ec in events_and_contexts if ec[0].event_id not in to_remove
|
||||
ec for ec in events_and_contexts if ec[0] not in to_remove
|
||||
]
|
||||
|
||||
if not events_and_contexts:
|
||||
# Make sure we don't pass an empty list to functions that expect to
|
||||
# be storing at least one element.
|
||||
return
|
||||
|
||||
# From this point onwards the events are only ones that weren't rejected.
|
||||
|
||||
for event, context in events_and_contexts:
|
||||
# Insert all the push actions into the event_push_actions table.
|
||||
if context.push_actions:
|
||||
self._set_push_actions_for_event_and_users_txn(
|
||||
txn, event, context.push_actions
|
||||
|
@ -595,12 +623,18 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
if event.type == EventTypes.Redaction and event.redacts is not None:
|
||||
# Remove the entries in the event_push_actions table for the
|
||||
# redacted event.
|
||||
self._remove_push_actions_for_event_id_txn(
|
||||
txn, event.room_id, event.redacts
|
||||
)
|
||||
|
||||
# Insert into the state_groups, state_groups_state, and
|
||||
# event_to_state_groups tables.
|
||||
self._store_mult_state_groups_txn(txn, events_and_contexts)
|
||||
|
||||
# Update the event_forward_extremities, event_backward_extremities and
|
||||
# event_edges tables.
|
||||
self._handle_mult_prev_events(
|
||||
txn,
|
||||
events=[event for event, _ in events_and_contexts],
|
||||
|
@ -608,18 +642,25 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
for event, _ in events_and_contexts:
|
||||
if event.type == EventTypes.Name:
|
||||
# Insert into the room_names and event_search tables.
|
||||
self._store_room_name_txn(txn, event)
|
||||
elif event.type == EventTypes.Topic:
|
||||
# Insert into the topics table and event_search table.
|
||||
self._store_room_topic_txn(txn, event)
|
||||
elif event.type == EventTypes.Message:
|
||||
# Insert into the event_search table.
|
||||
self._store_room_message_txn(txn, event)
|
||||
elif event.type == EventTypes.Redaction:
|
||||
# Insert into the redactions table.
|
||||
self._store_redaction(txn, event)
|
||||
elif event.type == EventTypes.RoomHistoryVisibility:
|
||||
# Insert into the event_search table.
|
||||
self._store_history_visibility_txn(txn, event)
|
||||
elif event.type == EventTypes.GuestAccess:
|
||||
# Insert into the event_search table.
|
||||
self._store_guest_access_txn(txn, event)
|
||||
|
||||
# Insert into the room_memberships table.
|
||||
self._store_room_members_txn(
|
||||
txn,
|
||||
[
|
||||
|
@ -630,6 +671,7 @@ class EventsStore(SQLBaseStore):
|
|||
backfilled=backfilled,
|
||||
)
|
||||
|
||||
# Insert event_reference_hashes table.
|
||||
self._store_event_reference_hashes_txn(
|
||||
txn, [event for event, _ in events_and_contexts]
|
||||
)
|
||||
|
@ -674,6 +716,7 @@ class EventsStore(SQLBaseStore):
|
|||
],
|
||||
)
|
||||
|
||||
# Prefil the event cache
|
||||
self._add_to_cache(txn, events_and_contexts)
|
||||
|
||||
if backfilled:
|
||||
|
|
Loading…
Reference in New Issue