Handle federation inbound instances being killed more gracefully (#11262)

* Make lock better handle process being killed

If the process gets killed and restarted (so that it didn't have a
chance to drop its locks gracefully) then there may still be locks in
the DB that are for the same instance that haven't yet timed out but are
safe to delete.

We handle this case by a) checking if the current instance already has
taken out the lock, and b) if not then ignoring locks that are for the
same instance.

* Periodically check for old staged events

This is to protect against other instances dying and their locks timing
out.
This commit is contained in:
Erik Johnston 2021-11-08 09:54:47 +00:00 committed by GitHub
parent 9799c569bb
commit 98c8fc6ce8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 10 deletions

1
changelog.d/11262.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1.

View File

@ -213,6 +213,11 @@ class FederationServer(FederationBase):
self._started_handling_of_staged_events = True self._started_handling_of_staged_events = True
self._handle_old_staged_events() self._handle_old_staged_events()
# Start a periodic check for old staged events. This is to handle
# the case where locks time out, e.g. if another process gets killed
# without dropping its locks.
self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)
# keep this as early as possible to make the calculated origin ts as # keep this as early as possible to make the calculated origin ts as
# accurate as possible. # accurate as possible.
request_time = self._clock.time_msec() request_time = self._clock.time_msec()

View File

@ -14,6 +14,7 @@
import logging import logging
from types import TracebackType from types import TracebackType
from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type
from weakref import WeakValueDictionary
from twisted.internet.interfaces import IReactorCore from twisted.internet.interfaces import IReactorCore
@ -61,7 +62,7 @@ class LockStore(SQLBaseStore):
# A map from `(lock_name, lock_key)` to the token of any locks that we # A map from `(lock_name, lock_key)` to the token of any locks that we
# think we currently hold. # think we currently hold.
self._live_tokens: Dict[Tuple[str, str], str] = {} self._live_tokens: Dict[Tuple[str, str], Lock] = WeakValueDictionary()
# When we shut down we want to remove the locks. Technically this can # When we shut down we want to remove the locks. Technically this can
# lead to a race, as we may drop the lock while we are still processing. # lead to a race, as we may drop the lock while we are still processing.
@ -80,10 +81,10 @@ class LockStore(SQLBaseStore):
# We need to take a copy of the tokens dict as dropping the locks will # We need to take a copy of the tokens dict as dropping the locks will
# cause the dictionary to change. # cause the dictionary to change.
tokens = dict(self._live_tokens) locks = dict(self._live_tokens)
for (lock_name, lock_key), token in tokens.items(): for lock in locks.values():
await self._drop_lock(lock_name, lock_key, token) await lock.release()
logger.info("Dropped locks due to shutdown") logger.info("Dropped locks due to shutdown")
@ -93,6 +94,11 @@ class LockStore(SQLBaseStore):
used (otherwise the lock will leak). used (otherwise the lock will leak).
""" """
# Check if this process has taken out a lock and if it's still valid.
lock = self._live_tokens.get((lock_name, lock_key))
if lock and await lock.is_still_valid():
return None
now = self._clock.time_msec() now = self._clock.time_msec()
token = random_string(6) token = random_string(6)
@ -100,7 +106,9 @@ class LockStore(SQLBaseStore):
def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool: def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
# We take out the lock if either a) there is no row for the lock # We take out the lock if either a) there is no row for the lock
# already or b) the existing row has timed out. # already, b) the existing row has timed out, or c) the row is
# for this instance (which means the process got killed and
# restarted)
sql = """ sql = """
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts) INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?)
@ -112,6 +120,7 @@ class LockStore(SQLBaseStore):
last_renewed_ts = EXCLUDED.last_renewed_ts last_renewed_ts = EXCLUDED.last_renewed_ts
WHERE WHERE
worker_locks.last_renewed_ts < ? worker_locks.last_renewed_ts < ?
OR worker_locks.instance_name = EXCLUDED.instance_name
""" """
txn.execute( txn.execute(
sql, sql,
@ -148,11 +157,11 @@ class LockStore(SQLBaseStore):
WHERE WHERE
lock_name = ? lock_name = ?
AND lock_key = ? AND lock_key = ?
AND last_renewed_ts < ? AND (last_renewed_ts < ? OR instance_name = ?)
""" """
txn.execute( txn.execute(
sql, sql,
(lock_name, lock_key, now - _LOCK_TIMEOUT_MS), (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
) )
inserted = self.db_pool.simple_upsert_txn_emulated( inserted = self.db_pool.simple_upsert_txn_emulated(
@ -179,9 +188,7 @@ class LockStore(SQLBaseStore):
if not did_lock: if not did_lock:
return None return None
self._live_tokens[(lock_name, lock_key)] = token lock = Lock(
return Lock(
self._reactor, self._reactor,
self._clock, self._clock,
self, self,
@ -190,6 +197,10 @@ class LockStore(SQLBaseStore):
token=token, token=token,
) )
self._live_tokens[(lock_name, lock_key)] = lock
return lock
async def _is_lock_still_valid( async def _is_lock_still_valid(
self, lock_name: str, lock_key: str, token: str self, lock_name: str, lock_key: str, token: str
) -> bool: ) -> bool: