Wrap all of get_app_service_rooms in a txn.

This commit is contained in:
Kegan Dougal 2015-03-02 11:20:51 +00:00
parent b216b36892
commit 377ae369c1
6 changed files with 67 additions and 88 deletions

View File

@ -15,6 +15,7 @@
import logging import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.appservice import ApplicationService from synapse.appservice import ApplicationService
from synapse.storage.roommember import RoomsForUser from synapse.storage.roommember import RoomsForUser
@ -197,7 +198,6 @@ class ApplicationServiceStore(SQLBaseStore):
# TODO: The from_cache=False impl # TODO: The from_cache=False impl
# TODO: This should be JOINed with the application_services_regex table. # TODO: This should be JOINed with the application_services_regex table.
@defer.inlineCallbacks
def get_app_service_rooms(self, service): def get_app_service_rooms(self, service):
"""Get a list of RoomsForUser for this application service. """Get a list of RoomsForUser for this application service.
@ -212,35 +212,49 @@ class ApplicationServiceStore(SQLBaseStore):
Returns: Returns:
A list of RoomsForUser. A list of RoomsForUser.
""" """
# FIXME: This is assuming that this store has methods from return self.runInteraction(
# RoomStore, DirectoryStore, RegistrationStore, RoomMemberStore which is "get_app_service_rooms",
# a bad assumption to make as it makes testing trickier and coupling self._get_app_service_rooms_txn,
# less obvious. service,
)
def _get_app_service_rooms_txn(self, txn, service):
# get all rooms matching the room ID regex. # get all rooms matching the room ID regex.
room_entries = yield self.get_all_rooms() room_entries = self._simple_select_list_txn(
txn=txn, table="rooms", keyvalues=None, retcols=["room_id"]
)
matching_room_list = set([ matching_room_list = set([
r["room_id"] for r in room_entries if r["room_id"] for r in room_entries if
service.is_interested_in_room(r["room_id"]) service.is_interested_in_room(r["room_id"])
]) ])
# resolve room IDs for matching room alias regex. # resolve room IDs for matching room alias regex.
room_alias_mappings = yield self.get_all_associations() room_alias_mappings = self._simple_select_list_txn(
txn=txn, table="room_aliases", keyvalues=None,
retcols=["room_id", "room_alias"]
)
matching_room_list |= set([ matching_room_list |= set([
r.room_id for r in room_alias_mappings if r["room_id"] for r in room_alias_mappings if
service.is_interested_in_alias(r.room_alias) service.is_interested_in_alias(r["room_alias"])
]) ])
# get all rooms for every user for this AS. This is scoped to users on # get all rooms for every user for this AS. This is scoped to users on
# this HS only. # this HS only.
user_list = yield self.get_all_users() user_list = self._simple_select_list_txn(
txn=txn, table="users", keyvalues=None, retcols=["name"]
)
user_list = [ user_list = [
u["name"] for u in user_list if u["name"] for u in user_list if
service.is_interested_in_user(u["name"]) service.is_interested_in_user(u["name"])
] ]
rooms_for_user_matching_user_id = set() # RoomsForUser list rooms_for_user_matching_user_id = set() # RoomsForUser list
for user_id in user_list: for user_id in user_list:
rooms_for_user = yield self.get_rooms_for_user(user_id) # FIXME: This assumes this store is linked with RoomMemberStore :(
rooms_for_user = self._get_rooms_for_user_where_membership_is_txn(
txn=txn,
user_id=user_id,
membership_list=[Membership.JOIN]
)
rooms_for_user_matching_user_id |= set(rooms_for_user) rooms_for_user_matching_user_id |= set(rooms_for_user)
# make RoomsForUser tuples for room ids and aliases which are not in the # make RoomsForUser tuples for room ids and aliases which are not in the
@ -253,7 +267,7 @@ class ApplicationServiceStore(SQLBaseStore):
] ]
rooms_for_user_matching_user_id |= set(missing_rooms_for_user) rooms_for_user_matching_user_id |= set(missing_rooms_for_user)
defer.returnValue(rooms_for_user_matching_user_id) return rooms_for_user_matching_user_id
@defer.inlineCallbacks @defer.inlineCallbacks
def _populate_cache(self): def _populate_cache(self):

View File

@ -134,27 +134,6 @@ class DirectoryStore(SQLBaseStore):
return room_id return room_id
@defer.inlineCallbacks
def get_all_associations(self):
"""Retrieve the entire list of room alias -> room ID pairings.
Returns:
A list of RoomAliasMappings.
"""
results = yield self._execute_and_decode(
"SELECT room_id, room_alias FROM room_aliases"
)
# TODO(kegan): It feels wrong to be specifying no servers here, but
# equally this function isn't required to obtain all servers so
# retrieving them "just for the sake of it" also seems wrong, but we
# want to conform to passing Objects around and not dicts..
defer.returnValue([
RoomAliasMapping(
room_id=r["room_id"], room_alias=r["room_alias"], servers=""
) for r in results
])
def get_aliases_for_room(self, room_id): def get_aliases_for_room(self, room_id):
return self._simple_select_onecol( return self._simple_select_onecol(
"room_aliases", "room_aliases",

View File

@ -92,10 +92,6 @@ class RegistrationStore(SQLBaseStore):
query, user_id query, user_id
) )
def get_all_users(self):
return self._simple_select_list(
table="users", keyvalues=None, retcols=["name"])
def get_user_by_token(self, token): def get_user_by_token(self, token):
"""Get a user from the given access token. """Get a user from the given access token.

