Add bulk fetch storage API

This commit is contained in:
Erik Johnston 2016-05-05 09:51:03 +01:00
parent fee1118a20
commit 1f0f5ffa1e
3 changed files with 26 additions and 11 deletions

View File

@ -95,13 +95,9 @@ class BaseHandler(object):
row["event_id"] for rows in forgotten for row in rows row["event_id"] for rows in forgotten for row in rows
) )
# Maps user_id -> account data content ignore_dict_content = yield self.store.get_global_account_data_by_type_for_users(
ignore_dict_content = yield defer.gatherResults([ "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples]
preserve_fn(self.store.get_global_account_data_by_type_for_user)( )
user_id, "m.ignored_user_list"
).addCallback(lambda d, u: (u, d), user_id)
for user_id, is_peeking in user_tuples
]).addCallback(dict)
# FIXME: This will explode if people upload something incorrect. # FIXME: This will explode if people upload something incorrect.
ignore_dict = { ignore_dict = {

View File

@ -522,7 +522,7 @@ class SyncHandler(BaseHandler):
) )
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
user_id, "m.ignored_user_list" "m.ignored_user_list", user_id=user_id,
) )
if ignored_account_data: if ignored_account_data:

View File

@ -16,7 +16,7 @@
from ._base import SQLBaseStore from ._base import SQLBaseStore
from twisted.internet import defer from twisted.internet import defer
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
import ujson as json import ujson as json
import logging import logging
@ -64,7 +64,7 @@ class AccountDataStore(SQLBaseStore):
) )
@cachedInlineCallbacks(num_args=2) @cachedInlineCallbacks(num_args=2)
def get_global_account_data_by_type_for_user(self, user_id, data_type): def get_global_account_data_by_type_for_user(self, data_type, user_id):
""" """
Returns: Returns:
Deferred: A dict Deferred: A dict
@ -85,6 +85,25 @@ class AccountDataStore(SQLBaseStore):
else: else:
defer.returnValue(None) defer.returnValue(None)
@cachedList(cached_method_name="get_global_account_data_by_type_for_user",
num_args=2, list_name="user_ids", inlineCallbacks=True)
def get_global_account_data_by_type_for_users(self, data_type, user_ids):
rows = yield self._simple_select_many_batch(
table="account_data",
column="user_id",
iterable=user_ids,
keyvalues={
"account_data_type": data_type,
},
retcols=("user_id", "content",),
desc="get_global_account_data_by_type_for_users",
)
defer.returnValue({
row["user_id"]: json.loads(row["content"]) if row["content"] else None
for row in rows
})
def get_account_data_for_room(self, user_id, room_id): def get_account_data_for_room(self, user_id, room_id):
"""Get all the client account_data for a user for a room. """Get all the client account_data for a user for a room.
@ -261,7 +280,7 @@ class AccountDataStore(SQLBaseStore):
txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
txn.call_after( txn.call_after(
self.get_global_account_data_by_type_for_user.invalidate, self.get_global_account_data_by_type_for_user.invalidate,
(user_id, account_data_type,) (account_data_type, user_id,)
) )
self._update_max_stream_id(txn, next_id) self._update_max_stream_id(txn, next_id)