Merge branch 'neilj/room_capabilities' of github.com:matrix-org/synapse into neilj/room_capabilities

This commit is contained in:
Neil Johnson 2019-01-30 10:56:47 +00:00
commit b37e8c9572
123 changed files with 2447 additions and 665 deletions

View File

@ -15,6 +15,7 @@ recursive-include docs *
recursive-include scripts *
recursive-include scripts-dev *
recursive-include synapse *.pyi
recursive-include tests *.pem
recursive-include tests *.py
recursive-include synapse/res *

View File

@ -18,7 +18,7 @@ instructions that may be required are listed later in this document.
.. code:: bash
pip install --upgrade --process-dependency-links matrix-synapse
pip install --upgrade matrix-synapse
# restart synapse
synctl restart

1
changelog.d/4405.bugfix Normal file
View File

@ -0,0 +1 @@
Fix bug when rejecting remote invites

1
changelog.d/4408.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

View File

@ -1 +0,0 @@
Refactor 'sign_request' as 'build_auth_headers'

1
changelog.d/4409.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

View File

@ -1 +0,0 @@
Remove redundant federation connection wrapping code

1
changelog.d/4412.bugfix Normal file
View File

@ -0,0 +1 @@
Copy over whether a room is a direct message and any associated room tags on room upgrade.

1
changelog.d/4415.feature Normal file
View File

@ -0,0 +1 @@
Search now includes results from predecessor rooms after a room upgrade.

1
changelog.d/4426.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

View File

@ -1 +0,0 @@
Remove redundant SynapseKeyClientProtocol magic

1
changelog.d/4427.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

View File

@ -1 +0,0 @@
Refactor and cleanup for SRV record lookup

1
changelog.d/4428.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

View File

@ -1 +0,0 @@
Move SRV logic into the Agent layer

1
changelog.d/4437.misc Normal file
View File

@ -0,0 +1 @@
Add infrastructure to support different event formats

1
changelog.d/4447.misc Normal file
View File

@ -0,0 +1 @@
Add infrastructure to support different event formats

1
changelog.d/4448.misc Normal file
View File

@ -0,0 +1 @@
Add infrastructure to support different event formats

1
changelog.d/4464.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

View File

@ -1 +0,0 @@
Move SRV logic into the Agent layer

1
changelog.d/4466.misc Normal file
View File

@ -0,0 +1 @@
Synapse will now take advantage of native UPSERT functionality in PostgreSQL 9.5+ and SQLite 3.24+.

1
changelog.d/4468.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

1
changelog.d/4470.misc Normal file
View File

@ -0,0 +1 @@
Add infrastructure to support different event formats

1
changelog.d/4471.misc Normal file
View File

@ -0,0 +1 @@
Synapse will now take advantage of native UPSERT functionality in PostgreSQL 9.5+ and SQLite 3.24+.

1
changelog.d/4476.misc Normal file
View File

@ -0,0 +1 @@
Fix quoting for allowed_local_3pids example config

1
changelog.d/4477.misc Normal file
View File

@ -0,0 +1 @@
Synapse will now take advantage of native UPSERT functionality in PostgreSQL 9.5+ and SQLite 3.24+.

1
changelog.d/4481.misc Normal file
View File

@ -0,0 +1 @@
Add infrastructure to support different event formats

1
changelog.d/4482.misc Normal file
View File

@ -0,0 +1 @@
Add infrastructure to support different event formats

1
changelog.d/4483.feature Normal file
View File

@ -0,0 +1 @@
Add support for room version 3

1
changelog.d/4485.misc Normal file
View File

@ -0,0 +1 @@
Remove deprecated --process-dependency-links option from UPGRADE.rst

1
changelog.d/4486.bugfix Normal file
View File

@ -0,0 +1 @@
Workaround for login error when using both LDAP and internal authentication.

1
changelog.d/4487.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

1
changelog.d/4488.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

1
changelog.d/4489.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

1
changelog.d/4492.feature Normal file
View File

@ -0,0 +1 @@
Synapse can now automatically provision TLS certificates via ACME (the protocol used by CAs like Let's Encrypt).

1
changelog.d/4493.misc Normal file
View File

@ -0,0 +1 @@
Add infrastructure to support different event formats

1
changelog.d/4494.misc Normal file
View File

@ -0,0 +1 @@
Add infrastructure to support different event formats

1
changelog.d/4496.misc Normal file
View File

@ -0,0 +1 @@
Add infrastructure to support different event formats

1
changelog.d/4497.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

1
changelog.d/4498.misc Normal file
View File

@ -0,0 +1 @@
Clarify documentation for the `public_baseurl` config param

1
changelog.d/4499.feature Normal file
View File

@ -0,0 +1 @@
Add support for room version 3

1
changelog.d/4505.misc Normal file
View File

@ -0,0 +1 @@
Synapse will now take advantage of native UPSERT functionality in PostgreSQL 9.5+ and SQLite 3.24+.

1
changelog.d/4506.misc Normal file
View File

@ -0,0 +1 @@
Make it possible to set the log level for tests via an environment variable

1
changelog.d/4507.misc Normal file
View File

@ -0,0 +1 @@
Reduce the log level of linearizer lock acquirement to DEBUG.

1
changelog.d/4509.removal Normal file
View File

@ -0,0 +1 @@
Synapse no longer generates self-signed TLS certificates when generating a configuration file.

1
changelog.d/4510.misc Normal file
View File

@ -0,0 +1 @@
Add infrastructure to support different event formats

1
changelog.d/4511.feature Normal file
View File

@ -0,0 +1 @@
Implement MSC1708 (.well-known routing for server-server federation)

1
changelog.d/4512.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug where setting a relative consent directory path would cause a crash.

1
changelog.d/4514.misc Normal file
View File

@ -0,0 +1 @@
Add infrastructure to support different event formats

1
changelog.d/4515.feature Normal file
View File

@ -0,0 +1 @@
Add support for room version 3

View File

@ -65,7 +65,7 @@ class Auth(object):
register_cache("cache", "token_cache", self.token_cache)
@defer.inlineCallbacks
def check_from_context(self, event, context, do_sig_check=True):
def check_from_context(self, room_version, event, context, do_sig_check=True):
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.compute_auth_events(
event, prev_state_ids, for_verification=True,
@ -74,12 +74,16 @@ class Auth(object):
auth_events = {
(e.type, e.state_key): e for e in itervalues(auth_events)
}
self.check(event, auth_events=auth_events, do_sig_check=do_sig_check)
self.check(
room_version, event,
auth_events=auth_events, do_sig_check=do_sig_check,
)
def check(self, event, auth_events, do_sig_check=True):
def check(self, room_version, event, auth_events, do_sig_check=True):
""" Checks if this event is correctly authed.
Args:
room_version (str): version of the room
event: the event being checked.
auth_events (dict: event-key -> event): the existing room state.
@ -88,7 +92,9 @@ class Auth(object):
True if the auth checks pass.
"""
with Measure(self.clock, "auth.check"):
event_auth.check(event, auth_events, do_sig_check=do_sig_check)
event_auth.check(
room_version, event, auth_events, do_sig_check=do_sig_check
)
@defer.inlineCallbacks
def check_joined_room(self, room_id, user_id, current_state=None):
@ -544,17 +550,6 @@ class Auth(object):
"""
return self.store.is_server_admin(user)
@defer.inlineCallbacks
def add_auth_events(self, builder, context):
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_ids = yield self.compute_auth_events(builder, prev_state_ids)
auth_events_entries = yield self.store.add_event_hashes(
auth_ids
)
builder.auth_events = auth_events_entries
@defer.inlineCallbacks
def compute_auth_events(self, event, current_state_ids, for_verification=False):
if event.type == EventTypes.Create:
@ -571,7 +566,7 @@ class Auth(object):
key = (EventTypes.JoinRules, "", )
join_rule_event_id = current_state_ids.get(key)
key = (EventTypes.Member, event.user_id, )
key = (EventTypes.Member, event.sender, )
member_event_id = current_state_ids.get(key)
key = (EventTypes.Create, "", )
@ -621,7 +616,7 @@ class Auth(object):
defer.returnValue(auth_ids)
def check_redaction(self, event, auth_events):
def check_redaction(self, room_version, event, auth_events):
"""Check whether the event sender is allowed to redact the target event.
Returns:
@ -634,7 +629,7 @@ class Auth(object):
AuthError if the event sender is definitely not allowed to redact
the target event.
"""
return event_auth.check_redaction(event, auth_events)
return event_auth.check_redaction(room_version, event, auth_events)
@defer.inlineCallbacks
def check_can_change_room_list(self, room_id, user):

View File

@ -105,7 +105,6 @@ class RoomVersions(object):
V1 = "1"
V2 = "2"
V3 = "3"
VDH_TEST = "vdh-test-version"
STATE_V2_TEST = "state-v2-test"
@ -122,11 +121,26 @@ DEFAULT_ROOM_VERSION = RoomVersions.V1
KNOWN_ROOM_VERSIONS = {
RoomVersions.V1,
RoomVersions.V2,
RoomVersions.VDH_TEST,
RoomVersions.V3,
RoomVersions.STATE_V2_TEST,
RoomVersions.V3,
}
class EventFormatVersions(object):
"""This is an internal enum for tracking the version of the event format,
independently from the room version.
"""
V1 = 1
V2 = 2
KNOWN_EVENT_FORMAT_VERSIONS = {
EventFormatVersions.V1,
EventFormatVersions.V2,
}
ServerNoticeMsgType = "m.server_notice"
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"

View File

@ -444,6 +444,20 @@ class Filter(object):
def include_redundant_members(self):
return self.filter_json.get("include_redundant_members", False)
def with_room_ids(self, room_ids):
"""Returns a new filter with the given room IDs appended.
Args:
room_ids (iterable[unicode]): The room_ids to add
Returns:
filter: A new filter including the given rooms and the old
filter's rooms.
"""
newFilter = Filter(self.filter_json)
newFilter.rooms += room_ids
return newFilter
def _matches_wildcard(actual_value, filter_value):
if filter_value.endswith("*"):

View File

@ -164,23 +164,23 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = ClientReaderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
ss.setup()
ss.start_listening(config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)

View File

@ -185,23 +185,23 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = EventCreatorServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
ss.setup()
ss.start_listening(config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)

View File

@ -151,23 +151,23 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = FederationReaderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
ss.setup()
ss.start_listening(config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)

View File

@ -183,24 +183,24 @@ def start(config_options):
# Force the pushers to start since they will be disabled in the main config
config.send_federation = True
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ps = FederationSenderServer(
ss = FederationSenderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
ps.setup()
ps.start_listening(config.worker_listeners)
ss.setup()
def start():
ps.get_datastore().start_profiling()
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-federation-sender", config)

View File

@ -241,23 +241,23 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = FrontendProxyServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
ss.setup()
ss.start_listening(config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)

View File

@ -151,23 +151,23 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = MediaRepositoryServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
ss.setup()
ss.start_listening(config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)

View File

@ -211,24 +211,24 @@ def start(config_options):
# Force the pushers to start since they will be disabled in the main config
config.update_user_directory = True
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ps = UserDirectoryServer(
ss = UserDirectoryServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
ps.setup()
ps.start_listening(config.worker_listeners)
ss.setup()
def start():
ps.get_datastore().start_profiling()
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)

View File

@ -13,6 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from os import path
from synapse.config import ConfigError
from ._base import Config
DEFAULT_CONFIG = """\
@ -85,7 +89,15 @@ class ConsentConfig(Config):
if consent_config is None:
return
self.user_consent_version = str(consent_config["version"])
self.user_consent_template_dir = consent_config["template_dir"]
self.user_consent_template_dir = self.abspath(
consent_config["template_dir"]
)
if not path.isdir(self.user_consent_template_dir):
raise ConfigError(
"Could not find template directory '%s'" % (
self.user_consent_template_dir,
),
)
self.user_consent_server_notice_content = consent_config.get(
"server_notice_content",
)

View File

@ -84,11 +84,11 @@ class RegistrationConfig(Config):
#
# allowed_local_3pids:
# - medium: email
# pattern: ".*@matrix\\.org"
# pattern: '.*@matrix\\.org'
# - medium: email
# pattern: ".*@vector\\.im"
# pattern: '.*@vector\\.im'
# - medium: msisdn
# pattern: "\\+44"
# pattern: '\\+44'
# If set, allows registration by anyone who also has the shared
# secret, even if registration is otherwise disabled.

View File

@ -261,7 +261,7 @@ class ServerConfig(Config):
# enter into the 'custom HS URL' field on their client. If you
# use synapse with a reverse proxy, this should be the URL to reach
# synapse via the proxy.
# public_baseurl: https://example.com:8448/
# public_baseurl: https://example.com/
# Set the soft limit on the number of file descriptors synapse can use
# Zero is used to indicate synapse should set the soft limit to the

View File

@ -15,6 +15,7 @@
import logging
import os
import warnings
from datetime import datetime
from hashlib import sha256
@ -39,8 +40,8 @@ class TlsConfig(Config):
self.acme_bind_addresses = acme_config.get("bind_addresses", ["127.0.0.1"])
self.acme_reprovision_threshold = acme_config.get("reprovision_threshold", 30)
self.tls_certificate_file = os.path.abspath(config.get("tls_certificate_path"))
self.tls_private_key_file = os.path.abspath(config.get("tls_private_key_path"))
self.tls_certificate_file = self.abspath(config.get("tls_certificate_path"))
self.tls_private_key_file = self.abspath(config.get("tls_private_key_path"))
self._original_tls_fingerprints = config["tls_fingerprints"]
self.tls_fingerprints = list(self._original_tls_fingerprints)
self.no_tls = config.get("no_tls", False)
@ -94,6 +95,16 @@ class TlsConfig(Config):
"""
self.tls_certificate = self.read_tls_certificate(self.tls_certificate_file)
# Check if it is self-signed, and issue a warning if so.
if self.tls_certificate.get_issuer() == self.tls_certificate.get_subject():
warnings.warn(
(
"Self-signed TLS certificates will not be accepted by Synapse 1.0. "
"Please either provide a valid certificate, or use Synapse's ACME "
"support to provision one."
)
)
if not self.no_tls:
self.tls_private_key = self.read_tls_private_key(self.tls_private_key_file)
@ -118,10 +129,11 @@ class TlsConfig(Config):
return (
"""\
# PEM encoded X509 certificate for TLS.
# You can replace the self-signed certificate that synapse
# autogenerates on launch with your own SSL certificate + key pair
# if you like. Any required intermediary certificates can be
# appended after the primary certificate in hierarchical order.
# This certificate, as of Synapse 1.0, will need to be a valid
# and verifiable certificate, with a root that is available in
# the root store of other servers you wish to federate to. Any
# required intermediary certificates can be appended after the
# primary certificate in hierarchical order.
tls_certificate_path: "%(tls_certificate_path)s"
# PEM encoded private key for TLS
@ -183,40 +195,3 @@ class TlsConfig(Config):
def read_tls_private_key(self, private_key_path):
private_key_pem = self.read_file(private_key_path, "tls_private_key")
return crypto.load_privatekey(crypto.FILETYPE_PEM, private_key_pem)
def generate_files(self, config):
tls_certificate_path = config["tls_certificate_path"]
tls_private_key_path = config["tls_private_key_path"]
if not self.path_exists(tls_private_key_path):
with open(tls_private_key_path, "wb") as private_key_file:
tls_private_key = crypto.PKey()
tls_private_key.generate_key(crypto.TYPE_RSA, 2048)
private_key_pem = crypto.dump_privatekey(
crypto.FILETYPE_PEM, tls_private_key
)
private_key_file.write(private_key_pem)
else:
with open(tls_private_key_path) as private_key_file:
private_key_pem = private_key_file.read()
tls_private_key = crypto.load_privatekey(
crypto.FILETYPE_PEM, private_key_pem
)
if not self.path_exists(tls_certificate_path):
with open(tls_certificate_path, "wb") as certificate_file:
cert = crypto.X509()
subject = cert.get_subject()
subject.CN = config["server_name"]
cert.set_serial_number(1000)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(tls_private_key)
cert.sign(tls_private_key, 'sha256')
cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
certificate_file.write(cert_pem)

View File

@ -23,14 +23,14 @@ from signedjson.sign import sign_json
from unpaddedbase64 import decode_base64, encode_base64
from synapse.api.errors import Codes, SynapseError
from synapse.events.utils import prune_event
from synapse.events.utils import prune_event, prune_event_dict
logger = logging.getLogger(__name__)
def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
"""Check whether the hash for this PDU matches the contents"""
name, expected_hash = compute_content_hash(event, hash_algorithm)
name, expected_hash = compute_content_hash(event.get_pdu_json(), hash_algorithm)
logger.debug("Expecting hash: %s", encode_base64(expected_hash))
# some malformed events lack a 'hashes'. Protect against it being missing
@ -59,35 +59,70 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
return message_hash_bytes == expected_hash
def compute_content_hash(event, hash_algorithm):
event_json = event.get_pdu_json()
event_json.pop("age_ts", None)
event_json.pop("unsigned", None)
event_json.pop("signatures", None)
event_json.pop("hashes", None)
event_json.pop("outlier", None)
event_json.pop("destinations", None)
def compute_content_hash(event_dict, hash_algorithm):
"""Compute the content hash of an event, which is the hash of the
unredacted event.
event_json_bytes = encode_canonical_json(event_json)
Args:
event_dict (dict): The unredacted event as a dict
hash_algorithm: A hasher from `hashlib`, e.g. hashlib.sha256, to use
to hash the event
Returns:
tuple[str, bytes]: A tuple of the name of hash and the hash as raw
bytes.
"""
event_dict = dict(event_dict)
event_dict.pop("age_ts", None)
event_dict.pop("unsigned", None)
event_dict.pop("signatures", None)
event_dict.pop("hashes", None)
event_dict.pop("outlier", None)
event_dict.pop("destinations", None)
event_json_bytes = encode_canonical_json(event_dict)
hashed = hash_algorithm(event_json_bytes)
return (hashed.name, hashed.digest())
def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256):
"""Computes the event reference hash. This is the hash of the redacted
event.
Args:
event (FrozenEvent)
hash_algorithm: A hasher from `hashlib`, e.g. hashlib.sha256, to use
to hash the event
Returns:
tuple[str, bytes]: A tuple of the name of hash and the hash as raw
bytes.
"""
tmp_event = prune_event(event)
event_json = tmp_event.get_pdu_json()
event_json.pop("signatures", None)
event_json.pop("age_ts", None)
event_json.pop("unsigned", None)
event_json_bytes = encode_canonical_json(event_json)
event_dict = tmp_event.get_pdu_json()
event_dict.pop("signatures", None)
event_dict.pop("age_ts", None)
event_dict.pop("unsigned", None)
event_json_bytes = encode_canonical_json(event_dict)
hashed = hash_algorithm(event_json_bytes)
return (hashed.name, hashed.digest())
def compute_event_signature(event, signature_name, signing_key):
tmp_event = prune_event(event)
redact_json = tmp_event.get_pdu_json()
def compute_event_signature(event_dict, signature_name, signing_key):
"""Compute the signature of the event for the given name and key.
Args:
event_dict (dict): The event as a dict
signature_name (str): The name of the entity signing the event
(typically the server's hostname).
signing_key (syutil.crypto.SigningKey): The key to sign with
Returns:
dict[str, dict[str, str]]: Returns a dictionary in the same format of
an event's signatures field.
"""
redact_json = prune_event_dict(event_dict)
redact_json.pop("age_ts", None)
redact_json.pop("unsigned", None)
logger.debug("Signing event: %s", encode_canonical_json(redact_json))
@ -96,25 +131,25 @@ def compute_event_signature(event, signature_name, signing_key):
return redact_json["signatures"]
def add_hashes_and_signatures(event, signature_name, signing_key,
def add_hashes_and_signatures(event_dict, signature_name, signing_key,
hash_algorithm=hashlib.sha256):
# if hasattr(event, "old_state_events"):
# state_json_bytes = encode_canonical_json(
# [e.event_id for e in event.old_state_events.values()]
# )
# hashed = hash_algorithm(state_json_bytes)
# event.state_hash = {
# hashed.name: encode_base64(hashed.digest())
# }
"""Add content hash and sign the event
name, digest = compute_content_hash(event, hash_algorithm=hash_algorithm)
Args:
event_dict (dict): The event to add hashes to and sign
signature_name (str): The name of the entity signing the event
(typically the server's hostname).
signing_key (syutil.crypto.SigningKey): The key to sign with
hash_algorithm: A hasher from `hashlib`, e.g. hashlib.sha256, to use
to hash the event
"""
if not hasattr(event, "hashes"):
event.hashes = {}
event.hashes[name] = encode_base64(digest)
name, digest = compute_content_hash(event_dict, hash_algorithm=hash_algorithm)
event.signatures = compute_event_signature(
event,
event_dict.setdefault("hashes", {})[name] = encode_base64(digest)
event_dict["signatures"] = compute_event_signature(
event_dict,
signature_name=signature_name,
signing_key=signing_key,
)

View File

@ -20,17 +20,25 @@ from signedjson.key import decode_verify_key_bytes
from signedjson.sign import SignatureVerifyException, verify_signed_json
from unpaddedbase64 import decode_base64
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, JoinRules, Membership
from synapse.api.constants import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
EventTypes,
JoinRules,
Membership,
RoomVersions,
)
from synapse.api.errors import AuthError, EventSizeError, SynapseError
from synapse.types import UserID, get_domain_from_id
logger = logging.getLogger(__name__)
def check(event, auth_events, do_sig_check=True, do_size_check=True):
def check(room_version, event, auth_events, do_sig_check=True, do_size_check=True):
""" Checks if this event is correctly authed.
Args:
room_version (str): the version of the room
event: the event being checked.
auth_events (dict: event-key -> event): the existing room state.
@ -48,7 +56,6 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
if do_sig_check:
sender_domain = get_domain_from_id(event.sender)
event_id_domain = get_domain_from_id(event.event_id)
is_invite_via_3pid = (
event.type == EventTypes.Member
@ -65,9 +72,13 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
if not is_invite_via_3pid:
raise AuthError(403, "Event not signed by sender's server")
# Check the event_id's domain has signed the event
if not event.signatures.get(event_id_domain):
raise AuthError(403, "Event not signed by sending server")
if event.format_version in (EventFormatVersions.V1,):
# Only older room versions have event IDs to check.
event_id_domain = get_domain_from_id(event.event_id)
# Check the origin domain has signed the event
if not event.signatures.get(event_id_domain):
raise AuthError(403, "Event not signed by sending server")
if auth_events is None:
# Oh, we don't know what the state of the room was, so we
@ -167,7 +178,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
_check_power_levels(event, auth_events)
if event.type == EventTypes.Redaction:
check_redaction(event, auth_events)
check_redaction(room_version, event, auth_events)
logger.debug("Allowing! %s", event)
@ -421,7 +432,7 @@ def _can_send_event(event, auth_events):
return True
def check_redaction(event, auth_events):
def check_redaction(room_version, event, auth_events):
"""Check whether the event sender is allowed to redact the target event.
Returns:
@ -441,10 +452,16 @@ def check_redaction(event, auth_events):
if user_level >= redact_level:
return False
redacter_domain = get_domain_from_id(event.event_id)
redactee_domain = get_domain_from_id(event.redacts)
if redacter_domain == redactee_domain:
if room_version in (RoomVersions.V1, RoomVersions.V2,):
redacter_domain = get_domain_from_id(event.event_id)
redactee_domain = get_domain_from_id(event.redacts)
if redacter_domain == redactee_domain:
return True
elif room_version == RoomVersions.V3:
event.internal_metadata.recheck_redaction = True
return True
else:
raise RuntimeError("Unrecognized room version %r" % (room_version,))
raise AuthError(
403,

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -18,6 +19,9 @@ from distutils.util import strtobool
import six
from unpaddedbase64 import encode_base64
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventFormatVersions, RoomVersions
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
@ -41,8 +45,13 @@ class _EventInternalMetadata(object):
def is_outlier(self):
return getattr(self, "outlier", False)
def is_invite_from_remote(self):
return getattr(self, "invite_from_remote", False)
def is_out_of_band_membership(self):
"""Whether this is an out of band membership, like an invite or an invite
rejection. This is needed as those events are marked as outliers, but
they still need to be processed as if they're new events (e.g. updating
invite state in the database, relaying to clients, etc).
"""
return getattr(self, "out_of_band_membership", False)
def get_send_on_behalf_of(self):
"""Whether this server should send the event on behalf of another server.
@ -53,6 +62,21 @@ class _EventInternalMetadata(object):
"""
return getattr(self, "send_on_behalf_of", None)
def need_to_check_redaction(self):
"""Whether the redaction event needs to be rechecked when fetching
from the database.
Starting in room v3 redaction events are accepted up front, and later
checked to see if the redacter and redactee's domains match.
If the sender of the redaction event is allowed to redact any event
due to auth rules, then this will always return false.
Returns:
bool
"""
return getattr(self, "recheck_redaction", False)
def _event_dict_property(key):
# We want to be able to use hasattr with the event dict properties.
@ -179,6 +203,8 @@ class EventBase(object):
class FrozenEvent(EventBase):
format_version = EventFormatVersions.V1 # All events of this type are V1
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):
event_dict = dict(event_dict)
@ -213,16 +239,6 @@ class FrozenEvent(EventBase):
rejected_reason=rejected_reason,
)
@staticmethod
def from_event(event):
e = FrozenEvent(
event.get_pdu_json()
)
e.internal_metadata = event.internal_metadata
return e
def __str__(self):
return self.__repr__()
@ -232,3 +248,127 @@ class FrozenEvent(EventBase):
self.get("type", None),
self.get("state_key", None),
)
class FrozenEventV2(EventBase):
format_version = EventFormatVersions.V2 # All events of this type are V2
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):
event_dict = dict(event_dict)
# Signatures is a dict of dicts, and this is faster than doing a
# copy.deepcopy
signatures = {
name: {sig_id: sig for sig_id, sig in sigs.items()}
for name, sigs in event_dict.pop("signatures", {}).items()
}
assert "event_id" not in event_dict
unsigned = dict(event_dict.pop("unsigned", {}))
# We intern these strings because they turn up a lot (especially when
# caching).
event_dict = intern_dict(event_dict)
if USE_FROZEN_DICTS:
frozen_dict = freeze(event_dict)
else:
frozen_dict = event_dict
self._event_id = None
self.type = event_dict["type"]
if "state_key" in event_dict:
self.state_key = event_dict["state_key"]
super(FrozenEventV2, self).__init__(
frozen_dict,
signatures=signatures,
unsigned=unsigned,
internal_metadata_dict=internal_metadata_dict,
rejected_reason=rejected_reason,
)
@property
def event_id(self):
# We have to import this here as otherwise we get an import loop which
# is hard to break.
from synapse.crypto.event_signing import compute_event_reference_hash
if self._event_id:
return self._event_id
self._event_id = "$" + encode_base64(compute_event_reference_hash(self)[1])
return self._event_id
def prev_event_ids(self):
"""Returns the list of prev event IDs. The order matches the order
specified in the event, though there is no meaning to it.
Returns:
list[str]: The list of event IDs of this event's prev_events
"""
return self.prev_events
def auth_event_ids(self):
"""Returns the list of auth event IDs. The order matches the order
specified in the event, though there is no meaning to it.
Returns:
list[str]: The list of event IDs of this event's auth_events
"""
return self.auth_events
def __str__(self):
return self.__repr__()
def __repr__(self):
return "<FrozenEventV2 event_id='%s', type='%s', state_key='%s'>" % (
self.event_id,
self.get("type", None),
self.get("state_key", None),
)
def room_version_to_event_format(room_version):
"""Converts a room version string to the event format
Args:
room_version (str)
Returns:
int
"""
if room_version not in KNOWN_ROOM_VERSIONS:
# We should have already checked version, so this should not happen
raise RuntimeError("Unrecognized room version %s" % (room_version,))
if room_version in (
RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST,
):
return EventFormatVersions.V1
elif room_version in (RoomVersions.V3,):
return EventFormatVersions.V2
else:
raise RuntimeError("Unrecognized room version %s" % (room_version,))
def event_type_from_format_version(format_version):
"""Returns the python type to use to construct an Event object for the
given event format version.
Args:
format_version (int): The event format version
Returns:
type: A type that can be initialized as per the initializer of
`FrozenEvent`
"""
if format_version == EventFormatVersions.V1:
return FrozenEvent
elif format_version == EventFormatVersions.V2:
return FrozenEventV2
else:
raise Exception(
"No event format %r" % (format_version,)
)