View File

@ -71,16 +71,6 @@ class RoomStore(SQLBaseStore):
RoomsTable.decode_single_result, query, room_id, RoomsTable.decode_single_result, query, room_id,
) )
def get_all_rooms(self):
"""Retrieve all the rooms.
Returns:
A list of namedtuples containing the room information.
"""
return self._simple_select_list(
table="rooms", keyvalues=None, retcols=["room_id"]
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_rooms(self, is_public): def get_rooms(self, is_public):
"""Retrieve a list of all public rooms. """Retrieve a list of all public rooms.

View File

@ -180,6 +180,14 @@ class RoomMemberStore(SQLBaseStore):
if not membership_list: if not membership_list:
return defer.succeed(None) return defer.succeed(None)
return self.runInteraction(
"get_rooms_for_user_where_membership_is",
self._get_rooms_for_user_where_membership_is_txn,
user_id, membership_list
)
def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
membership_list):
where_clause = "user_id = ? AND (%s)" % ( where_clause = "user_id = ? AND (%s)" % (
" OR ".join(["membership = ?" for _ in membership_list]), " OR ".join(["membership = ?" for _ in membership_list]),
) )
@ -187,24 +195,18 @@ class RoomMemberStore(SQLBaseStore):
args = [user_id] args = [user_id]
args.extend(membership_list) args.extend(membership_list)
def f(txn): sql = (
sql = ( "SELECT m.room_id, m.sender, m.membership"
"SELECT m.room_id, m.sender, m.membership" " FROM room_memberships as m"
" FROM room_memberships as m" " INNER JOIN current_state_events as c"
" INNER JOIN current_state_events as c" " ON m.event_id = c.event_id"
" ON m.event_id = c.event_id" " WHERE %s"
" WHERE %s" ) % (where_clause,)
) % (where_clause,)
txn.execute(sql, args) txn.execute(sql, args)
return [ return [
RoomsForUser(**r) for r in self.cursor_to_dict(txn) RoomsForUser(**r) for r in self.cursor_to_dict(txn)
] ]
return self.runInteraction(
"get_rooms_for_user_where_membership_is",
f
)
def get_joined_hosts_for_room(self, room_id): def get_joined_hosts_for_room(self, room_id):
return self._simple_select_onecol( return self._simple_select_onecol(

View File

@ -146,18 +146,6 @@ class StreamStore(SQLBaseStore):
defer.returnValue(([], to_key)) defer.returnValue(([], to_key))
return return
# Logic:
# - We want ALL events which match the AS room_id regex
# - We want ALL events which match the rooms represented by the AS
# room_alias regex
# - We want ALL events for rooms that AS users have joined.
# This is currently supported via get_app_service_rooms (which is used
# for the Notifier listener rooms). We can't reasonably make a SQL
# query for these room IDs, so we'll pull all the events between from/to
# and filter in python.
rooms_for_as = yield self.get_app_service_rooms(service)
room_ids_for_as = [r.room_id for r in rooms_for_as]
# select all the events between from/to with a sensible limit # select all the events between from/to with a sensible limit
sql = ( sql = (
"SELECT e.event_id, e.room_id, e.type, s.state_key, " "SELECT e.event_id, e.room_id, e.type, s.state_key, "
@ -169,20 +157,32 @@ class StreamStore(SQLBaseStore):
"limit": limit "limit": limit
} }
def app_service_interested(row):
if row["room_id"] in room_ids_for_as:
return True
if row["type"] == EventTypes.Member:
if service.is_interested_in_user(row.get("state_key")):
return True
return False
def f(txn): def f(txn):
# pull out all the events between the tokens
txn.execute(sql, (from_id.stream, to_id.stream,)) txn.execute(sql, (from_id.stream, to_id.stream,))
rows = self.cursor_to_dict(txn) rows = self.cursor_to_dict(txn)
# Logic:
# - We want ALL events which match the AS room_id regex
# - We want ALL events which match the rooms represented by the AS
# room_alias regex
# - We want ALL events for rooms that AS users have joined.
# This is currently supported via get_app_service_rooms (which is
# used for the Notifier listener rooms). We can't reasonably make a
# SQL query for these room IDs, so we'll pull all the events between
# from/to and filter in python.
rooms_for_as = self._get_app_service_rooms_txn(txn, service)
room_ids_for_as = [r.room_id for r in rooms_for_as]
def app_service_interested(row):
if row["room_id"] in room_ids_for_as:
return True
if row["type"] == EventTypes.Member:
if service.is_interested_in_user(row.get("state_key")):
return True
return False
ret = self._get_events_txn( ret = self._get_events_txn(
txn, txn,
# apply the filter on the room id list # apply the filter on the room id list
@ -197,7 +197,6 @@ class StreamStore(SQLBaseStore):
if rows: if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows) key = "s%d" % max(r["stream_ordering"] for r in rows)
else: else:
# Assume we didn't get anything because there was nothing to # Assume we didn't get anything because there was nothing to
# get. # get.
@ -266,7 +265,6 @@ class StreamStore(SQLBaseStore):
if rows: if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows) key = "s%d" % max(r["stream_ordering"] for r in rows)
else: else:
# Assume we didn't get anything because there was nothing to # Assume we didn't get anything because there was nothing to
# get. # get.