From 6af9cdca24f8505cc30b117fb30bda94ae3a6fbf Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 1 Jun 2020 07:28:43 -0400 Subject: [PATCH] Convert groups local and server to async/await. (#7600) --- changelog.d/7600.misc | 1 + synapse/groups/groups_server.py | 257 ++++++++++++++----------------- synapse/handlers/groups_local.py | 82 +++++----- 3 files changed, 150 insertions(+), 190 deletions(-) create mode 100644 changelog.d/7600.misc diff --git a/changelog.d/7600.misc b/changelog.d/7600.misc new file mode 100644 index 0000000000..3c2affbe6f --- /dev/null +++ b/changelog.d/7600.misc @@ -0,0 +1 @@ +Convert groups handlers to async/await. diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 4acb4fa489..8a9de913b3 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -19,8 +19,6 @@ import logging from six import string_types -from twisted.internet import defer - from synapse.api.errors import Codes, SynapseError from synapse.types import GroupID, RoomID, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute @@ -51,8 +49,7 @@ class GroupsServerWorkerHandler(object): self.transport_client = hs.get_federation_transport_client() self.profile_handler = hs.get_profile_handler() - @defer.inlineCallbacks - def check_group_is_ours( + async def check_group_is_ours( self, group_id, requester_user_id, and_exists=False, and_is_admin=None ): """Check that the group is ours, and optionally if it exists. @@ -68,25 +65,24 @@ class GroupsServerWorkerHandler(object): if not self.is_mine_id(group_id): raise SynapseError(400, "Group not on this server") - group = yield self.store.get_group(group_id) + group = await self.store.get_group(group_id) if and_exists and not group: raise SynapseError(404, "Unknown group") - is_user_in_group = yield self.store.is_user_in_group( + is_user_in_group = await self.store.is_user_in_group( requester_user_id, group_id ) if group and not is_user_in_group and not group["is_public"]: raise SynapseError(404, "Unknown group") if and_is_admin: - is_admin = yield self.store.is_user_admin_in_group(group_id, and_is_admin) + is_admin = await self.store.is_user_admin_in_group(group_id, and_is_admin) if not is_admin: raise SynapseError(403, "User is not admin in group") return group - @defer.inlineCallbacks - def get_group_summary(self, group_id, requester_user_id): + async def get_group_summary(self, group_id, requester_user_id): """Get the summary for a group as seen by requester_user_id. The group summary consists of the profile of the room, and a curated @@ -95,28 +91,28 @@ class GroupsServerWorkerHandler(object): A user/room may appear in multiple roles/categories. """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - is_user_in_group = yield self.store.is_user_in_group( + is_user_in_group = await self.store.is_user_in_group( requester_user_id, group_id ) - profile = yield self.get_group_profile(group_id, requester_user_id) + profile = await self.get_group_profile(group_id, requester_user_id) - users, roles = yield self.store.get_users_for_summary_by_role( + users, roles = await self.store.get_users_for_summary_by_role( group_id, include_private=is_user_in_group ) # TODO: Add profiles to users - rooms, categories = yield self.store.get_rooms_for_summary_by_category( + rooms, categories = await self.store.get_rooms_for_summary_by_category( group_id, include_private=is_user_in_group ) for room_entry in rooms: room_id = room_entry["room_id"] - joined_users = yield self.store.get_users_in_room(room_id) - entry = yield self.room_list_handler.generate_room_entry( + joined_users = await self.store.get_users_in_room(room_id) + entry = await self.room_list_handler.generate_room_entry( room_id, len(joined_users), with_alias=False, allow_private=True ) entry = dict(entry) # so we don't change whats cached @@ -130,7 +126,7 @@ class GroupsServerWorkerHandler(object): user_id = entry["user_id"] if not self.is_mine_id(requester_user_id): - attestation = yield self.store.get_remote_attestation(group_id, user_id) + attestation = await self.store.get_remote_attestation(group_id, user_id) if not attestation: continue @@ -140,12 +136,12 @@ class GroupsServerWorkerHandler(object): group_id, user_id ) - user_profile = yield self.profile_handler.get_profile_from_cache(user_id) + user_profile = await self.profile_handler.get_profile_from_cache(user_id) entry.update(user_profile) users.sort(key=lambda e: e.get("order", 0)) - membership_info = yield self.store.get_users_membership_info_in_group( + membership_info = await self.store.get_users_membership_info_in_group( group_id, requester_user_id ) @@ -164,22 +160,20 @@ class GroupsServerWorkerHandler(object): "user": membership_info, } - @defer.inlineCallbacks - def get_group_categories(self, group_id, requester_user_id): + async def get_group_categories(self, group_id, requester_user_id): """Get all categories in a group (as seen by user) """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - categories = yield self.store.get_group_categories(group_id=group_id) + categories = await self.store.get_group_categories(group_id=group_id) return {"categories": categories} - @defer.inlineCallbacks - def get_group_category(self, group_id, requester_user_id, category_id): + async def get_group_category(self, group_id, requester_user_id, category_id): """Get a specific category in a group (as seen by user) """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - res = yield self.store.get_group_category( + res = await self.store.get_group_category( group_id=group_id, category_id=category_id ) @@ -187,32 +181,29 @@ class GroupsServerWorkerHandler(object): return res - @defer.inlineCallbacks - def get_group_roles(self, group_id, requester_user_id): + async def get_group_roles(self, group_id, requester_user_id): """Get all roles in a group (as seen by user) """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - roles = yield self.store.get_group_roles(group_id=group_id) + roles = await self.store.get_group_roles(group_id=group_id) return {"roles": roles} - @defer.inlineCallbacks - def get_group_role(self, group_id, requester_user_id, role_id): + async def get_group_role(self, group_id, requester_user_id, role_id): """Get a specific role in a group (as seen by user) """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - res = yield self.store.get_group_role(group_id=group_id, role_id=role_id) + res = await self.store.get_group_role(group_id=group_id, role_id=role_id) return res - @defer.inlineCallbacks - def get_group_profile(self, group_id, requester_user_id): + async def get_group_profile(self, group_id, requester_user_id): """Get the group profile as seen by requester_user_id """ - yield self.check_group_is_ours(group_id, requester_user_id) + await self.check_group_is_ours(group_id, requester_user_id) - group = yield self.store.get_group(group_id) + group = await self.store.get_group(group_id) if group: cols = [ @@ -229,20 +220,19 @@ class GroupsServerWorkerHandler(object): else: raise SynapseError(404, "Unknown group") - @defer.inlineCallbacks - def get_users_in_group(self, group_id, requester_user_id): + async def get_users_in_group(self, group_id, requester_user_id): """Get the users in group as seen by requester_user_id. The ordering is arbitrary at the moment """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - is_user_in_group = yield self.store.is_user_in_group( + is_user_in_group = await self.store.is_user_in_group( requester_user_id, group_id ) - user_results = yield self.store.get_users_in_group( + user_results = await self.store.get_users_in_group( group_id, include_private=is_user_in_group ) @@ -254,14 +244,14 @@ class GroupsServerWorkerHandler(object): entry = {"user_id": g_user_id} - profile = yield self.profile_handler.get_profile_from_cache(g_user_id) + profile = await self.profile_handler.get_profile_from_cache(g_user_id) entry.update(profile) entry["is_public"] = bool(is_public) entry["is_privileged"] = bool(is_privileged) if not self.is_mine_id(g_user_id): - attestation = yield self.store.get_remote_attestation( + attestation = await self.store.get_remote_attestation( group_id, g_user_id ) if not attestation: @@ -279,30 +269,29 @@ class GroupsServerWorkerHandler(object): return {"chunk": chunk, "total_user_count_estimate": len(user_results)} - @defer.inlineCallbacks - def get_invited_users_in_group(self, group_id, requester_user_id): + async def get_invited_users_in_group(self, group_id, requester_user_id): """Get the users that have been invited to a group as seen by requester_user_id. The ordering is arbitrary at the moment """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - is_user_in_group = yield self.store.is_user_in_group( + is_user_in_group = await self.store.is_user_in_group( requester_user_id, group_id ) if not is_user_in_group: raise SynapseError(403, "User not in group") - invited_users = yield self.store.get_invited_users_in_group(group_id) + invited_users = await self.store.get_invited_users_in_group(group_id) user_profiles = [] for user_id in invited_users: user_profile = {"user_id": user_id} try: - profile = yield self.profile_handler.get_profile_from_cache(user_id) + profile = await self.profile_handler.get_profile_from_cache(user_id) user_profile.update(profile) except Exception as e: logger.warning("Error getting profile for %s: %s", user_id, e) @@ -310,20 +299,19 @@ class GroupsServerWorkerHandler(object): return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)} - @defer.inlineCallbacks - def get_rooms_in_group(self, group_id, requester_user_id): + async def get_rooms_in_group(self, group_id, requester_user_id): """Get the rooms in group as seen by requester_user_id This returns rooms in order of decreasing number of joined users """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - is_user_in_group = yield self.store.is_user_in_group( + is_user_in_group = await self.store.is_user_in_group( requester_user_id, group_id ) - room_results = yield self.store.get_rooms_in_group( + room_results = await self.store.get_rooms_in_group( group_id, include_private=is_user_in_group ) @@ -331,8 +319,8 @@ class GroupsServerWorkerHandler(object): for room_result in room_results: room_id = room_result["room_id"] - joined_users = yield self.store.get_users_in_room(room_id) - entry = yield self.room_list_handler.generate_room_entry( + joined_users = await self.store.get_users_in_room(room_id) + entry = await self.room_list_handler.generate_room_entry( room_id, len(joined_users), with_alias=False, allow_private=True ) @@ -355,13 +343,12 @@ class GroupsServerHandler(GroupsServerWorkerHandler): # Ensure attestations get renewed hs.get_groups_attestation_renewer() - @defer.inlineCallbacks - def update_group_summary_room( + async def update_group_summary_room( self, group_id, requester_user_id, room_id, category_id, content ): """Add/update a room to the group summary """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) @@ -371,7 +358,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler): is_public = _parse_visibility_from_contents(content) - yield self.store.add_room_to_summary( + await self.store.add_room_to_summary( group_id=group_id, room_id=room_id, category_id=category_id, @@ -381,31 +368,29 @@ class GroupsServerHandler(GroupsServerWorkerHandler): return {} - @defer.inlineCallbacks - def delete_group_summary_room( + async def delete_group_summary_room( self, group_id, requester_user_id, room_id, category_id ): """Remove a room from the summary """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) - yield self.store.remove_room_from_summary( + await self.store.remove_room_from_summary( group_id=group_id, room_id=room_id, category_id=category_id ) return {} - @defer.inlineCallbacks - def set_group_join_policy(self, group_id, requester_user_id, content): + async def set_group_join_policy(self, group_id, requester_user_id, content): """Sets the group join policy. Currently supported policies are: - "invite": an invite must be received and accepted in order to join. - "open": anyone can join. """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) @@ -413,22 +398,23 @@ class GroupsServerHandler(GroupsServerWorkerHandler): if join_policy is None: raise SynapseError(400, "No value specified for 'm.join_policy'") - yield self.store.set_group_join_policy(group_id, join_policy=join_policy) + await self.store.set_group_join_policy(group_id, join_policy=join_policy) return {} - @defer.inlineCallbacks - def update_group_category(self, group_id, requester_user_id, category_id, content): + async def update_group_category( + self, group_id, requester_user_id, category_id, content + ): """Add/Update a group category """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) is_public = _parse_visibility_from_contents(content) profile = content.get("profile") - yield self.store.upsert_group_category( + await self.store.upsert_group_category( group_id=group_id, category_id=category_id, is_public=is_public, @@ -437,25 +423,23 @@ class GroupsServerHandler(GroupsServerWorkerHandler): return {} - @defer.inlineCallbacks - def delete_group_category(self, group_id, requester_user_id, category_id): + async def delete_group_category(self, group_id, requester_user_id, category_id): """Delete a group category """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) - yield self.store.remove_group_category( + await self.store.remove_group_category( group_id=group_id, category_id=category_id ) return {} - @defer.inlineCallbacks - def update_group_role(self, group_id, requester_user_id, role_id, content): + async def update_group_role(self, group_id, requester_user_id, role_id, content): """Add/update a role in a group """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) @@ -463,31 +447,29 @@ class GroupsServerHandler(GroupsServerWorkerHandler): profile = content.get("profile") - yield self.store.upsert_group_role( + await self.store.upsert_group_role( group_id=group_id, role_id=role_id, is_public=is_public, profile=profile ) return {} - @defer.inlineCallbacks - def delete_group_role(self, group_id, requester_user_id, role_id): + async def delete_group_role(self, group_id, requester_user_id, role_id): """Remove role from group """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) - yield self.store.remove_group_role(group_id=group_id, role_id=role_id) + await self.store.remove_group_role(group_id=group_id, role_id=role_id) return {} - @defer.inlineCallbacks - def update_group_summary_user( + async def update_group_summary_user( self, group_id, requester_user_id, user_id, role_id, content ): """Add/update a users entry in the group summary """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) @@ -495,7 +477,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler): is_public = _parse_visibility_from_contents(content) - yield self.store.add_user_to_summary( + await self.store.add_user_to_summary( group_id=group_id, user_id=user_id, role_id=role_id, @@ -505,25 +487,25 @@ class GroupsServerHandler(GroupsServerWorkerHandler): return {} - @defer.inlineCallbacks - def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id): + async def delete_group_summary_user( + self, group_id, requester_user_id, user_id, role_id + ): """Remove a user from the group summary """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) - yield self.store.remove_user_from_summary( + await self.store.remove_user_from_summary( group_id=group_id, user_id=user_id, role_id=role_id ) return {} - @defer.inlineCallbacks - def update_group_profile(self, group_id, requester_user_id, content): + async def update_group_profile(self, group_id, requester_user_id, content): """Update the group profile """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) @@ -535,40 +517,38 @@ class GroupsServerHandler(GroupsServerWorkerHandler): raise SynapseError(400, "%r value is not a string" % (keyname,)) profile[keyname] = value - yield self.store.update_group_profile(group_id, profile) + await self.store.update_group_profile(group_id, profile) - @defer.inlineCallbacks - def add_room_to_group(self, group_id, requester_user_id, room_id, content): + async def add_room_to_group(self, group_id, requester_user_id, room_id, content): """Add room to group """ RoomID.from_string(room_id) # Ensure valid room id - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) is_public = _parse_visibility_from_contents(content) - yield self.store.add_room_to_group(group_id, room_id, is_public=is_public) + await self.store.add_room_to_group(group_id, room_id, is_public=is_public) return {} - @defer.inlineCallbacks - def update_room_in_group( + async def update_room_in_group( self, group_id, requester_user_id, room_id, config_key, content ): """Update room in group """ RoomID.from_string(room_id) # Ensure valid room id - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) if config_key == "m.visibility": is_public = _parse_visibility_dict(content) - yield self.store.update_room_in_group_visibility( + await self.store.update_room_in_group_visibility( group_id, room_id, is_public=is_public ) else: @@ -576,36 +556,34 @@ class GroupsServerHandler(GroupsServerWorkerHandler): return {} - @defer.inlineCallbacks - def remove_room_from_group(self, group_id, requester_user_id, room_id): + async def remove_room_from_group(self, group_id, requester_user_id, room_id): """Remove room from group """ - yield self.check_group_is_ours( + await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) - yield self.store.remove_room_from_group(group_id, room_id) + await self.store.remove_room_from_group(group_id, room_id) return {} - @defer.inlineCallbacks - def invite_to_group(self, group_id, user_id, requester_user_id, content): + async def invite_to_group(self, group_id, user_id, requester_user_id, content): """Invite user to group """ - group = yield self.check_group_is_ours( + group = await self.check_group_is_ours( group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id ) # TODO: Check if user knocked - invited_users = yield self.store.get_invited_users_in_group(group_id) + invited_users = await self.store.get_invited_users_in_group(group_id) if user_id in invited_users: raise SynapseError( 400, "User already invited to group", errcode=Codes.BAD_STATE ) - user_results = yield self.store.get_users_in_group( + user_results = await self.store.get_users_in_group( group_id, include_private=True ) if user_id in (user_result["user_id"] for user_result in user_results): @@ -618,18 +596,18 @@ class GroupsServerHandler(GroupsServerWorkerHandler): if self.hs.is_mine_id(user_id): groups_local = self.hs.get_groups_local_handler() - res = yield groups_local.on_invite(group_id, user_id, content) + res = await groups_local.on_invite(group_id, user_id, content) local_attestation = None else: local_attestation = self.attestations.create_attestation(group_id, user_id) content.update({"attestation": local_attestation}) - res = yield self.transport_client.invite_to_group_notification( + res = await self.transport_client.invite_to_group_notification( get_domain_from_id(user_id), group_id, user_id, content ) user_profile = res.get("user_profile", {}) - yield self.store.add_remote_profile_cache( + await self.store.add_remote_profile_cache( user_id, displayname=user_profile.get("displayname"), avatar_url=user_profile.get("avatar_url"), @@ -639,13 +617,13 @@ class GroupsServerHandler(GroupsServerWorkerHandler): if not self.hs.is_mine_id(user_id): remote_attestation = res["attestation"] - yield self.attestations.verify_attestation( + await self.attestations.verify_attestation( remote_attestation, user_id=user_id, group_id=group_id ) else: remote_attestation = None - yield self.store.add_user_to_group( + await self.store.add_user_to_group( group_id, user_id, is_admin=False, @@ -654,15 +632,14 @@ class GroupsServerHandler(GroupsServerWorkerHandler): remote_attestation=remote_attestation, ) elif res["state"] == "invite": - yield self.store.add_group_invite(group_id, user_id) + await self.store.add_group_invite(group_id, user_id) return {"state": "invite"} elif res["state"] == "reject": return {"state": "reject"} else: raise SynapseError(502, "Unknown state returned by HS") - @defer.inlineCallbacks - def _add_user(self, group_id, user_id, content): + async def _add_user(self, group_id, user_id, content): """Add a user to a group based on a content dict. See accept_invite, join_group. @@ -672,7 +649,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler): remote_attestation = content["attestation"] - yield self.attestations.verify_attestation( + await self.attestations.verify_attestation( remote_attestation, user_id=user_id, group_id=group_id ) else: @@ -681,7 +658,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler): is_public = _parse_visibility_from_contents(content) - yield self.store.add_user_to_group( + await self.store.add_user_to_group( group_id, user_id, is_admin=False, @@ -692,59 +669,55 @@ class GroupsServerHandler(GroupsServerWorkerHandler): return local_attestation - @defer.inlineCallbacks - def accept_invite(self, group_id, requester_user_id, content): + async def accept_invite(self, group_id, requester_user_id, content): """User tries to accept an invite to the group. This is different from them asking to join, and so should error if no invite exists (and they're not a member of the group) """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) - is_invited = yield self.store.is_user_invited_to_local_group( + is_invited = await self.store.is_user_invited_to_local_group( group_id, requester_user_id ) if not is_invited: raise SynapseError(403, "User not invited to group") - local_attestation = yield self._add_user(group_id, requester_user_id, content) + local_attestation = await self._add_user(group_id, requester_user_id, content) return {"state": "join", "attestation": local_attestation} - @defer.inlineCallbacks - def join_group(self, group_id, requester_user_id, content): + async def join_group(self, group_id, requester_user_id, content): """User tries to join the group. This will error if the group requires an invite/knock to join """ - group_info = yield self.check_group_is_ours( + group_info = await self.check_group_is_ours( group_id, requester_user_id, and_exists=True ) if group_info["join_policy"] != "open": raise SynapseError(403, "Group is not publicly joinable") - local_attestation = yield self._add_user(group_id, requester_user_id, content) + local_attestation = await self._add_user(group_id, requester_user_id, content) return {"state": "join", "attestation": local_attestation} - @defer.inlineCallbacks - def knock(self, group_id, requester_user_id, content): + async def knock(self, group_id, requester_user_id, content): """A user requests becoming a member of the group """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) raise NotImplementedError() - @defer.inlineCallbacks - def accept_knock(self, group_id, requester_user_id, content): + async def accept_knock(self, group_id, requester_user_id, content): """Accept a users knock to the room. Errors if the user hasn't knocked, rather than inviting them. """ - yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) + await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) raise NotImplementedError() @@ -872,8 +845,6 @@ class GroupsServerHandler(GroupsServerWorkerHandler): group_id (str) request_user_id (str) - Returns: - Deferred """ await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index ca5c83811a..ebe8d25bd8 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -18,8 +18,6 @@ import logging from six import iteritems -from twisted.internet import defer - from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.types import get_domain_from_id @@ -92,19 +90,18 @@ class GroupsLocalWorkerHandler(object): get_group_role = _create_rerouter("get_group_role") get_group_roles = _create_rerouter("get_group_roles") - @defer.inlineCallbacks - def get_group_summary(self, group_id, requester_user_id): + async def get_group_summary(self, group_id, requester_user_id): """Get the group summary for a group. If the group is remote we check that the users have valid attestations. """ if self.is_mine_id(group_id): - res = yield self.groups_server_handler.get_group_summary( + res = await self.groups_server_handler.get_group_summary( group_id, requester_user_id ) else: try: - res = yield self.transport_client.get_group_summary( + res = await self.transport_client.get_group_summary( get_domain_from_id(group_id), group_id, requester_user_id ) except HttpResponseException as e: @@ -122,7 +119,7 @@ class GroupsLocalWorkerHandler(object): attestation = entry.pop("attestation", {}) try: if get_domain_from_id(g_user_id) != group_server_name: - yield self.attestations.verify_attestation( + await self.attestations.verify_attestation( attestation, group_id=group_id, user_id=g_user_id, @@ -139,19 +136,18 @@ class GroupsLocalWorkerHandler(object): # Add `is_publicised` flag to indicate whether the user has publicised their # membership of the group on their profile - result = yield self.store.get_publicised_groups_for_user(requester_user_id) + result = await self.store.get_publicised_groups_for_user(requester_user_id) is_publicised = group_id in result res.setdefault("user", {})["is_publicised"] = is_publicised return res - @defer.inlineCallbacks - def get_users_in_group(self, group_id, requester_user_id): + async def get_users_in_group(self, group_id, requester_user_id): """Get users in a group """ if self.is_mine_id(group_id): - res = yield self.groups_server_handler.get_users_in_group( + res = await self.groups_server_handler.get_users_in_group( group_id, requester_user_id ) return res @@ -159,7 +155,7 @@ class GroupsLocalWorkerHandler(object): group_server_name = get_domain_from_id(group_id) try: - res = yield self.transport_client.get_users_in_group( + res = await self.transport_client.get_users_in_group( get_domain_from_id(group_id), group_id, requester_user_id ) except HttpResponseException as e: @@ -174,7 +170,7 @@ class GroupsLocalWorkerHandler(object): attestation = entry.pop("attestation", {}) try: if get_domain_from_id(g_user_id) != group_server_name: - yield self.attestations.verify_attestation( + await self.attestations.verify_attestation( attestation, group_id=group_id, user_id=g_user_id, @@ -188,15 +184,13 @@ class GroupsLocalWorkerHandler(object): return res - @defer.inlineCallbacks - def get_joined_groups(self, user_id): - group_ids = yield self.store.get_joined_groups(user_id) + async def get_joined_groups(self, user_id): + group_ids = await self.store.get_joined_groups(user_id) return {"groups": group_ids} - @defer.inlineCallbacks - def get_publicised_groups_for_user(self, user_id): + async def get_publicised_groups_for_user(self, user_id): if self.hs.is_mine_id(user_id): - result = yield self.store.get_publicised_groups_for_user(user_id) + result = await self.store.get_publicised_groups_for_user(user_id) # Check AS associated groups for this user - this depends on the # RegExps in the AS registration file (under `users`) @@ -206,7 +200,7 @@ class GroupsLocalWorkerHandler(object): return {"groups": result} else: try: - bulk_result = yield self.transport_client.bulk_get_publicised_groups( + bulk_result = await self.transport_client.bulk_get_publicised_groups( get_domain_from_id(user_id), [user_id] ) except HttpResponseException as e: @@ -218,8 +212,7 @@ class GroupsLocalWorkerHandler(object): # TODO: Verify attestations return {"groups": result} - @defer.inlineCallbacks - def bulk_get_publicised_groups(self, user_ids, proxy=True): + async def bulk_get_publicised_groups(self, user_ids, proxy=True): destinations = {} local_users = set() @@ -236,7 +229,7 @@ class GroupsLocalWorkerHandler(object): failed_results = [] for destination, dest_user_ids in iteritems(destinations): try: - r = yield self.transport_client.bulk_get_publicised_groups( + r = await self.transport_client.bulk_get_publicised_groups( destination, list(dest_user_ids) ) results.update(r["users"]) @@ -244,7 +237,7 @@ class GroupsLocalWorkerHandler(object): failed_results.extend(dest_user_ids) for uid in local_users: - results[uid] = yield self.store.get_publicised_groups_for_user(uid) + results[uid] = await self.store.get_publicised_groups_for_user(uid) # Check AS associated groups for this user - this depends on the # RegExps in the AS registration file (under `users`) @@ -333,12 +326,11 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): return res - @defer.inlineCallbacks - def join_group(self, group_id, user_id, content): + async def join_group(self, group_id, user_id, content): """Request to join a group """ if self.is_mine_id(group_id): - yield self.groups_server_handler.join_group(group_id, user_id, content) + await self.groups_server_handler.join_group(group_id, user_id, content) local_attestation = None remote_attestation = None else: @@ -346,7 +338,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): content["attestation"] = local_attestation try: - res = yield self.transport_client.join_group( + res = await self.transport_client.join_group( get_domain_from_id(group_id), group_id, user_id, content ) except HttpResponseException as e: @@ -356,7 +348,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): remote_attestation = res["attestation"] - yield self.attestations.verify_attestation( + await self.attestations.verify_attestation( remote_attestation, group_id=group_id, user_id=user_id, @@ -366,7 +358,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): # TODO: Check that the group is public and we're being added publically is_publicised = content.get("publicise", False) - token = yield self.store.register_user_group_membership( + token = await self.store.register_user_group_membership( group_id, user_id, membership="join", @@ -379,12 +371,11 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): return {} - @defer.inlineCallbacks - def accept_invite(self, group_id, user_id, content): + async def accept_invite(self, group_id, user_id, content): """Accept an invite to a group """ if self.is_mine_id(group_id): - yield self.groups_server_handler.accept_invite(group_id, user_id, content) + await self.groups_server_handler.accept_invite(group_id, user_id, content) local_attestation = None remote_attestation = None else: @@ -392,7 +383,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): content["attestation"] = local_attestation try: - res = yield self.transport_client.accept_group_invite( + res = await self.transport_client.accept_group_invite( get_domain_from_id(group_id), group_id, user_id, content ) except HttpResponseException as e: @@ -402,7 +393,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): remote_attestation = res["attestation"] - yield self.attestations.verify_attestation( + await self.attestations.verify_attestation( remote_attestation, group_id=group_id, user_id=user_id, @@ -412,7 +403,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): # TODO: Check that the group is public and we're being added publically is_publicised = content.get("publicise", False) - token = yield self.store.register_user_group_membership( + token = await self.store.register_user_group_membership( group_id, user_id, membership="join", @@ -425,18 +416,17 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): return {} - @defer.inlineCallbacks - def invite(self, group_id, user_id, requester_user_id, config): + async def invite(self, group_id, user_id, requester_user_id, config): """Invite a user to a group """ content = {"requester_user_id": requester_user_id, "config": config} if self.is_mine_id(group_id): - res = yield self.groups_server_handler.invite_to_group( + res = await self.groups_server_handler.invite_to_group( group_id, user_id, requester_user_id, content ) else: try: - res = yield self.transport_client.invite_to_group( + res = await self.transport_client.invite_to_group( get_domain_from_id(group_id), group_id, user_id, @@ -450,8 +440,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): return res - @defer.inlineCallbacks - def on_invite(self, group_id, user_id, content): + async def on_invite(self, group_id, user_id, content): """One of our users were invited to a group """ # TODO: Support auto join and rejection @@ -466,7 +455,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): if "avatar_url" in content["profile"]: local_profile["avatar_url"] = content["profile"]["avatar_url"] - token = yield self.store.register_user_group_membership( + token = await self.store.register_user_group_membership( group_id, user_id, membership="invite", @@ -474,7 +463,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): ) self.notifier.on_new_event("groups_key", token, users=[user_id]) try: - user_profile = yield self.profile_handler.get_profile(user_id) + user_profile = await self.profile_handler.get_profile(user_id) except Exception as e: logger.warning("No profile for user %s: %s", user_id, e) user_profile = {} @@ -516,12 +505,11 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): return res - @defer.inlineCallbacks - def user_removed_from_group(self, group_id, user_id, content): + async def user_removed_from_group(self, group_id, user_id, content): """One of our users was removed/kicked from a group """ # TODO: Check if user in group - token = yield self.store.register_user_group_membership( + token = await self.store.register_user_group_membership( group_id, user_id, membership="leave" ) self.notifier.on_new_event("groups_key", token, users=[user_id])