View File

@ -13,63 +13,270 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import attr
from twisted.internet import defer
from synapse.api.constants import (
KNOWN_EVENT_FORMAT_VERSIONS,
KNOWN_ROOM_VERSIONS,
MAX_DEPTH,
EventFormatVersions,
)
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.types import EventID
from synapse.util.stringutils import random_string
from . import EventBase, FrozenEvent, _event_dict_property
from . import (
_EventInternalMetadata,
event_type_from_format_version,
room_version_to_event_format,
)
class EventBuilder(EventBase):
def __init__(self, key_values={}, internal_metadata_dict={}):
signatures = copy.deepcopy(key_values.pop("signatures", {}))
unsigned = copy.deepcopy(key_values.pop("unsigned", {}))
@attr.s(slots=True, cmp=False, frozen=True)
class EventBuilder(object):
"""A format independent event builder used to build up the event content
before signing the event.
super(EventBuilder, self).__init__(
key_values,
signatures=signatures,
unsigned=unsigned,
internal_metadata_dict=internal_metadata_dict,
(Note that while objects of this class are frozen, the
content/unsigned/internal_metadata fields are still mutable)
Attributes:
format_version (int): Event format version
room_id (str)
type (str)
sender (str)
content (dict)
unsigned (dict)
internal_metadata (_EventInternalMetadata)
_state (StateHandler)
_auth (synapse.api.Auth)
_store (DataStore)
_clock (Clock)
_hostname (str): The hostname of the server creating the event
_signing_key: The signing key to use to sign the event as the server
"""
_state = attr.ib()
_auth = attr.ib()
_store = attr.ib()
_clock = attr.ib()
_hostname = attr.ib()
_signing_key = attr.ib()
format_version = attr.ib()
room_id = attr.ib()
type = attr.ib()
sender = attr.ib()
content = attr.ib(default=attr.Factory(dict))
unsigned = attr.ib(default=attr.Factory(dict))
# These only exist on a subset of events, so they raise AttributeError if
# someone tries to get them when they don't exist.
_state_key = attr.ib(default=None)
_redacts = attr.ib(default=None)
internal_metadata = attr.ib(default=attr.Factory(lambda: _EventInternalMetadata({})))
@property
def state_key(self):
if self._state_key is not None:
return self._state_key
raise AttributeError("state_key")
def is_state(self):
return self._state_key is not None
@defer.inlineCallbacks
def build(self, prev_event_ids):
"""Transform into a fully signed and hashed event
Args:
prev_event_ids (list[str]): The event IDs to use as the prev events
Returns:
Deferred[FrozenEvent]
"""
state_ids = yield self._state.get_current_state_ids(
self.room_id, prev_event_ids,
)
auth_ids = yield self._auth.compute_auth_events(
self, state_ids,
)
event_id = _event_dict_property("event_id")
state_key = _event_dict_property("state_key")
type = _event_dict_property("type")
if self.format_version == EventFormatVersions.V1:
auth_events = yield self._store.add_event_hashes(auth_ids)
prev_events = yield self._store.add_event_hashes(prev_event_ids)
else:
auth_events = auth_ids
prev_events = prev_event_ids
def build(self):
return FrozenEvent.from_event(self)
old_depth = yield self._store.get_max_depth_of(
prev_event_ids,
)
depth = old_depth + 1
# we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in
# the db)
depth = min(depth, MAX_DEPTH)
event_dict = {
"auth_events": auth_events,
"prev_events": prev_events,
"type": self.type,
"room_id": self.room_id,
"sender": self.sender,
"content": self.content,
"unsigned": self.unsigned,
"depth": depth,
"prev_state": [],
}
if self.is_state():
event_dict["state_key"] = self._state_key
if self._redacts is not None:
event_dict["redacts"] = self._redacts
defer.returnValue(
create_local_event_from_event_dict(
clock=self._clock,
hostname=self._hostname,
signing_key=self._signing_key,
format_version=self.format_version,
event_dict=event_dict,
internal_metadata_dict=self.internal_metadata.get_dict(),
)
)
class EventBuilderFactory(object):
def __init__(self, clock, hostname):
self.clock = clock
self.hostname = hostname
def __init__(self, hs):
self.clock = hs.get_clock()
self.hostname = hs.hostname
self.signing_key = hs.config.signing_key[0]
self.event_id_count = 0
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
def create_event_id(self):
i = str(self.event_id_count)
self.event_id_count += 1
def new(self, room_version, key_values):
"""Generate an event builder appropriate for the given room version
local_part = str(int(self.clock.time())) + i + random_string(5)
Args:
room_version (str): Version of the room that we're creating an
event builder for
key_values (dict): Fields used as the basis of the new event
e_id = EventID(local_part, self.hostname)
Returns:
EventBuilder
"""
return e_id.to_string()
# There's currently only the one event version defined
if room_version not in KNOWN_ROOM_VERSIONS:
raise Exception(
"No event format defined for version %r" % (room_version,)
)
def new(self, key_values={}):
key_values["event_id"] = self.create_event_id()
return EventBuilder(
store=self.store,
state=self.state,
auth=self.auth,
clock=self.clock,
hostname=self.hostname,
signing_key=self.signing_key,
format_version=room_version_to_event_format(room_version),
type=key_values["type"],
state_key=key_values.get("state_key"),
room_id=key_values["room_id"],
sender=key_values["sender"],
content=key_values.get("content", {}),
unsigned=key_values.get("unsigned", {}),
redacts=key_values.get("redacts", None),
)
time_now = int(self.clock.time_msec())
key_values.setdefault("origin", self.hostname)
key_values.setdefault("origin_server_ts", time_now)
def create_local_event_from_event_dict(clock, hostname, signing_key,
format_version, event_dict,
internal_metadata_dict=None):
"""Takes a fully formed event dict, ensuring that fields like `origin`
and `origin_server_ts` have correct values for a locally produced event,
then signs and hashes it.
key_values.setdefault("unsigned", {})
age = key_values["unsigned"].pop("age", 0)
key_values["unsigned"].setdefault("age_ts", time_now - age)
Args:
clock (Clock)
hostname (str)
signing_key
format_version (int)
event_dict (dict)
internal_metadata_dict (dict|None)
key_values["signatures"] = {}
Returns:
FrozenEvent
"""
return EventBuilder(key_values=key_values,)
# There's currently only the one event version defined
if format_version not in KNOWN_EVENT_FORMAT_VERSIONS:
raise Exception(
"No event format defined for version %r" % (format_version,)
)
if internal_metadata_dict is None:
internal_metadata_dict = {}
time_now = int(clock.time_msec())
if format_version == EventFormatVersions.V1:
event_dict["event_id"] = _create_event_id(clock, hostname)
event_dict["origin"] = hostname
event_dict["origin_server_ts"] = time_now
event_dict.setdefault("unsigned", {})
age = event_dict["unsigned"].pop("age", 0)
event_dict["unsigned"].setdefault("age_ts", time_now - age)
event_dict.setdefault("signatures", {})
add_hashes_and_signatures(
event_dict,
hostname,
signing_key,
)
return event_type_from_format_version(format_version)(
event_dict, internal_metadata_dict=internal_metadata_dict,
)
# A counter used when generating new event IDs
_event_id_counter = 0
def _create_event_id(clock, hostname):
"""Create a new event ID
Args:
clock (Clock)
hostname (str): The server name for the event ID
Returns:
str
"""
global _event_id_counter
i = str(_event_id_counter)
_event_id_counter += 1
local_part = str(int(clock.time())) + i + random_string(5)
e_id = EventID(local_part, hostname)
return e_id.to_string()

View File

