Add `sharding_allowed` to the WorkerTemplate rather than having a separate function for that

This commit is contained in:
Olivier Wilkinson (reivilibre) 2023-11-16 15:52:52 +00:00
parent 7d8824e2dc
commit f49dbc7ba7
1 changed files with 10 additions and 18 deletions

View File

@ -90,6 +90,9 @@ class WorkerTemplate:
shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {} shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {}
worker_extra_conf: str = "" worker_extra_conf: str = ""
# True if and only if multiple of this worker type are allowed.
sharding_allowed: bool = True
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
# Watching /_matrix/client needs a "client" listener # Watching /_matrix/client needs a "client" listener
@ -218,6 +221,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
# This worker cannot be sharded. Therefore, there should only ever be one # This worker cannot be sharded. Therefore, there should only ever be one
# background worker. This is enforced for the safety of your database. # background worker. This is enforced for the safety of your database.
shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name}, shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name},
sharding_allowed=False,
), ),
"event_creator": WorkerTemplate( "event_creator": WorkerTemplate(
listener_resources={"client"}, listener_resources={"client"},
@ -243,6 +247,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
shared_extra_conf=lambda worker_name: { shared_extra_conf=lambda worker_name: {
"stream_writers": {"account_data": [worker_name]} "stream_writers": {"account_data": [worker_name]}
}, },
sharding_allowed=False,
), ),
"presence": WorkerTemplate( "presence": WorkerTemplate(
listener_resources={"client", "replication"}, listener_resources={"client", "replication"},
@ -250,6 +255,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
shared_extra_conf=lambda worker_name: { shared_extra_conf=lambda worker_name: {
"stream_writers": {"presence": [worker_name]} "stream_writers": {"presence": [worker_name]}
}, },
sharding_allowed=False,
), ),
"receipts": WorkerTemplate( "receipts": WorkerTemplate(
listener_resources={"client", "replication"}, listener_resources={"client", "replication"},
@ -260,6 +266,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
shared_extra_conf=lambda worker_name: { shared_extra_conf=lambda worker_name: {
"stream_writers": {"receipts": [worker_name]} "stream_writers": {"receipts": [worker_name]}
}, },
sharding_allowed=False,
), ),
"to_device": WorkerTemplate( "to_device": WorkerTemplate(
listener_resources={"client", "replication"}, listener_resources={"client", "replication"},
@ -267,6 +274,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
shared_extra_conf=lambda worker_name: { shared_extra_conf=lambda worker_name: {
"stream_writers": {"to_device": [worker_name]} "stream_writers": {"to_device": [worker_name]}
}, },
sharding_allowed=False,
), ),
"typing": WorkerTemplate( "typing": WorkerTemplate(
listener_resources={"client", "replication"}, listener_resources={"client", "replication"},
@ -274,6 +282,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
shared_extra_conf=lambda worker_name: { shared_extra_conf=lambda worker_name: {
"stream_writers": {"typing": [worker_name]} "stream_writers": {"typing": [worker_name]}
}, },
sharding_allowed=False,
), ),
} }
@ -495,23 +504,6 @@ def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
return new_worker_types return new_worker_types
def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
"""Helper to check to make sure worker types that cannot have multiples do not.
Args:
worker_type: The type of worker to check against.
Returns: True if allowed, False if not
"""
return worker_type not in [
"background_worker",
"account_data",
"presence",
"receipts",
"typing",
"to_device",
]
def split_and_strip_string( def split_and_strip_string(
given_string: str, split_char: str, max_split: SupportsIndex = -1 given_string: str, split_char: str, max_split: SupportsIndex = -1
) -> List[str]: ) -> List[str]:
@ -637,7 +629,7 @@ def parse_worker_types(
) )
if worker_type in worker_type_shard_counter: if worker_type in worker_type_shard_counter:
if not is_sharding_allowed_for_worker_type(worker_type): if not WORKERS_CONFIG[worker_type].sharding_allowed:
error( error(
f"There can be only a single worker with {worker_type} " f"There can be only a single worker with {worker_type} "
"type. Please recount and remove." "type. Please recount and remove."