Comments and typing for `_update_outliers_txn` (#11776)

A couple of surprises for me here, so thought I'd document them
This commit is contained in:
Richard van der Hoff 2022-01-19 19:45:36 +00:00 committed by GitHub
parent c072c0b829
commit 5572e6cc4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 12 deletions

1
changelog.d/11776.misc Normal file
View File

@ -0,0 +1 @@
Add some comments and type annotations for `_update_outliers_txn`.

View File

@ -1254,20 +1254,22 @@ class PersistEventsStore:
for room_id, depth in depth_updates.items(): for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth) self._update_min_depth_for_room_txn(txn, room_id, depth)
def _update_outliers_txn(self, txn, events_and_contexts): def _update_outliers_txn(
self,
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
) -> List[Tuple[EventBase, EventContext]]:
"""Update any outliers with new event info. """Update any outliers with new event info.
This turns outliers into ex-outliers (unless the new event was This turns outliers into ex-outliers (unless the new event was rejected), and
rejected). also removes any other events we have already seen from the list.
Args: Args:
txn (twisted.enterprise.adbapi.Connection): db connection txn: db connection
events_and_contexts (list[(EventBase, EventContext)]): events events_and_contexts: events we are persisting
we are persisting
Returns: Returns:
list[(EventBase, EventContext)] new list, without events which new list, without events which are already in the events table.
are already in the events table.
""" """
txn.execute( txn.execute(
"SELECT event_id, outlier FROM events WHERE event_id in (%s)" "SELECT event_id, outlier FROM events WHERE event_id in (%s)"
@ -1275,7 +1277,9 @@ class PersistEventsStore:
[event.event_id for event, _ in events_and_contexts], [event.event_id for event, _ in events_and_contexts],
) )
have_persisted = {event_id: outlier for event_id, outlier in txn} have_persisted: Dict[str, bool] = {
event_id: outlier for event_id, outlier in txn
}
to_remove = set() to_remove = set()
for event, context in events_and_contexts: for event, context in events_and_contexts:
@ -1285,15 +1289,22 @@ class PersistEventsStore:
to_remove.add(event) to_remove.add(event)
if context.rejected: if context.rejected:
# If the event is rejected then we don't care if the event # If the incoming event is rejected then we don't care if the event
# was an outlier or not. # was an outlier or not - what we have is at least as good.
continue continue
outlier_persisted = have_persisted[event.event_id] outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted: if not event.internal_metadata.is_outlier() and outlier_persisted:
# We received a copy of an event that we had already stored as # We received a copy of an event that we had already stored as
# an outlier in the database. We now have some state at that # an outlier in the database. We now have some state at that event
# so we need to update the state_groups table with that state. # so we need to update the state_groups table with that state.
#
# Note that we do not update the stream_ordering of the event in this
# scenario. XXX: does this cause bugs? It will mean we won't send such
# events down /sync. In general they will be historical events, so that
# doesn't matter too much, but that is not always the case.
logger.info("Updating state for ex-outlier event %s", event.event_id)
# insert into event_to_state_groups. # insert into event_to_state_groups.
try: try: