Merge branch 'develop' into matthew/search-all-local-users
This commit is contained in:
commit
f397153dfc
157
docs/workers.rst
157
docs/workers.rst
|
@ -1,11 +1,15 @@
|
|||
Scaling synapse via workers
|
||||
---------------------------
|
||||
===========================
|
||||
|
||||
Synapse has experimental support for splitting out functionality into
|
||||
multiple separate python processes, helping greatly with scalability. These
|
||||
processes are called 'workers', and are (eventually) intended to scale
|
||||
horizontally independently.
|
||||
|
||||
All of the below is highly experimental and subject to change as Synapse evolves,
|
||||
but documenting it here to help folks needing highly scalable Synapses similar
|
||||
to the one running matrix.org!
|
||||
|
||||
All processes continue to share the same database instance, and as such, workers
|
||||
only work with postgres based synapse deployments (sharing a single sqlite
|
||||
across multiple processes is a recipe for disaster, plus you should be using
|
||||
|
@ -16,6 +20,16 @@ TCP protocol called 'replication' - analogous to MySQL or Postgres style
|
|||
database replication; feeding a stream of relevant data to the workers so they
|
||||
can be kept in sync with the main synapse process and database state.
|
||||
|
||||
Configuration
|
||||
-------------
|
||||
|
||||
To make effective use of the workers, you will need to configure an HTTP
|
||||
reverse-proxy such as nginx or haproxy, which will direct incoming requests to
|
||||
the correct worker, or to the main synapse instance. Note that this includes
|
||||
requests made to the federation port. The caveats regarding running a
|
||||
reverse-proxy on the federation port still apply (see
|
||||
https://github.com/matrix-org/synapse/blob/master/README.rst#reverse-proxying-the-federation-port).
|
||||
|
||||
To enable workers, you need to add a replication listener to the master synapse, e.g.::
|
||||
|
||||
listeners:
|
||||
|
@ -27,26 +41,19 @@ Under **no circumstances** should this replication API listener be exposed to th
|
|||
public internet; it currently implements no authentication whatsoever and is
|
||||
unencrypted.
|
||||
|
||||
You then create a set of configs for the various worker processes. These should be
|
||||
worker configuration files should be stored in a dedicated subdirectory, to allow
|
||||
synctl to manipulate them.
|
||||
|
||||
The current available worker applications are:
|
||||
* synapse.app.pusher - handles sending push notifications to sygnal and email
|
||||
* synapse.app.synchrotron - handles /sync endpoints. can scales horizontally through multiple instances.
|
||||
* synapse.app.appservice - handles output traffic to Application Services
|
||||
* synapse.app.federation_reader - handles receiving federation traffic (including public_rooms API)
|
||||
* synapse.app.media_repository - handles the media repository.
|
||||
* synapse.app.client_reader - handles client API endpoints like /publicRooms
|
||||
You then create a set of configs for the various worker processes. These
|
||||
should be worker configuration files, and should be stored in a dedicated
|
||||
subdirectory, to allow synctl to manipulate them.
|
||||
|
||||
Each worker configuration file inherits the configuration of the main homeserver
|
||||
configuration file. You can then override configuration specific to that worker,
|
||||
e.g. the HTTP listener that it provides (if any); logging configuration; etc.
|
||||
You should minimise the number of overrides though to maintain a usable config.
|
||||
|
||||
You must specify the type of worker application (worker_app) and the replication
|
||||
endpoint that it's talking to on the main synapse process (worker_replication_host
|
||||
and worker_replication_port).
|
||||
You must specify the type of worker application (``worker_app``). The currently
|
||||
available worker applications are listed below. You must also specify the
|
||||
replication endpoint that it's talking to on the main synapse process
|
||||
(``worker_replication_host`` and ``worker_replication_port``).
|
||||
|
||||
For instance::
|
||||
|
||||
|
@ -68,11 +75,11 @@ For instance::
|
|||
worker_log_config: /home/matrix/synapse/config/synchrotron_log_config.yaml
|
||||
|
||||
...is a full configuration for a synchrotron worker instance, which will expose a
|
||||
plain HTTP /sync endpoint on port 8083 separately from the /sync endpoint provided
|
||||
plain HTTP ``/sync`` endpoint on port 8083 separately from the ``/sync`` endpoint provided
|
||||
by the main synapse.
|
||||
|
||||
Obviously you should configure your loadbalancer to route the /sync endpoint to
|
||||
the synchrotron instance(s) in this instance.
|
||||
Obviously you should configure your reverse-proxy to route the relevant
|
||||
endpoints to the worker (``localhost:8083`` in the above example).
|
||||
|
||||
Finally, to actually run your worker-based synapse, you must pass synctl the -a
|
||||
commandline option to tell it to operate on all the worker configurations found
|
||||
|
@ -89,6 +96,114 @@ To manipulate a specific worker, you pass the -w option to synctl::
|
|||
|
||||
synctl -w $CONFIG/workers/synchrotron.yaml restart
|
||||
|
||||
All of the above is highly experimental and subject to change as Synapse evolves,
|
||||
but documenting it here to help folks needing highly scalable Synapses similar
|
||||
to the one running matrix.org!
|
||||
|
||||
Available worker applications
|
||||
-----------------------------
|
||||
|
||||
``synapse.app.pusher``
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Handles sending push notifications to sygnal and email. Doesn't handle any
|
||||
REST endpoints itself, but you should set ``start_pushers: False`` in the
|
||||
shared configuration file to stop the main synapse sending these notifications.
|
||||
|
||||
Note this worker cannot be load-balanced: only one instance should be active.
|
||||
|
||||
``synapse.app.synchrotron``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The synchrotron handles ``sync`` requests from clients. In particular, it can
|
||||
handle REST endpoints matching the following regular expressions::
|
||||
|
||||
^/_matrix/client/(v2_alpha|r0)/sync$
|
||||
^/_matrix/client/(api/v1|v2_alpha|r0)/events$
|
||||
^/_matrix/client/(api/v1|r0)/initialSync$
|
||||
^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$
|
||||
|
||||
The above endpoints should all be routed to the synchrotron worker by the
|
||||
reverse-proxy configuration.
|
||||
|
||||
It is possible to run multiple instances of the synchrotron to scale
|
||||
horizontally. In this case the reverse-proxy should be configured to
|
||||
load-balance across the instances, though it will be more efficient if all
|
||||
requests from a particular user are routed to a single instance. Extracting
|
||||
a userid from the access token is currently left as an exercise for the reader.
|
||||
|
||||
``synapse.app.appservice``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Handles sending output traffic to Application Services. Doesn't handle any
|
||||
REST endpoints itself, but you should set ``notify_appservices: False`` in the
|
||||
shared configuration file to stop the main synapse sending these notifications.
|
||||
|
||||
Note this worker cannot be load-balanced: only one instance should be active.
|
||||
|
||||
``synapse.app.federation_reader``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Handles a subset of federation endpoints. In particular, it can handle REST
|
||||
endpoints matching the following regular expressions::
|
||||
|
||||
^/_matrix/federation/v1/event/
|
||||
^/_matrix/federation/v1/state/
|
||||
^/_matrix/federation/v1/state_ids/
|
||||
^/_matrix/federation/v1/backfill/
|
||||
^/_matrix/federation/v1/get_missing_events/
|
||||
^/_matrix/federation/v1/publicRooms
|
||||
|
||||
The above endpoints should all be routed to the federation_reader worker by the
|
||||
reverse-proxy configuration.
|
||||
|
||||
``synapse.app.federation_sender``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Handles sending federation traffic to other servers. Doesn't handle any
|
||||
REST endpoints itself, but you should set ``send_federation: False`` in the
|
||||
shared configuration file to stop the main synapse sending this traffic.
|
||||
|
||||
Note this worker cannot be load-balanced: only one instance should be active.
|
||||
|
||||
``synapse.app.media_repository``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Handles the media repository. It can handle all endpoints starting with::
|
||||
|
||||
/_matrix/media/
|
||||
|
||||
You should also set ``enable_media_repo: False`` in the shared configuration
|
||||
file to stop the main synapse running background jobs related to managing the
|
||||
media repository.
|
||||
|
||||
Note this worker cannot be load-balanced: only one instance should be active.
|
||||
|
||||
``synapse.app.client_reader``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Handles client API endpoints. It can handle REST endpoints matching the
|
||||
following regular expressions::
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
|
||||
|
||||
``synapse.app.user_dir``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Handles searches in the user directory. It can handle REST endpoints matching
|
||||
the following regular expressions::
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$
|
||||
|
||||
``synapse.app.frontend_proxy``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Proxies some frequently-requested client endpoints to add caching and remove
|
||||
load from the main synapse. It can handle REST endpoints matching the following
|
||||
regular expressions::
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/keys/upload
|
||||
|
||||
It will proxy any requests it cannot handle to the main synapse instance. It
|
||||
must therefore be configured with the location of the main instance, via
|
||||
the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration
|
||||
file. For example::
|
||||
|
||||
worker_main_http_uri: http://127.0.0.1:8008
|
||||
|
|
|
@ -270,7 +270,11 @@ class Auth(object):
|
|||
rights (str): The operation being performed; the access token must
|
||||
allow this.
|
||||
Returns:
|
||||
dict : dict that includes the user and the ID of their access token.
|
||||
Deferred[dict]: dict that includes:
|
||||
`user` (UserID)
|
||||
`is_guest` (bool)
|
||||
`token_id` (int|None): access token id. May be None if guest
|
||||
`device_id` (str|None): device corresponding to access token
|
||||
Raises:
|
||||
AuthError if no user by that token exists or the token is invalid.
|
||||
"""
|
||||
|
|
|
@ -43,7 +43,6 @@ from synapse.rest import ClientRestResource
|
|||
from synapse.rest.key.v1.server_key_resource import LocalKey
|
||||
from synapse.rest.key.v2 import KeyApiV2Resource
|
||||
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
||||
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage import are_all_users_on_domain
|
||||
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
|
||||
|
@ -195,14 +194,19 @@ class SynapseHomeServer(HomeServer):
|
|||
})
|
||||
|
||||
if name in ["media", "federation", "client"]:
|
||||
media_repo = MediaRepositoryResource(self)
|
||||
resources.update({
|
||||
MEDIA_PREFIX: media_repo,
|
||||
LEGACY_MEDIA_PREFIX: media_repo,
|
||||
CONTENT_REPO_PREFIX: ContentRepoResource(
|
||||
self, self.config.uploads_path
|
||||
),
|
||||
})
|
||||
if self.get_config().enable_media_repo:
|
||||
media_repo = self.get_media_repository_resource()
|
||||
resources.update({
|
||||
MEDIA_PREFIX: media_repo,
|
||||
LEGACY_MEDIA_PREFIX: media_repo,
|
||||
CONTENT_REPO_PREFIX: ContentRepoResource(
|
||||
self, self.config.uploads_path
|
||||
),
|
||||
})
|
||||
elif name == "media":
|
||||
raise ConfigError(
|
||||
"'media' resource conflicts with enable_media_repo=False",
|
||||
)
|
||||
|
||||
if name in ["keys", "federation"]:
|
||||
resources.update({
|
||||
|
|
|
@ -35,7 +35,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
|
|||
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
||||
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.media_repository import MediaRepositoryStore
|
||||
|
@ -89,7 +88,7 @@ class MediaRepositoryServer(HomeServer):
|
|||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(self)
|
||||
elif name == "media":
|
||||
media_repo = MediaRepositoryResource(self)
|
||||
media_repo = self.get_media_repository_resource()
|
||||
resources.update({
|
||||
MEDIA_PREFIX: media_repo,
|
||||
LEGACY_MEDIA_PREFIX: media_repo,
|
||||
|
@ -151,6 +150,13 @@ def start(config_options):
|
|||
|
||||
assert config.worker_app == "synapse.app.media_repository"
|
||||
|
||||
if config.enable_media_repo:
|
||||
_base.quit_with_error(
|
||||
"enable_media_repo must be disabled in the main synapse process\n"
|
||||
"before the media repo can be run in a separate worker.\n"
|
||||
"Please add ``enable_media_repo: false`` to the main config\n"
|
||||
)
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
|
|
@ -340,11 +340,10 @@ class SyncReplicationHandler(ReplicationClientHandler):
|
|||
|
||||
self.store = hs.get_datastore()
|
||||
self.typing_handler = hs.get_typing_handler()
|
||||
# NB this is a SynchrotronPresence, not a normal PresenceHandler
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.notifier = hs.get_notifier()
|
||||
|
||||
self.presence_handler.sync_callback = self.send_user_sync
|
||||
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
|
||||
|
|
|
@ -41,6 +41,12 @@ class ServerConfig(Config):
|
|||
# false only if we are updating the user directory in a worker
|
||||
self.update_user_directory = config.get("update_user_directory", True)
|
||||
|
||||
# whether to enable the media repository endpoints. This should be set
|
||||
# to false if the media repository is running as a separate endpoint;
|
||||
# doing so ensures that we will not run cache cleanup jobs on the
|
||||
# master, potentially causing inconsistency.
|
||||
self.enable_media_repo = config.get("enable_media_repo", True)
|
||||
|
||||
self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
|
||||
|
||||
# Whether we should block invites sent to users on this server
|
||||
|
|
|
@ -32,15 +32,22 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
|
|||
"""Check whether the hash for this PDU matches the contents"""
|
||||
name, expected_hash = compute_content_hash(event, hash_algorithm)
|
||||
logger.debug("Expecting hash: %s", encode_base64(expected_hash))
|
||||
if name not in event.hashes:
|
||||
|
||||
# some malformed events lack a 'hashes'. Protect against it being missing
|
||||
# or a weird type by basically treating it the same as an unhashed event.
|
||||
hashes = event.get("hashes")
|
||||
if not isinstance(hashes, dict):
|
||||
raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED)
|
||||
|
||||
if name not in hashes:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Algorithm %s not in hashes %s" % (
|
||||
name, list(event.hashes),
|
||||
name, list(hashes),
|
||||
),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
message_hash_base64 = event.hashes[name]
|
||||
message_hash_base64 = hashes[name]
|
||||
try:
|
||||
message_hash_bytes = decode_base64(message_hash_base64)
|
||||
except Exception:
|
||||
|
|
|
@ -20,7 +20,7 @@ from .persistence import TransactionActions
|
|||
from .units import Transaction, Edu
|
||||
|
||||
from synapse.api.errors import HttpResponseException
|
||||
from synapse.util import logcontext
|
||||
from synapse.util import logcontext, PreserveLoggingContext
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
||||
from synapse.util.metrics import measure_func
|
||||
|
@ -146,7 +146,6 @@ class TransactionQueue(object):
|
|||
else:
|
||||
return not destination.startswith("localhost")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def notify_new_events(self, current_id):
|
||||
"""This gets called when we have some new events we might want to
|
||||
send out to other servers.
|
||||
|
@ -156,6 +155,13 @@ class TransactionQueue(object):
|
|||
if self._is_processing:
|
||||
return
|
||||
|
||||
# fire off a processing loop in the background. It's likely it will
|
||||
# outlast the current request, so run it in the sentinel logcontext.
|
||||
with PreserveLoggingContext():
|
||||
self._process_event_queue_loop()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _process_event_queue_loop(self):
|
||||
try:
|
||||
self._is_processing = True
|
||||
while True:
|
||||
|
|
|
@ -649,41 +649,6 @@ class AuthHandler(BaseHandler):
|
|||
except Exception:
|
||||
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_password(self, user_id, newpassword, requester=None):
|
||||
password_hash = self.hash(newpassword)
|
||||
|
||||
except_access_token_id = requester.access_token_id if requester else None
|
||||
|
||||
try:
|
||||
yield self.store.user_set_password_hash(user_id, password_hash)
|
||||
except StoreError as e:
|
||||
if e.code == 404:
|
||||
raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
|
||||
raise e
|
||||
yield self.delete_access_tokens_for_user(
|
||||
user_id, except_token_id=except_access_token_id,
|
||||
)
|
||||
yield self.hs.get_pusherpool().remove_pushers_by_user(
|
||||
user_id, except_access_token_id
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def deactivate_account(self, user_id):
|
||||
"""Deactivate a user's account
|
||||
|
||||
Args:
|
||||
user_id (str): ID of user to be deactivated
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
# FIXME: Theoretically there is a race here wherein user resets
|
||||
# password using threepid.
|
||||
yield self.delete_access_tokens_for_user(user_id)
|
||||
yield self.store.user_delete_threepids(user_id)
|
||||
yield self.store.user_set_password_hash(user_id, None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_access_token(self, access_token):
|
||||
"""Invalidate a single access token
|
||||
|
@ -706,6 +671,12 @@ class AuthHandler(BaseHandler):
|
|||
access_token=access_token,
|
||||
)
|
||||
|
||||
# delete pushers associated with this access token
|
||||
if user_info["token_id"] is not None:
|
||||
yield self.hs.get_pusherpool().remove_pushers_by_access_token(
|
||||
str(user_info["user"]), (user_info["token_id"], )
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_access_tokens_for_user(self, user_id, except_token_id=None,
|
||||
device_id=None):
|
||||
|
@ -728,13 +699,18 @@ class AuthHandler(BaseHandler):
|
|||
# see if any of our auth providers want to know about this
|
||||
for provider in self.password_providers:
|
||||
if hasattr(provider, "on_logged_out"):
|
||||
for token, device_id in tokens_and_devices:
|
||||
for token, token_id, device_id in tokens_and_devices:
|
||||
yield provider.on_logged_out(
|
||||
user_id=user_id,
|
||||
device_id=device_id,
|
||||
access_token=token,
|
||||
)
|
||||
|
||||
# delete pushers associated with the access tokens
|
||||
yield self.hs.get_pusherpool().remove_pushers_by_access_token(
|
||||
user_id, (token_id for _, token_id, _ in tokens_and_devices),
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_threepid(self, user_id, medium, address, validated_at):
|
||||
# 'Canonicalise' email addresses down to lower case.
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from twisted.internet import defer
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DeactivateAccountHandler(BaseHandler):
|
||||
"""Handler which deals with deactivating user accounts."""
|
||||
def __init__(self, hs):
|
||||
super(DeactivateAccountHandler, self).__init__(hs)
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
self._device_handler = hs.get_device_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def deactivate_account(self, user_id):
|
||||
"""Deactivate a user's account
|
||||
|
||||
Args:
|
||||
user_id (str): ID of user to be deactivated
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
# FIXME: Theoretically there is a race here wherein user resets
|
||||
# password using threepid.
|
||||
|
||||
# first delete any devices belonging to the user, which will also
|
||||
# delete corresponding access tokens.
|
||||
yield self._device_handler.delete_all_devices_for_user(user_id)
|
||||
# then delete any remaining access tokens which weren't associated with
|
||||
# a device.
|
||||
yield self._auth_handler.delete_access_tokens_for_user(user_id)
|
||||
|
||||
yield self.store.user_delete_threepids(user_id)
|
||||
yield self.store.user_set_password_hash(user_id, None)
|
|
@ -170,13 +170,31 @@ class DeviceHandler(BaseHandler):
|
|||
|
||||
yield self.notify_device_update(user_id, [device_id])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_all_devices_for_user(self, user_id, except_device_id=None):
|
||||
"""Delete all of the user's devices
|
||||
|
||||
Args:
|
||||
user_id (str):
|
||||
except_device_id (str|None): optional device id which should not
|
||||
be deleted
|
||||
|
||||
Returns:
|
||||
defer.Deferred:
|
||||
"""
|
||||
device_map = yield self.store.get_devices_by_user(user_id)
|
||||
device_ids = device_map.keys()
|
||||
if except_device_id is not None:
|
||||
device_ids = [d for d in device_ids if d != except_device_id]
|
||||
yield self.delete_devices(user_id, device_ids)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_devices(self, user_id, device_ids):
|
||||
""" Delete several devices
|
||||
|
||||
Args:
|
||||
user_id (str):
|
||||
device_ids (str): The list of device IDs to delete
|
||||
device_ids (List[str]): The list of device IDs to delete
|
||||
|
||||
Returns:
|
||||
defer.Deferred:
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import Codes, StoreError, SynapseError
|
||||
from ._base import BaseHandler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SetPasswordHandler(BaseHandler):
|
||||
"""Handler which deals with changing user account passwords"""
|
||||
def __init__(self, hs):
|
||||
super(SetPasswordHandler, self).__init__(hs)
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
self._device_handler = hs.get_device_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_password(self, user_id, newpassword, requester=None):
|
||||
password_hash = self._auth_handler.hash(newpassword)
|
||||
|
||||
except_device_id = requester.device_id if requester else None
|
||||
except_access_token_id = requester.access_token_id if requester else None
|
||||
|
||||
try:
|
||||
yield self.store.user_set_password_hash(user_id, password_hash)
|
||||
except StoreError as e:
|
||||
if e.code == 404:
|
||||
raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
|
||||
raise e
|
||||
|
||||
# we want to log out all of the user's other sessions. First delete
|
||||
# all his other devices.
|
||||
yield self._device_handler.delete_all_devices_for_user(
|
||||
user_id, except_device_id=except_device_id,
|
||||
)
|
||||
|
||||
# and now delete any access tokens which weren't associated with
|
||||
# devices (or were associated with this device).
|
||||
yield self._auth_handler.delete_access_tokens_for_user(
|
||||
user_id, except_token_id=except_access_token_id,
|
||||
)
|
|
@ -362,8 +362,10 @@ def _get_hosts_for_srv_record(dns_client, host):
|
|||
return res
|
||||
|
||||
# no logcontexts here, so we can safely fire these off and gatherResults
|
||||
d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb)
|
||||
d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb)
|
||||
d1 = dns_client.lookupAddress(host).addCallbacks(
|
||||
cb, eb, errbackArgs=("A", ))
|
||||
d2 = dns_client.lookupIPV6Address(host).addCallbacks(
|
||||
cb, eb, errbackArgs=("AAAA", ))
|
||||
results = yield defer.DeferredList(
|
||||
[d1, d2], consumeErrors=True)
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ from canonicaljson import (
|
|||
)
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.python import failure
|
||||
from twisted.web import server, resource
|
||||
from twisted.web.server import NOT_DONE_YET
|
||||
from twisted.web.util import redirectTo
|
||||
|
@ -131,12 +132,17 @@ def wrap_request_handler(request_handler, include_metrics=False):
|
|||
version_string=self.version_string,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed handle request %s.%s on %r: %r",
|
||||
# failure.Failure() fishes the original Failure out
|
||||
# of our stack, and thus gives us a sensible stack
|
||||
# trace.
|
||||
f = failure.Failure()
|
||||
logger.error(
|
||||
"Failed handle request %s.%s on %r: %r: %s",
|
||||
request_handler.__module__,
|
||||
request_handler.__name__,
|
||||
self,
|
||||
request
|
||||
request,
|
||||
f.getTraceback().rstrip(),
|
||||
)
|
||||
respond_with_json(
|
||||
request,
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.types import UserID
|
||||
|
||||
|
@ -81,6 +82,7 @@ class ModuleApi(object):
|
|||
reg = self.hs.get_handlers().registration_handler
|
||||
return reg.register(localpart=localpart)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def invalidate_access_token(self, access_token):
|
||||
"""Invalidate an access token for a user
|
||||
|
||||
|
@ -94,8 +96,16 @@ class ModuleApi(object):
|
|||
Raises:
|
||||
synapse.api.errors.AuthError: the access token is invalid
|
||||
"""
|
||||
|
||||
return self._auth_handler.delete_access_token(access_token)
|
||||
# see if the access token corresponds to a device
|
||||
user_info = yield self._auth.get_user_by_access_token(access_token)
|
||||
device_id = user_info.get("device_id")
|
||||
user_id = user_info["user"].to_string()
|
||||
if device_id:
|
||||
# delete the device, which will also delete its access tokens
|
||||
yield self.hs.get_device_handler().delete_device(user_id, device_id)
|
||||
else:
|
||||
# no associated device. Just delete the access token.
|
||||
yield self._auth_handler.delete_access_token(access_token)
|
||||
|
||||
def run_db_interaction(self, desc, func, *args, **kwargs):
|
||||
"""Run a function with a database connection
|
||||
|
|
|
@ -255,9 +255,7 @@ class Notifier(object):
|
|||
)
|
||||
|
||||
if self.federation_sender:
|
||||
preserve_fn(self.federation_sender.notify_new_events)(
|
||||
room_stream_id
|
||||
)
|
||||
self.federation_sender.notify_new_events(room_stream_id)
|
||||
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
self._user_joined_room(event.state_key, event.room_id)
|
||||
|
@ -297,8 +295,7 @@ class Notifier(object):
|
|||
def on_new_replication_data(self):
|
||||
"""Used to inform replication listeners that something has happend
|
||||
without waking up any of the normal user event streams"""
|
||||
with PreserveLoggingContext():
|
||||
self.notify_replication()
|
||||
self.notify_replication()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def wait_for_events(self, user_id, timeout, callback, room_ids=None,
|
||||
|
@ -516,8 +513,14 @@ class Notifier(object):
|
|||
self.replication_deferred = ObservableDeferred(defer.Deferred())
|
||||
deferred.callback(None)
|
||||
|
||||
for cb in self.replication_callbacks:
|
||||
preserve_fn(cb)()
|
||||
# the callbacks may well outlast the current request, so we run
|
||||
# them in the sentinel logcontext.
|
||||
#
|
||||
# (ideally it would be up to the callbacks to know if they were
|
||||
# starting off background processes and drop the logcontext
|
||||
# accordingly, but that requires more changes)
|
||||
for cb in self.replication_callbacks:
|
||||
cb()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def wait_for_replication(self, callback, timeout):
|
||||
|
|
|
@ -103,19 +103,25 @@ class PusherPool:
|
|||
yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def remove_pushers_by_user(self, user_id, except_access_token_id=None):
|
||||
all = yield self.store.get_all_pushers()
|
||||
logger.info(
|
||||
"Removing all pushers for user %s except access tokens id %r",
|
||||
user_id, except_access_token_id
|
||||
)
|
||||
for p in all:
|
||||
if p['user_name'] == user_id and p['access_token'] != except_access_token_id:
|
||||
def remove_pushers_by_access_token(self, user_id, access_tokens):
|
||||
"""Remove the pushers for a given user corresponding to a set of
|
||||
access_tokens.
|
||||
|
||||
Args:
|
||||
user_id (str): user to remove pushers for
|
||||
access_tokens (Iterable[int]): access token *ids* to remove pushers
|
||||
for
|
||||
"""
|
||||
tokens = set(access_tokens)
|
||||
for p in (yield self.store.get_pushers_by_user_id(user_id)):
|
||||
if p['access_token'] in tokens:
|
||||
logger.info(
|
||||
"Removing pusher for app id %s, pushkey %s, user %s",
|
||||
p['app_id'], p['pushkey'], p['user_name']
|
||||
)
|
||||
yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
|
||||
yield self.remove_pusher(
|
||||
p['app_id'], p['pushkey'], p['user_name'],
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_notifications(self, min_stream_id, max_stream_id):
|
||||
|
|
|
@ -216,11 +216,12 @@ class ReplicationStreamer(object):
|
|||
self.federation_sender.federation_ack(token)
|
||||
|
||||
@measure_func("repl.on_user_sync")
|
||||
@defer.inlineCallbacks
|
||||
def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
|
||||
"""A client has started/stopped syncing on a worker.
|
||||
"""
|
||||
user_sync_counter.inc()
|
||||
self.presence_handler.update_external_syncs_row(
|
||||
yield self.presence_handler.update_external_syncs_row(
|
||||
conn_id, user_id, is_syncing, last_sync_ms,
|
||||
)
|
||||
|
||||
|
@ -244,11 +245,12 @@ class ReplicationStreamer(object):
|
|||
getattr(self.store, cache_func).invalidate(tuple(keys))
|
||||
|
||||
@measure_func("repl.on_user_ip")
|
||||
@defer.inlineCallbacks
|
||||
def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
|
||||
"""The client saw a user request
|
||||
"""
|
||||
user_ip_cache_counter.inc()
|
||||
self.store.insert_client_ip(
|
||||
yield self.store.insert_client_ip(
|
||||
user_id, access_token, ip, user_agent, device_id, last_seen,
|
||||
)
|
||||
|
||||
|
|
|
@ -137,8 +137,8 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
|
|||
PATTERNS = client_path_patterns("/admin/deactivate/(?P<target_user_id>[^/]*)")
|
||||
|
||||
def __init__(self, hs):
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
super(DeactivateAccountRestServlet, self).__init__(hs)
|
||||
self._deactivate_account_handler = hs.get_deactivate_account_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request, target_user_id):
|
||||
|
@ -149,7 +149,7 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
|
|||
if not is_admin:
|
||||
raise AuthError(403, "You are not a server admin")
|
||||
|
||||
yield self._auth_handler.deactivate_account(target_user_id)
|
||||
yield self._deactivate_account_handler.deactivate_account(target_user_id)
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
|
@ -309,7 +309,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
|
|||
super(ResetPasswordRestServlet, self).__init__(hs)
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.auth_handler = hs.get_auth_handler()
|
||||
self._set_password_handler = hs.get_set_password_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request, target_user_id):
|
||||
|
@ -330,7 +330,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
|
|||
|
||||
logger.info("new_password: %r", new_password)
|
||||
|
||||
yield self.auth_handler.set_password(
|
||||
yield self._set_password_handler.set_password(
|
||||
target_user_id, new_password, requester
|
||||
)
|
||||
defer.returnValue((200, {}))
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.auth import get_access_token_from_request
|
||||
from synapse.api.errors import AuthError
|
||||
|
||||
from .base import ClientV1RestServlet, client_path_patterns
|
||||
|
||||
|
@ -30,15 +31,30 @@ class LogoutRestServlet(ClientV1RestServlet):
|
|||
|
||||
def __init__(self, hs):
|
||||
super(LogoutRestServlet, self).__init__(hs)
|
||||
self._auth = hs.get_auth()
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
self._device_handler = hs.get_device_handler()
|
||||
|
||||
def on_OPTIONS(self, request):
|
||||
return (200, {})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request):
|
||||
access_token = get_access_token_from_request(request)
|
||||
yield self._auth_handler.delete_access_token(access_token)
|
||||
try:
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
except AuthError:
|
||||
# this implies the access token has already been deleted.
|
||||
pass
|
||||
else:
|
||||
if requester.device_id is None:
|
||||
# the acccess token wasn't associated with a device.
|
||||
# Just delete the access token
|
||||
access_token = get_access_token_from_request(request)
|
||||
yield self._auth_handler.delete_access_token(access_token)
|
||||
else:
|
||||
yield self._device_handler.delete_device(
|
||||
requester.user.to_string(), requester.device_id)
|
||||
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
|
@ -49,6 +65,7 @@ class LogoutAllRestServlet(ClientV1RestServlet):
|
|||
super(LogoutAllRestServlet, self).__init__(hs)
|
||||
self.auth = hs.get_auth()
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
self._device_handler = hs.get_device_handler()
|
||||
|
||||
def on_OPTIONS(self, request):
|
||||
return (200, {})
|
||||
|
@ -57,6 +74,12 @@ class LogoutAllRestServlet(ClientV1RestServlet):
|
|||
def on_POST(self, request):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
# first delete all of the user's devices
|
||||
yield self._device_handler.delete_all_devices_for_user(user_id)
|
||||
|
||||
# .. and then delete any access tokens which weren't associated with
|
||||
# devices.
|
||||
yield self._auth_handler.delete_access_tokens_for_user(user_id)
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
|
|
@ -98,6 +98,7 @@ class PasswordRestServlet(RestServlet):
|
|||
self.auth = hs.get_auth()
|
||||
self.auth_handler = hs.get_auth_handler()
|
||||
self.datastore = self.hs.get_datastore()
|
||||
self._set_password_handler = hs.get_set_password_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request):
|
||||
|
@ -147,7 +148,7 @@ class PasswordRestServlet(RestServlet):
|
|||
raise SynapseError(400, "", Codes.MISSING_PARAM)
|
||||
new_password = params['new_password']
|
||||
|
||||
yield self.auth_handler.set_password(
|
||||
yield self._set_password_handler.set_password(
|
||||
user_id, new_password, requester
|
||||
)
|
||||
|
||||
|
@ -161,10 +162,11 @@ class DeactivateAccountRestServlet(RestServlet):
|
|||
PATTERNS = client_v2_patterns("/account/deactivate$")
|
||||
|
||||
def __init__(self, hs):
|
||||
super(DeactivateAccountRestServlet, self).__init__()
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.auth_handler = hs.get_auth_handler()
|
||||
super(DeactivateAccountRestServlet, self).__init__()
|
||||
self._deactivate_account_handler = hs.get_deactivate_account_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request):
|
||||
|
@ -179,7 +181,7 @@ class DeactivateAccountRestServlet(RestServlet):
|
|||
|
||||
# allow ASes to dectivate their own users
|
||||
if requester and requester.app_service:
|
||||
yield self.auth_handler.deactivate_account(
|
||||
yield self._deactivate_account_handler.deactivate_account(
|
||||
requester.user.to_string()
|
||||
)
|
||||
defer.returnValue((200, {}))
|
||||
|
@ -206,7 +208,7 @@ class DeactivateAccountRestServlet(RestServlet):
|
|||
logger.error("Auth succeeded but no known type!", result.keys())
|
||||
raise SynapseError(500, "", Codes.UNKNOWN)
|
||||
|
||||
yield self.auth_handler.deactivate_account(user_id)
|
||||
yield self._deactivate_account_handler.deactivate_account(user_id)
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ class GroupServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, group_id):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
group_description = yield self.groups_handler.get_group_profile(
|
||||
|
@ -74,7 +74,7 @@ class GroupSummaryServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, group_id):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
get_group_summary = yield self.groups_handler.get_group_summary(
|
||||
|
@ -148,7 +148,7 @@ class GroupCategoryServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, group_id, category_id):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
category = yield self.groups_handler.get_group_category(
|
||||
|
@ -200,7 +200,7 @@ class GroupCategoriesServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, group_id):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
category = yield self.groups_handler.get_group_categories(
|
||||
|
@ -225,7 +225,7 @@ class GroupRoleServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, group_id, role_id):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
category = yield self.groups_handler.get_group_role(
|
||||
|
@ -277,7 +277,7 @@ class GroupRolesServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, group_id):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
category = yield self.groups_handler.get_group_roles(
|
||||
|
@ -348,7 +348,7 @@ class GroupRoomServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, group_id):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
result = yield self.groups_handler.get_rooms_in_group(group_id, requester_user_id)
|
||||
|
@ -369,7 +369,7 @@ class GroupUsersServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, group_id):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
result = yield self.groups_handler.get_users_in_group(group_id, requester_user_id)
|
||||
|
@ -672,7 +672,7 @@ class PublicisedGroupsForUserServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, user_id):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
result = yield self.groups_handler.get_publicised_groups_for_user(
|
||||
user_id
|
||||
|
@ -697,7 +697,7 @@ class PublicisedGroupsForUsersServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
user_ids = content["user_ids"]
|
||||
|
@ -724,7 +724,7 @@ class GroupsForUserServlet(RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
result = yield self.groups_handler.get_joined_groups(requester_user_id)
|
||||
|
|
|
@ -25,7 +25,8 @@ from synapse.util.stringutils import random_string
|
|||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.http.client import SpiderHttpClient
|
||||
from synapse.http.server import (
|
||||
request_handler, respond_with_json_bytes
|
||||
request_handler, respond_with_json_bytes,
|
||||
respond_with_json,
|
||||
)
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.stringutils import is_ascii
|
||||
|
@ -78,6 +79,9 @@ class PreviewUrlResource(Resource):
|
|||
self._expire_url_cache_data, 10 * 1000
|
||||
)
|
||||
|
||||
def render_OPTIONS(self, request):
|
||||
return respond_with_json(request, 200, {}, send_cors=True)
|
||||
|
||||
def render_GET(self, request):
|
||||
self._async_render_GET(request)
|
||||
return NOT_DONE_YET
|
||||
|
@ -348,11 +352,16 @@ class PreviewUrlResource(Resource):
|
|||
def _expire_url_cache_data(self):
|
||||
"""Clean up expired url cache content, media and thumbnails.
|
||||
"""
|
||||
|
||||
# TODO: Delete from backup media store
|
||||
|
||||
now = self.clock.time_msec()
|
||||
|
||||
logger.info("Running url preview cache expiry")
|
||||
|
||||
if not (yield self.store.has_completed_background_updates()):
|
||||
logger.info("Still running DB updates; skipping expiry")
|
||||
return
|
||||
|
||||
# First we delete expired url cache entries
|
||||
media_ids = yield self.store.get_expired_url_cache(now)
|
||||
|
||||
|
@ -426,8 +435,7 @@ class PreviewUrlResource(Resource):
|
|||
|
||||
yield self.store.delete_url_cache_media(removed_media)
|
||||
|
||||
if removed_media:
|
||||
logger.info("Deleted %d media from url cache", len(removed_media))
|
||||
logger.info("Deleted %d media from url cache", len(removed_media))
|
||||
|
||||
|
||||
def decode_and_calc_og(body, media_uri, request_encoding=None):
|
||||
|
|
|
@ -39,11 +39,13 @@ from synapse.federation.transaction_queue import TransactionQueue
|
|||
from synapse.handlers import Handlers
|
||||
from synapse.handlers.appservice import ApplicationServicesHandler
|
||||
from synapse.handlers.auth import AuthHandler, MacaroonGeneartor
|
||||
from synapse.handlers.deactivate_account import DeactivateAccountHandler
|
||||
from synapse.handlers.devicemessage import DeviceMessageHandler
|
||||
from synapse.handlers.device import DeviceHandler
|
||||
from synapse.handlers.e2e_keys import E2eKeysHandler
|
||||
from synapse.handlers.presence import PresenceHandler
|
||||
from synapse.handlers.room_list import RoomListHandler
|
||||
from synapse.handlers.set_password import SetPasswordHandler
|
||||
from synapse.handlers.sync import SyncHandler
|
||||
from synapse.handlers.typing import TypingHandler
|
||||
from synapse.handlers.events import EventHandler, EventStreamHandler
|
||||
|
@ -60,7 +62,10 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
|||
from synapse.notifier import Notifier
|
||||
from synapse.push.action_generator import ActionGenerator
|
||||
from synapse.push.pusherpool import PusherPool
|
||||
from synapse.rest.media.v1.media_repository import MediaRepository
|
||||
from synapse.rest.media.v1.media_repository import (
|
||||
MediaRepository,
|
||||
MediaRepositoryResource,
|
||||
)
|
||||
from synapse.state import StateHandler
|
||||
from synapse.storage import DataStore
|
||||
from synapse.streams.events import EventSources
|
||||
|
@ -90,17 +95,12 @@ class HomeServer(object):
|
|||
"""
|
||||
|
||||
DEPENDENCIES = [
|
||||
'config',
|
||||
'clock',
|
||||
'http_client',
|
||||
'db_pool',
|
||||
'persistence_service',
|
||||
'replication_layer',
|
||||
'datastore',
|
||||
'handlers',
|
||||
'v1auth',
|
||||
'auth',
|
||||
'rest_servlet_factory',
|
||||
'state_handler',
|
||||
'presence_handler',
|
||||
'sync_handler',
|
||||
|
@ -117,19 +117,10 @@ class HomeServer(object):
|
|||
'application_service_handler',
|
||||
'device_message_handler',
|
||||
'profile_handler',
|
||||
'deactivate_account_handler',
|
||||
'set_password_handler',
|
||||
'notifier',
|
||||
'distributor',
|
||||
'client_resource',
|
||||
'resource_for_federation',
|
||||
'resource_for_static_content',
|
||||
'resource_for_web_client',
|
||||
'resource_for_content_repo',
|
||||
'resource_for_server_key',
|
||||
'resource_for_server_key_v2',
|
||||
'resource_for_media_repository',
|
||||
'resource_for_metrics',
|
||||
'event_sources',
|
||||
'ratelimiter',
|
||||
'keyring',
|
||||
'pusherpool',
|
||||
'event_builder_factory',
|
||||
|
@ -137,6 +128,7 @@ class HomeServer(object):
|
|||
'http_client_context_factory',
|
||||
'simple_http_client',
|
||||
'media_repository',
|
||||
'media_repository_resource',
|
||||
'federation_transport_client',
|
||||
'federation_sender',
|
||||
'receipts_handler',
|
||||
|
@ -183,6 +175,21 @@ class HomeServer(object):
|
|||
def is_mine_id(self, string):
|
||||
return string.split(":", 1)[1] == self.hostname
|
||||
|
||||
def get_clock(self):
|
||||
return self.clock
|
||||
|
||||
def get_datastore(self):
|
||||
return self.datastore
|
||||
|
||||
def get_config(self):
|
||||
return self.config
|
||||
|
||||
def get_distributor(self):
|
||||
return self.distributor
|
||||
|
||||
def get_ratelimiter(self):
|
||||
return self.ratelimiter
|
||||
|
||||
def build_replication_layer(self):
|
||||
return initialize_http_replication(self)
|
||||
|
||||
|
@ -265,6 +272,12 @@ class HomeServer(object):
|
|||
def build_profile_handler(self):
|
||||
return ProfileHandler(self)
|
||||
|
||||
def build_deactivate_account_handler(self):
|
||||
return DeactivateAccountHandler(self)
|
||||
|
||||
def build_set_password_handler(self):
|
||||
return SetPasswordHandler(self)
|
||||
|
||||
def build_event_sources(self):
|
||||
return EventSources(self)
|
||||
|
||||
|
@ -294,6 +307,11 @@ class HomeServer(object):
|
|||
**self.db_config.get("args", {})
|
||||
)
|
||||
|
||||
def build_media_repository_resource(self):
|
||||
# build the media repo resource. This indirects through the HomeServer
|
||||
# to ensure that we only have a single instance of
|
||||
return MediaRepositoryResource(self)
|
||||
|
||||
def build_media_repository(self):
|
||||
return MediaRepository(self)
|
||||
|
||||
|
|
|
@ -3,10 +3,14 @@ import synapse.federation.transaction_queue
|
|||
import synapse.federation.transport.client
|
||||
import synapse.handlers
|
||||
import synapse.handlers.auth
|
||||
import synapse.handlers.deactivate_account
|
||||
import synapse.handlers.device
|
||||
import synapse.handlers.e2e_keys
|
||||
import synapse.storage
|
||||
import synapse.handlers.set_password
|
||||
import synapse.rest.media.v1.media_repository
|
||||
import synapse.state
|
||||
import synapse.storage
|
||||
|
||||
|
||||
class HomeServer(object):
|
||||
def get_auth(self) -> synapse.api.auth.Auth:
|
||||
|
@ -30,8 +34,20 @@ class HomeServer(object):
|
|||
def get_state_handler(self) -> synapse.state.StateHandler:
|
||||
pass
|
||||
|
||||
def get_deactivate_account_handler(self) -> synapse.handlers.deactivate_account.DeactivateAccountHandler:
|
||||
pass
|
||||
|
||||
def get_set_password_handler(self) -> synapse.handlers.set_password.SetPasswordHandler:
|
||||
pass
|
||||
|
||||
def get_federation_sender(self) -> synapse.federation.transaction_queue.TransactionQueue:
|
||||
pass
|
||||
|
||||
def get_federation_transport_client(self) -> synapse.federation.transport.client.TransportLayerClient:
|
||||
pass
|
||||
|
||||
def get_media_repository_resource(self) -> synapse.rest.media.v1.media_repository.MediaRepositoryResource:
|
||||
pass
|
||||
|
||||
def get_media_repository(self) -> synapse.rest.media.v1.media_repository.MediaRepository:
|
||||
pass
|
||||
|
|
|
@ -495,6 +495,7 @@ class SQLBaseStore(object):
|
|||
Deferred(bool): True if a new entry was created, False if an
|
||||
existing one was updated.
|
||||
"""
|
||||
attempts = 0
|
||||
while True:
|
||||
try:
|
||||
result = yield self.runInteraction(
|
||||
|
@ -504,6 +505,12 @@ class SQLBaseStore(object):
|
|||
)
|
||||
defer.returnValue(result)
|
||||
except self.database_engine.module.IntegrityError as e:
|
||||
attempts += 1
|
||||
if attempts >= 5:
|
||||
# don't retry forever, because things other than races
|
||||
# can cause IntegrityErrors
|
||||
raise
|
||||
|
||||
# presumably we raced with another transaction: let's retry.
|
||||
logger.warn(
|
||||
"IntegrityError when upserting into %s; retrying: %s",
|
||||
|
@ -600,20 +607,18 @@ class SQLBaseStore(object):
|
|||
|
||||
@staticmethod
|
||||
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
|
||||
if keyvalues:
|
||||
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
|
||||
else:
|
||||
where = ""
|
||||
|
||||
sql = (
|
||||
"SELECT %(retcol)s FROM %(table)s %(where)s"
|
||||
"SELECT %(retcol)s FROM %(table)s"
|
||||
) % {
|
||||
"retcol": retcol,
|
||||
"table": table,
|
||||
"where": where,
|
||||
}
|
||||
|
||||
txn.execute(sql, keyvalues.values())
|
||||
if keyvalues:
|
||||
sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
|
||||
txn.execute(sql, keyvalues.values())
|
||||
else:
|
||||
txn.execute(sql)
|
||||
|
||||
return [r[0] for r in txn]
|
||||
|
||||
|
@ -624,7 +629,7 @@ class SQLBaseStore(object):
|
|||
|
||||
Args:
|
||||
table (str): table name
|
||||
keyvalues (dict): column names and values to select the rows with
|
||||
keyvalues (dict|None): column names and values to select the rows with
|
||||
retcol (str): column whos value we wish to retrieve.
|
||||
|
||||
Returns:
|
||||
|
|
|
@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore):
|
|||
"""
|
||||
content_json = json.dumps(content)
|
||||
|
||||
def add_account_data_txn(txn, next_id):
|
||||
self._simple_upsert_txn(
|
||||
txn,
|
||||
with self._account_data_id_gen.get_next() as next_id:
|
||||
# no need to lock here as room_account_data has a unique constraint
|
||||
# on (user_id, room_id, account_data_type) so _simple_upsert will
|
||||
# retry if there is a conflict.
|
||||
yield self._simple_upsert(
|
||||
desc="add_room_account_data",
|
||||
table="room_account_data",
|
||||
keyvalues={
|
||||
"user_id": user_id,
|
||||
|
@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore):
|
|||
values={
|
||||
"stream_id": next_id,
|
||||
"content": content_json,
|
||||
}
|
||||
},
|
||||
lock=False,
|
||||
)
|
||||
txn.call_after(
|
||||
self._account_data_stream_cache.entity_has_changed,
|
||||
user_id, next_id,
|
||||
)
|
||||
txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
|
||||
self._update_max_stream_id(txn, next_id)
|
||||
|
||||
with self._account_data_id_gen.get_next() as next_id:
|
||||
yield self.runInteraction(
|
||||
"add_room_account_data", add_account_data_txn, next_id
|
||||
)
|
||||
# it's theoretically possible for the above to succeed and the
|
||||
# below to fail - in which case we might reuse a stream id on
|
||||
# restart, and the above update might not get propagated. That
|
||||
# doesn't sound any worse than the whole update getting lost,
|
||||
# which is what would happen if we combined the two into one
|
||||
# transaction.
|
||||
yield self._update_max_stream_id(next_id)
|
||||
|
||||
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
|
||||
self.get_account_data_for_user.invalidate((user_id,))
|
||||
|
||||
result = self._account_data_id_gen.get_current_token()
|
||||
defer.returnValue(result)
|
||||
|
@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore):
|
|||
"""
|
||||
content_json = json.dumps(content)
|
||||
|
||||
def add_account_data_txn(txn, next_id):
|
||||
self._simple_upsert_txn(
|
||||
txn,
|
||||
with self._account_data_id_gen.get_next() as next_id:
|
||||
# no need to lock here as account_data has a unique constraint on
|
||||
# (user_id, account_data_type) so _simple_upsert will retry if
|
||||
# there is a conflict.
|
||||
yield self._simple_upsert(
|
||||
desc="add_user_account_data",
|
||||
table="account_data",
|
||||
keyvalues={
|
||||
"user_id": user_id,
|
||||
|
@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore):
|
|||
values={
|
||||
"stream_id": next_id,
|
||||
"content": content_json,
|
||||
}
|
||||
},
|
||||
lock=False,
|
||||
)
|
||||
txn.call_after(
|
||||
self._account_data_stream_cache.entity_has_changed,
|
||||
|
||||
# it's theoretically possible for the above to succeed and the
|
||||
# below to fail - in which case we might reuse a stream id on
|
||||
# restart, and the above update might not get propagated. That
|
||||
# doesn't sound any worse than the whole update getting lost,
|
||||
# which is what would happen if we combined the two into one
|
||||
# transaction.
|
||||
yield self._update_max_stream_id(next_id)
|
||||
|
||||
self._account_data_stream_cache.entity_has_changed(
|
||||
user_id, next_id,
|
||||
)
|
||||
txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
|
||||
txn.call_after(
|
||||
self.get_global_account_data_by_type_for_user.invalidate,
|
||||
self.get_account_data_for_user.invalidate((user_id,))
|
||||
self.get_global_account_data_by_type_for_user.invalidate(
|
||||
(account_data_type, user_id,)
|
||||
)
|
||||
self._update_max_stream_id(txn, next_id)
|
||||
|
||||
with self._account_data_id_gen.get_next() as next_id:
|
||||
yield self.runInteraction(
|
||||
"add_user_account_data", add_account_data_txn, next_id
|
||||
)
|
||||
|
||||
result = self._account_data_id_gen.get_current_token()
|
||||
defer.returnValue(result)
|
||||
|
||||
def _update_max_stream_id(self, txn, next_id):
|
||||
def _update_max_stream_id(self, next_id):
|
||||
"""Update the max stream_id
|
||||
|
||||
Args:
|
||||
txn: The database cursor
|
||||
next_id(int): The the revision to advance to.
|
||||
"""
|
||||
update_max_id_sql = (
|
||||
"UPDATE account_data_max_stream_id"
|
||||
" SET stream_id = ?"
|
||||
" WHERE stream_id < ?"
|
||||
def _update(txn):
|
||||
update_max_id_sql = (
|
||||
"UPDATE account_data_max_stream_id"
|
||||
" SET stream_id = ?"
|
||||
" WHERE stream_id < ?"
|
||||
)
|
||||
txn.execute(update_max_id_sql, (next_id, next_id))
|
||||
return self.runInteraction(
|
||||
"update_account_data_max_stream_id",
|
||||
_update,
|
||||
)
|
||||
txn.execute(update_max_id_sql, (next_id, next_id))
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
|
||||
def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
|
||||
|
|
|
@ -85,6 +85,7 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
self._background_update_performance = {}
|
||||
self._background_update_queue = []
|
||||
self._background_update_handlers = {}
|
||||
self._all_done = False
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def start_doing_background_updates(self):
|
||||
|
@ -106,8 +107,40 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
"No more background updates to do."
|
||||
" Unscheduling background update task."
|
||||
)
|
||||
self._all_done = True
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def has_completed_background_updates(self):
|
||||
"""Check if all the background updates have completed
|
||||
|
||||
Returns:
|
||||
Deferred[bool]: True if all background updates have completed
|
||||
"""
|
||||
# if we've previously determined that there is nothing left to do, that
|
||||
# is easy
|
||||
if self._all_done:
|
||||
defer.returnValue(True)
|
||||
|
||||
# obviously, if we have things in our queue, we're not done.
|
||||
if self._background_update_queue:
|
||||
defer.returnValue(False)
|
||||
|
||||
# otherwise, check if there are updates to be run. This is important,
|
||||
# as we may be running on a worker which doesn't perform the bg updates
|
||||
# itself, but still wants to wait for them to happen.
|
||||
updates = yield self._simple_select_onecol(
|
||||
"background_updates",
|
||||
keyvalues=None,
|
||||
retcol="1",
|
||||
desc="check_background_updates",
|
||||
)
|
||||
if not updates:
|
||||
self._all_done = True
|
||||
defer.returnValue(True)
|
||||
|
||||
defer.returnValue(False)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def do_next_background_update(self, desired_duration_ms):
|
||||
"""Does some amount of work on the next queued background update
|
||||
|
@ -269,7 +302,7 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
# Sqlite doesn't support concurrent creation of indexes.
|
||||
#
|
||||
# We don't use partial indices on SQLite as it wasn't introduced
|
||||
# until 3.8, and wheezy has 3.7
|
||||
# until 3.8, and wheezy and CentOS 7 have 3.7
|
||||
#
|
||||
# We assume that sqlite doesn't give us invalid indices; however
|
||||
# we may still end up with the index existing but the
|
||||
|
|
|
@ -12,13 +12,23 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
|
||||
|
||||
class MediaRepositoryStore(SQLBaseStore):
|
||||
class MediaRepositoryStore(BackgroundUpdateStore):
|
||||
"""Persistence for attachments and avatars"""
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(MediaRepositoryStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.register_background_index_update(
|
||||
update_name='local_media_repository_url_idx',
|
||||
index_name='local_media_repository_url_idx',
|
||||
table='local_media_repository',
|
||||
columns=['created_ts'],
|
||||
where_clause='url_cache IS NOT NULL',
|
||||
)
|
||||
|
||||
def get_default_thumbnails(self, top_level_type, sub_type):
|
||||
return []
|
||||
|
||||
|
|
|
@ -254,8 +254,8 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
|
|||
If None, tokens associated with any device (or no device) will
|
||||
be deleted
|
||||
Returns:
|
||||
defer.Deferred[list[str, str|None]]: a list of the deleted tokens
|
||||
and device IDs
|
||||
defer.Deferred[list[str, int, str|None, int]]: a list of
|
||||
(token, token id, device id) for each of the deleted tokens
|
||||
"""
|
||||
def f(txn):
|
||||
keyvalues = {
|
||||
|
@ -272,12 +272,12 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
|
|||
values.append(except_token_id)
|
||||
|
||||
txn.execute(
|
||||
"SELECT token, device_id FROM access_tokens WHERE %s" % where_clause,
|
||||
"SELECT token, id, device_id FROM access_tokens WHERE %s" % where_clause,
|
||||
values
|
||||
)
|
||||
tokens_and_devices = [(r[0], r[1]) for r in txn]
|
||||
tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
|
||||
|
||||
for token, _ in tokens_and_devices:
|
||||
for token, _, _ in tokens_and_devices:
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_user_by_access_token, (token,)
|
||||
)
|
||||
|
|
|
@ -13,7 +13,10 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
|
||||
-- this didn't work on SQLite 3.7 (because of lack of partial indexes), so was
|
||||
-- removed and replaced with 46/local_media_repository_url_idx.sql.
|
||||
--
|
||||
-- CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
|
||||
|
||||
-- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support
|
||||
-- indices on expressions until 3.9.
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
/* Copyright 2017 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- register a background update which will recreate the
|
||||
-- local_media_repository_url_idx index.
|
||||
--
|
||||
-- We do this as a bg update not because it is a particularly onerous
|
||||
-- operation, but because we'd like it to be a partial index if possible, and
|
||||
-- the background_index_update code will understand whether we are on
|
||||
-- postgres or sqlite and behave accordingly.
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('local_media_repository_url_idx', '{}');
|
Loading…
Reference in New Issue