Merge pull request #2025 from matrix-org/rav/no_reset_state_on_rejections
Avoid resetting state on rejected events
This commit is contained in:
commit
2e05f5d7a4
|
@ -15,6 +15,32 @@
|
||||||
|
|
||||||
|
|
||||||
class EventContext(object):
|
class EventContext(object):
|
||||||
|
"""
|
||||||
|
Attributes:
|
||||||
|
current_state_ids (dict[(str, str), str]):
|
||||||
|
The current state map including the current event.
|
||||||
|
(type, state_key) -> event_id
|
||||||
|
|
||||||
|
prev_state_ids (dict[(str, str), str]):
|
||||||
|
The current state map excluding the current event.
|
||||||
|
(type, state_key) -> event_id
|
||||||
|
|
||||||
|
state_group (int): state group id
|
||||||
|
rejected (bool|str): A rejection reason if the event was rejected, else
|
||||||
|
False
|
||||||
|
|
||||||
|
push_actions (list[(str, list[object])]): list of (user_id, actions)
|
||||||
|
tuples
|
||||||
|
|
||||||
|
prev_group (int): Previously persisted state group. ``None`` for an
|
||||||
|
outlier.
|
||||||
|
delta_ids (dict[(str, str), str]): Delta from ``prev_group``.
|
||||||
|
(type, state_key) -> event_id. ``None`` for an outlier.
|
||||||
|
|
||||||
|
prev_state_events (?): XXX: is this ever set to anything other than
|
||||||
|
the empty list?
|
||||||
|
"""
|
||||||
|
|
||||||
__slots__ = [
|
__slots__ = [
|
||||||
"current_state_ids",
|
"current_state_ids",
|
||||||
"prev_state_ids",
|
"prev_state_ids",
|
||||||
|
|
|
@ -1537,7 +1537,17 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _prep_event(self, origin, event, state=None, auth_events=None):
|
def _prep_event(self, origin, event, state=None, auth_events=None):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
origin:
|
||||||
|
event:
|
||||||
|
state:
|
||||||
|
auth_events:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred, which resolves to synapse.events.snapshot.EventContext
|
||||||
|
"""
|
||||||
context = yield self.state_handler.compute_event_context(
|
context = yield self.state_handler.compute_event_context(
|
||||||
event, old_state=state,
|
event, old_state=state,
|
||||||
)
|
)
|
||||||
|
|
|
@ -177,17 +177,12 @@ class StateHandler(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def compute_event_context(self, event, old_state=None):
|
def compute_event_context(self, event, old_state=None):
|
||||||
""" Fills out the context with the `current state` of the graph. The
|
"""Build an EventContext structure for the event.
|
||||||
`current state` here is defined to be the state of the event graph
|
|
||||||
just before the event - i.e. it never includes `event`
|
|
||||||
|
|
||||||
If `event` has `auth_events` then this will also fill out the
|
|
||||||
`auth_events` field on `context` from the `current_state`.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event (EventBase)
|
event (synapse.events.EventBase):
|
||||||
Returns:
|
Returns:
|
||||||
an EventContext
|
synapse.events.snapshot.EventContext:
|
||||||
"""
|
"""
|
||||||
context = EventContext()
|
context = EventContext()
|
||||||
|
|
||||||
|
|
|
@ -201,9 +201,9 @@ class EventFederationStore(SQLBaseStore):
|
||||||
def _update_min_depth_for_room_txn(self, txn, room_id, depth):
|
def _update_min_depth_for_room_txn(self, txn, room_id, depth):
|
||||||
min_depth = self._get_min_depth_interaction(txn, room_id)
|
min_depth = self._get_min_depth_interaction(txn, room_id)
|
||||||
|
|
||||||
do_insert = depth < min_depth if min_depth else True
|
if min_depth and depth >= min_depth:
|
||||||
|
return
|
||||||
|
|
||||||
if do_insert:
|
|
||||||
self._simple_upsert_txn(
|
self._simple_upsert_txn(
|
||||||
txn,
|
txn,
|
||||||
table="room_depth",
|
table="room_depth",
|
||||||
|
|
|
@ -34,14 +34,16 @@ from canonicaljson import encode_canonical_json
|
||||||
from collections import deque, namedtuple, OrderedDict
|
from collections import deque, namedtuple, OrderedDict
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
|
||||||
import synapse
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import math
|
import math
|
||||||
import ujson as json
|
import ujson as json
|
||||||
|
|
||||||
|
# these are only included to make the type annotations work
|
||||||
|
from synapse.events import EventBase # noqa: F401
|
||||||
|
from synapse.events.snapshot import EventContext # noqa: F401
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -82,6 +84,11 @@ class _EventPeristenceQueue(object):
|
||||||
|
|
||||||
def add_to_queue(self, room_id, events_and_contexts, backfilled):
|
def add_to_queue(self, room_id, events_and_contexts, backfilled):
|
||||||
"""Add events to the queue, with the given persist_event options.
|
"""Add events to the queue, with the given persist_event options.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str):
|
||||||
|
events_and_contexts (list[(EventBase, EventContext)]):
|
||||||
|
backfilled (bool):
|
||||||
"""
|
"""
|
||||||
queue = self._event_persist_queues.setdefault(room_id, deque())
|
queue = self._event_persist_queues.setdefault(room_id, deque())
|
||||||
if queue:
|
if queue:
|
||||||
|
@ -227,6 +234,17 @@ class EventsStore(SQLBaseStore):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def persist_event(self, event, context, backfilled=False):
|
def persist_event(self, event, context, backfilled=False):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event (EventBase):
|
||||||
|
context (EventContext):
|
||||||
|
backfilled (bool):
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred: resolves to (int, int): the stream ordering of ``event``,
|
||||||
|
and the stream ordering of the latest persisted event
|
||||||
|
"""
|
||||||
deferred = self._event_persist_queue.add_to_queue(
|
deferred = self._event_persist_queue.add_to_queue(
|
||||||
event.room_id, [(event, context)],
|
event.room_id, [(event, context)],
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
|
@ -253,6 +271,16 @@ class EventsStore(SQLBaseStore):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _persist_events(self, events_and_contexts, backfilled=False,
|
def _persist_events(self, events_and_contexts, backfilled=False,
|
||||||
delete_existing=False):
|
delete_existing=False):
|
||||||
|
"""Persist events to db
|
||||||
|
|
||||||
|
Args:
|
||||||
|
events_and_contexts (list[(EventBase, EventContext)]):
|
||||||
|
backfilled (bool):
|
||||||
|
delete_existing (bool):
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred: resolves when the events have been persisted
|
||||||
|
"""
|
||||||
if not events_and_contexts:
|
if not events_and_contexts:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -554,11 +582,91 @@ class EventsStore(SQLBaseStore):
|
||||||
and the rejections table. Things reading from those table will need to check
|
and the rejections table. Things reading from those table will need to check
|
||||||
whether the event was rejected.
|
whether the event was rejected.
|
||||||
|
|
||||||
If delete_existing is True then existing events will be purged from the
|
Args:
|
||||||
database before insertion. This is useful when retrying due to IntegrityError.
|
txn (twisted.enterprise.adbapi.Connection): db connection
|
||||||
|
events_and_contexts (list[(EventBase, EventContext)]):
|
||||||
|
events to persist
|
||||||
|
backfilled (bool): True if the events were backfilled
|
||||||
|
delete_existing (bool): True to purge existing table rows for the
|
||||||
|
events from the database. This is useful when retrying due to
|
||||||
|
IntegrityError.
|
||||||
|
current_state_for_room (dict[str, (list[str], list[str])]):
|
||||||
|
The current-state delta for each room. For each room, a tuple
|
||||||
|
(to_delete, to_insert), being a list of event ids to be removed
|
||||||
|
from the current state, and a list of event ids to be added to
|
||||||
|
the current state.
|
||||||
|
new_forward_extremeties (dict[str, list[str]]):
|
||||||
|
The new forward extremities for each room. For each room, a
|
||||||
|
list of the event ids which are the forward extremities.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
self._update_current_state_txn(txn, current_state_for_room)
|
||||||
|
|
||||||
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
|
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
|
||||||
for room_id, current_state_tuple in current_state_for_room.iteritems():
|
self._update_forward_extremities_txn(
|
||||||
|
txn,
|
||||||
|
new_forward_extremities=new_forward_extremeties,
|
||||||
|
max_stream_order=max_stream_order,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Ensure that we don't have the same event twice.
|
||||||
|
events_and_contexts = self._filter_events_and_contexts_for_duplicates(
|
||||||
|
events_and_contexts,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._update_room_depths_txn(
|
||||||
|
txn,
|
||||||
|
events_and_contexts=events_and_contexts,
|
||||||
|
backfilled=backfilled,
|
||||||
|
)
|
||||||
|
|
||||||
|
# _update_outliers_txn filters out any events which have already been
|
||||||
|
# persisted, and returns the filtered list.
|
||||||
|
events_and_contexts = self._update_outliers_txn(
|
||||||
|
txn,
|
||||||
|
events_and_contexts=events_and_contexts,
|
||||||
|
)
|
||||||
|
|
||||||
|
# From this point onwards the events are only events that we haven't
|
||||||
|
# seen before.
|
||||||
|
|
||||||
|
if delete_existing:
|
||||||
|
# For paranoia reasons, we go and delete all the existing entries
|
||||||
|
# for these events so we can reinsert them.
|
||||||
|
# This gets around any problems with some tables already having
|
||||||
|
# entries.
|
||||||
|
self._delete_existing_rows_txn(
|
||||||
|
txn,
|
||||||
|
events_and_contexts=events_and_contexts,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._store_event_txn(
|
||||||
|
txn,
|
||||||
|
events_and_contexts=events_and_contexts,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert into the state_groups, state_groups_state, and
|
||||||
|
# event_to_state_groups tables.
|
||||||
|
self._store_mult_state_groups_txn(txn, events_and_contexts)
|
||||||
|
|
||||||
|
# _store_rejected_events_txn filters out any events which were
|
||||||
|
# rejected, and returns the filtered list.
|
||||||
|
events_and_contexts = self._store_rejected_events_txn(
|
||||||
|
txn,
|
||||||
|
events_and_contexts=events_and_contexts,
|
||||||
|
)
|
||||||
|
|
||||||
|
# From this point onwards the events are only ones that weren't
|
||||||
|
# rejected.
|
||||||
|
|
||||||
|
self._update_metadata_tables_txn(
|
||||||
|
txn,
|
||||||
|
events_and_contexts=events_and_contexts,
|
||||||
|
backfilled=backfilled,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _update_current_state_txn(self, txn, state_delta_by_room):
|
||||||
|
for room_id, current_state_tuple in state_delta_by_room.iteritems():
|
||||||
to_delete, to_insert = current_state_tuple
|
to_delete, to_insert = current_state_tuple
|
||||||
txn.executemany(
|
txn.executemany(
|
||||||
"DELETE FROM current_state_events WHERE event_id = ?",
|
"DELETE FROM current_state_events WHERE event_id = ?",
|
||||||
|
@ -608,7 +716,9 @@ class EventsStore(SQLBaseStore):
|
||||||
txn, self.get_current_state_ids, (room_id,)
|
txn, self.get_current_state_ids, (room_id,)
|
||||||
)
|
)
|
||||||
|
|
||||||
for room_id, new_extrem in new_forward_extremeties.items():
|
def _update_forward_extremities_txn(self, txn, new_forward_extremities,
|
||||||
|
max_stream_order):
|
||||||
|
for room_id, new_extrem in new_forward_extremities.items():
|
||||||
self._simple_delete_txn(
|
self._simple_delete_txn(
|
||||||
txn,
|
txn,
|
||||||
table="event_forward_extremities",
|
table="event_forward_extremities",
|
||||||
|
@ -626,7 +736,7 @@ class EventsStore(SQLBaseStore):
|
||||||
"event_id": ev_id,
|
"event_id": ev_id,
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
}
|
}
|
||||||
for room_id, new_extrem in new_forward_extremeties.items()
|
for room_id, new_extrem in new_forward_extremities.items()
|
||||||
for ev_id in new_extrem
|
for ev_id in new_extrem
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -643,13 +753,22 @@ class EventsStore(SQLBaseStore):
|
||||||
"event_id": event_id,
|
"event_id": event_id,
|
||||||
"stream_ordering": max_stream_order,
|
"stream_ordering": max_stream_order,
|
||||||
}
|
}
|
||||||
for room_id, new_extrem in new_forward_extremeties.items()
|
for room_id, new_extrem in new_forward_extremities.items()
|
||||||
for event_id in new_extrem
|
for event_id in new_extrem
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
# Ensure that we don't have the same event twice.
|
@classmethod
|
||||||
# Pick the earliest non-outlier if there is one, else the earliest one.
|
def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
|
||||||
|
"""Ensure that we don't have the same event twice.
|
||||||
|
|
||||||
|
Pick the earliest non-outlier if there is one, else the earliest one.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
events_and_contexts (list[(EventBase, EventContext)]):
|
||||||
|
Returns:
|
||||||
|
list[(EventBase, EventContext)]: filtered list
|
||||||
|
"""
|
||||||
new_events_and_contexts = OrderedDict()
|
new_events_and_contexts = OrderedDict()
|
||||||
for event, context in events_and_contexts:
|
for event, context in events_and_contexts:
|
||||||
prev_event_context = new_events_and_contexts.get(event.event_id)
|
prev_event_context = new_events_and_contexts.get(event.event_id)
|
||||||
|
@ -662,9 +781,17 @@ class EventsStore(SQLBaseStore):
|
||||||
new_events_and_contexts[event.event_id] = (event, context)
|
new_events_and_contexts[event.event_id] = (event, context)
|
||||||
else:
|
else:
|
||||||
new_events_and_contexts[event.event_id] = (event, context)
|
new_events_and_contexts[event.event_id] = (event, context)
|
||||||
|
return new_events_and_contexts.values()
|
||||||
|
|
||||||
events_and_contexts = new_events_and_contexts.values()
|
def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
|
||||||
|
"""Update min_depth for each room
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn (twisted.enterprise.adbapi.Connection): db connection
|
||||||
|
events_and_contexts (list[(EventBase, EventContext)]): events
|
||||||
|
we are persisting
|
||||||
|
backfilled (bool): True if the events were backfilled
|
||||||
|
"""
|
||||||
depth_updates = {}
|
depth_updates = {}
|
||||||
for event, context in events_and_contexts:
|
for event, context in events_and_contexts:
|
||||||
# Remove the any existing cache entries for the event_ids
|
# Remove the any existing cache entries for the event_ids
|
||||||
|
@ -683,6 +810,21 @@ class EventsStore(SQLBaseStore):
|
||||||
for room_id, depth in depth_updates.items():
|
for room_id, depth in depth_updates.items():
|
||||||
self._update_min_depth_for_room_txn(txn, room_id, depth)
|
self._update_min_depth_for_room_txn(txn, room_id, depth)
|
||||||
|
|
||||||
|
def _update_outliers_txn(self, txn, events_and_contexts):
|
||||||
|
"""Update any outliers with new event info.
|
||||||
|
|
||||||
|
This turns outliers into ex-outliers (unless the new event was
|
||||||
|
rejected).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn (twisted.enterprise.adbapi.Connection): db connection
|
||||||
|
events_and_contexts (list[(EventBase, EventContext)]): events
|
||||||
|
we are persisting
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[(EventBase, EventContext)] new list, without events which
|
||||||
|
are already in the events table.
|
||||||
|
"""
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
|
"SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
|
||||||
",".join(["?"] * len(events_and_contexts)),
|
",".join(["?"] * len(events_and_contexts)),
|
||||||
|
@ -697,19 +839,16 @@ class EventsStore(SQLBaseStore):
|
||||||
|
|
||||||
to_remove = set()
|
to_remove = set()
|
||||||
for event, context in events_and_contexts:
|
for event, context in events_and_contexts:
|
||||||
if context.rejected:
|
|
||||||
# If the event is rejected then we don't care if the event
|
|
||||||
# was an outlier or not.
|
|
||||||
if event.event_id in have_persisted:
|
|
||||||
# If we have already seen the event then ignore it.
|
|
||||||
to_remove.add(event)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if event.event_id not in have_persisted:
|
if event.event_id not in have_persisted:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
to_remove.add(event)
|
to_remove.add(event)
|
||||||
|
|
||||||
|
if context.rejected:
|
||||||
|
# If the event is rejected then we don't care if the event
|
||||||
|
# was an outlier or not.
|
||||||
|
continue
|
||||||
|
|
||||||
outlier_persisted = have_persisted[event.event_id]
|
outlier_persisted = have_persisted[event.event_id]
|
||||||
if not event.internal_metadata.is_outlier() and outlier_persisted:
|
if not event.internal_metadata.is_outlier() and outlier_persisted:
|
||||||
# We received a copy of an event that we had already stored as
|
# We received a copy of an event that we had already stored as
|
||||||
|
@ -764,34 +903,16 @@ class EventsStore(SQLBaseStore):
|
||||||
# event isn't an outlier any more.
|
# event isn't an outlier any more.
|
||||||
self._update_backward_extremeties(txn, [event])
|
self._update_backward_extremeties(txn, [event])
|
||||||
|
|
||||||
events_and_contexts = [
|
return [
|
||||||
ec for ec in events_and_contexts if ec[0] not in to_remove
|
ec for ec in events_and_contexts if ec[0] not in to_remove
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _delete_existing_rows_txn(cls, txn, events_and_contexts):
|
||||||
if not events_and_contexts:
|
if not events_and_contexts:
|
||||||
# Make sure we don't pass an empty list to functions that expect to
|
# nothing to do here
|
||||||
# be storing at least one element.
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# From this point onwards the events are only events that we haven't
|
|
||||||
# seen before.
|
|
||||||
|
|
||||||
def event_dict(event):
|
|
||||||
return {
|
|
||||||
k: v
|
|
||||||
for k, v in event.get_dict().items()
|
|
||||||
if k not in [
|
|
||||||
"redacted",
|
|
||||||
"redacted_because",
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
if delete_existing:
|
|
||||||
# For paranoia reasons, we go and delete all the existing entries
|
|
||||||
# for these events so we can reinsert them.
|
|
||||||
# This gets around any problems with some tables already having
|
|
||||||
# entries.
|
|
||||||
|
|
||||||
logger.info("Deleting existing")
|
logger.info("Deleting existing")
|
||||||
|
|
||||||
for table in (
|
for table in (
|
||||||
|
@ -823,6 +944,29 @@ class EventsStore(SQLBaseStore):
|
||||||
[(ev.event_id,) for ev, _ in events_and_contexts]
|
[(ev.event_id,) for ev, _ in events_and_contexts]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _store_event_txn(self, txn, events_and_contexts):
|
||||||
|
"""Insert new events into the event and event_json tables
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn (twisted.enterprise.adbapi.Connection): db connection
|
||||||
|
events_and_contexts (list[(EventBase, EventContext)]): events
|
||||||
|
we are persisting
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not events_and_contexts:
|
||||||
|
# nothing to do here
|
||||||
|
return
|
||||||
|
|
||||||
|
def event_dict(event):
|
||||||
|
return {
|
||||||
|
k: v
|
||||||
|
for k, v in event.get_dict().items()
|
||||||
|
if k not in [
|
||||||
|
"redacted",
|
||||||
|
"redacted_because",
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
self._simple_insert_many_txn(
|
self._simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
table="event_json",
|
table="event_json",
|
||||||
|
@ -865,6 +1009,19 @@ class EventsStore(SQLBaseStore):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _store_rejected_events_txn(self, txn, events_and_contexts):
|
||||||
|
"""Add rows to the 'rejections' table for received events which were
|
||||||
|
rejected
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn (twisted.enterprise.adbapi.Connection): db connection
|
||||||
|
events_and_contexts (list[(EventBase, EventContext)]): events
|
||||||
|
we are persisting
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[(EventBase, EventContext)] new list, without the rejected
|
||||||
|
events.
|
||||||
|
"""
|
||||||
# Remove the rejected events from the list now that we've added them
|
# Remove the rejected events from the list now that we've added them
|
||||||
# to the events table and the events_json table.
|
# to the events table and the events_json table.
|
||||||
to_remove = set()
|
to_remove = set()
|
||||||
|
@ -876,16 +1033,23 @@ class EventsStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
to_remove.add(event)
|
to_remove.add(event)
|
||||||
|
|
||||||
events_and_contexts = [
|
return [
|
||||||
ec for ec in events_and_contexts if ec[0] not in to_remove
|
ec for ec in events_and_contexts if ec[0] not in to_remove
|
||||||
]
|
]
|
||||||
|
|
||||||
if not events_and_contexts:
|
def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
|
||||||
# Make sure we don't pass an empty list to functions that expect to
|
"""Update all the miscellaneous tables for new events
|
||||||
# be storing at least one element.
|
|
||||||
return
|
|
||||||
|
|
||||||
# From this point onwards the events are only ones that weren't rejected.
|
Args:
|
||||||
|
txn (twisted.enterprise.adbapi.Connection): db connection
|
||||||
|
events_and_contexts (list[(EventBase, EventContext)]): events
|
||||||
|
we are persisting
|
||||||
|
backfilled (bool): True if the events were backfilled
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not events_and_contexts:
|
||||||
|
# nothing to do here
|
||||||
|
return
|
||||||
|
|
||||||
for event, context in events_and_contexts:
|
for event, context in events_and_contexts:
|
||||||
# Insert all the push actions into the event_push_actions table.
|
# Insert all the push actions into the event_push_actions table.
|
||||||
|
@ -915,10 +1079,6 @@ class EventsStore(SQLBaseStore):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
# Insert into the state_groups, state_groups_state, and
|
|
||||||
# event_to_state_groups tables.
|
|
||||||
self._store_mult_state_groups_txn(txn, events_and_contexts)
|
|
||||||
|
|
||||||
# Update the event_forward_extremities, event_backward_extremities and
|
# Update the event_forward_extremities, event_backward_extremities and
|
||||||
# event_edges tables.
|
# event_edges tables.
|
||||||
self._handle_mult_prev_events(
|
self._handle_mult_prev_events(
|
||||||
|
@ -1005,13 +1165,6 @@ class EventsStore(SQLBaseStore):
|
||||||
# Prefill the event cache
|
# Prefill the event cache
|
||||||
self._add_to_cache(txn, events_and_contexts)
|
self._add_to_cache(txn, events_and_contexts)
|
||||||
|
|
||||||
if backfilled:
|
|
||||||
# Backfilled events come before the current state so we don't need
|
|
||||||
# to update the current state table
|
|
||||||
return
|
|
||||||
|
|
||||||
return
|
|
||||||
|
|
||||||
def _add_to_cache(self, txn, events_and_contexts):
|
def _add_to_cache(self, txn, events_and_contexts):
|
||||||
to_prefill = []
|
to_prefill = []
|
||||||
|
|
||||||
|
|
|
@ -136,6 +136,16 @@ class StateStore(SQLBaseStore):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if context.current_state_ids is None:
|
if context.current_state_ids is None:
|
||||||
|
# AFAIK, this can never happen
|
||||||
|
logger.error(
|
||||||
|
"Non-outlier event %s had current_state_ids==None",
|
||||||
|
event.event_id)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# if the event was rejected, just give it the same state as its
|
||||||
|
# predecessor.
|
||||||
|
if context.rejected:
|
||||||
|
state_groups[event.event_id] = context.prev_group
|
||||||
continue
|
continue
|
||||||
|
|
||||||
state_groups[event.event_id] = context.state_group
|
state_groups[event.event_id] = context.state_group
|
||||||
|
|
Loading…
Reference in New Issue