Convert listener_resources and endpoint_patterns to Set[str]

This commit is contained in:
Olivier Wilkinson (reivilibre) 2023-11-16 15:12:18 +00:00
parent 67d4fc8b99
commit 94a85b36f7
1 changed files with 44 additions and 46 deletions

View File

@ -90,8 +90,8 @@ MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
# have to attach by instance_map to the master process and have client endpoints.
@dataclass
class WorkerTemplate:
listener_resources: List[str] = field(default_factory=list)
endpoint_patterns: List[str] = field(default_factory=list)
listener_resources: Set[str] = field(default_factory=set)
endpoint_patterns: Set[str] = field(default_factory=set)
# (worker_name) -> {}
shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {}
worker_extra_conf: str = ""
@ -100,24 +100,24 @@ class WorkerTemplate:
WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"pusher": WorkerTemplate(),
"user_dir": WorkerTemplate(
listener_resources=["client"],
endpoint_patterns=[
listener_resources={"client"},
endpoint_patterns={
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
],
},
shared_extra_conf=lambda worker_name: {
"update_user_directory_from_worker": worker_name
},
),
"media_repository": WorkerTemplate(
listener_resources=["media"],
endpoint_patterns=[
listener_resources={"media"},
endpoint_patterns={
"^/_matrix/media/",
"^/_synapse/admin/v1/purge_media_cache$",
"^/_synapse/admin/v1/room/.*/media.*$",
"^/_synapse/admin/v1/user/.*/media.*$",
"^/_synapse/admin/v1/media/.*$",
"^/_synapse/admin/v1/quarantine_media/.*$",
],
},
# The first configured media worker will run the media background jobs
shared_extra_conf=lambda worker_name: {
"enable_media_repo": False,
@ -132,17 +132,17 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
),
"federation_sender": WorkerTemplate(),
"synchrotron": WorkerTemplate(
listener_resources=["client"],
endpoint_patterns=[
listener_resources={"client"},
endpoint_patterns={
"^/_matrix/client/(v2_alpha|r0|v3)/sync$",
"^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
"^/_matrix/client/(api/v1|r0|v3)/initialSync$",
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
],
},
),
"client_reader": WorkerTemplate(
listener_resources=["client"],
endpoint_patterns=[
listener_resources={"client"},
endpoint_patterns={
"^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
@ -170,11 +170,11 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
"^/_matrix/client/(r0|v3|unstable)/capabilities$",
"^/_matrix/client/(r0|v3|unstable)/notifications$",
],
},
),
"federation_reader": WorkerTemplate(
listener_resources=["federation"],
endpoint_patterns=[
listener_resources={"federation"},
endpoint_patterns={
"^/_matrix/federation/(v1|v2)/event/",
"^/_matrix/federation/(v1|v2)/state/",
"^/_matrix/federation/(v1|v2)/state_ids/",
@ -194,14 +194,14 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/federation/(v1|v2)/user/devices/",
"^/_matrix/federation/(v1|v2)/get_groups_publicised$",
"^/_matrix/key/v2/query",
],
},
),
"federation_inbound": WorkerTemplate(
listener_resources=["federation"],
endpoint_patterns=["/_matrix/federation/(v1|v2)/send/"],
listener_resources={"federation"},
endpoint_patterns={"/_matrix/federation/(v1|v2)/send/"},
),
"event_persister": WorkerTemplate(
listener_resources=["replication"],
listener_resources={"replication"},
),
"background_worker": WorkerTemplate(
# This worker cannot be sharded. Therefore, there should only ever be one
@ -209,45 +209,45 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name},
),
"event_creator": WorkerTemplate(
listener_resources=["client"],
endpoint_patterns=[
listener_resources={"client"},
endpoint_patterns={
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
],
},
),
"frontend_proxy": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"},
),
"account_data": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=[
listener_resources={"client", "replication"},
endpoint_patterns={
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
],
},
),
"presence": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"},
),
"receipts": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=[
listener_resources={"client", "replication"},
endpoint_patterns={
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
],
},
),
"to_device": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"},
),
"typing": WorkerTemplate(
listener_resources=["client", "replication"],
endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"],
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"},
),
}
@ -406,15 +406,11 @@ def merge_worker_template_configs(
# copy existing_template without any replacements
new_template: WorkerTemplate = dataclasses.replace(existing_template)
# merge the two lists, remove duplicates
new_template.listener_resources = list(
set(new_template.listener_resources + to_be_merged_template.listener_resources)
)
# add listener resources from the other set
new_template.listener_resources |= to_be_merged_template.listener_resources
# merge the two lists, remove duplicates
new_template.endpoint_patterns = list(
set(new_template.endpoint_patterns + to_be_merged_template.endpoint_patterns)
)
# add endpoint patterns from the other set
new_template.endpoint_patterns |= to_be_merged_template.endpoint_patterns
# merge dictionaries; the worker name will be replaced later
new_template.shared_extra_conf = lambda worker_name: {
@ -444,6 +440,8 @@ def insert_worker_name_for_worker_config(
"""
dict_to_edit = dataclasses.asdict(existing_template)
dict_to_edit["shared_extra_conf"] = existing_template.shared_extra_conf(worker_name)
dict_to_edit["endpoint_patterns"] = sorted(existing_template.endpoint_patterns)
dict_to_edit["listener_resources"] = sorted(existing_template.listener_resources)
return dict_to_edit
@ -760,7 +758,7 @@ def generate_worker_files(
# Map locations to upstreams (corresponding to worker types) in Nginx
# but only if we use the appropriate worker type
for worker_type in all_worker_types_in_use:
for endpoint_pattern in WORKERS_CONFIG[worker_type].endpoint_patterns:
for endpoint_pattern in sorted(WORKERS_CONFIG[worker_type].endpoint_patterns):
nginx_locations[endpoint_pattern] = f"http://{worker_type}"
# For each worker type specified by the user, create config values and write it's