This commit is contained in:
Amber Brown 2019-03-12 00:35:31 +11:00
parent 26eefca3b7
commit 5ba8ceab4c
4 changed files with 74 additions and 52 deletions

View File

@ -60,14 +60,16 @@ class UserDirectoryHandler(object):
self.update_user_directory = hs.config.update_user_directory self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users self.search_all_users = hs.config.user_directory_search_all_users
# If we're a worker, don't sleep when doing the initial room work, as it
# won't monopolise the master's CPU.
if hs.config.worker_app:
self.INITIAL_ROOM_SLEEP_MS = 0
self.INITIAL_USER_SLEEP_MS = 0
# When start up for the first time we need to populate the user_directory. # When start up for the first time we need to populate the user_directory.
# This is a set of user_id's we've inserted already # This is a set of user_id's we've inserted already
self.initially_handled_users = set() self.initially_handled_users = set()
self.register_background_update_handler(
"users_in_public_rooms_initial", self._populate_users_in_public_rooms
)
# The current position in the current_state_delta stream # The current position in the current_state_delta stream
self.pos = None self.pos = None
@ -81,41 +83,6 @@ class UserDirectoryHandler(object):
# we start populating the user directory # we start populating the user directory
self.clock.call_later(0, self.notify_new_event) self.clock.call_later(0, self.notify_new_event)
@defer.inlineCallbacks
def _populate_users_in_public_rooms(self, progress, batch_size):
"""
Populate the users_in_public_rooms table with the contents of the
users_who_share_public_rooms table.
"""
def _fetch(txn):
sql = "SELECT DISTINCT other_user_id FROM users_who_share_public_rooms"
txn.execute(sql)
return txn.fetchall()
users = yield self.store.runInteraction(
"populate_users_in_public_rooms_fetch", _fetch
)
if users:
def _fill(txn):
self._simple_upsert_many_txn(
txn,
table="users_in_public_rooms",
key_names=["user_id"],
key_values=users,
value_names=(),
value_values=None,
)
users = yield self.store.runInteraction(
"populate_users_in_public_rooms_fill", _fill
)
yield self._end_background_update("users_in_public_rooms_initial")
defer.returnValue(1)
def search_users(self, user_id, search_term, limit): def search_users(self, user_id, search_term, limit):
"""Searches for users in directory """Searches for users in directory

View File

@ -767,18 +767,25 @@ class SQLBaseStore(object):
""" """
allvalues = {} allvalues = {}
allvalues.update(keyvalues) allvalues.update(keyvalues)
allvalues.update(values)
allvalues.update(insertion_values) allvalues.update(insertion_values)
if not values:
latter = "NOTHING"
else:
allvalues.update(values)
latter = (
"UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
)
sql = ( sql = (
"INSERT INTO %s (%s) VALUES (%s) " "INSERT INTO %s (%s) VALUES (%s) "
"ON CONFLICT (%s) DO UPDATE SET %s" "ON CONFLICT (%s) DO %s"
) % ( ) % (
table, table,
", ".join(k for k in allvalues), ", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues), ", ".join("?" for _ in allvalues),
", ".join(k for k in keyvalues), ", ".join(k for k in keyvalues),
", ".join(k + "=EXCLUDED." + k for k in values), latter
) )
txn.execute(sql, list(allvalues.values())) txn.execute(sql, list(allvalues.values()))

View File

