Merge pull request #879 from matrix-org/erikj/linearize_fed_server
Linearize some federation endpoints based on (origin, room_id)
This commit is contained in:
commit
d8ec81cc31
|
@ -49,6 +49,7 @@ class FederationServer(FederationBase):
|
||||||
super(FederationServer, self).__init__(hs)
|
super(FederationServer, self).__init__(hs)
|
||||||
|
|
||||||
self._room_pdu_linearizer = Linearizer()
|
self._room_pdu_linearizer = Linearizer()
|
||||||
|
self._server_linearizer = Linearizer()
|
||||||
|
|
||||||
def set_handler(self, handler):
|
def set_handler(self, handler):
|
||||||
"""Sets the handler that the replication layer will use to communicate
|
"""Sets the handler that the replication layer will use to communicate
|
||||||
|
@ -89,11 +90,14 @@ class FederationServer(FederationBase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def on_backfill_request(self, origin, room_id, versions, limit):
|
def on_backfill_request(self, origin, room_id, versions, limit):
|
||||||
pdus = yield self.handler.on_backfill_request(
|
with (yield self._server_linearizer.queue((origin, room_id))):
|
||||||
origin, room_id, versions, limit
|
pdus = yield self.handler.on_backfill_request(
|
||||||
)
|
origin, room_id, versions, limit
|
||||||
|
)
|
||||||
|
|
||||||
defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
|
res = self._transaction_from_pdus(pdus).get_dict()
|
||||||
|
|
||||||
|
defer.returnValue((200, res))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
|
@ -184,27 +188,28 @@ class FederationServer(FederationBase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def on_context_state_request(self, origin, room_id, event_id):
|
def on_context_state_request(self, origin, room_id, event_id):
|
||||||
if event_id:
|
with (yield self._server_linearizer.queue((origin, room_id))):
|
||||||
pdus = yield self.handler.get_state_for_pdu(
|
if event_id:
|
||||||
origin, room_id, event_id,
|
pdus = yield self.handler.get_state_for_pdu(
|
||||||
)
|
origin, room_id, event_id,
|
||||||
auth_chain = yield self.store.get_auth_chain(
|
)
|
||||||
[pdu.event_id for pdu in pdus]
|
auth_chain = yield self.store.get_auth_chain(
|
||||||
)
|
[pdu.event_id for pdu in pdus]
|
||||||
|
)
|
||||||
|
|
||||||
for event in auth_chain:
|
for event in auth_chain:
|
||||||
# We sign these again because there was a bug where we
|
# We sign these again because there was a bug where we
|
||||||
# incorrectly signed things the first time round
|
# incorrectly signed things the first time round
|
||||||
if self.hs.is_mine_id(event.event_id):
|
if self.hs.is_mine_id(event.event_id):
|
||||||
event.signatures.update(
|
event.signatures.update(
|
||||||
compute_event_signature(
|
compute_event_signature(
|
||||||
event,
|
event,
|
||||||
self.hs.hostname,
|
self.hs.hostname,
|
||||||
self.hs.config.signing_key[0]
|
self.hs.config.signing_key[0]
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
else:
|
||||||
else:
|
raise NotImplementedError("Specify an event")
|
||||||
raise NotImplementedError("Specify an event")
|
|
||||||
|
|
||||||
defer.returnValue((200, {
|
defer.returnValue((200, {
|
||||||
"pdus": [pdu.get_pdu_json() for pdu in pdus],
|
"pdus": [pdu.get_pdu_json() for pdu in pdus],
|
||||||
|
@ -283,14 +288,16 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_event_auth(self, origin, room_id, event_id):
|
def on_event_auth(self, origin, room_id, event_id):
|
||||||
time_now = self._clock.time_msec()
|
with (yield self._server_linearizer.queue((origin, room_id))):
|
||||||
auth_pdus = yield self.handler.on_event_auth(event_id)
|
time_now = self._clock.time_msec()
|
||||||
defer.returnValue((200, {
|
auth_pdus = yield self.handler.on_event_auth(event_id)
|
||||||
"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
|
res = {
|
||||||
}))
|
"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
|
||||||
|
}
|
||||||
|
defer.returnValue((200, res))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_query_auth_request(self, origin, content, event_id):
|
def on_query_auth_request(self, origin, content, room_id, event_id):
|
||||||
"""
|
"""
|
||||||
Content is a dict with keys::
|
Content is a dict with keys::
|
||||||
auth_chain (list): A list of events that give the auth chain.
|
auth_chain (list): A list of events that give the auth chain.
|
||||||
|
@ -309,32 +316,33 @@ class FederationServer(FederationBase):
|
||||||
Returns:
|
Returns:
|
||||||
Deferred: Results in `dict` with the same format as `content`
|
Deferred: Results in `dict` with the same format as `content`
|
||||||
"""
|
"""
|
||||||
auth_chain = [
|
with (yield self._server_linearizer.queue((origin, room_id))):
|
||||||
self.event_from_pdu_json(e)
|
auth_chain = [
|
||||||
for e in content["auth_chain"]
|
self.event_from_pdu_json(e)
|
||||||
]
|
for e in content["auth_chain"]
|
||||||
|
]
|
||||||
|
|
||||||
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
||||||
origin, auth_chain, outlier=True
|
origin, auth_chain, outlier=True
|
||||||
)
|
)
|
||||||
|
|
||||||
ret = yield self.handler.on_query_auth(
|
ret = yield self.handler.on_query_auth(
|
||||||
origin,
|
origin,
|
||||||
event_id,
|
event_id,
|
||||||
signed_auth,
|
signed_auth,
|
||||||
content.get("rejects", []),
|
content.get("rejects", []),
|
||||||
content.get("missing", []),
|
content.get("missing", []),
|
||||||
)
|
)
|
||||||
|
|
||||||
time_now = self._clock.time_msec()
|
time_now = self._clock.time_msec()
|
||||||
send_content = {
|
send_content = {
|
||||||
"auth_chain": [
|
"auth_chain": [
|
||||||
e.get_pdu_json(time_now)
|
e.get_pdu_json(time_now)
|
||||||
for e in ret["auth_chain"]
|
for e in ret["auth_chain"]
|
||||||
],
|
],
|
||||||
"rejects": ret.get("rejects", []),
|
"rejects": ret.get("rejects", []),
|
||||||
"missing": ret.get("missing", []),
|
"missing": ret.get("missing", []),
|
||||||
}
|
}
|
||||||
|
|
||||||
defer.returnValue(
|
defer.returnValue(
|
||||||
(200, send_content)
|
(200, send_content)
|
||||||
|
@ -386,21 +394,24 @@ class FederationServer(FederationBase):
|
||||||
@log_function
|
@log_function
|
||||||
def on_get_missing_events(self, origin, room_id, earliest_events,
|
def on_get_missing_events(self, origin, room_id, earliest_events,
|
||||||
latest_events, limit, min_depth):
|
latest_events, limit, min_depth):
|
||||||
logger.info(
|
with (yield self._server_linearizer.queue((origin, room_id))):
|
||||||
"on_get_missing_events: earliest_events: %r, latest_events: %r,"
|
logger.info(
|
||||||
" limit: %d, min_depth: %d",
|
"on_get_missing_events: earliest_events: %r, latest_events: %r,"
|
||||||
earliest_events, latest_events, limit, min_depth
|
" limit: %d, min_depth: %d",
|
||||||
)
|
earliest_events, latest_events, limit, min_depth
|
||||||
missing_events = yield self.handler.on_get_missing_events(
|
)
|
||||||
origin, room_id, earliest_events, latest_events, limit, min_depth
|
missing_events = yield self.handler.on_get_missing_events(
|
||||||
)
|
origin, room_id, earliest_events, latest_events, limit, min_depth
|
||||||
|
)
|
||||||
|
|
||||||
if len(missing_events) < 5:
|
if len(missing_events) < 5:
|
||||||
logger.info("Returning %d events: %r", len(missing_events), missing_events)
|
logger.info(
|
||||||
else:
|
"Returning %d events: %r", len(missing_events), missing_events
|
||||||
logger.info("Returning %d events", len(missing_events))
|
)
|
||||||
|
else:
|
||||||
|
logger.info("Returning %d events", len(missing_events))
|
||||||
|
|
||||||
time_now = self._clock.time_msec()
|
time_now = self._clock.time_msec()
|
||||||
|
|
||||||
defer.returnValue({
|
defer.returnValue({
|
||||||
"events": [ev.get_pdu_json(time_now) for ev in missing_events],
|
"events": [ev.get_pdu_json(time_now) for ev in missing_events],
|
||||||
|
|
|
@ -388,7 +388,7 @@ class FederationQueryAuthServlet(BaseFederationServlet):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_POST(self, origin, content, query, context, event_id):
|
def on_POST(self, origin, content, query, context, event_id):
|
||||||
new_content = yield self.handler.on_query_auth_request(
|
new_content = yield self.handler.on_query_auth_request(
|
||||||
origin, content, event_id
|
origin, content, context, event_id
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue((200, new_content))
|
defer.returnValue((200, new_content))
|
||||||
|
|
Loading…
Reference in New Issue