@ -38,8 +38,31 @@ def prune_event(event):
This is used when we "redact" an event. We want to remove all fields that
the user has specified, but we do want to keep necessary information like
type, state_key etc.
Args:
event (FrozenEvent)
Returns:
FrozenEvent
"""
pruned_event_dict = prune_event_dict(event.get_dict())
from . import event_type_from_format_version
return event_type_from_format_version(event.format_version)(
pruned_event_dict, event.internal_metadata.get_dict()
)
def prune_event_dict(event_dict):
"""Redacts the event_dict in the same way as `prune_event`, except it
operates on dicts rather than event objects
Args:
event_dict (dict)
Returns:
dict: A copy of the pruned event dict
"""
event_type = event.type
allowed_keys = [
"event_id",
@ -59,13 +82,13 @@ def prune_event(event):
"membership",
]
event_dict = event.get_dict()
event_type = event_dict["type"]
new_content = {}
def add_fields(*fields):
for field in fields:
if field in event.content:
if field in event_dict["content"]:
new_content[field] = event_dict["content"][field]
if event_type == EventTypes.Member:
@ -98,17 +121,17 @@ def prune_event(event):
allowed_fields["content"] = new_content
allowed_fields["unsigned"] = {}
unsigned = {}
allowed_fields["unsigned"] = unsigned
if "age_ts" in event.unsigned:
allowed_fields["unsigned"]["age_ts"] = event.unsigned["age_ts"]
if "replaces_state" in event.unsigned:
allowed_fields["unsigned"]["replaces_state"] = event.unsigned["replaces_state"]
event_unsigned = event_dict.get("unsigned", {})
return type(event)(
allowed_fields,
internal_metadata_dict=event.internal_metadata.get_dict()
)
if "age_ts" in event_unsigned:
unsigned["age_ts"] = event_unsigned["age_ts"]
if "replaces_state" in event_unsigned:
unsigned["replaces_state"] = event_unsigned["replaces_state"]
return allowed_fields
def _copy_field(src, dst, field):
@ -244,6 +267,7 @@ def serialize_event(e, time_now_ms, as_client_event=True,
Returns:
dict
"""
# FIXME(erikj): To handle the case of presence events and the like
if not isinstance(e, EventBase):
return e
@ -253,6 +277,8 @@ def serialize_event(e, time_now_ms, as_client_event=True,
# Should this strip out None's?
d = {k: v for k, v in e.get_dict().items()}
d["event_id"] = e.event_id
if "age_ts" in d["unsigned"]:
d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"]
del d["unsigned"]["age_ts"]

View File

@ -15,23 +15,29 @@
from six import string_types
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import EventFormatVersions, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.types import EventID, RoomID, UserID
class EventValidator(object):
def validate_new(self, event):
"""Validates the event has roughly the right format
def validate(self, event):
EventID.from_string(event.event_id)
RoomID.from_string(event.room_id)
Args:
event (FrozenEvent)
"""
self.validate_builder(event)
if event.format_version == EventFormatVersions.V1:
EventID.from_string(event.event_id)
required = [
# "auth_events",
"auth_events",
"content",
# "hashes",
"hashes",
"origin",
# "prev_events",
"prev_events",
"sender",
"type",
]
@ -41,8 +47,25 @@ class EventValidator(object):
raise SynapseError(400, "Event does not have key %s" % (k,))
# Check that the following keys have string values
strings = [
event_strings = [
"origin",
]
for s in event_strings:
if not isinstance(getattr(event, s), string_types):
raise SynapseError(400, "'%s' not a string type" % (s,))
def validate_builder(self, event):
"""Validates that the builder/event has roughly the right format. Only
checks values that we expect a proto event to have, rather than all the
fields an event would have
Args:
event (EventBuilder|FrozenEvent)
"""
strings = [
"room_id",
"sender",
"type",
]
@ -54,22 +77,7 @@ class EventValidator(object):
if not isinstance(getattr(event, s), string_types):
raise SynapseError(400, "Not '%s' a string type" % (s,))
if event.type == EventTypes.Member:
if "membership" not in event.content:
raise SynapseError(400, "Content has not membership key")
if event.content["membership"] not in Membership.LIST:
raise SynapseError(400, "Invalid membership key")
# Check that the following keys have dictionary values
# TODO
# Check that the following keys have the correct format for DAGs
# TODO
def validate_new(self, event):
self.validate(event)
RoomID.from_string(event.room_id)
UserID.from_string(event.sender)
if event.type == EventTypes.Message:
@ -86,9 +94,16 @@ class EventValidator(object):
elif event.type == EventTypes.Name:
self._ensure_strings(event.content, ["name"])
elif event.type == EventTypes.Member:
if "membership" not in event.content:
raise SynapseError(400, "Content has not membership key")
if event.content["membership"] not in Membership.LIST:
raise SynapseError(400, "Invalid membership key")
def _ensure_strings(self, d, keys):
for s in keys:
if s not in d:
raise SynapseError(400, "'%s' not in content" % (s,))
if not isinstance(d[s], string_types):
raise SynapseError(400, "Not '%s' a string type" % (s,))
raise SynapseError(400, "'%s' not a string type" % (s,))

View File

@ -20,10 +20,10 @@ import six
from twisted.internet import defer
from twisted.internet.defer import DeferredList
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership, RoomVersions
from synapse.api.errors import Codes, SynapseError
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import FrozenEvent
from synapse.events import event_type_from_format_version
from synapse.events.utils import prune_event
from synapse.http.servlet import assert_params_in_dict
from synapse.types import get_domain_from_id
@ -43,8 +43,8 @@ class FederationBase(object):
self._clock = hs.get_clock()
@defer.inlineCallbacks
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
include_none=False):
def _check_sigs_and_hash_and_fetch(self, origin, pdus, room_version,
outlier=False, include_none=False):
"""Takes a list of PDUs and checks the signatures and hashs of each
one. If a PDU fails its signature check then we check if we have it in
the database and if not then request if from the originating server of
@ -56,13 +56,17 @@ class FederationBase(object):
a new list.
Args:
origin (str)
pdu (list)
outlier (bool)
room_version (str)
outlier (bool): Whether the events are outliers or not
include_none (str): Whether to include None in the returned list
for events that have failed their checks
Returns:
Deferred : A list of PDUs that have valid signatures and hashes.
"""
deferreds = self._check_sigs_and_hashes(pdus)
deferreds = self._check_sigs_and_hashes(room_version, pdus)
@defer.inlineCallbacks
def handle_check_result(pdu, deferred):
@ -84,6 +88,7 @@ class FederationBase(object):
res = yield self.get_pdu(
destinations=[pdu.origin],
event_id=pdu.event_id,
room_version=room_version,
outlier=outlier,
timeout=10000,
)
@ -116,16 +121,17 @@ class FederationBase(object):
else:
defer.returnValue([p for p in valid_pdus if p])
def _check_sigs_and_hash(self, pdu):
def _check_sigs_and_hash(self, room_version, pdu):
return logcontext.make_deferred_yieldable(
self._check_sigs_and_hashes([pdu])[0],
self._check_sigs_and_hashes(room_version, [pdu])[0],
)
def _check_sigs_and_hashes(self, pdus):
def _check_sigs_and_hashes(self, room_version, pdus):
"""Checks that each of the received events is correctly signed by the
sending server.
Args:
room_version (str): The room version of the PDUs
pdus (list[FrozenEvent]): the events to be checked
Returns:
@ -136,7 +142,7 @@ class FederationBase(object):
* throws a SynapseError if the signature check failed.
The deferreds run their callbacks in the sentinel logcontext.
"""
deferreds = _check_sigs_on_pdus(self.keyring, pdus)
deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus)
ctx = logcontext.LoggingContext.current_context()
@ -198,16 +204,17 @@ class FederationBase(object):
class PduToCheckSig(namedtuple("PduToCheckSig", [
"pdu", "redacted_pdu_json", "event_id_domain", "sender_domain", "deferreds",
"pdu", "redacted_pdu_json", "sender_domain", "deferreds",
])):
pass
def _check_sigs_on_pdus(keyring, pdus):
def _check_sigs_on_pdus(keyring, room_version, pdus):
"""Check that the given events are correctly signed
Args:
keyring (synapse.crypto.Keyring): keyring object to do the checks
room_version (str): the room version of the PDUs
pdus (Collection[EventBase]): the events to be checked
Returns:
@ -220,9 +227,7 @@ def _check_sigs_on_pdus(keyring, pdus):
# we want to check that the event is signed by:
#
# (a) the server which created the event_id
#
# (b) the sender's server.
# (a) the sender's server
#
# - except in the case of invites created from a 3pid invite, which are exempt
# from this check, because the sender has to match that of the original 3pid
@ -236,34 +241,26 @@ def _check_sigs_on_pdus(keyring, pdus):
# and signatures are *supposed* to be valid whether or not an event has been
# redacted. But this isn't the worst of the ways that 3pid invites are broken.
#
# (b) for V1 and V2 rooms, the server which created the event_id
#
# let's start by getting the domain for each pdu, and flattening the event back
# to JSON.
pdus_to_check = [
PduToCheckSig(
pdu=p,
redacted_pdu_json=prune_event(p).get_pdu_json(),
event_id_domain=get_domain_from_id(p.event_id),
sender_domain=get_domain_from_id(p.sender),
deferreds=[],
)
for p in pdus
]
# first make sure that the event is signed by the event_id's domain
deferreds = keyring.verify_json_objects_for_server([
(p.event_id_domain, p.redacted_pdu_json)
for p in pdus_to_check
])
for p, d in zip(pdus_to_check, deferreds):
p.deferreds.append(d)
# now let's look for events where the sender's domain is different to the
# event id's domain (normally only the case for joins/leaves), and add additional
# checks.
# First we check that the sender event is signed by the sender's domain
# (except if its a 3pid invite, in which case it may be sent by any server)
pdus_to_check_sender = [
p for p in pdus_to_check
if p.sender_domain != p.event_id_domain and not _is_invite_via_3pid(p.pdu)
if not _is_invite_via_3pid(p.pdu)
]
more_deferreds = keyring.verify_json_objects_for_server([
@ -274,19 +271,43 @@ def _check_sigs_on_pdus(keyring, pdus):
for p, d in zip(pdus_to_check_sender, more_deferreds):
p.deferreds.append(d)
# now let's look for events where the sender's domain is different to the
# event id's domain (normally only the case for joins/leaves), and add additional
# checks. Only do this if the room version has a concept of event ID domain
if room_version in (
RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST,
):
pdus_to_check_event_id = [
p for p in pdus_to_check
if p.sender_domain != get_domain_from_id(p.pdu.event_id)
]
more_deferreds = keyring.verify_json_objects_for_server([
(get_domain_from_id(p.pdu.event_id), p.redacted_pdu_json)
for p in pdus_to_check_event_id
])
for p, d in zip(pdus_to_check_event_id, more_deferreds):
p.deferreds.append(d)
elif room_version in (RoomVersions.V3,):
pass # No further checks needed, as event IDs are hashes here
else:
raise RuntimeError("Unrecognized room version %s" % (room_version,))
# replace lists of deferreds with single Deferreds
return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check]
def _flatten_deferred_list(deferreds):
"""Given a list of one or more deferreds, either return the single deferred, or
combine into a DeferredList.
"""Given a list of deferreds, either return the single deferred,
combine into a DeferredList, or return an already resolved deferred.
"""
if len(deferreds) > 1:
return DeferredList(deferreds, fireOnOneErrback=True, consumeErrors=True)
else:
assert len(deferreds) == 1
elif len(deferreds) == 1:
return deferreds[0]
else:
return defer.succeed(None)
def _is_invite_via_3pid(event):
@ -297,11 +318,12 @@ def _is_invite_via_3pid(event):
)
def event_from_pdu_json(pdu_json, outlier=False):
def event_from_pdu_json(pdu_json, event_format_version, outlier=False):
"""Construct a FrozenEvent from an event json received over federation
Args:
pdu_json (object): pdu as received over federation
event_format_version (int): The event format version
outlier (bool): True to mark this event as an outlier
Returns:
@ -313,7 +335,7 @@ def event_from_pdu_json(pdu_json, outlier=False):
"""
# we could probably enforce a bunch of other fields here (room_id, sender,
# origin, etc etc)
assert_params_in_dict(pdu_json, ('event_id', 'type', 'depth'))
assert_params_in_dict(pdu_json, ('type', 'depth'))
depth = pdu_json['depth']
if not isinstance(depth, six.integer_types):
@ -325,8 +347,8 @@ def event_from_pdu_json(pdu_json, outlier=False):
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
event = FrozenEvent(
pdu_json
event = event_type_from_format_version(event_format_version)(
pdu_json,
)
event.internal_metadata.outlier = outlier

View File

@ -25,14 +25,19 @@ from prometheus_client import Counter
from twisted.internet import defer
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
from synapse.api.constants import (
KNOWN_ROOM_VERSIONS,
EventTypes,
Membership,
RoomVersions,
)
from synapse.api.errors import (
CodeMessageException,
FederationDeniedError,
HttpResponseException,
SynapseError,
)
from synapse.events import builder
from synapse.events import builder, room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
@ -66,6 +71,9 @@ class FederationClient(FederationBase):
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()
self.hostname = hs.hostname
self.signing_key = hs.config.signing_key[0]
self._get_pdu_cache = ExpiringCache(
cache_name="get_pdu_cache",
clock=self._clock,
@ -162,13 +170,13 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
def backfill(self, dest, context, limit, extremities):
def backfill(self, dest, room_id, limit, extremities):
"""Requests some more historic PDUs for the given context from the
given destination server.
Args:
dest (str): The remote home server to ask.
context (str): The context to backfill.
room_id (str): The room_id to backfill.
limit (int): The maximum number of PDUs to return.
extremities (list): List of PDU id and origins of the first pdus
we have seen from the context
@ -183,18 +191,21 @@ class FederationClient(FederationBase):
return
transaction_data = yield self.transport_layer.backfill(
dest, context, extremities, limit)
dest, room_id, extremities, limit)
logger.debug("backfill transaction_data=%s", repr(transaction_data))
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
pdus = [
event_from_pdu_json(p, outlier=False)
event_from_pdu_json(p, format_ver, outlier=False)
for p in transaction_data["pdus"]
]
# FIXME: We should handle signature failures more gracefully.
pdus[:] = yield logcontext.make_deferred_yieldable(defer.gatherResults(
self._check_sigs_and_hashes(pdus),
self._check_sigs_and_hashes(room_version, pdus),
consumeErrors=True,
).addErrback(unwrapFirstError))
@ -202,7 +213,8 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
def get_pdu(self, destinations, event_id, room_version, outlier=False,
timeout=None):
"""Requests the PDU with given origin and ID from the remote home
servers.
@ -212,6 +224,7 @@ class FederationClient(FederationBase):
Args:
destinations (list): Which home servers to query
event_id (str): event to fetch
room_version (str): version of the room
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
@ -230,6 +243,8 @@ class FederationClient(FederationBase):
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
format_ver = room_version_to_event_format(room_version)
signed_pdu = None
for destination in destinations:
now = self._clock.time_msec()
@ -245,7 +260,7 @@ class FederationClient(FederationBase):
logger.debug("transaction_data %r", transaction_data)
pdu_list = [
event_from_pdu_json(p, outlier=outlier)
event_from_pdu_json(p, format_ver, outlier=outlier)
for p in transaction_data["pdus"]
]
@ -253,7 +268,7 @@ class FederationClient(FederationBase):
pdu = pdu_list[0]
# Check signatures are correct.
signed_pdu = yield self._check_sigs_and_hash(pdu)
signed_pdu = yield self._check_sigs_and_hash(room_version, pdu)
break
@ -339,12 +354,16 @@ class FederationClient(FederationBase):
destination, room_id, event_id=event_id,
)
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
pdus = [
event_from_pdu_json(p, outlier=True) for p in result["pdus"]
event_from_pdu_json(p, format_ver, outlier=True)
for p in result["pdus"]
]
auth_chain = [
event_from_pdu_json(p, outlier=True)
event_from_pdu_json(p, format_ver, outlier=True)
for p in result.get("auth_chain", [])
]
@ -355,7 +374,8 @@ class FederationClient(FederationBase):
signed_pdus = yield self._check_sigs_and_hash_and_fetch(
destination,
[p for p in pdus if p.event_id not in seen_events],
outlier=True
outlier=True,
room_version=room_version,
)
signed_pdus.extend(
seen_events[p.event_id] for p in pdus if p.event_id in seen_events
@ -364,7 +384,8 @@ class FederationClient(FederationBase):
signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination,
[p for p in auth_chain if p.event_id not in seen_events],
outlier=True
outlier=True,
room_version=room_version,
)
signed_auth.extend(
seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events
@ -411,6 +432,8 @@ class FederationClient(FederationBase):
random.shuffle(srvs)
return srvs
room_version = yield self.store.get_room_version(room_id)
batch_size = 20
missing_events = list(missing_events)
for i in range(0, len(missing_events), batch_size):
@ -421,6 +444,7 @@ class FederationClient(FederationBase):
self.get_pdu,
destinations=random_server_list(),
event_id=e_id,
room_version=room_version,
)
for e_id in batch
]
@ -445,13 +469,17 @@ class FederationClient(FederationBase):
destination, room_id, event_id,
)
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
auth_chain = [
event_from_pdu_json(p, outlier=True)
event_from_pdu_json(p, format_ver, outlier=True)
for p in res["auth_chain"]
]
signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True
destination, auth_chain,
outlier=True, room_version=room_version,
)
signed_auth.sort(key=lambda e: e.depth)
@ -522,6 +550,8 @@ class FederationClient(FederationBase):
Does so by asking one of the already participating servers to create an
event with proper context.
Returns a fully signed and hashed event.
Note that this does not append any events to any graphs.
Args:
@ -536,8 +566,10 @@ class FederationClient(FederationBase):
params (dict[str, str|Iterable[str]]): Query parameters to include in the
request.
Return:
Deferred: resolves to a tuple of (origin (str), event (object))
where origin is the remote homeserver which generated the event.
Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of
`(origin, event, event_format)` where origin is the remote
homeserver which generated the event, and event_format is one of
`synapse.api.constants.EventFormatVersions`.
Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
@ -557,6 +589,11 @@ class FederationClient(FederationBase):
destination, room_id, user_id, membership, params,
)
# Note: If not supplied, the room version may be either v1 or v2,
# however either way the event format version will be v1.
room_version = ret.get("room_version", RoomVersions.V1)
event_format = room_version_to_event_format(room_version)
pdu_dict = ret.get("event", None)
if not isinstance(pdu_dict, dict):
raise InvalidResponseError("Bad 'event' field in response")
@ -571,17 +608,20 @@ class FederationClient(FederationBase):
if "prev_state" not in pdu_dict:
pdu_dict["prev_state"] = []
ev = builder.EventBuilder(pdu_dict)
ev = builder.create_local_event_from_event_dict(
self._clock, self.hostname, self.signing_key,
format_version=event_format, event_dict=pdu_dict,
)
defer.returnValue(
(destination, ev)
(destination, ev, event_format)
)
return self._try_destination_list(
"make_" + membership, destinations, send_request,
)
def send_join(self, destinations, pdu):
def send_join(self, destinations, pdu, event_format_version):
"""Sends a join event to one of a list of homeservers.
Doing so will cause the remote server to add the event to the graph,
@ -591,6 +631,7 @@ class FederationClient(FederationBase):
destinations (str): Candidate homeservers which are probably
participating in the room.
pdu (BaseEvent): event to be sent
event_format_version (int): The event format version
Return:
Deferred: resolves to a dict with members ``origin`` (a string
@ -636,12 +677,12 @@ class FederationClient(FederationBase):
logger.debug("Got content: %s", content)
state = [
event_from_pdu_json(p, outlier=True)
event_from_pdu_json(p, event_format_version, outlier=True)
for p in content.get("state", [])
]
auth_chain = [
event_from_pdu_json(p, outlier=True)
event_from_pdu_json(p, event_format_version, outlier=True)
for p in content.get("auth_chain", [])
]
@ -650,9 +691,21 @@ class FederationClient(FederationBase):
for p in itertools.chain(state, auth_chain)
}
room_version = None
for e in state:
if (e.type, e.state_key) == (EventTypes.Create, ""):
room_version = e.content.get("room_version", RoomVersions.V1)
break
if room_version is None:
# If the state doesn't have a create event then the room is
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
destination, list(pdus.values()),
outlier=True,
room_version=room_version,
)
valid_pdus_map = {
@ -690,32 +743,75 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
time_now = self._clock.time_msec()
try:
code, content = yield self.transport_layer.send_invite(
destination=destination,
room_id=room_id,
event_id=event_id,
content=pdu.get_pdu_json(time_now),
)
except HttpResponseException as e:
if e.code == 403:
raise e.to_synapse_error()
raise
room_version = yield self.store.get_room_version(room_id)
content = yield self._do_send_invite(destination, pdu, room_version)
pdu_dict = content["event"]
logger.debug("Got response to send_invite: %s", pdu_dict)
pdu = event_from_pdu_json(pdu_dict)
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(pdu_dict, format_ver)
# Check signatures are correct.
pdu = yield self._check_sigs_and_hash(pdu)
pdu = yield self._check_sigs_and_hash(room_version, pdu)
# FIXME: We should handle signature failures more gracefully.
defer.returnValue(pdu)
@defer.inlineCallbacks
def _do_send_invite(self, destination, pdu, room_version):
"""Actually sends the invite, first trying v2 API and falling back to
v1 API if necessary.
Args:
destination (str): Target server
pdu (FrozenEvent)
room_version (str)
Returns:
dict: The event as a dict as returned by the remote server
"""
time_now = self._clock.time_msec()
try:
content = yield self.transport_layer.send_invite_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content={
"event": pdu.get_pdu_json(time_now),
"room_version": room_version,
"invite_room_state": pdu.unsigned.get("invite_room_state", []),
},
)
defer.returnValue(content)
except HttpResponseException as e:
if e.code in [400, 404]:
if room_version in (RoomVersions.V1, RoomVersions.V2):
pass # We'll fall through
else:
raise Exception("Remote server is too old")
elif e.code == 403:
raise e.to_synapse_error()
else:
raise
# Didn't work, try v1 API.
# Note the v1 API returns a tuple of `(200, content)`
_, content = yield self.transport_layer.send_invite_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
defer.returnValue(content)
def send_leave(self, destinations, pdu):
"""Sends a leave event to one of a list of homeservers.
@ -785,13 +881,16 @@ class FederationClient(FederationBase):
content=send_content,
)
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
auth_chain = [
event_from_pdu_json(e)
event_from_pdu_json(e, format_ver)
for e in content["auth_chain"]
]
signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True
destination, auth_chain, outlier=True, room_version=room_version,
)
signed_auth.sort(key=lambda e: e.depth)
@ -833,13 +932,16 @@ class FederationClient(FederationBase):
timeout=timeout,
)
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
events = [
event_from_pdu_json(e)
event_from_pdu_json(e, format_ver)
for e in content.get("events", [])
]
signed_events = yield self._check_sigs_and_hash_and_fetch(
destination, events, outlier=False
destination, events, outlier=False, room_version=room_version,
)
except HttpResponseException as e:
if not e.code == 400:

