Clean up exception handling in the startup code (#9059)
Factor out the exception handling in the startup code to a utility function, and fix the some logging and exit code stuff.
This commit is contained in:
parent
4e04435bda
commit
671138f658
|
@ -0,0 +1 @@
|
||||||
|
Fix incorrect exit code when there is an error at startup.
|
|
@ -1,5 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Copyright 2017 New Vector Ltd
|
# Copyright 2017 New Vector Ltd
|
||||||
|
# Copyright 2019-2021 The Matrix.org Foundation C.I.C
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -19,7 +20,7 @@ import signal
|
||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
from typing import Iterable
|
from typing import Awaitable, Callable, Iterable
|
||||||
|
|
||||||
from typing_extensions import NoReturn
|
from typing_extensions import NoReturn
|
||||||
|
|
||||||
|
@ -143,6 +144,45 @@ def quit_with_error(error_string: str) -> NoReturn:
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
|
||||||
|
"""Register a callback with the reactor, to be called once it is running
|
||||||
|
|
||||||
|
This can be used to initialise parts of the system which require an asynchronous
|
||||||
|
setup.
|
||||||
|
|
||||||
|
Any exception raised by the callback will be printed and logged, and the process
|
||||||
|
will exit.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def wrapper():
|
||||||
|
try:
|
||||||
|
await cb(*args, **kwargs)
|
||||||
|
except Exception:
|
||||||
|
# previously, we used Failure().printTraceback() here, in the hope that
|
||||||
|
# would give better tracebacks than traceback.print_exc(). However, that
|
||||||
|
# doesn't handle chained exceptions (with a __cause__ or __context__) well,
|
||||||
|
# and I *think* the need for Failure() is reduced now that we mostly use
|
||||||
|
# async/await.
|
||||||
|
|
||||||
|
# Write the exception to both the logs *and* the unredirected stderr,
|
||||||
|
# because people tend to get confused if it only goes to one or the other.
|
||||||
|
#
|
||||||
|
# One problem with this is that if people are using a logging config that
|
||||||
|
# logs to the console (as is common eg under docker), they will get two
|
||||||
|
# copies of the exception. We could maybe try to detect that, but it's
|
||||||
|
# probably a cost we can bear.
|
||||||
|
logger.fatal("Error during startup", exc_info=True)
|
||||||
|
print("Error during startup:", file=sys.__stderr__)
|
||||||
|
traceback.print_exc(file=sys.__stderr__)
|
||||||
|
|
||||||
|
# it's no use calling sys.exit here, since that just raises a SystemExit
|
||||||
|
# exception which is then caught by the reactor, and everything carries
|
||||||
|
# on as normal.
|
||||||
|
os._exit(1)
|
||||||
|
|
||||||
|
reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
|
||||||
|
|
||||||
|
|
||||||
def listen_metrics(bind_addresses, port):
|
def listen_metrics(bind_addresses, port):
|
||||||
"""
|
"""
|
||||||
Start Prometheus metrics server.
|
Start Prometheus metrics server.
|
||||||
|
@ -227,7 +267,7 @@ def refresh_certificate(hs):
|
||||||
logger.info("Context factories updated.")
|
logger.info("Context factories updated.")
|
||||||
|
|
||||||
|
|
||||||
def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
|
async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
|
||||||
"""
|
"""
|
||||||
Start a Synapse server or worker.
|
Start a Synapse server or worker.
|
||||||
|
|
||||||
|
@ -241,75 +281,67 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
|
||||||
hs: homeserver instance
|
hs: homeserver instance
|
||||||
listeners: Listener configuration ('listeners' in homeserver.yaml)
|
listeners: Listener configuration ('listeners' in homeserver.yaml)
|
||||||
"""
|
"""
|
||||||
try:
|
# Set up the SIGHUP machinery.
|
||||||
# Set up the SIGHUP machinery.
|
if hasattr(signal, "SIGHUP"):
|
||||||
if hasattr(signal, "SIGHUP"):
|
|
||||||
|
|
||||||
reactor = hs.get_reactor()
|
|
||||||
|
|
||||||
@wrap_as_background_process("sighup")
|
|
||||||
def handle_sighup(*args, **kwargs):
|
|
||||||
# Tell systemd our state, if we're using it. This will silently fail if
|
|
||||||
# we're not using systemd.
|
|
||||||
sdnotify(b"RELOADING=1")
|
|
||||||
|
|
||||||
for i, args, kwargs in _sighup_callbacks:
|
|
||||||
i(*args, **kwargs)
|
|
||||||
|
|
||||||
sdnotify(b"READY=1")
|
|
||||||
|
|
||||||
# We defer running the sighup handlers until next reactor tick. This
|
|
||||||
# is so that we're in a sane state, e.g. flushing the logs may fail
|
|
||||||
# if the sighup happens in the middle of writing a log entry.
|
|
||||||
def run_sighup(*args, **kwargs):
|
|
||||||
# `callFromThread` should be "signal safe" as well as thread
|
|
||||||
# safe.
|
|
||||||
reactor.callFromThread(handle_sighup, *args, **kwargs)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGHUP, run_sighup)
|
|
||||||
|
|
||||||
register_sighup(refresh_certificate, hs)
|
|
||||||
|
|
||||||
# Load the certificate from disk.
|
|
||||||
refresh_certificate(hs)
|
|
||||||
|
|
||||||
# Start the tracer
|
|
||||||
synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
|
|
||||||
hs
|
|
||||||
)
|
|
||||||
|
|
||||||
# It is now safe to start your Synapse.
|
|
||||||
hs.start_listening(listeners)
|
|
||||||
hs.get_datastore().db_pool.start_profiling()
|
|
||||||
hs.get_pusherpool().start()
|
|
||||||
|
|
||||||
# Log when we start the shut down process.
|
|
||||||
hs.get_reactor().addSystemEventTrigger(
|
|
||||||
"before", "shutdown", logger.info, "Shutting down..."
|
|
||||||
)
|
|
||||||
|
|
||||||
setup_sentry(hs)
|
|
||||||
setup_sdnotify(hs)
|
|
||||||
|
|
||||||
# If background tasks are running on the main process, start collecting the
|
|
||||||
# phone home stats.
|
|
||||||
if hs.config.run_background_tasks:
|
|
||||||
start_phone_stats_home(hs)
|
|
||||||
|
|
||||||
# We now freeze all allocated objects in the hopes that (almost)
|
|
||||||
# everything currently allocated are things that will be used for the
|
|
||||||
# rest of time. Doing so means less work each GC (hopefully).
|
|
||||||
#
|
|
||||||
# This only works on Python 3.7
|
|
||||||
if sys.version_info >= (3, 7):
|
|
||||||
gc.collect()
|
|
||||||
gc.freeze()
|
|
||||||
except Exception:
|
|
||||||
traceback.print_exc(file=sys.stderr)
|
|
||||||
reactor = hs.get_reactor()
|
reactor = hs.get_reactor()
|
||||||
if reactor.running:
|
|
||||||
reactor.stop()
|
@wrap_as_background_process("sighup")
|
||||||
sys.exit(1)
|
def handle_sighup(*args, **kwargs):
|
||||||
|
# Tell systemd our state, if we're using it. This will silently fail if
|
||||||
|
# we're not using systemd.
|
||||||
|
sdnotify(b"RELOADING=1")
|
||||||
|
|
||||||
|
for i, args, kwargs in _sighup_callbacks:
|
||||||
|
i(*args, **kwargs)
|
||||||
|
|
||||||
|
sdnotify(b"READY=1")
|
||||||
|
|
||||||
|
# We defer running the sighup handlers until next reactor tick. This
|
||||||
|
# is so that we're in a sane state, e.g. flushing the logs may fail
|
||||||
|
# if the sighup happens in the middle of writing a log entry.
|
||||||
|
def run_sighup(*args, **kwargs):
|
||||||
|
# `callFromThread` should be "signal safe" as well as thread
|
||||||
|
# safe.
|
||||||
|
reactor.callFromThread(handle_sighup, *args, **kwargs)
|
||||||
|
|
||||||
|
signal.signal(signal.SIGHUP, run_sighup)
|
||||||
|
|
||||||
|
register_sighup(refresh_certificate, hs)
|
||||||
|
|
||||||
|
# Load the certificate from disk.
|
||||||
|
refresh_certificate(hs)
|
||||||
|
|
||||||
|
# Start the tracer
|
||||||
|
synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
|
||||||
|
hs
|
||||||
|
)
|
||||||
|
|
||||||
|
# It is now safe to start your Synapse.
|
||||||
|
hs.start_listening(listeners)
|
||||||
|
hs.get_datastore().db_pool.start_profiling()
|
||||||
|
hs.get_pusherpool().start()
|
||||||
|
|
||||||
|
# Log when we start the shut down process.
|
||||||
|
hs.get_reactor().addSystemEventTrigger(
|
||||||
|
"before", "shutdown", logger.info, "Shutting down..."
|
||||||
|
)
|
||||||
|
|
||||||
|
setup_sentry(hs)
|
||||||
|
setup_sdnotify(hs)
|
||||||
|
|
||||||
|
# If background tasks are running on the main process, start collecting the
|
||||||
|
# phone home stats.
|
||||||
|
if hs.config.run_background_tasks:
|
||||||
|
start_phone_stats_home(hs)
|
||||||
|
|
||||||
|
# We now freeze all allocated objects in the hopes that (almost)
|
||||||
|
# everything currently allocated are things that will be used for the
|
||||||
|
# rest of time. Doing so means less work each GC (hopefully).
|
||||||
|
#
|
||||||
|
# This only works on Python 3.7
|
||||||
|
if sys.version_info >= (3, 7):
|
||||||
|
gc.collect()
|
||||||
|
gc.freeze()
|
||||||
|
|
||||||
|
|
||||||
def setup_sentry(hs):
|
def setup_sentry(hs):
|
||||||
|
|
|
@ -21,7 +21,7 @@ from typing import Dict, Iterable, Optional, Set
|
||||||
|
|
||||||
from typing_extensions import ContextManager
|
from typing_extensions import ContextManager
|
||||||
|
|
||||||
from twisted.internet import address, reactor
|
from twisted.internet import address
|
||||||
|
|
||||||
import synapse
|
import synapse
|
||||||
import synapse.events
|
import synapse.events
|
||||||
|
@ -34,6 +34,7 @@ from synapse.api.urls import (
|
||||||
SERVER_KEY_V2_PREFIX,
|
SERVER_KEY_V2_PREFIX,
|
||||||
)
|
)
|
||||||
from synapse.app import _base
|
from synapse.app import _base
|
||||||
|
from synapse.app._base import register_start
|
||||||
from synapse.config._base import ConfigError
|
from synapse.config._base import ConfigError
|
||||||
from synapse.config.homeserver import HomeServerConfig
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
from synapse.config.logger import setup_logging
|
from synapse.config.logger import setup_logging
|
||||||
|
@ -960,9 +961,7 @@ def start(config_options):
|
||||||
# streams. Will no-op if no streams can be written to by this worker.
|
# streams. Will no-op if no streams can be written to by this worker.
|
||||||
hs.get_replication_streamer()
|
hs.get_replication_streamer()
|
||||||
|
|
||||||
reactor.addSystemEventTrigger(
|
register_start(_base.start, hs, config.worker_listeners)
|
||||||
"before", "startup", _base.start, hs, config.worker_listeners
|
|
||||||
)
|
|
||||||
|
|
||||||
_base.start_worker_reactor("synapse-generic-worker", config)
|
_base.start_worker_reactor("synapse-generic-worker", config)
|
||||||
|
|
||||||
|
|
|
@ -20,8 +20,7 @@ import os
|
||||||
import sys
|
import sys
|
||||||
from typing import Iterable, Iterator
|
from typing import Iterable, Iterator
|
||||||
|
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import reactor
|
||||||
from twisted.python.failure import Failure
|
|
||||||
from twisted.web.resource import EncodingResourceWrapper, IResource
|
from twisted.web.resource import EncodingResourceWrapper, IResource
|
||||||
from twisted.web.server import GzipEncoderFactory
|
from twisted.web.server import GzipEncoderFactory
|
||||||
from twisted.web.static import File
|
from twisted.web.static import File
|
||||||
|
@ -38,7 +37,7 @@ from synapse.api.urls import (
|
||||||
WEB_CLIENT_PREFIX,
|
WEB_CLIENT_PREFIX,
|
||||||
)
|
)
|
||||||
from synapse.app import _base
|
from synapse.app import _base
|
||||||
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
|
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start
|
||||||
from synapse.config._base import ConfigError
|
from synapse.config._base import ConfigError
|
||||||
from synapse.config.emailconfig import ThreepidBehaviour
|
from synapse.config.emailconfig import ThreepidBehaviour
|
||||||
from synapse.config.homeserver import HomeServerConfig
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
|
@ -414,40 +413,29 @@ def setup(config_options):
|
||||||
_base.refresh_certificate(hs)
|
_base.refresh_certificate(hs)
|
||||||
|
|
||||||
async def start():
|
async def start():
|
||||||
try:
|
# Run the ACME provisioning code, if it's enabled.
|
||||||
# Run the ACME provisioning code, if it's enabled.
|
if hs.config.acme_enabled:
|
||||||
if hs.config.acme_enabled:
|
acme = hs.get_acme_handler()
|
||||||
acme = hs.get_acme_handler()
|
# Start up the webservices which we will respond to ACME
|
||||||
# Start up the webservices which we will respond to ACME
|
# challenges with, and then provision.
|
||||||
# challenges with, and then provision.
|
await acme.start_listening()
|
||||||
await acme.start_listening()
|
await do_acme()
|
||||||
await do_acme()
|
|
||||||
|
|
||||||
# Check if it needs to be reprovisioned every day.
|
# Check if it needs to be reprovisioned every day.
|
||||||
hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
|
hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
|
||||||
|
|
||||||
# Load the OIDC provider metadatas, if OIDC is enabled.
|
# Load the OIDC provider metadatas, if OIDC is enabled.
|
||||||
if hs.config.oidc_enabled:
|
if hs.config.oidc_enabled:
|
||||||
oidc = hs.get_oidc_handler()
|
oidc = hs.get_oidc_handler()
|
||||||
# Loading the provider metadata also ensures the provider config is valid.
|
# Loading the provider metadata also ensures the provider config is valid.
|
||||||
await oidc.load_metadata()
|
await oidc.load_metadata()
|
||||||
await oidc.load_jwks()
|
await oidc.load_jwks()
|
||||||
|
|
||||||
_base.start(hs, config.listeners)
|
await _base.start(hs, config.listeners)
|
||||||
|
|
||||||
hs.get_datastore().db_pool.updates.start_doing_background_updates()
|
hs.get_datastore().db_pool.updates.start_doing_background_updates()
|
||||||
except Exception:
|
|
||||||
# Print the exception and bail out.
|
|
||||||
print("Error during startup:", file=sys.stderr)
|
|
||||||
|
|
||||||
# this gives better tracebacks than traceback.print_exc()
|
register_start(start)
|
||||||
Failure().printTraceback(file=sys.stderr)
|
|
||||||
|
|
||||||
if reactor.running:
|
|
||||||
reactor.stop()
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
reactor.callWhenRunning(lambda: defer.ensureDeferred(start()))
|
|
||||||
|
|
||||||
return hs
|
return hs
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue