convert to async: FederationHandler._get_state_for_room
... and _get_events_from_store_or_dest
This commit is contained in:
parent
e77237b935
commit
4db394a4b3
|
@ -19,7 +19,7 @@
|
|||
|
||||
import itertools
|
||||
import logging
|
||||
from typing import Dict, Iterable, Optional, Sequence, Tuple
|
||||
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
|
||||
|
||||
import six
|
||||
from six import iteritems, itervalues
|
||||
|
@ -579,30 +579,30 @@ class FederationHandler(BaseHandler):
|
|||
else:
|
||||
raise
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def _get_state_for_room(self, destination, room_id, event_id):
|
||||
async def _get_state_for_room(
|
||||
self, destination: str, room_id: str, event_id: str
|
||||
) -> Tuple[List[EventBase], List[EventBase]]:
|
||||
"""Requests all of the room state at a given event from a remote homeserver.
|
||||
|
||||
Args:
|
||||
destination (str): The remote homeserver to query for the state.
|
||||
room_id (str): The id of the room we're interested in.
|
||||
event_id (str): The id of the event we want the state at.
|
||||
destination:: The remote homeserver to query for the state.
|
||||
room_id: The id of the room we're interested in.
|
||||
event_id: The id of the event we want the state at.
|
||||
|
||||
Returns:
|
||||
Deferred[Tuple[List[EventBase], List[EventBase]]]:
|
||||
A list of events in the state, and a list of events in the auth chain
|
||||
for the given event.
|
||||
"""
|
||||
(
|
||||
state_event_ids,
|
||||
auth_event_ids,
|
||||
) = yield self.federation_client.get_room_state_ids(
|
||||
) = await self.federation_client.get_room_state_ids(
|
||||
destination, room_id, event_id=event_id
|
||||
)
|
||||
|
||||
desired_events = set(state_event_ids + auth_event_ids)
|
||||
event_map = yield self._get_events_from_store_or_dest(
|
||||
event_map = await self._get_events_from_store_or_dest(
|
||||
destination, room_id, desired_events
|
||||
)
|
||||
|
||||
|
@ -621,20 +621,20 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
return pdus, auth_chain
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_events_from_store_or_dest(self, destination, room_id, event_ids):
|
||||
async def _get_events_from_store_or_dest(
|
||||
self, destination: str, room_id: str, event_ids: Iterable[str]
|
||||
) -> Dict[str, EventBase]:
|
||||
"""Fetch events from a remote destination, checking if we already have them.
|
||||
|
||||
Args:
|
||||
destination (str)
|
||||
room_id (str)
|
||||
event_ids (Iterable[str])
|
||||
destination
|
||||
room_id
|
||||
event_ids
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str, EventBase]]: A deferred resolving to a map
|
||||
from event_id to event
|
||||
map from event_id to event
|
||||
"""
|
||||
fetched_events = yield self.store.get_events(event_ids, allow_rejected=True)
|
||||
fetched_events = await self.store.get_events(event_ids, allow_rejected=True)
|
||||
|
||||
missing_events = set(event_ids) - fetched_events.keys()
|
||||
|
||||
|
@ -647,7 +647,7 @@ class FederationHandler(BaseHandler):
|
|||
event_ids,
|
||||
)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
room_version = await self.store.get_room_version(room_id)
|
||||
|
||||
# XXX 20 requests at once? really?
|
||||
for batch in batch_iter(missing_events, 20):
|
||||
|
@ -661,7 +661,7 @@ class FederationHandler(BaseHandler):
|
|||
for e_id in batch
|
||||
]
|
||||
|
||||
res = yield make_deferred_yieldable(
|
||||
res = await make_deferred_yieldable(
|
||||
defer.DeferredList(deferreds, consumeErrors=True)
|
||||
)
|
||||
for success, result in res:
|
||||
|
|
Loading…
Reference in New Issue