make FederationClient._try_destination_list async

This commit is contained in:
Richard van der Hoff 2020-02-03 20:59:10 +00:00
parent a46fabf17b
commit 1330c311b7
1 changed files with 26 additions and 10 deletions

View File

@ -17,7 +17,17 @@
import copy import copy
import itertools import itertools
import logging import logging
from typing import Any, Dict, Iterable, List, Optional, Tuple from typing import (
Any,
Awaitable,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
TypeVar,
)
from prometheus_client import Counter from prometheus_client import Counter
@ -53,6 +63,8 @@ sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["t
PDU_RETRY_TIME_MS = 1 * 60 * 1000 PDU_RETRY_TIME_MS = 1 * 60 * 1000
T = TypeVar("T")
class InvalidResponseError(RuntimeError): class InvalidResponseError(RuntimeError):
"""Helper for _try_destination_list: indicates that the server returned a response """Helper for _try_destination_list: indicates that the server returned a response
@ -349,17 +361,21 @@ class FederationClient(FederationBase):
return signed_auth return signed_auth
@defer.inlineCallbacks async def _try_destination_list(
def _try_destination_list(self, description, destinations, callback): self,
description: str,
destinations: Iterable[str],
callback: Callable[[str], Awaitable[T]],
) -> T:
"""Try an operation on a series of servers, until it succeeds """Try an operation on a series of servers, until it succeeds
Args: Args:
description (unicode): description of the operation we're doing, for logging description: description of the operation we're doing, for logging
destinations (Iterable[unicode]): list of server_names to try destinations: list of server_names to try
callback (callable): Function to run for each server. Passed a single callback: Function to run for each server. Passed a single
argument: the server_name to try. May return a deferred. argument: the server_name to try.
If the callback raises a CodeMessageException with a 300/400 code, If the callback raises a CodeMessageException with a 300/400 code,
attempts to perform the operation stop immediately and the exception is attempts to perform the operation stop immediately and the exception is
@ -370,7 +386,7 @@ class FederationClient(FederationBase):
suppressed if the exception is an InvalidResponseError. suppressed if the exception is an InvalidResponseError.
Returns: Returns:
The [Deferred] result of callback, if it succeeds The result of callback, if it succeeds
Raises: Raises:
SynapseError if the chosen remote server returns a 300/400 code, or SynapseError if the chosen remote server returns a 300/400 code, or
@ -381,7 +397,7 @@ class FederationClient(FederationBase):
continue continue
try: try:
res = yield callback(destination) res = await callback(destination)
return res return res
except InvalidResponseError as e: except InvalidResponseError as e:
logger.warning("Failed to %s via %s: %s", description, destination, e) logger.warning("Failed to %s via %s: %s", description, destination, e)
@ -400,7 +416,7 @@ class FederationClient(FederationBase):
) )
except Exception: except Exception:
logger.warning( logger.warning(
"Failed to %s via %s", description, destination, exc_info=1 "Failed to %s via %s", description, destination, exc_info=True
) )
raise SynapseError(502, "Failed to %s via any server" % (description,)) raise SynapseError(502, "Failed to %s via any server" % (description,))