Abstract logic for setting the statement timeout.
This commit is contained in:
parent
43d1aa75e8
commit
9622bda163
|
@ -768,8 +768,9 @@ class BackgroundUpdater:
|
||||||
|
|
||||||
# override the global statement timeout to avoid accidentally squashing
|
# override the global statement timeout to avoid accidentally squashing
|
||||||
# a long-running index creation process
|
# a long-running index creation process
|
||||||
timeout_sql = "SET SESSION statement_timeout = 0"
|
self.db_pool.engine.attempt_to_set_statement_timeout(
|
||||||
c.execute(timeout_sql)
|
c, 0, for_transaction=True
|
||||||
|
)
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"CREATE %(unique)s INDEX CONCURRENTLY %(name)s"
|
"CREATE %(unique)s INDEX CONCURRENTLY %(name)s"
|
||||||
|
@ -791,12 +792,6 @@ class BackgroundUpdater:
|
||||||
logger.debug("[SQL] %s", sql)
|
logger.debug("[SQL] %s", sql)
|
||||||
c.execute(sql)
|
c.execute(sql)
|
||||||
finally:
|
finally:
|
||||||
# mypy ignore - `statement_timeout` is defined on PostgresEngine
|
|
||||||
# reset the global timeout to the default
|
|
||||||
default_timeout = self.db_pool.engine.statement_timeout # type: ignore[attr-defined]
|
|
||||||
undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
|
|
||||||
conn.cursor().execute(undo_timeout_sql)
|
|
||||||
|
|
||||||
conn.engine.attempt_to_set_autocommit(conn.conn, False)
|
conn.engine.attempt_to_set_autocommit(conn.conn, False)
|
||||||
|
|
||||||
def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:
|
def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:
|
||||||
|
|
|
@ -89,10 +89,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
# furthermore, we might already have the table from a previous (failed)
|
# furthermore, we might already have the table from a previous (failed)
|
||||||
# purge attempt, so let's drop the table first.
|
# purge attempt, so let's drop the table first.
|
||||||
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
# Disable statement timeouts for this transaction; purging rooms can
|
||||||
# Disable statement timeouts for this transaction; purging rooms can
|
# take a while!
|
||||||
# take a while!
|
self.database_engine.attempt_to_set_statement_timeout(
|
||||||
txn.execute("SET LOCAL statement_timeout = 0")
|
txn, 0, for_transaction=True
|
||||||
|
)
|
||||||
|
|
||||||
txn.execute("DROP TABLE IF EXISTS events_to_purge")
|
txn.execute("DROP TABLE IF EXISTS events_to_purge")
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,9 @@ CursorType = TypeVar("CursorType", bound=Cursor)
|
||||||
|
|
||||||
|
|
||||||
class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCMeta):
|
class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCMeta):
|
||||||
|
# The default statement timeout to use for transactions.
|
||||||
|
statement_timeout: Optional[int] = None
|
||||||
|
|
||||||
def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]):
|
def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]):
|
||||||
self.module = module
|
self.module = module
|
||||||
|
|
||||||
|
@ -132,6 +135,16 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
|
||||||
"""
|
"""
|
||||||
...
|
...
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def attempt_to_set_statement_timeout(
|
||||||
|
self, cursor: CursorType, statement_timeout: int, for_transaction: bool
|
||||||
|
) -> None:
|
||||||
|
"""Attempt to set the cursor's statement timeout.
|
||||||
|
|
||||||
|
Note this has no effect on SQLite3.
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def executescript(cursor: CursorType, script: str) -> None:
|
def executescript(cursor: CursorType, script: str) -> None:
|
||||||
|
|
|
@ -52,7 +52,7 @@ class PostgresEngine(
|
||||||
# some degenerate query plan has been created and the client has probably
|
# some degenerate query plan has been created and the client has probably
|
||||||
# timed out/walked off anyway.
|
# timed out/walked off anyway.
|
||||||
# This is in milliseconds.
|
# This is in milliseconds.
|
||||||
self.statement_timeout: Optional[int] = database_config.get(
|
self.statement_timeout = database_config.get(
|
||||||
"statement_timeout", 60 * 60 * 1000
|
"statement_timeout", 60 * 60 * 1000
|
||||||
)
|
)
|
||||||
self._version: Optional[int] = None # unknown as yet
|
self._version: Optional[int] = None # unknown as yet
|
||||||
|
@ -169,7 +169,11 @@ class PostgresEngine(
|
||||||
|
|
||||||
# Abort really long-running statements and turn them into errors.
|
# Abort really long-running statements and turn them into errors.
|
||||||
if self.statement_timeout is not None:
|
if self.statement_timeout is not None:
|
||||||
cursor.execute("SET statement_timeout TO ?", (self.statement_timeout,))
|
self.attempt_to_set_statement_timeout(
|
||||||
|
cast(psycopg2.extensions.cursor, cursor.txn),
|
||||||
|
self.statement_timeout,
|
||||||
|
for_transaction=False,
|
||||||
|
)
|
||||||
|
|
||||||
cursor.close()
|
cursor.close()
|
||||||
db_conn.commit()
|
db_conn.commit()
|
||||||
|
@ -233,6 +237,18 @@ class PostgresEngine(
|
||||||
isolation_level = self.isolation_level_map[isolation_level]
|
isolation_level = self.isolation_level_map[isolation_level]
|
||||||
return conn.set_isolation_level(isolation_level)
|
return conn.set_isolation_level(isolation_level)
|
||||||
|
|
||||||
|
def attempt_to_set_statement_timeout(
|
||||||
|
self,
|
||||||
|
cursor: psycopg2.extensions.cursor,
|
||||||
|
statement_timeout: int,
|
||||||
|
for_transaction: bool,
|
||||||
|
) -> None:
|
||||||
|
if for_transaction:
|
||||||
|
sql = "SET LOCAL statement_timeout TO ?"
|
||||||
|
else:
|
||||||
|
sql = "SET statement_timeout TO ?"
|
||||||
|
cursor.execute(sql, (statement_timeout,))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None:
|
def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None:
|
||||||
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
|
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
|
||||||
|
|
|
@ -143,6 +143,12 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
|
||||||
# All transactions are SERIALIZABLE by default in sqlite
|
# All transactions are SERIALIZABLE by default in sqlite
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def attempt_to_set_statement_timeout(
|
||||||
|
self, cursor: sqlite3.Cursor, statement_timeout: int, for_transaction: bool
|
||||||
|
) -> None:
|
||||||
|
# Not supported.
|
||||||
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def executescript(cursor: sqlite3.Cursor, script: str) -> None:
|
def executescript(cursor: sqlite3.Cursor, script: str) -> None:
|
||||||
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
|
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
|
||||||
|
|
Loading…
Reference in New Issue