Fix the inbound PDU metric (#10279)

This broke in #10272
This commit is contained in:
Erik Johnston 2021-06-30 12:07:16 +01:00 committed by GitHub
parent bc5589a1bb
commit 329ef5c715
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 93 additions and 27 deletions

1
changelog.d/10279.bugfix Normal file
View File

@ -0,0 +1 @@
Fix the prometheus `synapse_federation_server_pdu_process_time` metric. Broke in v1.37.1.

View File

@ -369,22 +369,21 @@ class FederationServer(FederationBase):
async def process_pdu(pdu: EventBase) -> JsonDict: async def process_pdu(pdu: EventBase) -> JsonDict:
event_id = pdu.event_id event_id = pdu.event_id
with pdu_process_time.time(): with nested_logging_context(event_id):
with nested_logging_context(event_id): try:
try: await self._handle_received_pdu(origin, pdu)
await self._handle_received_pdu(origin, pdu) return {}
return {} except FederationError as e:
except FederationError as e: logger.warning("Error handling PDU %s: %s", event_id, e)
logger.warning("Error handling PDU %s: %s", event_id, e) return {"error": str(e)}
return {"error": str(e)} except Exception as e:
except Exception as e: f = failure.Failure()
f = failure.Failure() logger.error(
logger.error( "Failed to handle PDU %s",
"Failed to handle PDU %s", event_id,
event_id, exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore )
) return {"error": str(e)}
return {"error": str(e)}
await concurrently_execute( await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
@ -932,9 +931,13 @@ class FederationServer(FederationBase):
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
) )
await self.store.remove_received_event_from_staging( received_ts = await self.store.remove_received_event_from_staging(
origin, event.event_id origin, event.event_id
) )
if received_ts is not None:
pdu_process_time.observe(
(self._clock.time_msec() - received_ts) / 1000
)
# We need to do this check outside the lock to avoid a race between # We need to do this check outside the lock to avoid a race between
# a new event being inserted by another instance and it attempting # a new event being inserted by another instance and it attempting

View File

@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
self, self,
origin: str, origin: str,
event_id: str, event_id: str,
) -> None: ) -> Optional[int]:
"""Remove the given event from the staging area""" """Remove the given event from the staging area.
await self.db_pool.simple_delete(
table="federation_inbound_events_staging", Returns:
keyvalues={ The received_ts of the row that was deleted, if any.
"origin": origin, """
"event_id": event_id, if self.db_pool.engine.supports_returning:
},
desc="remove_received_event_from_staging", def _remove_received_event_from_staging_txn(txn):
) sql = """
DELETE FROM federation_inbound_events_staging
WHERE origin = ? AND event_id = ?
RETURNING received_ts
"""
txn.execute(sql, (origin, event_id))
return txn.fetchone()
row = await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
db_autocommit=True,
)
if row is None:
return None
return row[0]
else:
def _remove_received_event_from_staging_txn(txn):
received_ts = self.db_pool.simple_select_one_onecol_txn(
txn,
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
retcol="received_ts",
allow_none=True,
)
self.db_pool.simple_delete_txn(
txn,
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
)
return received_ts
return await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
)
async def get_next_staged_event_id_for_room( async def get_next_staged_event_id_for_room(
self, self,

View File

@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
""" """
... ...
@property
@abc.abstractmethod
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
...
@abc.abstractmethod @abc.abstractmethod
def check_database( def check_database(
self, db_conn: ConnectionType, allow_outdated_version: bool = False self, db_conn: ConnectionType, allow_outdated_version: bool = False

View File

@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine):
"""Do we support using `a = ANY(?)` and passing a list""" """Do we support using `a = ANY(?)` and passing a list"""
return True return True
@property
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
return True
def is_deadlock(self, error): def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError): if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html # https://www.postgresql.org/docs/current/static/errcodes-appendix.html

View File

@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
"""Do we support using `a = ANY(?)` and passing a list""" """Do we support using `a = ANY(?)` and passing a list"""
return False return False
@property
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
return self.module.sqlite_version_info >= (3, 35, 0)
def check_database(self, db_conn, allow_outdated_version: bool = False): def check_database(self, db_conn, allow_outdated_version: bool = False):
if not allow_outdated_version: if not allow_outdated_version:
version = self.module.sqlite_version_info version = self.module.sqlite_version_info