Port replication http server endpoints to async/await

This commit is contained in:
Erik Johnston 2019-10-29 13:00:51 +00:00
parent 561133c3c5
commit e577a4b2ad
6 changed files with 26 additions and 44 deletions

View File

@ -110,14 +110,14 @@ class ReplicationEndpoint(object):
return {}
@abc.abstractmethod
def _handle_request(self, request, **kwargs):
async def _handle_request(self, request, **kwargs):
"""Handle incoming request.
This is called with the request object and PATH_ARGS.
Returns:
Deferred[dict]: A JSON serialisable dict to be used as response
body of request.
tuple[int, dict]: HTTP status code and a JSON serialisable dict
to be used as response body of request.
"""
pass

View File

@ -82,8 +82,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
return payload
@defer.inlineCallbacks
def _handle_request(self, request):
async def _handle_request(self, request):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)
@ -101,15 +100,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
EventType = event_type_from_format_version(format_ver)
event = EventType(event_dict, internal_metadata, rejected_reason)
context = yield EventContext.deserialize(
self.store, event_payload["context"]
)
context = EventContext.deserialize(self.store, event_payload["context"])
event_and_contexts.append((event, context))
logger.info("Got %d events from federation", len(event_and_contexts))
yield self.federation_handler.persist_events_and_notify(
await self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled
)
@ -144,8 +141,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
def _serialize_payload(edu_type, origin, content):
return {"origin": origin, "content": content}
@defer.inlineCallbacks
def _handle_request(self, request, edu_type):
async def _handle_request(self, request, edu_type):
with Measure(self.clock, "repl_fed_send_edu_parse"):
content = parse_json_object_from_request(request)
@ -154,7 +150,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
logger.info("Got %r edu from %s", edu_type, origin)
result = yield self.registry.on_edu(edu_type, origin, edu_content)
result = await self.registry.on_edu(edu_type, origin, edu_content)
return 200, result
@ -193,8 +189,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
"""
return {"args": args}
@defer.inlineCallbacks
def _handle_request(self, request, query_type):
async def _handle_request(self, request, query_type):
with Measure(self.clock, "repl_fed_query_parse"):
content = parse_json_object_from_request(request)
@ -202,7 +197,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
logger.info("Got %r query", query_type)
result = yield self.registry.on_query(query_type, args)
result = await self.registry.on_query(query_type, args)
return 200, result
@ -234,9 +229,8 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
"""
return {}
@defer.inlineCallbacks
def _handle_request(self, request, room_id):
yield self.store.clean_room_for_join(room_id)
async def _handle_request(self, request, room_id):
await self.store.clean_room_for_join(room_id)
return 200, {}

View File

@ -15,8 +15,6 @@
import logging
from twisted.internet import defer
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@ -52,15 +50,14 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
"is_guest": is_guest,
}
@defer.inlineCallbacks
def _handle_request(self, request, user_id):
async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)
device_id = content["device_id"]
initial_display_name = content["initial_display_name"]
is_guest = content["is_guest"]
device_id, access_token = yield self.registration_handler.register_device(
device_id, access_token = await self.registration_handler.register_device(
user_id, device_id, initial_display_name, is_guest
)

View File

@ -15,8 +15,6 @@
import logging
from twisted.internet import defer
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID
@ -65,8 +63,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
"content": content,
}
@defer.inlineCallbacks
def _handle_request(self, request, room_id, user_id):
async def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
@ -79,7 +76,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
logger.info("remote_join: %s into room: %s", user_id, room_id)
yield self.federation_handler.do_invite_join(
await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user_id, event_content
)
@ -123,8 +120,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
"remote_room_hosts": remote_room_hosts,
}
@defer.inlineCallbacks
def _handle_request(self, request, room_id, user_id):
async def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
@ -137,7 +133,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id)
try:
event = yield self.federation_handler.do_remotely_reject_invite(
event = await self.federation_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, user_id
)
ret = event.get_pdu_json()
@ -150,7 +146,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
#
logger.warn("Failed to reject invite: %s", e)
yield self.store.locally_reject_invite(user_id, room_id)
await self.store.locally_reject_invite(user_id, room_id)
ret = {}
return 200, ret

View File

@ -15,8 +15,6 @@
import logging
from twisted.internet import defer
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@ -74,11 +72,10 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
"address": address,
}
@defer.inlineCallbacks
def _handle_request(self, request, user_id):
async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)
yield self.registration_handler.register_with_store(
await self.registration_handler.register_with_store(
user_id=user_id,
password_hash=content["password_hash"],
was_guest=content["was_guest"],
@ -117,14 +114,13 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
"""
return {"auth_result": auth_result, "access_token": access_token}
@defer.inlineCallbacks
def _handle_request(self, request, user_id):
async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)
auth_result = content["auth_result"]
access_token = content["access_token"]
yield self.registration_handler.post_registration_actions(
await self.registration_handler.post_registration_actions(
user_id=user_id, auth_result=auth_result, access_token=access_token
)

View File

@ -87,8 +87,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
return payload
@defer.inlineCallbacks
def _handle_request(self, request, event_id):
async def _handle_request(self, request, event_id):
with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request)
@ -101,7 +100,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
event = EventType(event_dict, internal_metadata, rejected_reason)
requester = Requester.deserialize(self.store, content["requester"])
context = yield EventContext.deserialize(self.store, content["context"])
context = EventContext.deserialize(self.store, content["context"])
ratelimit = content["ratelimit"]
extra_users = [UserID.from_string(u) for u in content["extra_users"]]
@ -113,7 +112,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
)
yield self.event_creation_handler.persist_and_notify_client_event(
await self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)