Merge pull request #4448 from matrix-org/erikj/get_pdu_versions
Add room_version param to get_pdu
This commit is contained in:
commit
edc1e21dbe
|
@ -0,0 +1 @@
|
||||||
|
Add infrastructure to support different event formats
|
|
@ -43,8 +43,8 @@ class FederationBase(object):
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
|
def _check_sigs_and_hash_and_fetch(self, origin, pdus, room_version,
|
||||||
include_none=False):
|
outlier=False, include_none=False):
|
||||||
"""Takes a list of PDUs and checks the signatures and hashs of each
|
"""Takes a list of PDUs and checks the signatures and hashs of each
|
||||||
one. If a PDU fails its signature check then we check if we have it in
|
one. If a PDU fails its signature check then we check if we have it in
|
||||||
the database and if not then request if from the originating server of
|
the database and if not then request if from the originating server of
|
||||||
|
@ -56,8 +56,12 @@ class FederationBase(object):
|
||||||
a new list.
|
a new list.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
origin (str)
|
||||||
pdu (list)
|
pdu (list)
|
||||||
outlier (bool)
|
room_version (str)
|
||||||
|
outlier (bool): Whether the events are outliers or not
|
||||||
|
include_none (str): Whether to include None in the returned list
|
||||||
|
for events that have failed their checks
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred : A list of PDUs that have valid signatures and hashes.
|
Deferred : A list of PDUs that have valid signatures and hashes.
|
||||||
|
@ -84,6 +88,7 @@ class FederationBase(object):
|
||||||
res = yield self.get_pdu(
|
res = yield self.get_pdu(
|
||||||
destinations=[pdu.origin],
|
destinations=[pdu.origin],
|
||||||
event_id=pdu.event_id,
|
event_id=pdu.event_id,
|
||||||
|
room_version=room_version,
|
||||||
outlier=outlier,
|
outlier=outlier,
|
||||||
timeout=10000,
|
timeout=10000,
|
||||||
)
|
)
|
||||||
|
|
|
@ -25,7 +25,12 @@ from prometheus_client import Counter
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
|
from synapse.api.constants import (
|
||||||
|
KNOWN_ROOM_VERSIONS,
|
||||||
|
EventTypes,
|
||||||
|
Membership,
|
||||||
|
RoomVersions,
|
||||||
|
)
|
||||||
from synapse.api.errors import (
|
from synapse.api.errors import (
|
||||||
CodeMessageException,
|
CodeMessageException,
|
||||||
FederationDeniedError,
|
FederationDeniedError,
|
||||||
|
@ -204,7 +209,8 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
|
def get_pdu(self, destinations, event_id, room_version, outlier=False,
|
||||||
|
timeout=None):
|
||||||
"""Requests the PDU with given origin and ID from the remote home
|
"""Requests the PDU with given origin and ID from the remote home
|
||||||
servers.
|
servers.
|
||||||
|
|
||||||
|
@ -214,6 +220,7 @@ class FederationClient(FederationBase):
|
||||||
Args:
|
Args:
|
||||||
destinations (list): Which home servers to query
|
destinations (list): Which home servers to query
|
||||||
event_id (str): event to fetch
|
event_id (str): event to fetch
|
||||||
|
room_version (str): version of the room
|
||||||
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
|
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
|
||||||
it's from an arbitary point in the context as opposed to part
|
it's from an arbitary point in the context as opposed to part
|
||||||
of the current block of PDUs. Defaults to `False`
|
of the current block of PDUs. Defaults to `False`
|
||||||
|
@ -354,10 +361,13 @@ class FederationClient(FederationBase):
|
||||||
ev.event_id for ev in itertools.chain(pdus, auth_chain)
|
ev.event_id for ev in itertools.chain(pdus, auth_chain)
|
||||||
])
|
])
|
||||||
|
|
||||||
|
room_version = yield self.store.get_room_version(room_id)
|
||||||
|
|
||||||
signed_pdus = yield self._check_sigs_and_hash_and_fetch(
|
signed_pdus = yield self._check_sigs_and_hash_and_fetch(
|
||||||
destination,
|
destination,
|
||||||
[p for p in pdus if p.event_id not in seen_events],
|
[p for p in pdus if p.event_id not in seen_events],
|
||||||
outlier=True
|
outlier=True,
|
||||||
|
room_version=room_version,
|
||||||
)
|
)
|
||||||
signed_pdus.extend(
|
signed_pdus.extend(
|
||||||
seen_events[p.event_id] for p in pdus if p.event_id in seen_events
|
seen_events[p.event_id] for p in pdus if p.event_id in seen_events
|
||||||
|
@ -366,7 +376,8 @@ class FederationClient(FederationBase):
|
||||||
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
||||||
destination,
|
destination,
|
||||||
[p for p in auth_chain if p.event_id not in seen_events],
|
[p for p in auth_chain if p.event_id not in seen_events],
|
||||||
outlier=True
|
outlier=True,
|
||||||
|
room_version=room_version,
|
||||||
)
|
)
|
||||||
signed_auth.extend(
|
signed_auth.extend(
|
||||||
seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events
|
seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events
|
||||||
|
@ -413,6 +424,8 @@ class FederationClient(FederationBase):
|
||||||
random.shuffle(srvs)
|
random.shuffle(srvs)
|
||||||
return srvs
|
return srvs
|
||||||
|
|
||||||
|
room_version = yield self.store.get_room_version(room_id)
|
||||||
|
|
||||||
batch_size = 20
|
batch_size = 20
|
||||||
missing_events = list(missing_events)
|
missing_events = list(missing_events)
|
||||||
for i in range(0, len(missing_events), batch_size):
|
for i in range(0, len(missing_events), batch_size):
|
||||||
|
@ -423,6 +436,7 @@ class FederationClient(FederationBase):
|
||||||
self.get_pdu,
|
self.get_pdu,
|
||||||
destinations=random_server_list(),
|
destinations=random_server_list(),
|
||||||
event_id=e_id,
|
event_id=e_id,
|
||||||
|
room_version=room_version,
|
||||||
)
|
)
|
||||||
for e_id in batch
|
for e_id in batch
|
||||||
]
|
]
|
||||||
|
@ -452,8 +466,11 @@ class FederationClient(FederationBase):
|
||||||
for p in res["auth_chain"]
|
for p in res["auth_chain"]
|
||||||
]
|
]
|
||||||
|
|
||||||
|
room_version = yield self.store.get_room_version(room_id)
|
||||||
|
|
||||||
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
||||||
destination, auth_chain, outlier=True
|
destination, auth_chain,
|
||||||
|
outlier=True, room_version=room_version,
|
||||||
)
|
)
|
||||||
|
|
||||||
signed_auth.sort(key=lambda e: e.depth)
|
signed_auth.sort(key=lambda e: e.depth)
|
||||||
|
@ -666,9 +683,21 @@ class FederationClient(FederationBase):
|
||||||
for p in itertools.chain(state, auth_chain)
|
for p in itertools.chain(state, auth_chain)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
room_version = None
|
||||||
|
for e in state:
|
||||||
|
if (e.type, e.state_key) == (EventTypes.Create, ""):
|
||||||
|
room_version = e.content.get("room_version", RoomVersions.V1)
|
||||||
|
break
|
||||||
|
|
||||||
|
if room_version is None:
|
||||||
|
# If the state doesn't have a create event then the room is
|
||||||
|
# invalid, and it would fail auth checks anyway.
|
||||||
|
raise SynapseError(400, "No create event in state")
|
||||||
|
|
||||||
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
|
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
|
||||||
destination, list(pdus.values()),
|
destination, list(pdus.values()),
|
||||||
outlier=True,
|
outlier=True,
|
||||||
|
room_version=room_version,
|
||||||
)
|
)
|
||||||
|
|
||||||
valid_pdus_map = {
|
valid_pdus_map = {
|
||||||
|
@ -806,8 +835,10 @@ class FederationClient(FederationBase):
|
||||||
for e in content["auth_chain"]
|
for e in content["auth_chain"]
|
||||||
]
|
]
|
||||||
|
|
||||||
|
room_version = yield self.store.get_room_version(room_id)
|
||||||
|
|
||||||
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
||||||
destination, auth_chain, outlier=True
|
destination, auth_chain, outlier=True, room_version=room_version,
|
||||||
)
|
)
|
||||||
|
|
||||||
signed_auth.sort(key=lambda e: e.depth)
|
signed_auth.sort(key=lambda e: e.depth)
|
||||||
|
@ -854,8 +885,10 @@ class FederationClient(FederationBase):
|
||||||
for e in content.get("events", [])
|
for e in content.get("events", [])
|
||||||
]
|
]
|
||||||
|
|
||||||
|
room_version = yield self.store.get_room_version(room_id)
|
||||||
|
|
||||||
signed_events = yield self._check_sigs_and_hash_and_fetch(
|
signed_events = yield self._check_sigs_and_hash_and_fetch(
|
||||||
destination, events, outlier=False
|
destination, events, outlier=False, room_version=room_version,
|
||||||
)
|
)
|
||||||
except HttpResponseException as e:
|
except HttpResponseException as e:
|
||||||
if not e.code == 400:
|
if not e.code == 400:
|
||||||
|
|
|
@ -457,8 +457,10 @@ class FederationServer(FederationBase):
|
||||||
for e in content["auth_chain"]
|
for e in content["auth_chain"]
|
||||||
]
|
]
|
||||||
|
|
||||||
|
room_version = yield self.store.get_room_version(room_id)
|
||||||
|
|
||||||
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
||||||
origin, auth_chain, outlier=True
|
origin, auth_chain, outlier=True, room_version=room_version,
|
||||||
)
|
)
|
||||||
|
|
||||||
ret = yield self.handler.on_query_auth(
|
ret = yield self.handler.on_query_auth(
|
||||||
|
|
|
@ -34,6 +34,7 @@ from synapse.api.constants import (
|
||||||
EventTypes,
|
EventTypes,
|
||||||
Membership,
|
Membership,
|
||||||
RejectedReason,
|
RejectedReason,
|
||||||
|
RoomVersions,
|
||||||
)
|
)
|
||||||
from synapse.api.errors import (
|
from synapse.api.errors import (
|
||||||
AuthError,
|
AuthError,
|
||||||
|
@ -338,6 +339,8 @@ class FederationHandler(BaseHandler):
|
||||||
room_id, event_id, p,
|
room_id, event_id, p,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
room_version = yield self.store.get_room_version(room_id)
|
||||||
|
|
||||||
with logcontext.nested_logging_context(p):
|
with logcontext.nested_logging_context(p):
|
||||||
# note that if any of the missing prevs share missing state or
|
# note that if any of the missing prevs share missing state or
|
||||||
# auth events, the requests to fetch those events are deduped
|
# auth events, the requests to fetch those events are deduped
|
||||||
|
@ -351,7 +354,7 @@ class FederationHandler(BaseHandler):
|
||||||
# we want the state *after* p; get_state_for_room returns the
|
# we want the state *after* p; get_state_for_room returns the
|
||||||
# state *before* p.
|
# state *before* p.
|
||||||
remote_event = yield self.federation_client.get_pdu(
|
remote_event = yield self.federation_client.get_pdu(
|
||||||
[origin], p, outlier=True,
|
[origin], p, room_version, outlier=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
if remote_event is None:
|
if remote_event is None:
|
||||||
|
@ -375,7 +378,6 @@ class FederationHandler(BaseHandler):
|
||||||
for x in remote_state:
|
for x in remote_state:
|
||||||
event_map[x.event_id] = x
|
event_map[x.event_id] = x
|
||||||
|
|
||||||
room_version = yield self.store.get_room_version(room_id)
|
|
||||||
state_map = yield resolve_events_with_store(
|
state_map = yield resolve_events_with_store(
|
||||||
room_version, state_maps, event_map,
|
room_version, state_maps, event_map,
|
||||||
state_res_store=StateResolutionStore(self.store),
|
state_res_store=StateResolutionStore(self.store),
|
||||||
|
@ -651,6 +653,8 @@ class FederationHandler(BaseHandler):
|
||||||
if dest == self.server_name:
|
if dest == self.server_name:
|
||||||
raise SynapseError(400, "Can't backfill from self.")
|
raise SynapseError(400, "Can't backfill from self.")
|
||||||
|
|
||||||
|
room_version = yield self.store.get_room_version(room_id)
|
||||||
|
|
||||||
events = yield self.federation_client.backfill(
|
events = yield self.federation_client.backfill(
|
||||||
dest,
|
dest,
|
||||||
room_id,
|
room_id,
|
||||||
|
@ -744,6 +748,7 @@ class FederationHandler(BaseHandler):
|
||||||
self.federation_client.get_pdu,
|
self.federation_client.get_pdu,
|
||||||
[dest],
|
[dest],
|
||||||
event_id,
|
event_id,
|
||||||
|
room_version=room_version,
|
||||||
outlier=True,
|
outlier=True,
|
||||||
timeout=10000,
|
timeout=10000,
|
||||||
)
|
)
|
||||||
|
@ -1633,6 +1638,13 @@ class FederationHandler(BaseHandler):
|
||||||
create_event = e
|
create_event = e
|
||||||
break
|
break
|
||||||
|
|
||||||
|
if create_event is None:
|
||||||
|
# If the state doesn't have a create event then the room is
|
||||||
|
# invalid, and it would fail auth checks anyway.
|
||||||
|
raise SynapseError(400, "No create event in state")
|
||||||
|
|
||||||
|
room_version = create_event.content.get("room_version", RoomVersions.V1)
|
||||||
|
|
||||||
missing_auth_events = set()
|
missing_auth_events = set()
|
||||||
for e in itertools.chain(auth_events, state, [event]):
|
for e in itertools.chain(auth_events, state, [event]):
|
||||||
for e_id in e.auth_event_ids():
|
for e_id in e.auth_event_ids():
|
||||||
|
@ -1643,6 +1655,7 @@ class FederationHandler(BaseHandler):
|
||||||
m_ev = yield self.federation_client.get_pdu(
|
m_ev = yield self.federation_client.get_pdu(
|
||||||
[origin],
|
[origin],
|
||||||
e_id,
|
e_id,
|
||||||
|
room_version=room_version,
|
||||||
outlier=True,
|
outlier=True,
|
||||||
timeout=10000,
|
timeout=10000,
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue