Complement image: propagate SIGTERM to all workers (#13914)

This should mean that logs from worker processes are flushed before shutdown.

When a test completes, Complement stops the docker container, which means that
synapse will receive a SIGTERM. Currently, the `complement_fork_starter` exits
immediately (without notifying the worker processes), which means that the
workers never get a chance to flush their logs before the whole container is
vaped. We can fix this by propagating the SIGTERM to the children.
This commit is contained in:
Richard van der Hoff 2022-09-26 23:07:02 +01:00 committed by GitHub
parent 2fae1a3f78
commit d6b85a2a7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 2 deletions

1
changelog.d/13914.misc Normal file
View File

@ -0,0 +1 @@
Complement image: propagate SIGTERM to all workers.

View File

@ -51,11 +51,18 @@ import argparse
import importlib import importlib
import itertools import itertools
import multiprocessing import multiprocessing
import os
import signal
import sys import sys
from typing import Any, Callable, List from types import FrameType
from typing import Any, Callable, List, Optional
from twisted.internet.main import installReactor from twisted.internet.main import installReactor
# a list of the original signal handlers, before we installed our custom ones.
# We restore these in our child processes.
_original_signal_handlers: dict[int, Any] = {}
class ProxiedReactor: class ProxiedReactor:
""" """
@ -105,6 +112,11 @@ def _worker_entrypoint(
sys.argv = args sys.argv = args
# reset the custom signal handlers that we installed, so that the children start
# from a clean slate.
for sig, handler in _original_signal_handlers.items():
signal.signal(sig, handler)
from twisted.internet.epollreactor import EPollReactor from twisted.internet.epollreactor import EPollReactor
proxy_reactor._install_real_reactor(EPollReactor()) proxy_reactor._install_real_reactor(EPollReactor())
@ -167,13 +179,29 @@ def main() -> None:
update_proc.join() update_proc.join()
print("===== PREPARED DATABASE =====", file=sys.stderr) print("===== PREPARED DATABASE =====", file=sys.stderr)
processes: List[multiprocessing.Process] = []
# Install signal handlers to propagate signals to all our children, so that they
# shut down cleanly. This also inhibits our own exit, but that's good: we want to
# wait until the children have exited.
def handle_signal(signum: int, frame: Optional[FrameType]) -> None:
print(
f"complement_fork_starter: Caught signal {signum}. Stopping children.",
file=sys.stderr,
)
for p in processes:
if p.pid:
os.kill(p.pid, signum)
for sig in (signal.SIGINT, signal.SIGTERM):
_original_signal_handlers[sig] = signal.signal(sig, handle_signal)
# At this point, we've imported all the main entrypoints for all the workers. # At this point, we've imported all the main entrypoints for all the workers.
# Now we basically just fork() out to create the workers we need. # Now we basically just fork() out to create the workers we need.
# Because we're using fork(), all the workers get a clone of this launcher's # Because we're using fork(), all the workers get a clone of this launcher's
# memory space and don't need to repeat the work of loading the code! # memory space and don't need to repeat the work of loading the code!
# Instead of using fork() directly, we use the multiprocessing library, # Instead of using fork() directly, we use the multiprocessing library,
# which uses fork() on Unix platforms. # which uses fork() on Unix platforms.
processes = []
for (func, worker_args) in zip(worker_functions, args_by_worker): for (func, worker_args) in zip(worker_functions, args_by_worker):
process = multiprocessing.Process( process = multiprocessing.Process(
target=_worker_entrypoint, args=(func, proxy_reactor, worker_args) target=_worker_entrypoint, args=(func, proxy_reactor, worker_args)