Db txn set isolation level (#11799)
Co-authored-by: Brendan Abolivier <babolivier@matrix.org>
This commit is contained in:
parent
fc8598bc87
commit
b59d285f7c
|
@ -0,0 +1 @@
|
||||||
|
Preparation for reducing Postgres serialization errors: allow setting transaction isolation level. Contributed by Nick @ Beeper.
|
|
@ -702,6 +702,7 @@ class DatabasePool:
|
||||||
func: Callable[..., R],
|
func: Callable[..., R],
|
||||||
*args: Any,
|
*args: Any,
|
||||||
db_autocommit: bool = False,
|
db_autocommit: bool = False,
|
||||||
|
isolation_level: Optional[int] = None,
|
||||||
**kwargs: Any,
|
**kwargs: Any,
|
||||||
) -> R:
|
) -> R:
|
||||||
"""Starts a transaction on the database and runs a given function
|
"""Starts a transaction on the database and runs a given function
|
||||||
|
@ -724,6 +725,7 @@ class DatabasePool:
|
||||||
called multiple times if the transaction is retried, so must
|
called multiple times if the transaction is retried, so must
|
||||||
correctly handle that case.
|
correctly handle that case.
|
||||||
|
|
||||||
|
isolation_level: Set the server isolation level for this transaction.
|
||||||
args: positional args to pass to `func`
|
args: positional args to pass to `func`
|
||||||
kwargs: named args to pass to `func`
|
kwargs: named args to pass to `func`
|
||||||
|
|
||||||
|
@ -763,6 +765,7 @@ class DatabasePool:
|
||||||
func: Callable[..., R],
|
func: Callable[..., R],
|
||||||
*args: Any,
|
*args: Any,
|
||||||
db_autocommit: bool = False,
|
db_autocommit: bool = False,
|
||||||
|
isolation_level: Optional[int] = None,
|
||||||
**kwargs: Any,
|
**kwargs: Any,
|
||||||
) -> R:
|
) -> R:
|
||||||
"""Wraps the .runWithConnection() method on the underlying db_pool.
|
"""Wraps the .runWithConnection() method on the underlying db_pool.
|
||||||
|
@ -775,6 +778,7 @@ class DatabasePool:
|
||||||
db_autocommit: Whether to run the function in "autocommit" mode,
|
db_autocommit: Whether to run the function in "autocommit" mode,
|
||||||
i.e. outside of a transaction. This is useful for transaction
|
i.e. outside of a transaction. This is useful for transaction
|
||||||
that are only a single query. Currently only affects postgres.
|
that are only a single query. Currently only affects postgres.
|
||||||
|
isolation_level: Set the server isolation level for this transaction.
|
||||||
kwargs: named args to pass to `func`
|
kwargs: named args to pass to `func`
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
|
@ -834,6 +838,10 @@ class DatabasePool:
|
||||||
try:
|
try:
|
||||||
if db_autocommit:
|
if db_autocommit:
|
||||||
self.engine.attempt_to_set_autocommit(conn, True)
|
self.engine.attempt_to_set_autocommit(conn, True)
|
||||||
|
if isolation_level is not None:
|
||||||
|
self.engine.attempt_to_set_isolation_level(
|
||||||
|
conn, isolation_level
|
||||||
|
)
|
||||||
|
|
||||||
db_conn = LoggingDatabaseConnection(
|
db_conn = LoggingDatabaseConnection(
|
||||||
conn, self.engine, "runWithConnection"
|
conn, self.engine, "runWithConnection"
|
||||||
|
@ -842,6 +850,8 @@ class DatabasePool:
|
||||||
finally:
|
finally:
|
||||||
if db_autocommit:
|
if db_autocommit:
|
||||||
self.engine.attempt_to_set_autocommit(conn, False)
|
self.engine.attempt_to_set_autocommit(conn, False)
|
||||||
|
if isolation_level:
|
||||||
|
self.engine.attempt_to_set_isolation_level(conn, None)
|
||||||
|
|
||||||
return await make_deferred_yieldable(
|
return await make_deferred_yieldable(
|
||||||
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
|
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
|
||||||
|
|
|
@ -12,11 +12,18 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import abc
|
import abc
|
||||||
from typing import Generic, TypeVar
|
from enum import IntEnum
|
||||||
|
from typing import Generic, Optional, TypeVar
|
||||||
|
|
||||||
from synapse.storage.types import Connection
|
from synapse.storage.types import Connection
|
||||||
|
|
||||||
|
|
||||||
|
class IsolationLevel(IntEnum):
|
||||||
|
READ_COMMITTED: int = 1
|
||||||
|
REPEATABLE_READ: int = 2
|
||||||
|
SERIALIZABLE: int = 3
|
||||||
|
|
||||||
|
|
||||||
class IncorrectDatabaseSetup(RuntimeError):
|
class IncorrectDatabaseSetup(RuntimeError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -109,3 +116,13 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
|
||||||
commit/rollback the connections.
|
commit/rollback the connections.
|
||||||
"""
|
"""
|
||||||
...
|
...
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def attempt_to_set_isolation_level(
|
||||||
|
self, conn: Connection, isolation_level: Optional[int]
|
||||||
|
):
|
||||||
|
"""Attempt to set the connections isolation level.
|
||||||
|
|
||||||
|
Note: This has no effect on SQLite3, as transactions are SERIALIZABLE by default.
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
|
@ -13,8 +13,13 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from typing import Mapping, Optional
|
||||||
|
|
||||||
from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup
|
from synapse.storage.engines._base import (
|
||||||
|
BaseDatabaseEngine,
|
||||||
|
IncorrectDatabaseSetup,
|
||||||
|
IsolationLevel,
|
||||||
|
)
|
||||||
from synapse.storage.types import Connection
|
from synapse.storage.types import Connection
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -34,6 +39,15 @@ class PostgresEngine(BaseDatabaseEngine):
|
||||||
self.synchronous_commit = database_config.get("synchronous_commit", True)
|
self.synchronous_commit = database_config.get("synchronous_commit", True)
|
||||||
self._version = None # unknown as yet
|
self._version = None # unknown as yet
|
||||||
|
|
||||||
|
self.isolation_level_map: Mapping[int, int] = {
|
||||||
|
IsolationLevel.READ_COMMITTED: self.module.extensions.ISOLATION_LEVEL_READ_COMMITTED,
|
||||||
|
IsolationLevel.REPEATABLE_READ: self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ,
|
||||||
|
IsolationLevel.SERIALIZABLE: self.module.extensions.ISOLATION_LEVEL_SERIALIZABLE,
|
||||||
|
}
|
||||||
|
self.default_isolation_level = (
|
||||||
|
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def single_threaded(self) -> bool:
|
def single_threaded(self) -> bool:
|
||||||
return False
|
return False
|
||||||
|
@ -104,9 +118,7 @@ class PostgresEngine(BaseDatabaseEngine):
|
||||||
return sql.replace("?", "%s")
|
return sql.replace("?", "%s")
|
||||||
|
|
||||||
def on_new_connection(self, db_conn):
|
def on_new_connection(self, db_conn):
|
||||||
db_conn.set_isolation_level(
|
db_conn.set_isolation_level(self.default_isolation_level)
|
||||||
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
|
|
||||||
)
|
|
||||||
|
|
||||||
# Set the bytea output to escape, vs the default of hex
|
# Set the bytea output to escape, vs the default of hex
|
||||||
cursor = db_conn.cursor()
|
cursor = db_conn.cursor()
|
||||||
|
@ -175,3 +187,12 @@ class PostgresEngine(BaseDatabaseEngine):
|
||||||
|
|
||||||
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
|
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
|
||||||
return conn.set_session(autocommit=autocommit) # type: ignore
|
return conn.set_session(autocommit=autocommit) # type: ignore
|
||||||
|
|
||||||
|
def attempt_to_set_isolation_level(
|
||||||
|
self, conn: Connection, isolation_level: Optional[int]
|
||||||
|
):
|
||||||
|
if isolation_level is None:
|
||||||
|
isolation_level = self.default_isolation_level
|
||||||
|
else:
|
||||||
|
isolation_level = self.isolation_level_map[isolation_level]
|
||||||
|
return conn.set_isolation_level(isolation_level) # type: ignore
|
||||||
|
|
|
@ -15,6 +15,7 @@ import platform
|
||||||
import struct
|
import struct
|
||||||
import threading
|
import threading
|
||||||
import typing
|
import typing
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from synapse.storage.engines import BaseDatabaseEngine
|
from synapse.storage.engines import BaseDatabaseEngine
|
||||||
from synapse.storage.types import Connection
|
from synapse.storage.types import Connection
|
||||||
|
@ -122,6 +123,12 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
|
||||||
# set the connection to autocommit mode.
|
# set the connection to autocommit mode.
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def attempt_to_set_isolation_level(
|
||||||
|
self, conn: Connection, isolation_level: Optional[int]
|
||||||
|
):
|
||||||
|
# All transactions are SERIALIZABLE by default in sqllite
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
# Following functions taken from: https://github.com/coleifer/peewee
|
# Following functions taken from: https://github.com/coleifer/peewee
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue