Implement and use new batched get missing pdu
This commit is contained in:
parent
0ac2a79faa
commit
db215b7e00
|
@ -439,6 +439,25 @@ class FederationClient(FederationBase):
|
|||
|
||||
defer.returnValue(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_missing_events(self, destination, room_id, earliest_events,
|
||||
latest_events, limit, min_depth):
|
||||
content = yield self.transport_layer.get_missing_events(
|
||||
destination, room_id, earliest_events, latest_events, limit,
|
||||
min_depth,
|
||||
)
|
||||
|
||||
events = [
|
||||
self.event_from_pdu_json(e)
|
||||
for e in content.get("events", [])
|
||||
]
|
||||
|
||||
signed_events = yield self._check_sigs_and_hash_and_fetch(
|
||||
destination, events, outlier=True
|
||||
)
|
||||
|
||||
defer.returnValue(signed_events)
|
||||
|
||||
def event_from_pdu_json(self, pdu_json, outlier=False):
|
||||
event = FrozenEvent(
|
||||
pdu_json
|
||||
|
|
|
@ -142,7 +142,15 @@ class FederationServer(FederationBase):
|
|||
if r[0]:
|
||||
ret.append({})
|
||||
else:
|
||||
logger.exception(r[1])
|
||||
failure = r[1]
|
||||
logger.error(
|
||||
"Failed to handle PDU",
|
||||
exc_info=(
|
||||
failure.type,
|
||||
failure.value,
|
||||
failure.getTracebackObject()
|
||||
)
|
||||
)
|
||||
ret.append({"error": str(r[1].value)})
|
||||
|
||||
logger.debug("Returning: %s", str(ret))
|
||||
|
@ -306,75 +314,17 @@ class FederationServer(FederationBase):
|
|||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_missing_events(self, origin, room_id, earliest_events,
|
||||
@log_function
|
||||
def on_get_missing_events(self, origin, room_id, earliest_events,
|
||||
latest_events, limit, min_depth):
|
||||
limit = max(limit, 50)
|
||||
min_depth = max(min_depth, 0)
|
||||
|
||||
missing_events = yield self.store.get_missing_events(
|
||||
room_id=room_id,
|
||||
earliest_events=earliest_events,
|
||||
latest_events=latest_events,
|
||||
limit=limit,
|
||||
min_depth=min_depth,
|
||||
missing_events = yield self.handler.on_get_missing_events(
|
||||
origin, room_id, earliest_events, latest_events, limit, min_depth
|
||||
)
|
||||
|
||||
known_ids = {e.event_id for e in missing_events} | {earliest_events}
|
||||
|
||||
back_edges = {
|
||||
e for e in missing_events
|
||||
if {i for i, h in e.prev_events.items()} <= known_ids
|
||||
}
|
||||
|
||||
decoded_auth_events = set()
|
||||
state = {}
|
||||
auth_events = set()
|
||||
auth_and_state = {}
|
||||
for event in back_edges:
|
||||
state_pdus = yield self.handler.get_state_for_pdu(
|
||||
origin, room_id, event.event_id,
|
||||
do_auth=False,
|
||||
)
|
||||
|
||||
state[event.event_id] = [s.event_id for s in state_pdus]
|
||||
|
||||
auth_and_state.update({
|
||||
s.event_id: s for s in state_pdus
|
||||
})
|
||||
|
||||
state_ids = {pdu.event_id for pdu in state_pdus}
|
||||
prev_ids = {i for i, h in event.prev_events.items()}
|
||||
partial_auth_chain = yield self.store.get_auth_chain(
|
||||
state_ids | prev_ids, have_ids=decoded_auth_events.keys()
|
||||
)
|
||||
|
||||
for p in partial_auth_chain:
|
||||
p.signatures.update(
|
||||
compute_event_signature(
|
||||
p,
|
||||
self.hs.hostname,
|
||||
self.hs.config.signing_key[0]
|
||||
)
|
||||
)
|
||||
|
||||
auth_events.update(
|
||||
a.event_id for a in partial_auth_chain
|
||||
)
|
||||
|
||||
auth_and_state.update({
|
||||
a.event_id: a for a in partial_auth_chain
|
||||
})
|
||||
|
||||
time_now = self._clock.time_msec()
|
||||
|
||||
defer.returnValue({
|
||||
"events": [ev.get_pdu_json(time_now) for ev in missing_events],
|
||||
"state_for_events": state,
|
||||
"auth_events": auth_events,
|
||||
"event_map": {
|
||||
k: ev.get_pdu_json(time_now)
|
||||
for k, ev in auth_and_state.items()
|
||||
},
|
||||
})
|
||||
|
||||
@log_function
|
||||
|
@ -403,7 +353,7 @@ class FederationServer(FederationBase):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def _handle_new_pdu(self, origin, pdu, max_recursion=10):
|
||||
def _handle_new_pdu(self, origin, pdu, get_missing=True):
|
||||
# We reprocess pdus when we have seen them only as outliers
|
||||
existing = yield self._get_persisted_pdu(
|
||||
origin, pdu.event_id, do_auth=False
|
||||
|
@ -455,48 +405,50 @@ class FederationServer(FederationBase):
|
|||
pdu.room_id, min_depth
|
||||
)
|
||||
|
||||
prevs = {e_id for e_id, _ in pdu.prev_events}
|
||||
seen = set(have_seen.keys())
|
||||
|
||||
if min_depth and pdu.depth < min_depth:
|
||||
# This is so that we don't notify the user about this
|
||||
# message, to work around the fact that some events will
|
||||
# reference really really old events we really don't want to
|
||||
# send to the clients.
|
||||
pdu.internal_metadata.outlier = True
|
||||
elif min_depth and pdu.depth > min_depth and max_recursion > 0:
|
||||
for event_id, hashes in pdu.prev_events:
|
||||
if event_id not in have_seen:
|
||||
logger.debug(
|
||||
"_handle_new_pdu requesting pdu %s",
|
||||
event_id
|
||||
elif min_depth and pdu.depth > min_depth:
|
||||
if get_missing and prevs - seen:
|
||||
latest_tuples = yield self.store.get_latest_events_in_room(
|
||||
pdu.room_id
|
||||
)
|
||||
|
||||
try:
|
||||
new_pdu = yield self.federation_client.get_pdu(
|
||||
[origin, pdu.origin],
|
||||
event_id=event_id,
|
||||
# We add the prev events that we have seen to the latest
|
||||
# list to ensure the remote server doesn't give them to us
|
||||
latest = set(e_id for e_id, _, _ in latest_tuples)
|
||||
latest |= seen
|
||||
|
||||
missing_events = yield self.get_missing_events(
|
||||
origin,
|
||||
pdu.room_id,
|
||||
earliest_events=list(latest),
|
||||
latest_events=[pdu.event_id],
|
||||
limit=10,
|
||||
min_depth=min_depth,
|
||||
)
|
||||
|
||||
if new_pdu:
|
||||
for e in missing_events:
|
||||
yield self._handle_new_pdu(
|
||||
origin,
|
||||
new_pdu,
|
||||
max_recursion=max_recursion-1
|
||||
e,
|
||||
get_missing=False
|
||||
)
|
||||
|
||||
have_seen = yield self.store.have_events(
|
||||
[ev for ev, _ in pdu.prev_events]
|
||||
)
|
||||
|
||||
logger.debug("Processed pdu %s", event_id)
|
||||
else:
|
||||
logger.warn("Failed to get PDU %s", event_id)
|
||||
fetch_state = True
|
||||
except:
|
||||
# TODO(erikj): Do some more intelligent retries.
|
||||
logger.exception("Failed to get PDU")
|
||||
fetch_state = True
|
||||
else:
|
||||
prevs = {e_id for e_id, _ in pdu.prev_events}
|
||||
seen = set(have_seen.keys())
|
||||
if prevs - seen:
|
||||
fetch_state = True
|
||||
else:
|
||||
fetch_state = True
|
||||
|
||||
if fetch_state:
|
||||
# We need to get the state at this event, since we haven't
|
||||
|
|
|
@ -287,7 +287,7 @@ class TransactionQueue(object):
|
|||
code = 200
|
||||
|
||||
if response:
|
||||
for e_id, r in getattr(response, "pdus", {}).items():
|
||||
for e_id, r in response.get("pdus", {}).items():
|
||||
if "error" in r:
|
||||
logger.warn(
|
||||
"Transaction returned error for %s: %s",
|
||||
|
|
|
@ -219,3 +219,22 @@ class TransportLayerClient(object):
|
|||
)
|
||||
|
||||
defer.returnValue(content)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def get_missing_events(self, destination, room_id, earliest_events,
|
||||
latest_events, limit, min_depth):
|
||||
path = PREFIX + "/get_missing_events/%s" % (room_id,)
|
||||
|
||||
content = yield self.client.post_json(
|
||||
destination=destination,
|
||||
path=path,
|
||||
data={
|
||||
"limit": int(limit),
|
||||
"min_depth": int(min_depth),
|
||||
"earliest_events": earliest_events,
|
||||
"latest_events": latest_events,
|
||||
}
|
||||
)
|
||||
|
||||
defer.returnValue(content)
|
||||
|
|
|
@ -234,6 +234,7 @@ class TransportLayerServer(object):
|
|||
)
|
||||
)
|
||||
)
|
||||
|
||||
self.server.register_path(
|
||||
"POST",
|
||||
re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"),
|
||||
|
@ -245,6 +246,17 @@ class TransportLayerServer(object):
|
|||
)
|
||||
)
|
||||
|
||||
self.server.register_path(
|
||||
"POST",
|
||||
re.compile("^" + PREFIX + "/get_missing_events/([^/]*)/?$"),
|
||||
self._with_authentication(
|
||||
lambda origin, content, query, room_id:
|
||||
self._get_missing_events(
|
||||
origin, content, room_id,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def _on_send_request(self, origin, content, query, transaction_id):
|
||||
|
@ -344,3 +356,22 @@ class TransportLayerServer(object):
|
|||
)
|
||||
|
||||
defer.returnValue((200, new_content))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def _get_missing_events(self, origin, content, room_id):
|
||||
limit = int(content.get("limit", 10))
|
||||
min_depth = int(content.get("min_depth", 0))
|
||||
earliest_events = content.get("earliest_events", [])
|
||||
latest_events = content.get("latest_events", [])
|
||||
|
||||
content = yield self.request_handler.on_get_missing_events(
|
||||
origin,
|
||||
room_id=room_id,
|
||||
earliest_events=earliest_events,
|
||||
latest_events=latest_events,
|
||||
min_depth=min_depth,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
defer.returnValue((200, content))
|
||||
|
|
|
@ -789,6 +789,29 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
defer.returnValue(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_get_missing_events(self, origin, room_id, earliest_events,
|
||||
latest_events, limit, min_depth):
|
||||
in_room = yield self.auth.check_host_in_room(
|
||||
room_id,
|
||||
origin
|
||||
)
|
||||
if not in_room:
|
||||
raise AuthError(403, "Host not in room.")
|
||||
|
||||
limit = min(limit, 20)
|
||||
min_depth = max(min_depth, 0)
|
||||
|
||||
missing_events = yield self.store.get_missing_events(
|
||||
room_id=room_id,
|
||||
earliest_events=earliest_events,
|
||||
latest_events=latest_events,
|
||||
limit=limit,
|
||||
min_depth=min_depth,
|
||||
)
|
||||
|
||||
defer.returnValue(missing_events)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def do_auth(self, origin, event, context, auth_events):
|
||||
|
|
Loading…
Reference in New Issue