Messy: Fix undefined state_group for federated historical events
``` 2021-07-13 02:27:57,810 - synapse.handlers.federation - 1248 - ERROR - GET-4 - Failed to backfill from hs1 because NOT NULL constraint failed: event_to_state_groups.state_group Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 1216, in try_backfill await self.backfill( File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 1035, in backfill await self._auth_and_persist_event(dest, event, context, backfilled=True) File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 2222, in _auth_and_persist_event await self._run_push_actions_and_persist_event(event, context, backfilled) File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 2244, in _run_push_actions_and_persist_event await self.persist_events_and_notify( File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 3290, in persist_events_and_notify events, max_stream_token = await self.storage.persistence.persist_events( File "/usr/local/lib/python3.8/site-packages/synapse/logging/opentracing.py", line 774, in _trace_inner return await func(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 320, in persist_events ret_vals = await yieldable_gather_results(enqueue, partitioned.items()) File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 237, in handle_queue_loop ret = await self._per_item_callback( File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 577, in _persist_event_batch await self.persist_events_store._persist_events_and_state_updates( File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 176, in _persist_events_and_state_updates await self.db_pool.runInteraction( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 681, in runInteraction result = await self.runWithConnection( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 770, in runWithConnection return await make_deferred_yieldable( File "/usr/local/lib/python3.8/site-packages/twisted/python/threadpool.py", line 238, in inContext result = inContext.theWork() # type: ignore[attr-defined] File "/usr/local/lib/python3.8/site-packages/twisted/python/threadpool.py", line 254, in <lambda> inContext.theWork = lambda: context.call( # type: ignore[attr-defined] File "/usr/local/lib/python3.8/site-packages/twisted/python/context.py", line 118, in callWithContext return self.currentContext().callWithContext(ctx, func, *args, **kw) File "/usr/local/lib/python3.8/site-packages/twisted/python/context.py", line 83, in callWithContext return func(*args, **kw) File "/usr/local/lib/python3.8/site-packages/twisted/enterprise/adbapi.py", line 293, in _runWithConnection compat.reraise(excValue, excTraceback) File "/usr/local/lib/python3.8/site-packages/twisted/python/deprecate.py", line 298, in deprecatedFunction return function(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/twisted/python/compat.py", line 403, in reraise raise exception.with_traceback(traceback) File "/usr/local/lib/python3.8/site-packages/twisted/enterprise/adbapi.py", line 284, in _runWithConnection result = func(conn, *args, **kw) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 765, in inner_func return func(db_conn, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 549, in new_transaction r = func(cursor, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/logging/utils.py", line 69, in wrapped return f(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 385, in _persist_events_txn self._store_event_state_mappings_txn(txn, events_and_contexts) File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 2065, in _store_event_state_mappings_txn self.db_pool.simple_insert_many_txn( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 923, in simple_insert_many_txn txn.execute_batch(sql, vals) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 280, in execute_batch self.executemany(sql, args) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 300, in executemany self._do_execute(self.txn.executemany, sql, *args) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 330, in _do_execute return func(sql, *args) sqlite3.IntegrityError: NOT NULL constraint failed: event_to_state_groups.state_group ```
This commit is contained in:
parent
8ebbc5f607
commit
187ab28611
|
@ -924,7 +924,11 @@ class FederationHandler(BaseHandler):
|
|||
origin,
|
||||
marker_event.room_id,
|
||||
[insertion_event_id],
|
||||
# outlier=False,
|
||||
)
|
||||
# await self._get_state_after_missing_prev_event(
|
||||
# origin, marker_event.room_id, insertion_event_id
|
||||
# )
|
||||
|
||||
insertion_event = await self.store.get_event(
|
||||
insertion_event_id, allow_none=True
|
||||
|
@ -1078,15 +1082,27 @@ class FederationHandler(BaseHandler):
|
|||
# Step 2: Persist the rest of the events in the chunk one by one
|
||||
events.sort(key=lambda e: e.depth)
|
||||
|
||||
logger.info("backfill: events=%s", events)
|
||||
for event in events:
|
||||
if event in events_to_state:
|
||||
continue
|
||||
|
||||
# For paranoia we ensure that these events are marked as
|
||||
# non-outliers
|
||||
logger.info(
|
||||
"backfill: persist event_id=%s (%s) outlier=%s",
|
||||
event.event_id,
|
||||
event.type,
|
||||
event.internal_metadata.is_outlier(),
|
||||
)
|
||||
assert not event.internal_metadata.is_outlier()
|
||||
|
||||
context = await self.state_handler.compute_event_context(event)
|
||||
logger.info(
|
||||
"backfill: context event_id=%s state_group=%s",
|
||||
event.event_id,
|
||||
context.state_group,
|
||||
)
|
||||
|
||||
# We store these one at a time since each event depends on the
|
||||
# previous to work out the state.
|
||||
|
@ -1383,7 +1399,12 @@ class FederationHandler(BaseHandler):
|
|||
return False
|
||||
|
||||
async def _get_events_and_persist(
|
||||
self, destination: str, room_id: str, events: Iterable[str]
|
||||
self,
|
||||
destination: str,
|
||||
room_id: str,
|
||||
events: Iterable[str],
|
||||
# TODO: check if still used
|
||||
outlier: bool = True,
|
||||
) -> None:
|
||||
"""Fetch the given events from a server, and persist them as outliers.
|
||||
|
||||
|
@ -1405,7 +1426,7 @@ class FederationHandler(BaseHandler):
|
|||
[destination],
|
||||
event_id,
|
||||
room_version,
|
||||
outlier=True,
|
||||
outlier=outlier,
|
||||
)
|
||||
if event is None:
|
||||
logger.warning(
|
||||
|
@ -2278,6 +2299,11 @@ class FederationHandler(BaseHandler):
|
|||
server.
|
||||
backfilled: True if the event was backfilled.
|
||||
"""
|
||||
logger.info(
|
||||
"_auth_and_persist_event: before event_id=%s state_group=%s",
|
||||
event.event_id,
|
||||
context.state_group,
|
||||
)
|
||||
context = await self._check_event_auth(
|
||||
origin,
|
||||
event,
|
||||
|
@ -2286,6 +2312,11 @@ class FederationHandler(BaseHandler):
|
|||
auth_events=auth_events,
|
||||
backfilled=backfilled,
|
||||
)
|
||||
logger.info(
|
||||
"_auth_and_persist_event: after event_id=%s state_group=%s",
|
||||
event.event_id,
|
||||
context.state_group,
|
||||
)
|
||||
|
||||
await self._run_push_actions_and_persist_event(event, context, backfilled)
|
||||
|
||||
|
@ -2667,9 +2698,19 @@ class FederationHandler(BaseHandler):
|
|||
auth_events[(c.type, c.state_key)] = c
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
"_check_event_auth: before event_id=%s state_group=%s",
|
||||
event.event_id,
|
||||
context.state_group,
|
||||
)
|
||||
context = await self._update_auth_events_and_context_for_auth(
|
||||
origin, event, context, auth_events
|
||||
)
|
||||
logger.info(
|
||||
"_check_event_auth: after event_id=%s state_group=%s",
|
||||
event.event_id,
|
||||
context.state_group,
|
||||
)
|
||||
except Exception:
|
||||
# We don't really mind if the above fails, so lets not fail
|
||||
# processing if it does. However, it really shouldn't fail so
|
||||
|
@ -2756,7 +2797,11 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
if missing_auth:
|
||||
# If we don't have all the auth events, we need to get them.
|
||||
logger.info("auth_events contains unknown events: %s", missing_auth)
|
||||
logger.info(
|
||||
"auth_events contains unknown events for event_id=%s, missing_auth=%s",
|
||||
event.event_id,
|
||||
missing_auth,
|
||||
)
|
||||
try:
|
||||
try:
|
||||
remote_auth_chain = await self.federation_client.get_event_auth(
|
||||
|
@ -2793,9 +2838,13 @@ class FederationHandler(BaseHandler):
|
|||
event.event_id,
|
||||
e.event_id,
|
||||
)
|
||||
context = await self.state_handler.compute_event_context(e)
|
||||
# XXX: Main fix is here. It was computing context for the missing auth event
|
||||
# and re-assigning to the `context` variable used for the main event
|
||||
missing_auth_context = (
|
||||
await self.state_handler.compute_event_context(e)
|
||||
)
|
||||
await self._auth_and_persist_event(
|
||||
origin, e, context, auth_events=auth
|
||||
origin, e, missing_auth_context, auth_events=auth
|
||||
)
|
||||
|
||||
if e.event_id in event_auth_events:
|
||||
|
@ -2806,13 +2855,20 @@ class FederationHandler(BaseHandler):
|
|||
except Exception:
|
||||
logger.exception("Failed to get auth chain")
|
||||
|
||||
logger.info(
|
||||
"_update_auth_events_and_context_for_auth: check outlier event_id=%s outlier=%s",
|
||||
event.event_id,
|
||||
event.internal_metadata.is_outlier(),
|
||||
)
|
||||
if event.internal_metadata.is_outlier():
|
||||
# XXX: given that, for an outlier, we'll be working with the
|
||||
# event's *claimed* auth events rather than those we calculated:
|
||||
# (a) is there any point in this test, since different_auth below will
|
||||
# obviously be empty
|
||||
# (b) alternatively, why don't we do it earlier?
|
||||
logger.info("Skipping auth_event fetch for outlier")
|
||||
logger.info(
|
||||
"Skipping auth_event fetch for outlier event_id=%s", event.event_id
|
||||
)
|
||||
return context
|
||||
|
||||
different_auth = event_auth_events.difference(
|
||||
|
|
|
@ -324,6 +324,13 @@ class StateHandler:
|
|||
entry = await self.resolve_state_groups_for_events(
|
||||
event.room_id, event.prev_event_ids()
|
||||
)
|
||||
logger.info(
|
||||
"compute_event_context: resolve_state_groups_for_events\nstate_ids_before_event=%s\nstate_group_before_event=%s\nstate_group_before_event_prev_group=%s\ndeltas_to_state_group_before_event=%s",
|
||||
entry.state,
|
||||
entry.state_group,
|
||||
entry.prev_group,
|
||||
entry.delta_ids,
|
||||
)
|
||||
|
||||
state_ids_before_event = entry.state
|
||||
state_group_before_event = entry.state_group
|
||||
|
@ -359,6 +366,10 @@ class StateHandler:
|
|||
#
|
||||
|
||||
if not event.is_state():
|
||||
logger.info(
|
||||
"compute_event_context: returning with state_group_before_event=%s",
|
||||
state_group_before_event,
|
||||
)
|
||||
return EventContext.with_state(
|
||||
state_group_before_event=state_group_before_event,
|
||||
state_group=state_group_before_event,
|
||||
|
@ -390,6 +401,11 @@ class StateHandler:
|
|||
current_state_ids=state_ids_after_event,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"compute_event_context: after\nstate_group_after_event=%s",
|
||||
state_group_after_event,
|
||||
)
|
||||
|
||||
return EventContext.with_state(
|
||||
state_group=state_group_after_event,
|
||||
state_group_before_event=state_group_before_event,
|
||||
|
|
|
@ -2032,6 +2032,13 @@ class PersistEventsStore:
|
|||
):
|
||||
state_groups = {}
|
||||
for event, context in events_and_contexts:
|
||||
|
||||
logger.info(
|
||||
"creating state_groups grsesegr event_id=%s outlier=%s %s",
|
||||
event.event_id,
|
||||
event.internal_metadata.is_outlier(),
|
||||
event,
|
||||
)
|
||||
if event.internal_metadata.is_outlier():
|
||||
continue
|
||||
|
||||
|
@ -2043,6 +2050,8 @@ class PersistEventsStore:
|
|||
|
||||
state_groups[event.event_id] = context.state_group
|
||||
|
||||
logger.info("state_groups asdfasdf %s", state_groups)
|
||||
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn,
|
||||
table="event_to_state_groups",
|
||||
|
|
Loading…
Reference in New Issue