Cache federation state responses
This commit is contained in:
parent
e9e3eaa67d
commit
248e6770ca
|
@ -21,10 +21,11 @@ from .units import Transaction, Edu
|
||||||
|
|
||||||
from synapse.util.async import Linearizer
|
from synapse.util.async import Linearizer
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
from synapse.events import FrozenEvent
|
from synapse.events import FrozenEvent
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
|
||||||
from synapse.api.errors import FederationError, SynapseError
|
from synapse.api.errors import AuthError, FederationError, SynapseError
|
||||||
|
|
||||||
from synapse.crypto.event_signing import compute_event_signature
|
from synapse.crypto.event_signing import compute_event_signature
|
||||||
|
|
||||||
|
@ -48,9 +49,15 @@ class FederationServer(FederationBase):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(FederationServer, self).__init__(hs)
|
super(FederationServer, self).__init__(hs)
|
||||||
|
|
||||||
|
self.auth = hs.get_auth()
|
||||||
|
|
||||||
self._room_pdu_linearizer = Linearizer()
|
self._room_pdu_linearizer = Linearizer()
|
||||||
self._server_linearizer = Linearizer()
|
self._server_linearizer = Linearizer()
|
||||||
|
|
||||||
|
# We cache responses to state queries, as they take a while and often
|
||||||
|
# come in waves.
|
||||||
|
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
|
||||||
|
|
||||||
def set_handler(self, handler):
|
def set_handler(self, handler):
|
||||||
"""Sets the handler that the replication layer will use to communicate
|
"""Sets the handler that the replication layer will use to communicate
|
||||||
receipt of new PDUs from other home servers. The required methods are
|
receipt of new PDUs from other home servers. The required methods are
|
||||||
|
@ -188,28 +195,45 @@ class FederationServer(FederationBase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def on_context_state_request(self, origin, room_id, event_id):
|
def on_context_state_request(self, origin, room_id, event_id):
|
||||||
with (yield self._server_linearizer.queue((origin, room_id))):
|
if not event_id:
|
||||||
if event_id:
|
raise NotImplementedError("Specify an event")
|
||||||
pdus = yield self.handler.get_state_for_pdu(
|
|
||||||
origin, room_id, event_id,
|
|
||||||
)
|
|
||||||
auth_chain = yield self.store.get_auth_chain(
|
|
||||||
[pdu.event_id for pdu in pdus]
|
|
||||||
)
|
|
||||||
|
|
||||||
for event in auth_chain:
|
in_room = yield self.auth.check_host_in_room(room_id, origin)
|
||||||
# We sign these again because there was a bug where we
|
if not in_room:
|
||||||
# incorrectly signed things the first time round
|
raise AuthError(403, "Host not in room.")
|
||||||
if self.hs.is_mine_id(event.event_id):
|
|
||||||
event.signatures.update(
|
result = self._state_resp_cache.get((room_id, event_id))
|
||||||
compute_event_signature(
|
if not result:
|
||||||
event,
|
with (yield self._server_linearizer.queue((origin, room_id))):
|
||||||
self.hs.hostname,
|
resp = yield self.response_cache.set(
|
||||||
self.hs.config.signing_key[0]
|
(room_id, event_id),
|
||||||
)
|
self._on_context_state_request_compute(room_id, event_id)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError("Specify an event")
|
resp = yield result
|
||||||
|
|
||||||
|
defer.returnValue((200, resp))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _on_context_state_request_compute(self, room_id, event_id):
|
||||||
|
pdus = yield self.handler.get_state_for_pdu(
|
||||||
|
room_id, event_id,
|
||||||
|
)
|
||||||
|
auth_chain = yield self.store.get_auth_chain(
|
||||||
|
[pdu.event_id for pdu in pdus]
|
||||||
|
)
|
||||||
|
|
||||||
|
for event in auth_chain:
|
||||||
|
# We sign these again because there was a bug where we
|
||||||
|
# incorrectly signed things the first time round
|
||||||
|
if self.hs.is_mine_id(event.event_id):
|
||||||
|
event.signatures.update(
|
||||||
|
compute_event_signature(
|
||||||
|
event,
|
||||||
|
self.hs.hostname,
|
||||||
|
self.hs.config.signing_key[0]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
defer.returnValue((200, {
|
defer.returnValue((200, {
|
||||||
"pdus": [pdu.get_pdu_json() for pdu in pdus],
|
"pdus": [pdu.get_pdu_json() for pdu in pdus],
|
||||||
|
|
|
@ -991,14 +991,9 @@ class FederationHandler(BaseHandler):
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True):
|
def get_state_for_pdu(self, room_id, event_id):
|
||||||
yield run_on_reactor()
|
yield run_on_reactor()
|
||||||
|
|
||||||
if do_auth:
|
|
||||||
in_room = yield self.auth.check_host_in_room(room_id, origin)
|
|
||||||
if not in_room:
|
|
||||||
raise AuthError(403, "Host not in room.")
|
|
||||||
|
|
||||||
state_groups = yield self.store.get_state_groups(
|
state_groups = yield self.store.get_state_groups(
|
||||||
room_id, [event_id]
|
room_id, [event_id]
|
||||||
)
|
)
|
||||||
|
|
|
@ -345,8 +345,8 @@ class RoomCreationHandler(BaseHandler):
|
||||||
class RoomListHandler(BaseHandler):
|
class RoomListHandler(BaseHandler):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(RoomListHandler, self).__init__(hs)
|
super(RoomListHandler, self).__init__(hs)
|
||||||
self.response_cache = ResponseCache()
|
self.response_cache = ResponseCache(hs)
|
||||||
self.remote_list_request_cache = ResponseCache()
|
self.remote_list_request_cache = ResponseCache(hs)
|
||||||
self.remote_list_cache = {}
|
self.remote_list_cache = {}
|
||||||
self.fetch_looping_call = hs.get_clock().looping_call(
|
self.fetch_looping_call = hs.get_clock().looping_call(
|
||||||
self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
|
self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
|
||||||
|
|
|
@ -138,7 +138,7 @@ class SyncHandler(object):
|
||||||
self.presence_handler = hs.get_presence_handler()
|
self.presence_handler = hs.get_presence_handler()
|
||||||
self.event_sources = hs.get_event_sources()
|
self.event_sources = hs.get_event_sources()
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.response_cache = ResponseCache()
|
self.response_cache = ResponseCache(hs)
|
||||||
|
|
||||||
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
|
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
|
||||||
full_state=False):
|
full_state=False):
|
||||||
|
|
|
@ -24,9 +24,12 @@ class ResponseCache(object):
|
||||||
used rather than trying to compute a new response.
|
used rather than trying to compute a new response.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, hs, timeout_ms=0):
|
||||||
self.pending_result_cache = {} # Requests that haven't finished yet.
|
self.pending_result_cache = {} # Requests that haven't finished yet.
|
||||||
|
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
self.timeout_sec = timeout_ms / 1000.
|
||||||
|
|
||||||
def get(self, key):
|
def get(self, key):
|
||||||
result = self.pending_result_cache.get(key)
|
result = self.pending_result_cache.get(key)
|
||||||
if result is not None:
|
if result is not None:
|
||||||
|
@ -39,7 +42,13 @@ class ResponseCache(object):
|
||||||
self.pending_result_cache[key] = result
|
self.pending_result_cache[key] = result
|
||||||
|
|
||||||
def remove(r):
|
def remove(r):
|
||||||
self.pending_result_cache.pop(key, None)
|
if self.timeout_sec:
|
||||||
|
self.clock.call_later(
|
||||||
|
self.timeout_sec,
|
||||||
|
self.pending_result_cache.pop, key, None,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.pending_result_cache.pop(key, None)
|
||||||
return r
|
return r
|
||||||
|
|
||||||
result.addBoth(remove)
|
result.addBoth(remove)
|
||||||
|
|
Loading…
Reference in New Issue