Make sure that we close db connections opened during init
We should explicitly close any db connections we open, because failing to do so can block other transactions as per https://github.com/matrix-org/synapse/issues/3682. Let's also try to factor out some of the boilerplate by having server classes define their datastore class rather than duplicating the whole of `setup`.
This commit is contained in:
parent
9fbaed325f
commit
0b07f02e19
|
@ -51,10 +51,7 @@ class AppserviceSlaveStore(
|
||||||
|
|
||||||
|
|
||||||
class AppserviceServer(HomeServer):
|
class AppserviceServer(HomeServer):
|
||||||
def setup(self):
|
DATASTORE_CLASS = AppserviceSlaveStore
|
||||||
logger.info("Setting up.")
|
|
||||||
self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
|
|
||||||
logger.info("Finished setting up.")
|
|
||||||
|
|
||||||
def _listen_http(self, listener_config):
|
def _listen_http(self, listener_config):
|
||||||
port = listener_config["port"]
|
port = listener_config["port"]
|
||||||
|
|
|
@ -74,10 +74,7 @@ class ClientReaderSlavedStore(
|
||||||
|
|
||||||
|
|
||||||
class ClientReaderServer(HomeServer):
|
class ClientReaderServer(HomeServer):
|
||||||
def setup(self):
|
DATASTORE_CLASS = ClientReaderSlavedStore
|
||||||
logger.info("Setting up.")
|
|
||||||
self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
|
|
||||||
logger.info("Finished setting up.")
|
|
||||||
|
|
||||||
def _listen_http(self, listener_config):
|
def _listen_http(self, listener_config):
|
||||||
port = listener_config["port"]
|
port = listener_config["port"]
|
||||||
|
|
|
@ -90,10 +90,7 @@ class EventCreatorSlavedStore(
|
||||||
|
|
||||||
|
|
||||||
class EventCreatorServer(HomeServer):
|
class EventCreatorServer(HomeServer):
|
||||||
def setup(self):
|
DATASTORE_CLASS = EventCreatorSlavedStore
|
||||||
logger.info("Setting up.")
|
|
||||||
self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
|
|
||||||
logger.info("Finished setting up.")
|
|
||||||
|
|
||||||
def _listen_http(self, listener_config):
|
def _listen_http(self, listener_config):
|
||||||
port = listener_config["port"]
|
port = listener_config["port"]
|
||||||
|
|
|
@ -72,10 +72,7 @@ class FederationReaderSlavedStore(
|
||||||
|
|
||||||
|
|
||||||
class FederationReaderServer(HomeServer):
|
class FederationReaderServer(HomeServer):
|
||||||
def setup(self):
|
DATASTORE_CLASS = FederationReaderSlavedStore
|
||||||
logger.info("Setting up.")
|
|
||||||
self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
|
|
||||||
logger.info("Finished setting up.")
|
|
||||||
|
|
||||||
def _listen_http(self, listener_config):
|
def _listen_http(self, listener_config):
|
||||||
port = listener_config["port"]
|
port = listener_config["port"]
|
||||||
|
|
|
@ -78,10 +78,7 @@ class FederationSenderSlaveStore(
|
||||||
|
|
||||||
|
|
||||||
class FederationSenderServer(HomeServer):
|
class FederationSenderServer(HomeServer):
|
||||||
def setup(self):
|
DATASTORE_CLASS = FederationSenderSlaveStore
|
||||||
logger.info("Setting up.")
|
|
||||||
self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
|
|
||||||
logger.info("Finished setting up.")
|
|
||||||
|
|
||||||
def _listen_http(self, listener_config):
|
def _listen_http(self, listener_config):
|
||||||
port = listener_config["port"]
|
port = listener_config["port"]
|
||||||
|
|
|
@ -148,10 +148,7 @@ class FrontendProxySlavedStore(
|
||||||
|
|
||||||
|
|
||||||
class FrontendProxyServer(HomeServer):
|
class FrontendProxyServer(HomeServer):
|
||||||
def setup(self):
|
DATASTORE_CLASS = FrontendProxySlavedStore
|
||||||
logger.info("Setting up.")
|
|
||||||
self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
|
|
||||||
logger.info("Finished setting up.")
|
|
||||||
|
|
||||||
def _listen_http(self, listener_config):
|
def _listen_http(self, listener_config):
|
||||||
port = listener_config["port"]
|
port = listener_config["port"]
|
||||||
|
|
|
@ -62,7 +62,7 @@ from synapse.rest.key.v1.server_key_resource import LocalKey
|
||||||
from synapse.rest.key.v2 import KeyApiV2Resource
|
from synapse.rest.key.v2 import KeyApiV2Resource
|
||||||
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage import are_all_users_on_domain
|
from synapse.storage import DataStore, are_all_users_on_domain
|
||||||
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
|
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
|
||||||
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
|
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
|
||||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||||
|
@ -111,6 +111,8 @@ def build_resource_for_web_client(hs):
|
||||||
|
|
||||||
|
|
||||||
class SynapseHomeServer(HomeServer):
|
class SynapseHomeServer(HomeServer):
|
||||||
|
DATASTORE_CLASS = DataStore
|
||||||
|
|
||||||
def _listener_http(self, config, listener_config):
|
def _listener_http(self, config, listener_config):
|
||||||
port = listener_config["port"]
|
port = listener_config["port"]
|
||||||
bind_addresses = listener_config["bind_addresses"]
|
bind_addresses = listener_config["bind_addresses"]
|
||||||
|
@ -356,13 +358,13 @@ def setup(config_options):
|
||||||
logger.info("Preparing database: %s...", config.database_config['name'])
|
logger.info("Preparing database: %s...", config.database_config['name'])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
db_conn = hs.get_db_conn(run_new_connection=False)
|
with hs.get_db_conn(run_new_connection=False) as db_conn:
|
||||||
prepare_database(db_conn, database_engine, config=config)
|
prepare_database(db_conn, database_engine, config=config)
|
||||||
database_engine.on_new_connection(db_conn)
|
database_engine.on_new_connection(db_conn)
|
||||||
|
|
||||||
hs.run_startup_checks(db_conn, database_engine)
|
hs.run_startup_checks(db_conn, database_engine)
|
||||||
|
|
||||||
db_conn.commit()
|
db_conn.commit()
|
||||||
except UpgradeDatabaseException:
|
except UpgradeDatabaseException:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
"\nFailed to upgrade database.\n"
|
"\nFailed to upgrade database.\n"
|
||||||
|
|
|
@ -60,10 +60,7 @@ class MediaRepositorySlavedStore(
|
||||||
|
|
||||||
|
|
||||||
class MediaRepositoryServer(HomeServer):
|
class MediaRepositoryServer(HomeServer):
|
||||||
def setup(self):
|
DATASTORE_CLASS = MediaRepositorySlavedStore
|
||||||
logger.info("Setting up.")
|
|
||||||
self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
|
|
||||||
logger.info("Finished setting up.")
|
|
||||||
|
|
||||||
def _listen_http(self, listener_config):
|
def _listen_http(self, listener_config):
|
||||||
port = listener_config["port"]
|
port = listener_config["port"]
|
||||||
|
|
|
@ -78,10 +78,7 @@ class PusherSlaveStore(
|
||||||
|
|
||||||
|
|
||||||
class PusherServer(HomeServer):
|
class PusherServer(HomeServer):
|
||||||
def setup(self):
|
DATASTORE_CLASS = PusherSlaveStore
|
||||||
logger.info("Setting up.")
|
|
||||||
self.datastore = PusherSlaveStore(self.get_db_conn(), self)
|
|
||||||
logger.info("Finished setting up.")
|
|
||||||
|
|
||||||
def remove_pusher(self, app_id, push_key, user_id):
|
def remove_pusher(self, app_id, push_key, user_id):
|
||||||
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
|
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
|
||||||
|
|
|
@ -249,10 +249,7 @@ class SynchrotronApplicationService(object):
|
||||||
|
|
||||||
|
|
||||||
class SynchrotronServer(HomeServer):
|
class SynchrotronServer(HomeServer):
|
||||||
def setup(self):
|
DATASTORE_CLASS = SynchrotronSlavedStore
|
||||||
logger.info("Setting up.")
|
|
||||||
self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
|
|
||||||
logger.info("Finished setting up.")
|
|
||||||
|
|
||||||
def _listen_http(self, listener_config):
|
def _listen_http(self, listener_config):
|
||||||
port = listener_config["port"]
|
port = listener_config["port"]
|
||||||
|
|
|
@ -94,10 +94,7 @@ class UserDirectorySlaveStore(
|
||||||
|
|
||||||
|
|
||||||
class UserDirectoryServer(HomeServer):
|
class UserDirectoryServer(HomeServer):
|
||||||
def setup(self):
|
DATASTORE_CLASS = UserDirectorySlaveStore
|
||||||
logger.info("Setting up.")
|
|
||||||
self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
|
|
||||||
logger.info("Finished setting up.")
|
|
||||||
|
|
||||||
def _listen_http(self, listener_config):
|
def _listen_http(self, listener_config):
|
||||||
port = listener_config["port"]
|
port = listener_config["port"]
|
||||||
|
|
|
@ -81,7 +81,6 @@ from synapse.server_notices.server_notices_manager import ServerNoticesManager
|
||||||
from synapse.server_notices.server_notices_sender import ServerNoticesSender
|
from synapse.server_notices.server_notices_sender import ServerNoticesSender
|
||||||
from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender
|
from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender
|
||||||
from synapse.state import StateHandler, StateResolutionHandler
|
from synapse.state import StateHandler, StateResolutionHandler
|
||||||
from synapse.storage import DataStore
|
|
||||||
from synapse.streams.events import EventSources
|
from synapse.streams.events import EventSources
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
from synapse.util.distributor import Distributor
|
from synapse.util.distributor import Distributor
|
||||||
|
@ -172,6 +171,11 @@ class HomeServer(object):
|
||||||
'room_context_handler',
|
'room_context_handler',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# This is overridden in derived application classes
|
||||||
|
# (such as synapse.app.homeserver.SynapseHomeServer) and gives the class to be
|
||||||
|
# instantiated during setup() for future return by get_datastore()
|
||||||
|
DATASTORE_CLASS = None
|
||||||
|
|
||||||
def __init__(self, hostname, reactor=None, **kwargs):
|
def __init__(self, hostname, reactor=None, **kwargs):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
|
@ -188,13 +192,20 @@ class HomeServer(object):
|
||||||
self.distributor = Distributor()
|
self.distributor = Distributor()
|
||||||
self.ratelimiter = Ratelimiter()
|
self.ratelimiter = Ratelimiter()
|
||||||
|
|
||||||
|
self.datastore = None
|
||||||
|
|
||||||
# Other kwargs are explicit dependencies
|
# Other kwargs are explicit dependencies
|
||||||
for depname in kwargs:
|
for depname in kwargs:
|
||||||
setattr(self, depname, kwargs[depname])
|
setattr(self, depname, kwargs[depname])
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
logger.info("Setting up.")
|
logger.info("Setting up.")
|
||||||
self.datastore = DataStore(self.get_db_conn(), self)
|
if self.DATASTORE_CLASS is None:
|
||||||
|
raise RuntimeError("%s does not define a DATASTORE_CLASS" % (
|
||||||
|
self.__class__.__name__,
|
||||||
|
))
|
||||||
|
with self.get_db_conn() as conn:
|
||||||
|
self.datastore = self.DATASTORE_CLASS(conn, self)
|
||||||
logger.info("Finished setting up.")
|
logger.info("Finished setting up.")
|
||||||
|
|
||||||
def get_reactor(self):
|
def get_reactor(self):
|
||||||
|
|
Loading…
Reference in New Issue