View File

@ -25,7 +25,7 @@ from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
from synapse.api.constants import EventTypes
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
AuthError,
FederationError,
@ -34,6 +34,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.crypto.event_signing import compute_event_signature
from synapse.events import room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
@ -178,14 +179,13 @@ class FederationServer(FederationBase):
continue
try:
# In future we will actually use the room version to parse the
# PDU into an event.
yield self.store.get_room_version(room_id)
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
except NotFoundError:
logger.info("Ignoring PDU for unknown room_id: %s", room_id)
continue
event = event_from_pdu_json(p)
event = event_from_pdu_json(p, format_ver)
pdus_by_room.setdefault(room_id, []).append(event)
pdu_results = {}
@ -322,7 +322,7 @@ class FederationServer(FederationBase):
if self.hs.is_mine_id(event.event_id):
event.signatures.update(
compute_event_signature(
event,
event.get_pdu_json(),
self.hs.hostname,
self.hs.config.signing_key[0]
)
@ -370,7 +370,9 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_invite_request(self, origin, content, room_version):
pdu = event_from_pdu_json(content)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, pdu.room_id)
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
@ -378,9 +380,12 @@ class FederationServer(FederationBase):
defer.returnValue({"event": ret_pdu.get_pdu_json(time_now)})
@defer.inlineCallbacks
def on_send_join_request(self, origin, content):
def on_send_join_request(self, origin, content, room_id):
logger.debug("on_send_join_request: content: %s", content)
pdu = event_from_pdu_json(content)
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, pdu.room_id)
@ -400,13 +405,22 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
pdu = yield self.handler.on_make_leave_request(room_id, user_id)
room_version = yield self.store.get_room_version(room_id)
time_now = self._clock.time_msec()
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
defer.returnValue({
"event": pdu.get_pdu_json(time_now),
"room_version": room_version,
})
@defer.inlineCallbacks
def on_send_leave_request(self, origin, content):
def on_send_leave_request(self, origin, content, room_id):
logger.debug("on_send_leave_request: content: %s", content)
pdu = event_from_pdu_json(content)
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, pdu.room_id)
@ -452,13 +466,16 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
auth_chain = [
event_from_pdu_json(e)
event_from_pdu_json(e, format_ver)
for e in content["auth_chain"]
]
signed_auth = yield self._check_sigs_and_hash_and_fetch(
origin, auth_chain, outlier=True
origin, auth_chain, outlier=True, room_version=room_version,
)
ret = yield self.handler.on_query_auth(
@ -603,16 +620,19 @@ class FederationServer(FederationBase):
"""
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if origin != get_domain_from_id(pdu.event_id):
if origin != get_domain_from_id(pdu.sender):
# We continue to accept join events from any server; this is
# necessary for the federation join dance to work correctly.
# (When we join over federation, the "helper" server is
# responsible for sending out the join event, rather than the
# origin. See bug #1893).
# origin. See bug #1893. This is also true for some third party
# invites).
if not (
pdu.type == 'm.room.member' and
pdu.content and
pdu.content.get("membership", None) == 'join'
pdu.content.get("membership", None) in (
Membership.JOIN, Membership.INVITE,
)
):
logger.info(
"Discarding PDU %s from invalid origin %s",
@ -625,9 +645,12 @@ class FederationServer(FederationBase):
pdu.event_id, origin
)
# We've already checked that we know the room version by this point
room_version = yield self.store.get_room_version(pdu.room_id)
# Check signature.
try:
pdu = yield self._check_sigs_and_hash(pdu)
pdu = yield self._check_sigs_and_hash(room_version, pdu)
except SynapseError as e:
raise FederationError(
"ERROR",

View File

@ -175,7 +175,7 @@ class TransactionQueue(object):
def handle_event(event):
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id)
is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None:
return

View File

@ -21,7 +21,7 @@ from six.moves import urllib
from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.api.urls import FEDERATION_V1_PREFIX
from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
from synapse.util.logutils import log_function
logger = logging.getLogger(__name__)
@ -289,7 +289,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_invite(self, destination, room_id, event_id, content):
def send_invite_v1(self, destination, room_id, event_id, content):
path = _create_v1_path("/invite/%s/%s", room_id, event_id)
response = yield self.client.put_json(
@ -301,6 +301,20 @@ class TransportLayerClient(object):
defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def send_invite_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/invite/%s/%s", room_id, event_id)
response = yield self.client.put_json(
destination=destination,
path=path,
data=content,
ignore_backoff=True,
)
defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def get_public_rooms(self, remote_server, limit, since_token,
@ -958,3 +972,24 @@ def _create_v1_path(path, *args):
FEDERATION_V1_PREFIX
+ path % tuple(urllib.parse.quote(arg, "") for arg in args)
)
def _create_v2_path(path, *args):
"""Creates a path against V2 federation API from the path template and
args. Ensures that all args are url encoded.
Example:
_create_v2_path("/event/%s/", event_id)
Args:
path (str): String template for the path
args: ([str]): Args to insert into path. Each arg will be url encoded
Returns:
str
"""
return (
FEDERATION_V2_PREFIX
+ path % tuple(urllib.parse.quote(arg, "") for arg in args)
)

View File

@ -469,7 +469,7 @@ class FederationSendLeaveServlet(BaseFederationServlet):
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, room_id, event_id):
content = yield self.handler.on_send_leave_request(origin, content)
content = yield self.handler.on_send_leave_request(origin, content, room_id)
defer.returnValue((200, content))
@ -487,7 +487,7 @@ class FederationSendJoinServlet(BaseFederationServlet):
def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
content = yield self.handler.on_send_join_request(origin, content)
content = yield self.handler.on_send_join_request(origin, content, context)
defer.returnValue((200, content))

View File

@ -34,6 +34,7 @@ from synapse.api.constants import (
EventTypes,
Membership,
RejectedReason,
RoomVersions,
)
from synapse.api.errors import (
AuthError,
@ -43,10 +44,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
from synapse.crypto.event_signing import (
add_hashes_and_signatures,
compute_event_signature,
)
from synapse.crypto.event_signing import compute_event_signature
from synapse.events.validator import EventValidator
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
@ -58,7 +56,6 @@ from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room
from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
@ -342,6 +339,8 @@ class FederationHandler(BaseHandler):
room_id, event_id, p,
)
room_version = yield self.store.get_room_version(room_id)
with logcontext.nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
@ -355,7 +354,7 @@ class FederationHandler(BaseHandler):
# we want the state *after* p; get_state_for_room returns the
# state *before* p.
remote_event = yield self.federation_client.get_pdu(
[origin], p, outlier=True,
[origin], p, room_version, outlier=True,
)
if remote_event is None:
@ -379,7 +378,6 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_store(
room_version, state_maps, event_map,
state_res_store=StateResolutionStore(self.store),
@ -655,6 +653,8 @@ class FederationHandler(BaseHandler):
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
room_version = yield self.store.get_room_version(room_id)
events = yield self.federation_client.backfill(
dest,
room_id,
@ -748,6 +748,7 @@ class FederationHandler(BaseHandler):
self.federation_client.get_pdu,
[dest],
event_id,
room_version=room_version,
outlier=True,
timeout=10000,
)
@ -1060,7 +1061,7 @@ class FederationHandler(BaseHandler):
"""
logger.debug("Joining %s to %s", joinee, room_id)
origin, event = yield self._make_and_verify_event(
origin, event, event_format_version = yield self._make_and_verify_event(
target_hosts,
room_id,
joinee,
@ -1083,7 +1084,6 @@ class FederationHandler(BaseHandler):
handled_events = set()
try:
event = self._sign_event(event)
# Try the host we successfully got a response to /make_join/
# request first.
try:
@ -1091,7 +1091,9 @@ class FederationHandler(BaseHandler):
target_hosts.insert(0, origin)
except ValueError:
pass
ret = yield self.federation_client.send_join(target_hosts, event)
ret = yield self.federation_client.send_join(
target_hosts, event, event_format_version,
)
origin = ret["origin"]
state = ret["state"]
@ -1164,13 +1166,18 @@ class FederationHandler(BaseHandler):
"""
event_content = {"membership": Membership.JOIN}
builder = self.event_builder_factory.new({
"type": EventTypes.Member,
"content": event_content,
"room_id": room_id,
"sender": user_id,
"state_key": user_id,
})
room_version = yield self.store.get_room_version(room_id)
builder = self.event_builder_factory.new(
room_version,
{
"type": EventTypes.Member,
"content": event_content,
"room_id": room_id,
"sender": user_id,
"state_key": user_id,
}
)
try:
event, context = yield self.event_creation_handler.create_new_client_event(
@ -1182,7 +1189,9 @@ class FederationHandler(BaseHandler):
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_join_request`
yield self.auth.check_from_context(event, context, do_sig_check=False)
yield self.auth.check_from_context(
room_version, event, context, do_sig_check=False,
)
defer.returnValue(event)
@ -1287,11 +1296,11 @@ class FederationHandler(BaseHandler):
)
event.internal_metadata.outlier = True
event.internal_metadata.invite_from_remote = True
event.internal_metadata.out_of_band_membership = True
event.signatures.update(
compute_event_signature(
event,
event.get_pdu_json(),
self.hs.hostname,
self.hs.config.signing_key[0]
)
@ -1304,7 +1313,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
origin, event = yield self._make_and_verify_event(
origin, event, event_format_version = yield self._make_and_verify_event(
target_hosts,
room_id,
user_id,
@ -1313,7 +1322,7 @@ class FederationHandler(BaseHandler):
# Mark as outlier as we don't have any state for this event; we're not
# even in the room.
event.internal_metadata.outlier = True
event = self._sign_event(event)
event.internal_metadata.out_of_band_membership = True
# Try the host that we succesfully called /make_leave/ on first for
# the /send_leave/ request.
@ -1336,7 +1345,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
content={}, params=None):
origin, pdu = yield self.federation_client.make_membership_event(
origin, event, format_ver = yield self.federation_client.make_membership_event(
target_hosts,
room_id,
user_id,
@ -1345,9 +1354,7 @@ class FederationHandler(BaseHandler):
params=params,
)
logger.debug("Got response to make_%s: %s", membership, pdu)
event = pdu
logger.debug("Got response to make_%s: %s", membership, event)
# We should assert some things.
# FIXME: Do this in a nicer way
@ -1355,28 +1362,7 @@ class FederationHandler(BaseHandler):
assert(event.user_id == user_id)
assert(event.state_key == user_id)
assert(event.room_id == room_id)
defer.returnValue((origin, event))
def _sign_event(self, event):
event.internal_metadata.outlier = False
builder = self.event_builder_factory.new(
unfreeze(event.get_pdu_json())
)
builder.event_id = self.event_builder_factory.create_event_id()
builder.origin = self.hs.hostname
if not hasattr(event, "signatures"):
builder.signatures = {}
add_hashes_and_signatures(
builder,
self.hs.hostname,
self.hs.config.signing_key[0],
)
return builder.build()
defer.returnValue((origin, event, format_ver))
@defer.inlineCallbacks
@log_function
@ -1385,13 +1371,17 @@ class FederationHandler(BaseHandler):
leave event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
"""
builder = self.event_builder_factory.new({
"type": EventTypes.Member,
"content": {"membership": Membership.LEAVE},
"room_id": room_id,
"sender": user_id,
"state_key": user_id,
})
room_version = yield self.store.get_room_version(room_id)
builder = self.event_builder_factory.new(
room_version,
{
"type": EventTypes.Member,
"content": {"membership": Membership.LEAVE},
"room_id": room_id,
"sender": user_id,
"state_key": user_id,
}
)
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
@ -1400,7 +1390,9 @@ class FederationHandler(BaseHandler):
try:
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_leave_request`
yield self.auth.check_from_context(event, context, do_sig_check=False)
yield self.auth.check_from_context(
room_version, event, context, do_sig_check=False,
)
except AuthError as e:
logger.warn("Failed to create new leave %r because %s", event, e)
raise e
@ -1659,6 +1651,13 @@ class FederationHandler(BaseHandler):
create_event = e
break
if create_event is None:
# If the state doesn't have a create event then the room is
# invalid, and it would fail auth checks anyway.
raise SynapseError(400, "No create event in state")
room_version = create_event.content.get("room_version", RoomVersions.V1)
missing_auth_events = set()
for e in itertools.chain(auth_events, state, [event]):
for e_id in e.auth_event_ids():
@ -1669,6 +1668,7 @@ class FederationHandler(BaseHandler):
m_ev = yield self.federation_client.get_pdu(
[origin],
e_id,
room_version=room_version,
outlier=True,
timeout=10000,
)
@ -1687,7 +1687,7 @@ class FederationHandler(BaseHandler):
auth_for_e[(EventTypes.Create, "")] = create_event
try:
self.auth.check(e, auth_events=auth_for_e)
self.auth.check(room_version, e, auth_events=auth_for_e)
except SynapseError as err:
# we may get SynapseErrors here as well as AuthErrors. For
# instance, there are a couple of (ancient) events in some
@ -1931,6 +1931,8 @@ class FederationHandler(BaseHandler):
current_state = set(e.event_id for e in auth_events.values())
different_auth = event_auth_events - current_state
room_version = yield self.store.get_room_version(event.room_id)
if different_auth and not event.internal_metadata.is_outlier():
# Do auth conflict res.
logger.info("Different auth: %s", different_auth)
@ -1955,8 +1957,6 @@ class FederationHandler(BaseHandler):
(d.type, d.state_key): d for d in different_events if d
})
room_version = yield self.store.get_room_version(event.room_id)
new_state = yield self.state_handler.resolve_events(
room_version,
[list(local_view.values()), list(remote_view.values())],
@ -2056,7 +2056,7 @@ class FederationHandler(BaseHandler):
)
try:
self.auth.check(event, auth_events=auth_events)
self.auth.check(room_version, event, auth_events=auth_events)
except AuthError as e:
logger.warn("Failed auth resolution for %r because %s", event, e)
raise e
@ -2279,18 +2279,26 @@ class FederationHandler(BaseHandler):
}
if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
room_version = yield self.store.get_room_version(room_id)
builder = self.event_builder_factory.new(room_version, event_dict)
EventValidator().validate_builder(builder)
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder
)
event, context = yield self.add_display_name_to_third_party_invite(
event_dict, event, context
room_version, event_dict, event, context
)
EventValidator().validate_new(event)
# We need to tell the transaction queue to send this out, even
# though the sender isn't a local user.
event.internal_metadata.send_on_behalf_of = self.hs.hostname
try:
yield self.auth.check_from_context(event, context)
yield self.auth.check_from_context(room_version, event, context)
except AuthError as e:
logger.warn("Denying new third party invite %r because %s", event, e)
raise e
@ -2317,23 +2325,31 @@ class FederationHandler(BaseHandler):
Returns:
Deferred: resolves (to None)
"""
builder = self.event_builder_factory.new(event_dict)
room_version = yield self.store.get_room_version(room_id)
# NB: event_dict has a particular specced format we might need to fudge
# if we change event formats too much.
builder = self.event_builder_factory.new(room_version, event_dict)
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
event, context = yield self.add_display_name_to_third_party_invite(
event_dict, event, context
room_version, event_dict, event, context
)
try:
self.auth.check_from_context(event, context)
self.auth.check_from_context(room_version, event, context)
except AuthError as e:
logger.warn("Denying third party invite %r because %s", event, e)
raise e
yield self._check_signature(event, context)
# We need to tell the transaction queue to send this out, even
# though the sender isn't a local user.
event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
# XXX we send the invite here, but send_membership_event also sends it,
# so we end up making two requests. I think this is redundant.
returned_invite = yield self.send_invite(origin, event)
@ -2344,7 +2360,8 @@ class FederationHandler(BaseHandler):
yield member_handler.send_membership_event(None, event, context)
@defer.inlineCallbacks
def add_display_name_to_third_party_invite(self, event_dict, event, context):
def add_display_name_to_third_party_invite(self, room_version, event_dict,
event, context):
key = (
EventTypes.ThirdPartyInvite,
event.content["third_party_invite"]["signed"]["token"]
@ -2368,11 +2385,12 @@ class FederationHandler(BaseHandler):
# auth checks. If we need the invite and don't have it then the
# auth check code will explode appropriately.
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
builder = self.event_builder_factory.new(room_version, event_dict)
EventValidator().validate_builder(builder)
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
EventValidator().validate_new(event)
defer.returnValue((event, context))
@defer.inlineCallbacks

View File

@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.constants import EventTypes, Membership, RoomVersions
from synapse.api.errors import (
AuthError,
Codes,
@ -31,7 +31,6 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
@ -278,9 +277,17 @@ class EventCreationHandler(object):
"""
yield self.auth.check_auth_blocking(requester.user.to_string())
builder = self.event_builder_factory.new(event_dict)
if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
room_version = event_dict["content"]["room_version"]
else:
try:
room_version = yield self.store.get_room_version(event_dict["room_id"])
except NotFoundError:
raise AuthError(403, "Unknown room")
self.validator.validate_new(builder)
builder = self.event_builder_factory.new(room_version, event_dict)
self.validator.validate_builder(builder)
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
@ -318,6 +325,8 @@ class EventCreationHandler(object):
prev_events_and_hashes=prev_events_and_hashes,
)
self.validator.validate_new(event)
defer.returnValue((event, context))
def _is_exempt_from_privacy_policy(self, builder, requester):
@ -535,40 +544,19 @@ class EventCreationHandler(object):
prev_events_and_hashes = \
yield self.store.get_prev_events_for_room(builder.room_id)
if prev_events_and_hashes:
depth = max([d for _, _, d in prev_events_and_hashes]) + 1
# we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in
# the db)
depth = min(depth, MAX_DEPTH)
else:
depth = 1
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in prev_events_and_hashes
]
builder.prev_events = prev_events
builder.depth = depth
context = yield self.state.compute_event_context(builder)
event = yield builder.build(
prev_event_ids=[p for p, _ in prev_events],
)
context = yield self.state.compute_event_context(event)
if requester:
context.app_service = requester.app_service
if builder.is_state():
builder.prev_state = yield self.store.add_event_hashes(
context.prev_state_events
)
yield self.auth.add_auth_events(builder, context)
signing_key = self.hs.config.signing_key[0]
add_hashes_and_signatures(
builder, self.server_name, signing_key
)
event = builder.build()
self.validator.validate_new(event)
logger.debug(
"Created event %s",
@ -603,8 +591,13 @@ class EventCreationHandler(object):
extra_users (list(UserID)): Any extra users to notify about event
"""
if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""):
room_version = event.content.get("room_version", RoomVersions.V1)
else:
room_version = yield self.store.get_room_version(event.room_id)
try:
yield self.auth.check_from_context(event, context)
yield self.auth.check_from_context(room_version, event, context)
except AuthError as err:
logger.warn("Denying new event %r because %s", event, err)
raise err
@ -752,7 +745,8 @@ class EventCreationHandler(object):
auth_events = {
(e.type, e.state_key): e for e in auth_events.values()
}
if self.auth.check_redaction(event, auth_events=auth_events):
room_version = yield self.store.get_room_version(event.room_id)
if self.auth.check_redaction(room_version, event, auth_events=auth_events):
original_event = yield self.store.get_event(
event.redacts,
check_redacted=False,
@ -766,6 +760,9 @@ class EventCreationHandler(object):
"You don't have permission to redact events"
)
# We've already checked.
event.internal_metadata.recheck_redaction = False
if event.type == EventTypes.Create:
prev_state_ids = yield context.get_prev_state_ids(self.store)
if prev_state_ids:

View File

@ -123,9 +123,12 @@ class RoomCreationHandler(BaseHandler):
token_id=requester.access_token_id,
)
)
yield self.auth.check_from_context(tombstone_event, tombstone_context)
old_room_version = yield self.store.get_room_version(old_room_id)
yield self.auth.check_from_context(
old_room_version, tombstone_event, tombstone_context,
)
yield self.clone_exiting_room(
yield self.clone_existing_room(
requester,
old_room_id=old_room_id,
new_room_id=new_room_id,
@ -230,7 +233,7 @@ class RoomCreationHandler(BaseHandler):
)
@defer.inlineCallbacks
def clone_exiting_room(
def clone_existing_room(
self, requester, old_room_id, new_room_id, new_room_version,
tombstone_event_id,
):
@ -262,6 +265,7 @@ class RoomCreationHandler(BaseHandler):
initial_state = dict()
# Replicate relevant room events
types_to_copy = (
(EventTypes.JoinRules, ""),
(EventTypes.Name, ""),

View File

@ -63,7 +63,7 @@ class RoomMemberHandler(object):
self.directory_handler = hs.get_handlers().directory_handler
self.registration_handler = hs.get_handlers().registration_handler
self.profile_handler = hs.get_profile_handler()
self.event_creation_hander = hs.get_event_creation_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
@ -161,6 +161,8 @@ class RoomMemberHandler(object):
ratelimit=True,
content=None,
):
user_id = target.to_string()
if content is None:
content = {}
@ -168,14 +170,14 @@ class RoomMemberHandler(object):
if requester.is_guest:
content["kind"] = "guest"
event, context = yield self.event_creation_hander.create_event(
event, context = yield self.event_creation_handler.create_event(
requester,
{
"type": EventTypes.Member,
"content": content,
"room_id": room_id,
"sender": requester.user.to_string(),
"state_key": target.to_string(),
"state_key": user_id,
# For backwards compatibility:
"membership": membership,
@ -186,14 +188,14 @@ class RoomMemberHandler(object):
)
# Check if this event matches the previous membership event for the user.
duplicate = yield self.event_creation_hander.deduplicate_state_event(
duplicate = yield self.event_creation_handler.deduplicate_state_event(
event, context,
)
if duplicate is not None:
# Discard the new event since this membership change is a no-op.
defer.returnValue(duplicate)
yield self.event_creation_hander.handle_new_client_event(
yield self.event_creation_handler.handle_new_client_event(
requester,
event,
context,
@ -204,12 +206,12 @@ class RoomMemberHandler(object):
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, target.to_string()),
(EventTypes.Member, user_id),
None
)
if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has acutally joined the
# Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
newly_joined = True
@ -218,6 +220,18 @@ class RoomMemberHandler(object):
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
yield self._user_joined_room(target, room_id)
# Copy over direct message status and room tags if this is a join
# on an upgraded room
# Check if this is an upgraded room
predecessor = yield self.store.get_room_predecessor(room_id)
if predecessor:
# It is an upgraded room. Copy over old tags
self.copy_room_tags_and_direct_to_room(
predecessor["room_id"], room_id, user_id,
)
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id)
@ -226,6 +240,55 @@ class RoomMemberHandler(object):
defer.returnValue(event)
@defer.inlineCallbacks
def copy_room_tags_and_direct_to_room(
self,
old_room_id,
new_room_id,
user_id,
):
"""Copies the tags and direct room state from one room to another.
Args:
old_room_id (str)
new_room_id (str)
user_id (str)
Returns:
Deferred[None]
"""
# Retrieve user account data for predecessor room
user_account_data, _ = yield self.store.get_account_data_for_user(
user_id,
)
# Copy direct message state if applicable
direct_rooms = user_account_data.get("m.direct", {})
# Check which key this room is under
if isinstance(direct_rooms, dict):
for key, room_id_list in direct_rooms.items():
if old_room_id in room_id_list and new_room_id not in room_id_list:
# Add new room_id to this key
direct_rooms[key].append(new_room_id)
# Save back to user's m.direct account data
yield self.store.add_account_data_for_user(
user_id, "m.direct", direct_rooms,
)
break
# Copy room tags if applicable
room_tags = yield self.store.get_tags_for_room(
user_id, old_room_id,
)
# Copy each room tag to the new room
for tag, tag_content in room_tags.items():
yield self.store.add_tag_to_room(
user_id, new_room_id, tag, tag_content
)
@defer.inlineCallbacks
def update_membership(
self,
@ -493,7 +556,7 @@ class RoomMemberHandler(object):
else:
requester = synapse.types.create_requester(target_user)
prev_event = yield self.event_creation_hander.deduplicate_state_event(
prev_event = yield self.event_creation_handler.deduplicate_state_event(
event, context,
)
if prev_event is not None:
@ -513,7 +576,7 @@ class RoomMemberHandler(object):
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
yield self.event_creation_hander.handle_new_client_event(
yield self.event_creation_handler.handle_new_client_event(
requester,
event,
context,
@ -527,7 +590,7 @@ class RoomMemberHandler(object):
)
if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has acutally joined the
# Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
newly_joined = True
@ -755,7 +818,7 @@ class RoomMemberHandler(object):
)
)
yield self.event_creation_hander.create_and_send_nonmember_event(
yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.ThirdPartyInvite,
@ -877,7 +940,8 @@ class RoomMemberHandler(object):
# first member event?
create_event_id = current_state_ids.get(("m.room.create", ""))
if len(current_state_ids) == 1 and create_event_id:
defer.returnValue(self.hs.is_mine_id(create_event_id))
# We can only get here if we're in the process of creating the room
defer.returnValue(True)
for etype, state_key in current_state_ids:
if etype != EventTypes.Member or not self.hs.is_mine_id(state_key):

View File

@ -37,6 +37,41 @@ class SearchHandler(BaseHandler):
def __init__(self, hs):
super(SearchHandler, self).__init__(hs)
@defer.inlineCallbacks
def get_old_rooms_from_upgraded_room(self, room_id):
"""Retrieves room IDs of old rooms in the history of an upgraded room.
We do so by checking the m.room.create event of the room for a
`predecessor` key. If it exists, we add the room ID to our return
list and then check that room for a m.room.create event and so on
until we can no longer find any more previous rooms.
The full list of all found rooms in then returned.
Args:
room_id (str): id of the room to search through.
Returns:
Deferred[iterable[unicode]]: predecessor room ids
"""
historical_room_ids = []
while True:
predecessor = yield self.store.get_room_predecessor(room_id)
# If no predecessor, assume we've hit a dead end
if not predecessor:
break
# Add predecessor's room ID
historical_room_ids.append(predecessor["room_id"])
# Scan through the old room for further predecessors
room_id = predecessor["room_id"]
defer.returnValue(historical_room_ids)
@defer.inlineCallbacks
def search(self, user, content, batch=None):
"""Performs a full text search for a user.
@ -137,6 +172,18 @@ class SearchHandler(BaseHandler):
)
room_ids = set(r.room_id for r in rooms)
# If doing a subset of all rooms seearch, check if any of the rooms
# are from an upgraded room, and search their contents as well
if search_filter.rooms:
historical_room_ids = []
for room_id in search_filter.rooms:
# Add any previous rooms to the search if they exist
ids = yield self.get_old_rooms_from_upgraded_room(room_id)
historical_room_ids += ids
# Prevent any historical events from being filtered
search_filter = search_filter.with_room_ids(historical_room_ids)
room_ids = search_filter.filter_rooms(room_ids)
if batch_group == "room_id":

View File

@ -12,16 +12,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import attr
from netaddr import IPAddress
from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.web.client import URI, Agent, HTTPConnectionPool
from twisted.web.client import URI, Agent, HTTPConnectionPool, readBody
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent
from synapse.http.endpoint import parse_server_name
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
from synapse.util.logcontext import make_deferred_yieldable
@ -41,13 +44,19 @@ class MatrixFederationAgent(object):
tls_client_options_factory (ClientTLSOptionsFactory|None):
factory to use for fetching client tls options, or none to disable TLS.
_well_known_tls_policy (IPolicyForHTTPS|None):
TLS policy to use for fetching .well-known files. None to use a default
(browser-like) implementation.
srv_resolver (SrvResolver|None):
SRVResolver impl to use for looking up SRV records. None to use a default
implementation.
"""
def __init__(
self, reactor, tls_client_options_factory, _srv_resolver=None,
self, reactor, tls_client_options_factory,
_well_known_tls_policy=None,
_srv_resolver=None,
):
self._reactor = reactor
self._tls_client_options_factory = tls_client_options_factory
@ -60,6 +69,14 @@ class MatrixFederationAgent(object):
self._pool.maxPersistentPerHost = 5
self._pool.cachedConnectionTimeout = 2 * 60
agent_args = {}
if _well_known_tls_policy is not None:
# the param is called 'contextFactory', but actually passing a
# contextfactory is deprecated, and it expects an IPolicyForHTTPS.
agent_args['contextFactory'] = _well_known_tls_policy
_well_known_agent = Agent(self._reactor, pool=self._pool, **agent_args)
self._well_known_agent = _well_known_agent
@defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None):
"""
@ -85,35 +102,39 @@ class MatrixFederationAgent(object):
response from being received (including problems that prevent the request
from being sent).
"""
parsed_uri = URI.fromBytes(uri, defaultPort=-1)
res = yield self._route_matrix_uri(parsed_uri)
parsed_uri = URI.fromBytes(uri)
server_name_bytes = parsed_uri.netloc
host, port = parse_server_name(server_name_bytes.decode("ascii"))
# set up the TLS connection params
#
# XXX disabling TLS is really only supported here for the benefit of the
# unit tests. We should make the UTs cope with TLS rather than having to make
# the code support the unit tests.
if self._tls_client_options_factory is None:
tls_options = None
else:
tls_options = self._tls_client_options_factory.get_options(host)
tls_options = self._tls_client_options_factory.get_options(
res.tls_server_name.decode("ascii")
)
if port is not None:
target = (host, port)
# make sure that the Host header is set correctly
if headers is None:
headers = Headers()
else:
service_name = b"_matrix._tcp.%s" % (server_name_bytes, )
server_list = yield self._srv_resolver.resolve_service(service_name)
if not server_list:
target = (host, 8448)
logger.debug("No SRV record for %s, using %s", host, target)
else:
target = pick_server_from_list(server_list)
headers = headers.copy()
if not headers.hasHeader(b'host'):
headers.addRawHeader(b'host', res.host_header)
class EndpointFactory(object):
@staticmethod
def endpointForURI(_uri):
logger.info("Connecting to %s:%s", target[0], target[1])
ep = HostnameEndpoint(self._reactor, host=target[0], port=target[1])
logger.info(
"Connecting to %s:%i",
res.target_host.decode("ascii"),
res.target_port,
)
ep = HostnameEndpoint(self._reactor, res.target_host, res.target_port)
if tls_options is not None:
ep = wrapClientTLS(tls_options, ep)
return ep
@ -123,3 +144,191 @@ class MatrixFederationAgent(object):
agent.request(method, uri, headers, bodyProducer)
)
defer.returnValue(res)
@defer.inlineCallbacks
def _route_matrix_uri(self, parsed_uri, lookup_well_known=True):
"""Helper for `request`: determine the routing for a Matrix URI
Args:
parsed_uri (twisted.web.client.URI): uri to route. Note that it should be
parsed with URI.fromBytes(uri, defaultPort=-1) to set the `port` to -1
if there is no explicit port given.
lookup_well_known (bool): True if we should look up the .well-known file if
there is no SRV record.
Returns:
Deferred[_RoutingResult]
"""
# check for an IP literal
try:
ip_address = IPAddress(parsed_uri.host.decode("ascii"))
except Exception:
# not an IP address
ip_address = None
if ip_address:
port = parsed_uri.port
if port == -1:
port = 8448
defer.returnValue(_RoutingResult(
host_header=parsed_uri.netloc,
tls_server_name=parsed_uri.host,
target_host=parsed_uri.host,
target_port=port,
))
if parsed_uri.port != -1:
# there is an explicit port
defer.returnValue(_RoutingResult(
host_header=parsed_uri.netloc,
tls_server_name=parsed_uri.host,
target_host=parsed_uri.host,
target_port=parsed_uri.port,
))
# try a SRV lookup
service_name = b"_matrix._tcp.%s" % (parsed_uri.host,)
server_list = yield self._srv_resolver.resolve_service(service_name)
if not server_list and lookup_well_known:
# try a .well-known lookup
well_known_server = yield self._get_well_known(parsed_uri.host)
if well_known_server:
# if we found a .well-known, start again, but don't do another
# .well-known lookup.
# parse the server name in the .well-known response into host/port.
# (This code is lifted from twisted.web.client.URI.fromBytes).
if b':' in well_known_server:
well_known_host, well_known_port = well_known_server.rsplit(b':', 1)
try:
well_known_port = int(well_known_port)
except ValueError:
# the part after the colon could not be parsed as an int
# - we assume it is an IPv6 literal with no port (the closing
# ']' stops it being parsed as an int)
well_known_host, well_known_port = well_known_server, -1
else:
well_known_host, well_known_port = well_known_server, -1
new_uri = URI(
scheme=parsed_uri.scheme,
netloc=well_known_server,
host=well_known_host,
port=well_known_port,
path=parsed_uri.path,
params=parsed_uri.params,
query=parsed_uri.query,
fragment=parsed_uri.fragment,
)
res = yield self._route_matrix_uri(new_uri, lookup_well_known=False)
defer.returnValue(res)
if not server_list:
target_host = parsed_uri.host
port = 8448
logger.debug(
"No SRV record for %s, using %s:%i",
parsed_uri.host.decode("ascii"), target_host.decode("ascii"), port,
)
else:
target_host, port = pick_server_from_list(server_list)
logger.debug(
"Picked %s:%i from SRV records for %s",
target_host.decode("ascii"), port, parsed_uri.host.decode("ascii"),
)
defer.returnValue(_RoutingResult(
host_header=parsed_uri.netloc,
tls_server_name=parsed_uri.host,
target_host=target_host,
target_port=port,
))
@defer.inlineCallbacks
def _get_well_known(self, server_name):
"""Attempt to fetch and parse a .well-known file for the given server
Args:
server_name (bytes): name of the server, from the requested url
Returns:
Deferred[bytes|None]: either the new server name, from the .well-known, or
None if there was no .well-known file.
"""
# FIXME: add a cache
uri = b"https://%s/.well-known/matrix/server" % (server_name, )
uri_str = uri.decode("ascii")
logger.info("Fetching %s", uri_str)
try:
response = yield make_deferred_yieldable(
self._well_known_agent.request(b"GET", uri),
)
except Exception as e:
logger.info("Connection error fetching %s: %s", uri_str, e)
defer.returnValue(None)
body = yield make_deferred_yieldable(readBody(response))
if response.code != 200:
logger.info("Error response %i from %s", response.code, uri_str)
defer.returnValue(None)
try:
parsed_body = json.loads(body.decode('utf-8'))
logger.info("Response from .well-known: %s", parsed_body)
if not isinstance(parsed_body, dict):
raise Exception("not a dict")
if "m.server" not in parsed_body:
raise Exception("Missing key 'm.server'")
except Exception as e:
raise Exception("invalid .well-known response from %s: %s" % (uri_str, e,))
defer.returnValue(parsed_body["m.server"].encode("ascii"))
@attr.s
class _RoutingResult(object):
"""The result returned by `_route_matrix_uri`.
Contains the parameters needed to direct a federation connection to a particular
server.
Where a SRV record points to several servers, this object contains a single server
chosen from the list.
"""
host_header = attr.ib()
"""
The value we should assign to the Host header (host:port from the matrix
URI, or .well-known).
:type: bytes
"""
tls_server_name = attr.ib()
"""
The server name we should set in the SNI (typically host, without port, from the
matrix URI or .well-known)
:type: bytes
"""
target_host = attr.ib()
"""
The hostname (or IP literal) we should route the TCP connection to (the target of the
SRV record, or the hostname from the URL/.well-known)
:type: bytes
"""
target_port = attr.ib()
"""
The port we should route the TCP connection to (the target of the SRV record, or
the port from the URL/.well-known, or 8448)
:type: int
"""

View File

@ -255,7 +255,6 @@ class MatrixFederationHttpClient(object):
headers_dict = {
b"User-Agent": [self.version_string_bytes],
b"Host": [destination_bytes],
}
with limiter:

View File

@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
from synapse.events import FrozenEvent
from synapse.events import event_type_from_format_version
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@ -70,6 +70,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
event_payloads.append({
"event": event.get_pdu_json(),
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
@ -94,9 +95,12 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
event_and_contexts = []
for event_payload in event_payloads:
event_dict = event_payload["event"]
format_ver = event_payload["event_format_version"]
internal_metadata = event_payload["internal_metadata"]
rejected_reason = event_payload["rejected_reason"]
event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
EventType = event_type_from_format_version(format_ver)
event = EventType(event_dict, internal_metadata, rejected_reason)
context = yield EventContext.deserialize(
self.store, event_payload["context"],

View File

@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
from synapse.events import FrozenEvent
from synapse.events import event_type_from_format_version
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@ -74,6 +74,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
payload = {
"event": event.get_pdu_json(),
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
@ -90,9 +91,12 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
content = parse_json_object_from_request(request)
event_dict = content["event"]
format_ver = content["event_format_version"]
internal_metadata = content["internal_metadata"]
rejected_reason = content["rejected_reason"]
event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
EventType = event_type_from_format_version(format_ver)
event = EventType(event_dict, internal_metadata, rejected_reason)
requester = Requester.deserialize(self.store, content["requester"])
context = yield EventContext.deserialize(self.store, content["context"])

View File

@ -89,7 +89,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomStateEventRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.event_creation_hander = hs.get_event_creation_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
self.message_handler = hs.get_message_handler()
@ -172,7 +172,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
content=content,
)
else:
event = yield self.event_creation_hander.create_and_send_nonmember_event(
event = yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
event_dict,
txn_id=txn_id,
@ -189,7 +189,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomSendEventRestServlet, self).__init__(hs)
self.event_creation_hander = hs.get_event_creation_handler()
self.event_creation_handler = hs.get_event_creation_handler()
def register(self, http_server):
# /rooms/$roomid/send/$event_type[/$txn_id]
@ -211,7 +211,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
if b'ts' in request.args and requester.app_service:
event_dict['origin_server_ts'] = parse_integer(request, "ts", 0)
event = yield self.event_creation_hander.create_and_send_nonmember_event(
event = yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
event_dict,
txn_id=txn_id,

View File

@ -101,16 +101,7 @@ class ConsentResource(Resource):
"missing in config file.",
)
# daemonize changes the cwd to /, so make the path absolute now.
consent_template_directory = path.abspath(
hs.config.user_consent_template_dir,
)
if not path.isdir(consent_template_directory):
raise ConfigError(
"Could not find template directory '%s'" % (
consent_template_directory,
),
)
consent_template_directory = hs.config.user_consent_template_dir
loader = jinja2.FileSystemLoader(consent_template_directory)
self._jinja_env = jinja2.Environment(

View File

@ -355,10 +355,7 @@ class HomeServer(object):
return Keyring(self)
def build_event_builder_factory(self):
return EventBuilderFactory(
clock=self.get_clock(),
hostname=self.hostname,
)
return EventBuilderFactory(self)
def build_filtering(self):
return Filtering(self)

View File

@ -608,10 +608,10 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto
state_sets, event_map, state_res_store.get_events,
)
elif room_version in (
RoomVersions.VDH_TEST, RoomVersions.STATE_V2_TEST, RoomVersions.V2,
RoomVersions.STATE_V2_TEST, RoomVersions.V2, RoomVersions.V3,
):
return v2.resolve_events_with_store(
state_sets, event_map, state_res_store,
room_version, state_sets, event_map, state_res_store,
)
else:
# This should only happen if we added a version but forgot to add it to

View File

@ -21,7 +21,7 @@ from six import iteritems, iterkeys, itervalues
from twisted.internet import defer
from synapse import event_auth
from synapse.api.constants import EventTypes
from synapse.api.constants import EventTypes, RoomVersions
from synapse.api.errors import AuthError
logger = logging.getLogger(__name__)
@ -274,7 +274,11 @@ def _resolve_auth_events(events, auth_events):
auth_events[(prev_event.type, prev_event.state_key)] = prev_event
try:
# The signatures have already been checked at this point
event_auth.check(event, auth_events, do_sig_check=False, do_size_check=False)
event_auth.check(
RoomVersions.V1, event, auth_events,
do_sig_check=False,
do_size_check=False,
)
prev_event = event
except AuthError:
return prev_event
@ -286,7 +290,11 @@ def _resolve_normal_events(events, auth_events):
for event in _ordered_events(events):
try:
# The signatures have already been checked at this point
event_auth.check(event, auth_events, do_sig_check=False, do_size_check=False)
event_auth.check(
RoomVersions.V1, event, auth_events,
do_sig_check=False,
do_size_check=False,
)
return event
except AuthError:
pass

View File

@ -29,10 +29,12 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def resolve_events_with_store(state_sets, event_map, state_res_store):
def resolve_events_with_store(room_version, state_sets, event_map, state_res_store):
"""Resolves the state using the v2 state resolution algorithm
Args:
room_version (str): The room version
state_sets(list): List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
@ -104,7 +106,7 @@ def resolve_events_with_store(state_sets, event_map, state_res_store):
# Now sequentially auth each one
resolved_state = yield _iterative_auth_checks(
sorted_power_events, unconflicted_state, event_map,
room_version, sorted_power_events, unconflicted_state, event_map,
state_res_store,
)
@ -129,7 +131,7 @@ def resolve_events_with_store(state_sets, event_map, state_res_store):
logger.debug("resolving remaining events")
resolved_state = yield _iterative_auth_checks(
leftover_events, resolved_state, event_map,
room_version, leftover_events, resolved_state, event_map,
state_res_store,
)
@ -350,11 +352,13 @@ def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_
@defer.inlineCallbacks
def _iterative_auth_checks(event_ids, base_state, event_map, state_res_store):
def _iterative_auth_checks(room_version, event_ids, base_state, event_map,
state_res_store):
"""Sequentially apply auth checks to each event in given list, updating the
state as it goes along.
Args:
room_version (str)
event_ids (list[str]): Ordered list of events to apply auth checks to
base_state (dict[tuple[str, str], str]): The set of state to start with
event_map (dict[str,FrozenEvent])
@ -385,7 +389,7 @@ def _iterative_auth_checks(event_ids, base_state, event_map, state_res_store):
try:
event_auth.check(
event, auth_events,
room_version, event, auth_events,
do_sig_check=False,
do_size_check=False
)

View File

@ -27,7 +27,7 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.stringutils import exception_to_unicode
@ -196,6 +196,12 @@ class SQLBaseStore(object):
# A set of tables that are not safe to use native upserts in.
self._unsafe_to_upsert_tables = {"user_ips"}
# We add the user_directory_search table to the blacklist on SQLite
# because the existing search table does not have an index, making it
# unsafe to use native upserts.
if isinstance(self.database_engine, Sqlite3Engine):
self._unsafe_to_upsert_tables.add("user_directory_search")
if self.database_engine.can_native_upsert:
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
@ -230,7 +236,7 @@ class SQLBaseStore(object):
self._unsafe_to_upsert_tables.discard("user_ips")
# If there's any tables left to check, reschedule to run.
if self._unsafe_to_upsert_tables:
if updates:
self._clock.call_later(
15.0,
run_as_background_process,

View File

@ -240,7 +240,7 @@ class BackgroundUpdateStore(SQLBaseStore):
* An integer count of the number of items to update in this batch.
The handler should return a deferred integer count of items updated.
The hander is responsible for updating the progress of the update.
The handler is responsible for updating the progress of the update.
Args:
update_name(str): The name of the update that this code handles.

View File

@ -15,7 +15,6 @@
import struct
import threading
from sqlite3 import sqlite_version_info
from synapse.storage.prepare_database import prepare_database
@ -37,7 +36,7 @@ class Sqlite3Engine(object):
Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
more work we haven't done yet to tell what was inserted vs updated.
"""
return sqlite_version_info >= (3, 24, 0)
return self.module.sqlite_version_info >= (3, 24, 0)
def check_database(self, txn):
pass

View File

@ -125,6 +125,29 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
return dict(txn)
@defer.inlineCallbacks
def get_max_depth_of(self, event_ids):
"""Returns the max depth of a set of event IDs
Args:
event_ids (list[str])
Returns
Deferred[int]
"""
rows = yield self._simple_select_many_batch(
table="events",
column="event_id",
iterable=event_ids,
retcols=("depth",),
desc="get_max_depth_of",
)
if not rows:
defer.returnValue(0)
else:
defer.returnValue(max(row["depth"] for row in rows))
def _get_oldest_events_in_room_txn(self, txn, room_id):
return self._simple_select_onecol_txn(
txn,

View File

@ -1268,6 +1268,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
event.internal_metadata.get_dict()
),
"json": encode_json(event_dict(event)),
"format_version": event.format_version,
}
for event, _ in events_and_contexts
],

View File

@ -21,13 +21,14 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.constants import EventFormatVersions, EventTypes
from synapse.api.errors import NotFoundError
from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
from synapse.util.logcontext import (
LoggingContext,
PreserveLoggingContext,
@ -162,7 +163,6 @@ class EventsWorkerStore(SQLBaseStore):
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
allow_rejected=allow_rejected,
)
@ -174,6 +174,29 @@ class EventsWorkerStore(SQLBaseStore):
if not entry:
continue
# Starting in room version v3, some redactions need to be rechecked if we
# didn't have the redacted event at the time, so we recheck on read
# instead.
if not allow_rejected and entry.event.type == EventTypes.Redaction:
if entry.event.internal_metadata.need_to_check_redaction():
orig = yield self.get_event(
entry.event.redacts,
allow_none=True,
allow_rejected=True,
get_prev_content=False,
)
expected_domain = get_domain_from_id(entry.event.sender)
if orig and get_domain_from_id(orig.sender) == expected_domain:
# This redaction event is allowed. Mark as not needing a
# recheck.
entry.event.internal_metadata.recheck_redaction = False
else:
# We don't have the event that is being redacted, so we
# assume that the event isn't authorized for now. (If we
# later receive the event, then we will always redact
# it anyway, since we have this redaction)
continue
if allow_rejected or not entry.event.rejected_reason:
if check_redacted and entry.redacted_event:
event = entry.redacted_event
@ -310,7 +333,7 @@ class EventsWorkerStore(SQLBaseStore):
self.hs.get_reactor().callFromThread(fire, event_list, e)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
def _enqueue_events(self, events, allow_rejected=False):
"""Fetches events from the database using the _event_fetch_list. This
allows batch and bulk fetching of events - it allows us to fetch events
without having to create a new transaction for each request for events.
@ -353,6 +376,7 @@ class EventsWorkerStore(SQLBaseStore):
self._get_event_from_row,
row["internal_metadata"], row["json"], row["redacts"],
rejected_reason=row["rejects"],
format_version=row["format_version"],
)
for row in rows
],
@ -377,6 +401,7 @@ class EventsWorkerStore(SQLBaseStore):
" e.event_id as event_id, "
" e.internal_metadata,"
" e.json,"
" e.format_version, "
" r.redacts as redacts,"
" rej.event_id as rejects "
" FROM event_json as e"
@ -392,7 +417,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
rejected_reason=None):
format_version, rejected_reason=None):
with Measure(self._clock, "_get_event_from_row"):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@ -405,8 +430,13 @@ class EventsWorkerStore(SQLBaseStore):
desc="_get_event_from_row_rejected_reason",
)
original_ev = FrozenEvent(
d,
if format_version is None:
# This means that we stored the event before we had the concept
# of a event format version, so it must be a V1 event.
format_version = EventFormatVersions.V1
original_ev = event_type_from_format_version(format_version)(
event_dict=d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
@ -436,6 +466,19 @@ class EventsWorkerStore(SQLBaseStore):
# will serialise this field correctly
redacted_event.unsigned["redacted_because"] = because
# Starting in room version v3, some redactions need to be
# rechecked if we didn't have the redacted event at the
# time, so we recheck on read instead.
if because.internal_metadata.need_to_check_redaction():
expected_domain = get_domain_from_id(original_ev.sender)
if get_domain_from_id(because.sender) == expected_domain:
# This redaction event is allowed. Mark as not needing a
# recheck.
because.internal_metadata.recheck_redaction = False
else:
# Senders don't match, so the event isn't actually redacted
redacted_event = None
cache_entry = _EventCacheEntry(
event=original_ev,
redacted_event=redacted_event,

View File

@ -197,15 +197,21 @@ class MonthlyActiveUsersStore(SQLBaseStore):
if is_support:
return
is_insert = yield self.runInteraction(
yield self.runInteraction(
"upsert_monthly_active_user", self.upsert_monthly_active_user_txn,
user_id
)
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))
user_in_mau = self.user_last_seen_monthly_active.cache.get(
(user_id,),
None,
update_metrics=False
)
if user_in_mau is None:
self.get_monthly_active_count.invalidate(())
self.user_last_seen_monthly_active.invalidate((user_id,))
def upsert_monthly_active_user_txn(self, txn, user_id):
"""Updates or inserts monthly active user member

View File

@ -588,12 +588,12 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
# We update the local_invites table only if the event is "current",
# i.e., its something that has just happened.
# The only current event that can also be an outlier is if its an
# invite that has come in across federation.
# i.e., its something that has just happened. If the event is an
# outlier it is only current if its an "out of band membership",
# like a remote invite or a rejection of a remote invite.
is_new_state = not backfilled and (
not event.internal_metadata.is_outlier()
or event.internal_metadata.is_invite_from_remote()
or event.internal_metadata.is_out_of_band_membership()
)
is_mine = self.hs.is_mine_id(event.state_key)
if is_new_state and is_mine:

View File

@ -0,0 +1,16 @@
/* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE event_json ADD COLUMN format_version INTEGER;

Some files were not shown because too many files have changed in this diff Show More