Remove `worker_replication_*` settings (#15491)
* Add master to the instance_map as part of Complement, have ReplicationEndpoint look at instance_map for master. * Fix typo in drive by. * Remove unnecessary worker_replication_* bits from unit tests and add master to instance_map(hopefully in the right place) * Several updates: 1. Switch from master to main for naming the main process in the instance_map. Add useful constants for easier adjustment of names in the future. 2. Add backwards compatibility for worker_replication_* to allow time to transition to new style. Make sure to prioritize declaring main directly on the instance_map. 3. Clean up old comments/commented out code. 4. Adjust unit tests to match with new code. 5. Adjust Complement setup infrastructure to only add main to the instance_map if workers are used and remove now unused options from the worker.yaml template. * Initial Docs upload * Changelog * Missed some commented out code that can go now * Remove TODO comment that no longer holds true. * Fix links in docs * More docs * Remove debug logging * Apply suggestions from code review Co-authored-by: reivilibre <olivier@librepush.net> * Apply suggestions from code review Co-authored-by: reivilibre <olivier@librepush.net> * Update version to latest, include completeish before/after examples in upgrade notes. * Fix up and docs too --------- Co-authored-by: reivilibre <olivier@librepush.net>
This commit is contained in:
parent
722ccc30b5
commit
e4f545c452
|
@ -0,0 +1 @@
|
||||||
|
Remove need for `worker_replication_*` based settings in worker configuration yaml by placing this data directly on the `instance_map` instead.
|
|
@ -6,10 +6,6 @@
|
||||||
worker_app: "{{ app }}"
|
worker_app: "{{ app }}"
|
||||||
worker_name: "{{ name }}"
|
worker_name: "{{ name }}"
|
||||||
|
|
||||||
# The replication listener on the main synapse process.
|
|
||||||
worker_replication_host: 127.0.0.1
|
|
||||||
worker_replication_http_port: 9093
|
|
||||||
|
|
||||||
worker_listeners:
|
worker_listeners:
|
||||||
- type: http
|
- type: http
|
||||||
port: {{ port }}
|
port: {{ port }}
|
||||||
|
|
|
@ -69,6 +69,9 @@ import yaml
|
||||||
from jinja2 import Environment, FileSystemLoader
|
from jinja2 import Environment, FileSystemLoader
|
||||||
|
|
||||||
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
|
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
|
||||||
|
MAIN_PROCESS_INSTANCE_NAME = "main"
|
||||||
|
MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
|
||||||
|
MAIN_PROCESS_REPLICATION_PORT = 9093
|
||||||
|
|
||||||
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
|
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
|
||||||
# during processing with the name of the worker.
|
# during processing with the name of the worker.
|
||||||
|
@ -719,8 +722,8 @@ def generate_worker_files(
|
||||||
# shared config file.
|
# shared config file.
|
||||||
listeners = [
|
listeners = [
|
||||||
{
|
{
|
||||||
"port": 9093,
|
"port": MAIN_PROCESS_REPLICATION_PORT,
|
||||||
"bind_address": "127.0.0.1",
|
"bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
|
||||||
"type": "http",
|
"type": "http",
|
||||||
"resources": [{"names": ["replication"]}],
|
"resources": [{"names": ["replication"]}],
|
||||||
}
|
}
|
||||||
|
@ -870,6 +873,14 @@ def generate_worker_files(
|
||||||
|
|
||||||
workers_in_use = len(requested_worker_types) > 0
|
workers_in_use = len(requested_worker_types) > 0
|
||||||
|
|
||||||
|
# If there are workers, add the main process to the instance_map too.
|
||||||
|
if workers_in_use:
|
||||||
|
instance_map = shared_config.setdefault("instance_map", {})
|
||||||
|
instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
|
||||||
|
"host": MAIN_PROCESS_LOCALHOST_ADDRESS,
|
||||||
|
"port": MAIN_PROCESS_REPLICATION_PORT,
|
||||||
|
}
|
||||||
|
|
||||||
# Shared homeserver config
|
# Shared homeserver config
|
||||||
convert(
|
convert(
|
||||||
"/conf/shared.yaml.j2",
|
"/conf/shared.yaml.j2",
|
||||||
|
|
|
@ -1,10 +1,6 @@
|
||||||
worker_app: synapse.app.generic_worker
|
worker_app: synapse.app.generic_worker
|
||||||
worker_name: generic_worker1
|
worker_name: generic_worker1
|
||||||
|
|
||||||
# The replication listener on the main synapse process.
|
|
||||||
worker_replication_host: 127.0.0.1
|
|
||||||
worker_replication_http_port: 9093
|
|
||||||
|
|
||||||
worker_listeners:
|
worker_listeners:
|
||||||
- type: http
|
- type: http
|
||||||
port: 8083
|
port: 8083
|
||||||
|
|
|
@ -88,6 +88,84 @@ process, for example:
|
||||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||||
```
|
```
|
||||||
|
|
||||||
|
# Upgrading to v1.84.0
|
||||||
|
|
||||||
|
## Deprecation of `worker_replication_*` configuration settings
|
||||||
|
|
||||||
|
When using workers,
|
||||||
|
* `worker_replication_host`
|
||||||
|
* `worker_replication_http_port`
|
||||||
|
* `worker_replication_http_tls`
|
||||||
|
|
||||||
|
can now be removed from individual worker YAML configuration ***if*** you add the main process to the `instance_map` in the shared YAML configuration,
|
||||||
|
using the name `main`.
|
||||||
|
|
||||||
|
### Before:
|
||||||
|
Shared YAML
|
||||||
|
```yaml
|
||||||
|
instance_map:
|
||||||
|
generic_worker1:
|
||||||
|
host: localhost
|
||||||
|
port: 5678
|
||||||
|
tls: false
|
||||||
|
```
|
||||||
|
Worker YAML
|
||||||
|
```yaml
|
||||||
|
worker_app: synapse.app.generic_worker
|
||||||
|
worker_name: generic_worker1
|
||||||
|
|
||||||
|
worker_replication_host: localhost
|
||||||
|
worker_replication_http_port: 3456
|
||||||
|
worker_replication_http_tls: false
|
||||||
|
|
||||||
|
worker_listeners:
|
||||||
|
- type: http
|
||||||
|
port: 1234
|
||||||
|
resources:
|
||||||
|
- names: [client, federation]
|
||||||
|
- type: http
|
||||||
|
port: 5678
|
||||||
|
resources:
|
||||||
|
- names: [replication]
|
||||||
|
|
||||||
|
worker_log_config: /etc/matrix-synapse/generic-worker-log.yaml
|
||||||
|
```
|
||||||
|
### After:
|
||||||
|
Shared YAML
|
||||||
|
```yaml
|
||||||
|
instance_map:
|
||||||
|
main:
|
||||||
|
host: localhost
|
||||||
|
port: 3456
|
||||||
|
tls: false
|
||||||
|
generic_worker1:
|
||||||
|
host: localhost
|
||||||
|
port: 5678
|
||||||
|
tls: false
|
||||||
|
```
|
||||||
|
Worker YAML
|
||||||
|
```yaml
|
||||||
|
worker_app: synapse.app.generic_worker
|
||||||
|
worker_name: generic_worker1
|
||||||
|
|
||||||
|
worker_listeners:
|
||||||
|
- type: http
|
||||||
|
port: 1234
|
||||||
|
resources:
|
||||||
|
- names: [client, federation]
|
||||||
|
- type: http
|
||||||
|
port: 5678
|
||||||
|
resources:
|
||||||
|
- names: [replication]
|
||||||
|
|
||||||
|
worker_log_config: /etc/matrix-synapse/generic-worker-log.yaml
|
||||||
|
|
||||||
|
```
|
||||||
|
Notes:
|
||||||
|
* `tls` is optional but mirrors the functionality of `worker_replication_http_tls`
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Upgrading to v1.81.0
|
# Upgrading to v1.81.0
|
||||||
|
|
||||||
## Application service path & authentication deprecations
|
## Application service path & authentication deprecations
|
||||||
|
|
|
@ -3884,15 +3884,20 @@ federation_sender_instances:
|
||||||
### `instance_map`
|
### `instance_map`
|
||||||
|
|
||||||
When using workers this should be a map from [`worker_name`](#worker_name) to the
|
When using workers this should be a map from [`worker_name`](#worker_name) to the
|
||||||
HTTP replication listener of the worker, if configured.
|
HTTP replication listener of the worker, if configured, and to the main process.
|
||||||
Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs
|
Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs
|
||||||
a HTTP replication listener, and that listener should be included in the `instance_map`.
|
a HTTP replication listener, and that listener should be included in the `instance_map`.
|
||||||
(The main process also needs an HTTP replication listener, but it should not be
|
The main process also needs an entry on the `instance_map`, and it should be listed under
|
||||||
listed in the `instance_map`.)
|
`main` **if even one other worker exists**. Ensure the port matches with what is declared
|
||||||
|
inside the `listener` block for a `replication` listener.
|
||||||
|
|
||||||
|
|
||||||
Example configuration:
|
Example configuration:
|
||||||
```yaml
|
```yaml
|
||||||
instance_map:
|
instance_map:
|
||||||
|
main:
|
||||||
|
host: localhost
|
||||||
|
port: 8030
|
||||||
worker1:
|
worker1:
|
||||||
host: localhost
|
host: localhost
|
||||||
port: 8034
|
port: 8034
|
||||||
|
@ -4024,6 +4029,7 @@ worker_name: generic_worker1
|
||||||
```
|
```
|
||||||
---
|
---
|
||||||
### `worker_replication_host`
|
### `worker_replication_host`
|
||||||
|
*Deprecated as of version 1.84.0. Place `host` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.*
|
||||||
|
|
||||||
The HTTP replication endpoint that it should talk to on the main Synapse process.
|
The HTTP replication endpoint that it should talk to on the main Synapse process.
|
||||||
The main Synapse process defines this with a `replication` resource in
|
The main Synapse process defines this with a `replication` resource in
|
||||||
|
@ -4035,6 +4041,7 @@ worker_replication_host: 127.0.0.1
|
||||||
```
|
```
|
||||||
---
|
---
|
||||||
### `worker_replication_http_port`
|
### `worker_replication_http_port`
|
||||||
|
*Deprecated as of version 1.84.0. Place `port` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.*
|
||||||
|
|
||||||
The HTTP replication port that it should talk to on the main Synapse process.
|
The HTTP replication port that it should talk to on the main Synapse process.
|
||||||
The main Synapse process defines this with a `replication` resource in
|
The main Synapse process defines this with a `replication` resource in
|
||||||
|
@ -4046,6 +4053,7 @@ worker_replication_http_port: 9093
|
||||||
```
|
```
|
||||||
---
|
---
|
||||||
### `worker_replication_http_tls`
|
### `worker_replication_http_tls`
|
||||||
|
*Deprecated as of version 1.84.0. Place `tls` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.*
|
||||||
|
|
||||||
Whether TLS should be used for talking to the HTTP replication port on the main
|
Whether TLS should be used for talking to the HTTP replication port on the main
|
||||||
Synapse process.
|
Synapse process.
|
||||||
|
@ -4071,9 +4079,9 @@ A worker can handle HTTP requests. To do so, a `worker_listeners` option
|
||||||
must be declared, in the same way as the [`listeners` option](#listeners)
|
must be declared, in the same way as the [`listeners` option](#listeners)
|
||||||
in the shared config.
|
in the shared config.
|
||||||
|
|
||||||
Workers declared in [`stream_writers`](#stream_writers) will need to include a
|
Workers declared in [`stream_writers`](#stream_writers) and [`instance_map`](#instance_map)
|
||||||
`replication` listener here, in order to accept internal HTTP requests from
|
will need to include a `replication` listener here, in order to accept internal HTTP
|
||||||
other workers.
|
requests from other workers.
|
||||||
|
|
||||||
Example configuration:
|
Example configuration:
|
||||||
```yaml
|
```yaml
|
||||||
|
|
|
@ -87,12 +87,18 @@ shared configuration file.
|
||||||
|
|
||||||
### Shared configuration
|
### Shared configuration
|
||||||
|
|
||||||
Normally, only a couple of changes are needed to make an existing configuration
|
Normally, only a few changes are needed to make an existing configuration
|
||||||
file suitable for use with workers. First, you need to enable an
|
file suitable for use with workers:
|
||||||
|
* First, you need to enable an
|
||||||
["HTTP replication listener"](usage/configuration/config_documentation.md#listeners)
|
["HTTP replication listener"](usage/configuration/config_documentation.md#listeners)
|
||||||
for the main process; and secondly, you need to enable
|
for the main process
|
||||||
[redis-based replication](usage/configuration/config_documentation.md#redis).
|
* Secondly, you need to enable
|
||||||
Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret)
|
[redis-based replication](usage/configuration/config_documentation.md#redis)
|
||||||
|
* You will need to add an [`instance_map`](usage/configuration/config_documentation.md#instance_map)
|
||||||
|
with the `main` process defined, as well as the relevant connection information from
|
||||||
|
it's HTTP `replication` listener (defined in step 1 above). Note that the `host` defined
|
||||||
|
is the address the worker needs to look for the `main` process at, not necessarily the same address that is bound to.
|
||||||
|
* Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret)
|
||||||
can be used to authenticate HTTP traffic between workers. For example:
|
can be used to authenticate HTTP traffic between workers. For example:
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
|
@ -111,6 +117,11 @@ worker_replication_secret: ""
|
||||||
|
|
||||||
redis:
|
redis:
|
||||||
enabled: true
|
enabled: true
|
||||||
|
|
||||||
|
instance_map:
|
||||||
|
main:
|
||||||
|
host: 'localhost'
|
||||||
|
port: 9093
|
||||||
```
|
```
|
||||||
|
|
||||||
See the [configuration manual](usage/configuration/config_documentation.md)
|
See the [configuration manual](usage/configuration/config_documentation.md)
|
||||||
|
@ -130,13 +141,13 @@ In the config file for each worker, you must specify:
|
||||||
* The type of worker ([`worker_app`](usage/configuration/config_documentation.md#worker_app)).
|
* The type of worker ([`worker_app`](usage/configuration/config_documentation.md#worker_app)).
|
||||||
The currently available worker applications are listed [below](#available-worker-applications).
|
The currently available worker applications are listed [below](#available-worker-applications).
|
||||||
* A unique name for the worker ([`worker_name`](usage/configuration/config_documentation.md#worker_name)).
|
* A unique name for the worker ([`worker_name`](usage/configuration/config_documentation.md#worker_name)).
|
||||||
* The HTTP replication endpoint that it should talk to on the main synapse process
|
|
||||||
([`worker_replication_host`](usage/configuration/config_documentation.md#worker_replication_host) and
|
|
||||||
[`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)).
|
|
||||||
* If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option
|
* If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option
|
||||||
with an `http` listener.
|
with an `http` listener.
|
||||||
* **Synapse 1.72 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
|
* **Synapse 1.72 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
|
||||||
the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.73 and newer.
|
the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.73 and newer.
|
||||||
|
* **Synapse 1.83 and older:** The HTTP replication endpoint that the worker should talk to on the main synapse process
|
||||||
|
([`worker_replication_host`](usage/configuration/config_documentation.md#worker_replication_host) and
|
||||||
|
[`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)). If using Synapse 1.84 and newer, these are not needed if `main` is defined on the [shared configuration](#shared-configuration) `instance_map`
|
||||||
|
|
||||||
For example:
|
For example:
|
||||||
|
|
||||||
|
@ -417,11 +428,14 @@ effects of bursts of events from that bridge on events sent by normal users.
|
||||||
Additionally, the writing of specific streams (such as events) can be moved off
|
Additionally, the writing of specific streams (such as events) can be moved off
|
||||||
of the main process to a particular worker.
|
of the main process to a particular worker.
|
||||||
|
|
||||||
To enable this, the worker must have a
|
To enable this, the worker must have:
|
||||||
[HTTP `replication` listener](usage/configuration/config_documentation.md#listeners) configured,
|
* An [HTTP `replication` listener](usage/configuration/config_documentation.md#listeners) configured,
|
||||||
have a [`worker_name`](usage/configuration/config_documentation.md#worker_name)
|
* Have a [`worker_name`](usage/configuration/config_documentation.md#worker_name)
|
||||||
and be listed in the [`instance_map`](usage/configuration/config_documentation.md#instance_map)
|
and be listed in the [`instance_map`](usage/configuration/config_documentation.md#instance_map)
|
||||||
config. The same worker can handle multiple streams, but unless otherwise documented,
|
config.
|
||||||
|
* Have the main process declared on the [`instance_map`](usage/configuration/config_documentation.md#instance_map) as well.
|
||||||
|
|
||||||
|
Note: The same worker can handle multiple streams, but unless otherwise documented,
|
||||||
each stream can only have a single writer.
|
each stream can only have a single writer.
|
||||||
|
|
||||||
For example, to move event persistence off to a dedicated worker, the shared
|
For example, to move event persistence off to a dedicated worker, the shared
|
||||||
|
@ -429,6 +443,9 @@ configuration would include:
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
instance_map:
|
instance_map:
|
||||||
|
main:
|
||||||
|
host: localhost
|
||||||
|
port: 8030
|
||||||
event_persister1:
|
event_persister1:
|
||||||
host: localhost
|
host: localhost
|
||||||
port: 8034
|
port: 8034
|
||||||
|
|
|
@ -39,6 +39,19 @@ The '%s' configuration option is deprecated and will be removed in a future
|
||||||
Synapse version. Please use ``%s: name_of_worker`` instead.
|
Synapse version. Please use ``%s: name_of_worker`` instead.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
_MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA = """
|
||||||
|
Missing data for a worker to connect to main process. Please include '%s' in the
|
||||||
|
`instance_map` declared in your shared yaml configuration, or optionally(as a deprecated
|
||||||
|
solution) in every worker's yaml as various `worker_replication_*` settings as defined
|
||||||
|
in workers documentation here:
|
||||||
|
`https://matrix-org.github.io/synapse/latest/workers.html#worker-configuration`
|
||||||
|
"""
|
||||||
|
# This allows for a handy knob when it's time to change from 'master' to
|
||||||
|
# something with less 'history'
|
||||||
|
MAIN_PROCESS_INSTANCE_NAME = "master"
|
||||||
|
# Use this to adjust what the main process is known as in the yaml instance_map
|
||||||
|
MAIN_PROCESS_INSTANCE_MAP_NAME = "main"
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -161,27 +174,15 @@ class WorkerConfig(Config):
|
||||||
raise ConfigError("worker_log_config must be a string")
|
raise ConfigError("worker_log_config must be a string")
|
||||||
self.worker_log_config = worker_log_config
|
self.worker_log_config = worker_log_config
|
||||||
|
|
||||||
# The host used to connect to the main synapse
|
|
||||||
self.worker_replication_host = config.get("worker_replication_host", None)
|
|
||||||
|
|
||||||
# The port on the main synapse for TCP replication
|
# The port on the main synapse for TCP replication
|
||||||
if "worker_replication_port" in config:
|
if "worker_replication_port" in config:
|
||||||
raise ConfigError(DIRECT_TCP_ERROR, ("worker_replication_port",))
|
raise ConfigError(DIRECT_TCP_ERROR, ("worker_replication_port",))
|
||||||
|
|
||||||
# The port on the main synapse for HTTP replication endpoint
|
|
||||||
self.worker_replication_http_port = config.get("worker_replication_http_port")
|
|
||||||
|
|
||||||
# The tls mode on the main synapse for HTTP replication endpoint.
|
|
||||||
# For backward compatibility this defaults to False.
|
|
||||||
self.worker_replication_http_tls = config.get(
|
|
||||||
"worker_replication_http_tls", False
|
|
||||||
)
|
|
||||||
|
|
||||||
# The shared secret used for authentication when connecting to the main synapse.
|
# The shared secret used for authentication when connecting to the main synapse.
|
||||||
self.worker_replication_secret = config.get("worker_replication_secret", None)
|
self.worker_replication_secret = config.get("worker_replication_secret", None)
|
||||||
|
|
||||||
self.worker_name = config.get("worker_name", self.worker_app)
|
self.worker_name = config.get("worker_name", self.worker_app)
|
||||||
self.instance_name = self.worker_name or "master"
|
self.instance_name = self.worker_name or MAIN_PROCESS_INSTANCE_NAME
|
||||||
|
|
||||||
# FIXME: Remove this check after a suitable amount of time.
|
# FIXME: Remove this check after a suitable amount of time.
|
||||||
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
|
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
|
||||||
|
@ -215,12 +216,55 @@ class WorkerConfig(Config):
|
||||||
)
|
)
|
||||||
|
|
||||||
# A map from instance name to host/port of their HTTP replication endpoint.
|
# A map from instance name to host/port of their HTTP replication endpoint.
|
||||||
|
# Check if the main process is declared. Inject it into the map if it's not,
|
||||||
|
# based first on if a 'main' block is declared then on 'worker_replication_*'
|
||||||
|
# data. If both are available, default to instance_map. The main process
|
||||||
|
# itself doesn't need this data as it would never have to talk to itself.
|
||||||
|
instance_map: Dict[str, Any] = config.get("instance_map", {})
|
||||||
|
|
||||||
|
if instance_map and self.instance_name is not MAIN_PROCESS_INSTANCE_NAME:
|
||||||
|
# The host used to connect to the main synapse
|
||||||
|
main_host = config.get("worker_replication_host", None)
|
||||||
|
|
||||||
|
# The port on the main synapse for HTTP replication endpoint
|
||||||
|
main_port = config.get("worker_replication_http_port")
|
||||||
|
|
||||||
|
# The tls mode on the main synapse for HTTP replication endpoint.
|
||||||
|
# For backward compatibility this defaults to False.
|
||||||
|
main_tls = config.get("worker_replication_http_tls", False)
|
||||||
|
|
||||||
|
# For now, accept 'main' in the instance_map, but the replication system
|
||||||
|
# expects 'master', force that into being until it's changed later.
|
||||||
|
if MAIN_PROCESS_INSTANCE_MAP_NAME in instance_map:
|
||||||
|
instance_map[MAIN_PROCESS_INSTANCE_NAME] = instance_map[
|
||||||
|
MAIN_PROCESS_INSTANCE_MAP_NAME
|
||||||
|
]
|
||||||
|
del instance_map[MAIN_PROCESS_INSTANCE_MAP_NAME]
|
||||||
|
|
||||||
|
# This is the backwards compatibility bit that handles the
|
||||||
|
# worker_replication_* bits using setdefault() to not overwrite anything.
|
||||||
|
elif main_host is not None and main_port is not None:
|
||||||
|
instance_map.setdefault(
|
||||||
|
MAIN_PROCESS_INSTANCE_NAME,
|
||||||
|
{
|
||||||
|
"host": main_host,
|
||||||
|
"port": main_port,
|
||||||
|
"tls": main_tls,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# If we've gotten here, it means that the main process is not on the
|
||||||
|
# instance_map and that not enough worker_replication_* variables
|
||||||
|
# were declared in the worker's yaml.
|
||||||
|
raise ConfigError(
|
||||||
|
_MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA
|
||||||
|
% MAIN_PROCESS_INSTANCE_MAP_NAME
|
||||||
|
)
|
||||||
|
|
||||||
self.instance_map: Dict[
|
self.instance_map: Dict[
|
||||||
str, InstanceLocationConfig
|
str, InstanceLocationConfig
|
||||||
] = parse_and_validate_mapping(
|
] = parse_and_validate_mapping(instance_map, InstanceLocationConfig)
|
||||||
config.get("instance_map", {}),
|
|
||||||
InstanceLocationConfig,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Map from type of streams to source, c.f. WriterLocations.
|
# Map from type of streams to source, c.f. WriterLocations.
|
||||||
writers = config.get("stream_writers") or {}
|
writers = config.get("stream_writers") or {}
|
||||||
|
|
|
@ -25,6 +25,7 @@ from twisted.internet.error import ConnectError, DNSLookupError
|
||||||
from twisted.web.server import Request
|
from twisted.web.server import Request
|
||||||
|
|
||||||
from synapse.api.errors import HttpResponseException, SynapseError
|
from synapse.api.errors import HttpResponseException, SynapseError
|
||||||
|
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
|
||||||
from synapse.http import RequestTimedOutError
|
from synapse.http import RequestTimedOutError
|
||||||
from synapse.http.server import HttpServer
|
from synapse.http.server import HttpServer
|
||||||
from synapse.http.servlet import parse_json_object_from_request
|
from synapse.http.servlet import parse_json_object_from_request
|
||||||
|
@ -197,11 +198,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||||
client = hs.get_replication_client()
|
client = hs.get_replication_client()
|
||||||
local_instance_name = hs.get_instance_name()
|
local_instance_name = hs.get_instance_name()
|
||||||
|
|
||||||
# The value of these option should match the replication listener settings
|
|
||||||
master_host = hs.config.worker.worker_replication_host
|
|
||||||
master_port = hs.config.worker.worker_replication_http_port
|
|
||||||
master_tls = hs.config.worker.worker_replication_http_tls
|
|
||||||
|
|
||||||
instance_map = hs.config.worker.instance_map
|
instance_map = hs.config.worker.instance_map
|
||||||
|
|
||||||
outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME)
|
outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME)
|
||||||
|
@ -213,7 +209,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||||
)
|
)
|
||||||
|
|
||||||
@trace_with_opname("outgoing_replication_request")
|
@trace_with_opname("outgoing_replication_request")
|
||||||
async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:
|
async def send_request(
|
||||||
|
*, instance_name: str = MAIN_PROCESS_INSTANCE_NAME, **kwargs: Any
|
||||||
|
) -> Any:
|
||||||
# We have to pull these out here to avoid circular dependencies...
|
# We have to pull these out here to avoid circular dependencies...
|
||||||
streams = hs.get_replication_command_handler().get_streams_to_replicate()
|
streams = hs.get_replication_command_handler().get_streams_to_replicate()
|
||||||
replication = hs.get_replication_data_handler()
|
replication = hs.get_replication_data_handler()
|
||||||
|
@ -221,11 +219,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||||
with outgoing_gauge.track_inprogress():
|
with outgoing_gauge.track_inprogress():
|
||||||
if instance_name == local_instance_name:
|
if instance_name == local_instance_name:
|
||||||
raise Exception("Trying to send HTTP request to self")
|
raise Exception("Trying to send HTTP request to self")
|
||||||
if instance_name == "master":
|
if instance_name in instance_map:
|
||||||
host = master_host
|
|
||||||
port = master_port
|
|
||||||
tls = master_tls
|
|
||||||
elif instance_name in instance_map:
|
|
||||||
host = instance_map[instance_name].host
|
host = instance_map[instance_name].host
|
||||||
port = instance_map[instance_name].port
|
port = instance_map[instance_name].port
|
||||||
tls = instance_map[instance_name].tls
|
tls = instance_map[instance_name].tls
|
||||||
|
|
|
@ -837,6 +837,7 @@ class ModuleApiWorkerTestCase(BaseModuleApiTestCase, BaseMultiWorkerStreamTestCa
|
||||||
conf = super().default_config()
|
conf = super().default_config()
|
||||||
conf["stream_writers"] = {"presence": ["presence_writer"]}
|
conf["stream_writers"] = {"presence": ["presence_writer"]}
|
||||||
conf["instance_map"] = {
|
conf["instance_map"] = {
|
||||||
|
"main": {"host": "testserv", "port": 8765},
|
||||||
"presence_writer": {"host": "testserv", "port": 1001},
|
"presence_writer": {"host": "testserv", "port": 1001},
|
||||||
}
|
}
|
||||||
return conf
|
return conf
|
||||||
|
|
|
@ -110,8 +110,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
|
||||||
def _get_worker_hs_config(self) -> dict:
|
def _get_worker_hs_config(self) -> dict:
|
||||||
config = self.default_config()
|
config = self.default_config()
|
||||||
config["worker_app"] = "synapse.app.generic_worker"
|
config["worker_app"] = "synapse.app.generic_worker"
|
||||||
config["worker_replication_host"] = "testserv"
|
config["instance_map"] = {"main": {"host": "testserv", "port": 8765}}
|
||||||
config["worker_replication_http_port"] = "8765"
|
|
||||||
return config
|
return config
|
||||||
|
|
||||||
def _build_replication_data_handler(self) -> "TestReplicationDataHandler":
|
def _build_replication_data_handler(self) -> "TestReplicationDataHandler":
|
||||||
|
@ -249,6 +248,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
||||||
"""
|
"""
|
||||||
base = super().default_config()
|
base = super().default_config()
|
||||||
base["redis"] = {"enabled": True}
|
base["redis"] = {"enabled": True}
|
||||||
|
base["instance_map"] = {"main": {"host": "testserv", "port": 8765}}
|
||||||
return base
|
return base
|
||||||
|
|
||||||
def setUp(self) -> None:
|
def setUp(self) -> None:
|
||||||
|
@ -310,7 +310,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
||||||
def make_worker_hs(
|
def make_worker_hs(
|
||||||
self, worker_app: str, extra_config: Optional[dict] = None, **kwargs: Any
|
self, worker_app: str, extra_config: Optional[dict] = None, **kwargs: Any
|
||||||
) -> HomeServer:
|
) -> HomeServer:
|
||||||
"""Make a new worker HS instance, correctly connecting replcation
|
"""Make a new worker HS instance, correctly connecting replication
|
||||||
stream to the master HS.
|
stream to the master HS.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -388,8 +388,6 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
def _get_worker_hs_config(self) -> dict:
|
def _get_worker_hs_config(self) -> dict:
|
||||||
config = self.default_config()
|
config = self.default_config()
|
||||||
config["worker_replication_host"] = "testserv"
|
|
||||||
config["worker_replication_http_port"] = "8765"
|
|
||||||
return config
|
return config
|
||||||
|
|
||||||
def replicate(self) -> None:
|
def replicate(self) -> None:
|
||||||
|
|
|
@ -43,9 +43,6 @@ class WorkerAuthenticationTestCase(BaseMultiWorkerStreamTestCase):
|
||||||
def _get_worker_hs_config(self) -> dict:
|
def _get_worker_hs_config(self) -> dict:
|
||||||
config = self.default_config()
|
config = self.default_config()
|
||||||
config["worker_app"] = "synapse.app.generic_worker"
|
config["worker_app"] = "synapse.app.generic_worker"
|
||||||
config["worker_replication_host"] = "testserv"
|
|
||||||
config["worker_replication_http_port"] = "8765"
|
|
||||||
|
|
||||||
return config
|
return config
|
||||||
|
|
||||||
def _test_register(self) -> FakeChannel:
|
def _test_register(self) -> FakeChannel:
|
||||||
|
|
|
@ -29,8 +29,6 @@ class ClientReaderTestCase(BaseMultiWorkerStreamTestCase):
|
||||||
def _get_worker_hs_config(self) -> dict:
|
def _get_worker_hs_config(self) -> dict:
|
||||||
config = self.default_config()
|
config = self.default_config()
|
||||||
config["worker_app"] = "synapse.app.generic_worker"
|
config["worker_app"] = "synapse.app.generic_worker"
|
||||||
config["worker_replication_host"] = "testserv"
|
|
||||||
config["worker_replication_http_port"] = "8765"
|
|
||||||
return config
|
return config
|
||||||
|
|
||||||
def test_register_single_worker(self) -> None:
|
def test_register_single_worker(self) -> None:
|
||||||
|
|
|
@ -50,6 +50,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
|
||||||
conf = super().default_config()
|
conf = super().default_config()
|
||||||
conf["stream_writers"] = {"events": ["worker1", "worker2"]}
|
conf["stream_writers"] = {"events": ["worker1", "worker2"]}
|
||||||
conf["instance_map"] = {
|
conf["instance_map"] = {
|
||||||
|
"main": {"host": "testserv", "port": 8765},
|
||||||
"worker1": {"host": "testserv", "port": 1001},
|
"worker1": {"host": "testserv", "port": 1001},
|
||||||
"worker2": {"host": "testserv", "port": 1002},
|
"worker2": {"host": "testserv", "port": 1002},
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue