Merge pull request #835 from matrix-org/erikj/get_event_txn
Remove event fetching from DB threads
This commit is contained in:
commit
f6be734be9
|
@ -131,15 +131,10 @@ class SlavedEventStore(BaseSlavedStore):
|
|||
_get_events_from_cache = DataStore._get_events_from_cache.__func__
|
||||
|
||||
_invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
|
||||
_parse_events_txn = DataStore._parse_events_txn.__func__
|
||||
_get_events_txn = DataStore._get_events_txn.__func__
|
||||
_get_event_txn = DataStore._get_event_txn.__func__
|
||||
_enqueue_events = DataStore._enqueue_events.__func__
|
||||
_do_fetch = DataStore._do_fetch.__func__
|
||||
_fetch_events_txn = DataStore._fetch_events_txn.__func__
|
||||
_fetch_event_rows = DataStore._fetch_event_rows.__func__
|
||||
_get_event_from_row = DataStore._get_event_from_row.__func__
|
||||
_get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
|
||||
_get_rooms_for_user_where_membership_is_txn = (
|
||||
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
|
||||
)
|
||||
|
|
|
@ -298,6 +298,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
|||
dict(txn_id=txn_id, as_id=service.id)
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_oldest_unsent_txn(self, service):
|
||||
"""Get the oldest transaction which has not been sent for this
|
||||
service.
|
||||
|
@ -308,12 +309,23 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
|||
A Deferred which resolves to an AppServiceTransaction or
|
||||
None.
|
||||
"""
|
||||
return self.runInteraction(
|
||||
entry = yield self.runInteraction(
|
||||
"get_oldest_unsent_appservice_txn",
|
||||
self._get_oldest_unsent_txn,
|
||||
service
|
||||
)
|
||||
|
||||
if not entry:
|
||||
defer.returnValue(None)
|
||||
|
||||
event_ids = json.loads(entry["event_ids"])
|
||||
|
||||
events = yield self.get_events(event_ids)
|
||||
|
||||
defer.returnValue(AppServiceTransaction(
|
||||
service=service, id=entry["txn_id"], events=events
|
||||
))
|
||||
|
||||
def _get_oldest_unsent_txn(self, txn, service):
|
||||
# Monotonically increasing txn ids, so just select the smallest
|
||||
# one in the txns table (we delete them when they are sent)
|
||||
|
@ -328,12 +340,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
|||
|
||||
entry = rows[0]
|
||||
|
||||
event_ids = json.loads(entry["event_ids"])
|
||||
events = self._get_events_txn(txn, event_ids)
|
||||
|
||||
return AppServiceTransaction(
|
||||
service=service, id=entry["txn_id"], events=events
|
||||
)
|
||||
return entry
|
||||
|
||||
def _get_last_txn(self, txn, service_id):
|
||||
txn.execute(
|
||||
|
|
|
@ -762,41 +762,6 @@ class EventsStore(SQLBaseStore):
|
|||
if e_id in event_map and event_map[e_id]
|
||||
])
|
||||
|
||||
def _get_events_txn(self, txn, event_ids, check_redacted=True,
|
||||
get_prev_content=False, allow_rejected=False):
|
||||
if not event_ids:
|
||||
return []
|
||||
|
||||
event_map = self._get_events_from_cache(
|
||||
event_ids,
|
||||
check_redacted=check_redacted,
|
||||
get_prev_content=get_prev_content,
|
||||
allow_rejected=allow_rejected,
|
||||
)
|
||||
|
||||
missing_events_ids = [e for e in event_ids if e not in event_map]
|
||||
|
||||
if not missing_events_ids:
|
||||
return [
|
||||
event_map[e_id] for e_id in event_ids
|
||||
if e_id in event_map and event_map[e_id]
|
||||
]
|
||||
|
||||
missing_events = self._fetch_events_txn(
|
||||
txn,
|
||||
missing_events_ids,
|
||||
check_redacted=check_redacted,
|
||||
get_prev_content=get_prev_content,
|
||||
allow_rejected=allow_rejected,
|
||||
)
|
||||
|
||||
event_map.update(missing_events)
|
||||
|
||||
return [
|
||||
event_map[e_id] for e_id in event_ids
|
||||
if e_id in event_map and event_map[e_id]
|
||||
]
|
||||
|
||||
def _invalidate_get_event_cache(self, event_id):
|
||||
for check_redacted in (False, True):
|
||||
for get_prev_content in (False, True):
|
||||
|
@ -804,18 +769,6 @@ class EventsStore(SQLBaseStore):
|
|||
(event_id, check_redacted, get_prev_content)
|
||||
)
|
||||
|
||||
def _get_event_txn(self, txn, event_id, check_redacted=True,
|
||||
get_prev_content=False, allow_rejected=False):
|
||||
|
||||
events = self._get_events_txn(
|
||||
txn, [event_id],
|
||||
check_redacted=check_redacted,
|
||||
get_prev_content=get_prev_content,
|
||||
allow_rejected=allow_rejected,
|
||||
)
|
||||
|
||||
return events[0] if events else None
|
||||
|
||||
def _get_events_from_cache(self, events, check_redacted, get_prev_content,
|
||||
allow_rejected):
|
||||
event_map = {}
|
||||
|
@ -981,34 +934,6 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
return rows
|
||||
|
||||
def _fetch_events_txn(self, txn, events, check_redacted=True,
|
||||
get_prev_content=False, allow_rejected=False):
|
||||
if not events:
|
||||
return {}
|
||||
|
||||
rows = self._fetch_event_rows(
|
||||
txn, events,
|
||||
)
|
||||
|
||||
if not allow_rejected:
|
||||
rows[:] = [r for r in rows if not r["rejects"]]
|
||||
|
||||
res = [
|
||||
self._get_event_from_row_txn(
|
||||
txn,
|
||||
row["internal_metadata"], row["json"], row["redacts"],
|
||||
check_redacted=check_redacted,
|
||||
get_prev_content=get_prev_content,
|
||||
rejected_reason=row["rejects"],
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
return {
|
||||
r.event_id: r
|
||||
for r in res
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_event_from_row(self, internal_metadata, js, redacted,
|
||||
check_redacted=True, get_prev_content=False,
|
||||
|
@ -1070,69 +995,6 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
defer.returnValue(ev)
|
||||
|
||||
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
|
||||
check_redacted=True, get_prev_content=False,
|
||||
rejected_reason=None):
|
||||
d = json.loads(js)
|
||||
internal_metadata = json.loads(internal_metadata)
|
||||
|
||||
if rejected_reason:
|
||||
rejected_reason = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="rejections",
|
||||
keyvalues={"event_id": rejected_reason},
|
||||
retcol="reason",
|
||||
)
|
||||
|
||||
ev = FrozenEvent(
|
||||
d,
|
||||
internal_metadata_dict=internal_metadata,
|
||||
rejected_reason=rejected_reason,
|
||||
)
|
||||
|
||||
if check_redacted and redacted:
|
||||
ev = prune_event(ev)
|
||||
|
||||
redaction_id = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="redactions",
|
||||
keyvalues={"redacts": ev.event_id},
|
||||
retcol="event_id",
|
||||
)
|
||||
|
||||
ev.unsigned["redacted_by"] = redaction_id
|
||||
# Get the redaction event.
|
||||
|
||||
because = self._get_event_txn(
|
||||
txn,
|
||||
redaction_id,
|
||||
check_redacted=False
|
||||
)
|
||||
|
||||
if because:
|
||||
ev.unsigned["redacted_because"] = because
|
||||
|
||||
if get_prev_content and "replaces_state" in ev.unsigned:
|
||||
prev = self._get_event_txn(
|
||||
txn,
|
||||
ev.unsigned["replaces_state"],
|
||||
get_prev_content=False,
|
||||
)
|
||||
if prev:
|
||||
ev.unsigned["prev_content"] = prev.content
|
||||
ev.unsigned["prev_sender"] = prev.sender
|
||||
|
||||
self._get_event_cache.prefill(
|
||||
(ev.event_id, check_redacted, get_prev_content), ev
|
||||
)
|
||||
|
||||
return ev
|
||||
|
||||
def _parse_events_txn(self, txn, rows):
|
||||
event_ids = [r["event_id"] for r in rows]
|
||||
|
||||
return self._get_events_txn(txn, event_ids)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def count_daily_messages(self):
|
||||
"""
|
||||
|
|
|
@ -194,32 +194,44 @@ class RoomStore(SQLBaseStore):
|
|||
|
||||
@cachedInlineCallbacks()
|
||||
def get_room_name_and_aliases(self, room_id):
|
||||
def f(txn):
|
||||
def get_room_name(txn):
|
||||
sql = (
|
||||
"SELECT event_id FROM current_state_events "
|
||||
"WHERE room_id = ? "
|
||||
"SELECT name FROM room_names"
|
||||
" INNER JOIN current_state_events USING (room_id, event_id)"
|
||||
" WHERE room_id = ?"
|
||||
" LIMIT 1"
|
||||
)
|
||||
|
||||
sql += " AND ((type = 'm.room.name' AND state_key = '')"
|
||||
sql += " OR type = 'm.room.aliases')"
|
||||
|
||||
txn.execute(sql, (room_id,))
|
||||
results = self.cursor_to_dict(txn)
|
||||
rows = txn.fetchall()
|
||||
if rows:
|
||||
return rows[0][0]
|
||||
else:
|
||||
return None
|
||||
|
||||
return self._parse_events_txn(txn, results)
|
||||
return [row[0] for row in txn.fetchall()]
|
||||
|
||||
events = yield self.runInteraction("get_room_name_and_aliases", f)
|
||||
def get_room_aliases(txn):
|
||||
sql = (
|
||||
"SELECT content FROM current_state_events"
|
||||
" INNER JOIN events USING (room_id, event_id)"
|
||||
" WHERE room_id = ?"
|
||||
)
|
||||
txn.execute(sql, (room_id,))
|
||||
return [row[0] for row in txn.fetchall()]
|
||||
|
||||
name = yield self.runInteraction("get_room_name", get_room_name)
|
||||
alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases)
|
||||
|
||||
name = None
|
||||
aliases = []
|
||||
|
||||
for e in events:
|
||||
if e.type == 'm.room.name':
|
||||
if 'name' in e.content:
|
||||
name = e.content['name']
|
||||
elif e.type == 'm.room.aliases':
|
||||
if 'aliases' in e.content:
|
||||
aliases.extend(e.content['aliases'])
|
||||
for c in alias_contents:
|
||||
try:
|
||||
content = json.loads(c)
|
||||
except:
|
||||
continue
|
||||
|
||||
aliases.extend(content.get('aliases', []))
|
||||
|
||||
defer.returnValue((name, aliases))
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
|||
|
||||
import logging
|
||||
import re
|
||||
import ujson as json
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -52,7 +53,7 @@ class SearchStore(BackgroundUpdateStore):
|
|||
|
||||
def reindex_search_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id FROM events"
|
||||
"SELECT stream_ordering, event_id, room_id, type, content FROM events"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" AND (%s)"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
|
@ -61,28 +62,30 @@ class SearchStore(BackgroundUpdateStore):
|
|||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
min_stream_id = rows[-1][0]
|
||||
event_ids = [row[1] for row in rows]
|
||||
|
||||
events = self._get_events_txn(txn, event_ids)
|
||||
min_stream_id = rows[-1]["stream_ordering"]
|
||||
|
||||
event_search_rows = []
|
||||
for event in events:
|
||||
for row in rows:
|
||||
try:
|
||||
event_id = event.event_id
|
||||
room_id = event.room_id
|
||||
content = event.content
|
||||
if event.type == "m.room.message":
|
||||
event_id = row["event_id"]
|
||||
room_id = row["room_id"]
|
||||
etype = row["type"]
|
||||
try:
|
||||
content = json.loads(row["content"])
|
||||
except:
|
||||
continue
|
||||
|
||||
if etype == "m.room.message":
|
||||
key = "content.body"
|
||||
value = content["body"]
|
||||
elif event.type == "m.room.topic":
|
||||
elif etype == "m.room.topic":
|
||||
key = "content.topic"
|
||||
value = content["topic"]
|
||||
elif event.type == "m.room.name":
|
||||
elif etype == "m.room.name":
|
||||
key = "content.name"
|
||||
value = content["name"]
|
||||
except (KeyError, AttributeError):
|
||||
|
|
|
@ -132,17 +132,16 @@ class StreamStore(SQLBaseStore):
|
|||
return True
|
||||
return False
|
||||
|
||||
ret = self._get_events_txn(
|
||||
txn,
|
||||
# apply the filter on the room id list
|
||||
[
|
||||
r["event_id"] for r in rows
|
||||
if app_service_interested(r)
|
||||
],
|
||||
return [r for r in rows if app_service_interested(r)]
|
||||
|
||||
rows = yield self.runInteraction("get_appservice_room_stream", f)
|
||||
|
||||
ret = yield self._get_events(
|
||||
[r["event_id"] for r in rows],
|
||||
get_prev_content=True
|
||||
)
|
||||
|
||||
self._set_before_and_after(ret, rows)
|
||||
self._set_before_and_after(ret, rows, topo_order=from_id is None)
|
||||
|
||||
if rows:
|
||||
key = "s%d" % max(r["stream_ordering"] for r in rows)
|
||||
|
@ -151,10 +150,7 @@ class StreamStore(SQLBaseStore):
|
|||
# get.
|
||||
key = to_key
|
||||
|
||||
return ret, key
|
||||
|
||||
results = yield self.runInteraction("get_appservice_room_stream", f)
|
||||
defer.returnValue(results)
|
||||
defer.returnValue((ret, key))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
|
||||
|
|
|
@ -357,7 +357,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
|||
other_events = [Mock(event_id="e5"), Mock(event_id="e6")]
|
||||
|
||||
# we aren't testing store._base stuff here, so mock this out
|
||||
self.store._get_events_txn = Mock(return_value=events)
|
||||
self.store.get_events = Mock(return_value=events)
|
||||
|
||||
yield self._insert_txn(self.as_list[1]["id"], 9, other_events)
|
||||
yield self._insert_txn(service.id, 10, events)
|
||||
|
|
Loading…
Reference in New Issue