Speed up event filtering (for ACL) logic

This commit is contained in:
Erik Johnston 2015-08-04 09:32:23 +01:00
parent 2c963054f8
commit 4d6cb8814e
5 changed files with 105 additions and 46 deletions

View File

@ -230,7 +230,11 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events): def _filter_events_for_server(self, server_name, room_id, events):
states = yield self.store.get_state_for_events( states = yield self.store.get_state_for_events(
room_id, [e.event_id for e in events], room_id, frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, None),
)
) )
events_and_states = zip(events, states) events_and_states = zip(events, states)

View File

@ -138,7 +138,11 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _filter_events_for_client(self, user_id, room_id, events): def _filter_events_for_client(self, user_id, room_id, events):
states = yield self.store.get_state_for_events( states = yield self.store.get_state_for_events(
room_id, [e.event_id for e in events], room_id, frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, user_id),
)
) )
events_and_states = zip(events, states) events_and_states = zip(events, states)

View File

@ -295,7 +295,11 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _filter_events_for_client(self, user_id, room_id, events): def _filter_events_for_client(self, user_id, room_id, events):
states = yield self.store.get_state_for_events( states = yield self.store.get_state_for_events(
room_id, [e.event_id for e in events], room_id, frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, user_id),
)
) )
events_and_states = zip(events, states) events_and_states = zip(events, states)

View File

@ -71,6 +71,11 @@ class Cache(object):
self.thread = None self.thread = None
caches_by_name[name] = self.cache caches_by_name[name] = self.cache
class Sentinel(object):
__slots__ = []
self.sentinel = Sentinel()
def check_thread(self): def check_thread(self):
expected_thread = self.thread expected_thread = self.thread
if expected_thread is None: if expected_thread is None:
@ -85,9 +90,10 @@ class Cache(object):
if len(keyargs) != self.keylen: if len(keyargs) != self.keylen:
raise ValueError("Expected a key to have %d items", self.keylen) raise ValueError("Expected a key to have %d items", self.keylen)
if keyargs in self.cache: val = self.cache.get(keyargs, self.sentinel)
if val is not self.sentinel:
cache_counter.inc_hits(self.name) cache_counter.inc_hits(self.name)
return self.cache[keyargs] return val
cache_counter.inc_misses(self.name) cache_counter.inc_misses(self.name)
raise KeyError() raise KeyError()

View File

@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached
from twisted.internet import defer from twisted.internet import defer
from synapse.util import unwrapFirstError
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
import logging import logging
@ -206,62 +207,102 @@ class StateStore(SQLBaseStore):
events = yield self._get_events(event_ids, get_prev_content=False) events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events) defer.returnValue(events)
@defer.inlineCallbacks @cached(num_args=3, lru=True)
def get_state_for_events(self, room_id, event_ids): def _get_state_groups_from_group(self, room_id, group, types):
def f(txn): def f(txn):
groups = set() sql = (
event_to_group = {} "SELECT event_id FROM state_groups_state WHERE"
for event_id in event_ids: " room_id = ? AND state_group = ? AND (%s)"
# TODO: Remove this loop. ) % (" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),)
group = self._simple_select_one_onecol_txn(
txn,
table="event_to_state_groups",
keyvalues={"event_id": event_id},
retcol="state_group",
allow_none=True,
)
if group:
event_to_group[event_id] = group
groups.add(group)
group_to_state_ids = {} args = [room_id, group]
for group in groups: args.extend([i for typ in types for i in typ])
state_ids = self._simple_select_onecol_txn( txn.execute(sql, args)
txn,
table="state_groups_state",
keyvalues={"state_group": group},
retcol="event_id",
)
group_to_state_ids[group] = state_ids return group, [
r[0]
for r in txn.fetchall()
]
return event_to_group, group_to_state_ids return self.runInteraction(
"_get_state_groups_from_group",
res = yield self.runInteraction(
"annotate_events_with_state_groups",
f, f,
) )
event_to_group, group_to_state_ids = res @cached(num_args=3, lru=True, max_entries=100000)
def _get_state_for_event_id(self, room_id, event_id, types):
def f(txn):
type_and_state_sql = " OR ".join([
"(type = ? AND state_key = ?)"
if typ[1] is not None
else "type = ?"
for typ in types
])
state_list = yield defer.gatherResults( sql = (
[ "SELECT sg.event_id FROM state_groups_state as sg"
self._fetch_events_for_group(group, vals) " INNER JOIN event_to_state_groups as e"
for group, vals in group_to_state_ids.items() " ON e.state_group = sg.state_group"
], " WHERE e.event_id = ? AND (%s)"
consumeErrors=True, ) % (type_and_state_sql,)
args = [event_id]
for typ, state_key in types:
args.extend(
[typ, state_key] if state_key is not None else [typ]
)
txn.execute(sql, args)
return event_id, [
r[0]
for r in txn.fetchall()
]
return self.runInteraction(
"_get_state_for_event_id",
f,
) )
state_dict = { @defer.inlineCallbacks
group: { def get_state_for_events(self, room_id, event_ids, types):
set_types = frozenset(types)
res = yield defer.gatherResults(
[
self._get_state_for_event_id(
room_id, event_id, set_types,
)
for event_id in event_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError)
event_to_state_ids = dict(res)
event_dict = yield self._get_events(
[
item
for lst in event_to_state_ids.values()
for item in lst
],
get_prev_content=False
).addCallback(
lambda evs: {ev.event_id: ev for ev in evs}
)
event_to_state = {
event_id: {
(ev.type, ev.state_key): ev (ev.type, ev.state_key): ev
for ev in state for ev in [
event_dict[state_id]
for state_id in state_ids
if state_id in event_dict
]
} }
for group, state in state_list for event_id, state_ids in event_to_state_ids.items()
} }
defer.returnValue([ defer.returnValue([
state_dict.get(event_to_group.get(event, None), None) event_to_state[event]
for event in event_ids for event in event_ids
]) ])