From d85cba1aa0a2f6dbb988b81d331b5ba9487fe1ac Mon Sep 17 00:00:00 2001 From: realtyem Date: Tue, 8 Nov 2022 07:14:00 -0600 Subject: [PATCH] Add all Stream Writer worker types to configure_workers_and_start.py (#14197) Co-authored-by: reivilibre --- changelog.d/14197.docker | 1 + docker/configure_workers_and_start.py | 76 ++++++++++++++++++++++++--- 2 files changed, 70 insertions(+), 7 deletions(-) create mode 100644 changelog.d/14197.docker diff --git a/changelog.d/14197.docker b/changelog.d/14197.docker new file mode 100644 index 0000000000..529ccd99c5 --- /dev/null +++ b/changelog.d/14197.docker @@ -0,0 +1 @@ +Add all Stream Writer worker types to configure_workers_and_start.py. diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 1ea456b2f8..da259129d1 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -50,7 +50,12 @@ from jinja2 import Environment, FileSystemLoader MAIN_PROCESS_HTTP_LISTENER_PORT = 8080 - +# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources +# Watching /_matrix/client needs a "client" listener +# Watching /_matrix/federation needs a "federation" listener +# Watching /_matrix/media and related needs a "media" listener +# Stream Writers require "client" and "replication" listeners because they +# have to attach by instance_map to the master process and have client endpoints. WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "pusher": { "app": "synapse.app.pusher", @@ -209,6 +214,49 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { % (MAIN_PROCESS_HTTP_LISTENER_PORT,) ), }, + "account_data": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": [ + "^/_matrix/client/(r0|v3|unstable)/.*/tags", + "^/_matrix/client/(r0|v3|unstable)/.*/account_data", + ], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, + "presence": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, + "receipts": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": [ + "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", + "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", + ], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, + "to_device": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, + "typing": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": [ + "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing" + ], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, } # Templates for sections that may be inserted multiple times in config files @@ -271,7 +319,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None: outfile.write(rendered) -def add_sharding_to_shared_config( +def add_worker_roles_to_shared_config( shared_config: dict, worker_type: str, worker_name: str, @@ -309,6 +357,20 @@ def add_sharding_to_shared_config( "port": worker_port, } + elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]: + # Update the list of stream writers + # It's convienent that the name of the worker type is the same as the event stream + shared_config.setdefault("stream_writers", {}).setdefault( + worker_type, [] + ).append(worker_name) + + # Map of stream writer instance names to host/ports combos + # For now, all stream writers need http replication ports + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } + elif worker_type == "media_repository": # The first configured media worker will run the media background jobs shared_config.setdefault("media_instance_running_background_jobs", worker_name) @@ -441,11 +503,11 @@ def generate_worker_files( # Check if more than one instance of this worker type has been specified worker_type_total_count = worker_types.count(worker_type) - if worker_type_total_count > 1: - # Update the shared config with sharding-related options if necessary - add_sharding_to_shared_config( - shared_config, worker_type, worker_name, worker_port - ) + + # Update the shared config with sharding-related options if necessary + add_worker_roles_to_shared_config( + shared_config, worker_type, worker_name, worker_port + ) # Enable the worker in supervisord worker_descriptors.append(worker_config)