@ -22,16 +22,57 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.state import StateFilter from synapse.storage.state import StateFilter
from synapse.types import get_domain_from_id, get_localpart_from_id from synapse.types import get_domain_from_id, get_localpart_from_id
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from ._base import SQLBaseStore
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class UserDirectoryStore(SQLBaseStore): class UserDirectoryStore(BackgroundUpdateStore):
def __init__(self, dbconn, hs):
super(UserDirectoryStore, self).__init__(dbconn, hs)
self.register_background_update_handler(
"users_in_public_rooms_initial", self._populate_users_in_public_rooms
)
@defer.inlineCallbacks
def _populate_users_in_public_rooms(self, progress, batch_size):
"""
Populate the users_in_public_rooms table with the contents of the
users_who_share_public_rooms table.
"""
def _fetch(txn):
sql = "SELECT DISTINCT other_user_id FROM users_who_share_public_rooms"
txn.execute(sql)
return txn.fetchall()
users = yield self.runInteraction(
"populate_users_in_public_rooms_fetch", _fetch
)
if users:
def _fill(txn):
self._simple_upsert_many_txn(
txn,
table="users_in_public_rooms",
key_names=["user_id"],
key_values=users,
value_names=(),
value_values=None,
)
users = yield self.runInteraction(
"populate_users_in_public_rooms_fill", _fill
)
yield self._end_background_update("users_in_public_rooms_initial")
defer.returnValue(1)
@defer.inlineCallbacks @defer.inlineCallbacks
def is_room_world_readable_or_publicly_joinable(self, room_id): def is_room_world_readable_or_publicly_joinable(self, room_id):
"""Check if the room is either world_readable or publically joinable """Check if the room is either world_readable or publically joinable
@ -353,8 +394,7 @@ class UserDirectoryStore(SQLBaseStore):
txn, txn,
"users_in_public_rooms", "users_in_public_rooms",
keyvalues={"user_id": user_id}, keyvalues={"user_id": user_id},
values={}, values=None,
desc="add_user_as_in_public_room",
) )
for user_id, other_user_id in user_id_tuples: for user_id, other_user_id in user_id_tuples:
@ -603,7 +643,7 @@ class UserDirectoryStore(SQLBaseStore):
else: else:
join_clause = """ join_clause = """
LEFT JOIN ( LEFT JOIN (
SELECT other_user_id AS user_id FROM users_who_share_public_rooms SELECT user_id FROM users_in_public_rooms
UNION UNION
SELECT other_user_id AS user_id FROM users_who_share_private_rooms SELECT other_user_id AS user_id FROM users_who_share_private_rooms
WHERE user_id = ? WHERE user_id = ?

View File

@ -116,12 +116,13 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
# Check we have populated the database correctly. # Check we have populated the database correctly.
shares_public = self.get_users_who_share_public_rooms() shares_public = self.get_users_who_share_public_rooms()
shares_private = self.get_users_who_share_private_rooms() shares_private = self.get_users_who_share_private_rooms()
public_users = self.get_users_in_public_rooms()
self.assertEqual(shares_public, []) self.assertEqual(shares_public, [])
self.assertEqual( self.assertEqual(
self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)]) self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)])
) )
self.assertEqual(set(public_users), set([u1, u2])) self.assertEqual(public_users, [])
# We get one search result when searching for user2 by user1. # We get one search result when searching for user2 by user1.
s = self.get_success(self.handler.search_users(u1, "user2", 10)) s = self.get_success(self.handler.search_users(u1, "user2", 10))
@ -145,7 +146,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
self.assertEqual(shares_public, []) self.assertEqual(shares_public, [])
self.assertEqual(self._compress_shared(shares_private), set()) self.assertEqual(self._compress_shared(shares_private), set())
self.assertEqual(public_users, [u1]) self.assertEqual(public_users, [])
# User1 now gets no search results for any of the other users. # User1 now gets no search results for any of the other users.
s = self.get_success(self.handler.search_users(u1, "user2", 10)) s = self.get_success(self.handler.search_users(u1, "user2", 10))
@ -165,10 +166,10 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
def get_users_in_public_rooms(self): def get_users_in_public_rooms(self):
return self.get_success( return self.get_success(
self.store._simple_select_list( self.store._simple_select_onecol(
"users_in_public_rooms", "users_in_public_rooms",
None, None,
["user_id"], "user_id",
) )
) )
@ -214,9 +215,12 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
shares_public = self.get_users_who_share_public_rooms() shares_public = self.get_users_who_share_public_rooms()
shares_private = self.get_users_who_share_private_rooms() shares_private = self.get_users_who_share_private_rooms()
public_users = self.get_users_in_public_rooms()
# Nothing updated yet
self.assertEqual(shares_private, []) self.assertEqual(shares_private, [])
self.assertEqual(shares_public, []) self.assertEqual(shares_public, [])
self.assertEqual(public_users, [])
# Reset the handled users caches # Reset the handled users caches
self.handler.initially_handled_users = set() self.handler.initially_handled_users = set()
@ -233,6 +237,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
shares_public = self.get_users_who_share_public_rooms() shares_public = self.get_users_who_share_public_rooms()
shares_private = self.get_users_who_share_private_rooms() shares_private = self.get_users_who_share_private_rooms()
public_users = self.get_users_in_public_rooms()
# User 1 and User 2 share public rooms # User 1 and User 2 share public rooms
self.assertEqual( self.assertEqual(
@ -245,6 +250,9 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
set([(u1, u3, private_room), (u3, u1, private_room)]), set([(u1, u3, private_room), (u3, u1, private_room)]),
) )
# User 1 and 2 are in public rooms
self.assertEqual(set(public_users), set([u1, u2]))
def test_search_all_users(self): def test_search_all_users(self):
""" """
Search all users = True means that a user does not have to share a Search all users = True means that a user does not have to share a