Move state's bg updates to a dedicated store
This commit is contained in:
parent
841054ad96
commit
cfccd2d78a
|
@ -353,8 +353,158 @@ class StateFilter(object):
|
||||||
return member_filter, non_member_filter
|
return member_filter, non_member_filter
|
||||||
|
|
||||||
|
|
||||||
|
class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
||||||
|
"""Defines functions related to state groups needed to run the state backgroud
|
||||||
|
updates.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _count_state_group_hops_txn(self, txn, state_group):
|
||||||
|
"""Given a state group, count how many hops there are in the tree.
|
||||||
|
|
||||||
|
This is used to ensure the delta chains don't get too long.
|
||||||
|
"""
|
||||||
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
sql = """
|
||||||
|
WITH RECURSIVE state(state_group) AS (
|
||||||
|
VALUES(?::bigint)
|
||||||
|
UNION ALL
|
||||||
|
SELECT prev_state_group FROM state_group_edges e, state s
|
||||||
|
WHERE s.state_group = e.state_group
|
||||||
|
)
|
||||||
|
SELECT count(*) FROM state;
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (state_group,))
|
||||||
|
row = txn.fetchone()
|
||||||
|
if row and row[0]:
|
||||||
|
return row[0]
|
||||||
|
else:
|
||||||
|
return 0
|
||||||
|
else:
|
||||||
|
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
||||||
|
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
||||||
|
next_group = state_group
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
while next_group:
|
||||||
|
next_group = self._simple_select_one_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="state_group_edges",
|
||||||
|
keyvalues={"state_group": next_group},
|
||||||
|
retcol="prev_state_group",
|
||||||
|
allow_none=True,
|
||||||
|
)
|
||||||
|
if next_group:
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
return count
|
||||||
|
|
||||||
|
def _get_state_groups_from_groups_txn(
|
||||||
|
self, txn, groups, state_filter=StateFilter.all()
|
||||||
|
):
|
||||||
|
results = {group: {} for group in groups}
|
||||||
|
|
||||||
|
where_clause, where_args = state_filter.make_sql_filter_clause()
|
||||||
|
|
||||||
|
# Unless the filter clause is empty, we're going to append it after an
|
||||||
|
# existing where clause
|
||||||
|
if where_clause:
|
||||||
|
where_clause = " AND (%s)" % (where_clause,)
|
||||||
|
|
||||||
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
# Temporarily disable sequential scans in this transaction. This is
|
||||||
|
# a temporary hack until we can add the right indices in
|
||||||
|
txn.execute("SET LOCAL enable_seqscan=off")
|
||||||
|
|
||||||
|
# The below query walks the state_group tree so that the "state"
|
||||||
|
# table includes all state_groups in the tree. It then joins
|
||||||
|
# against `state_groups_state` to fetch the latest state.
|
||||||
|
# It assumes that previous state groups are always numerically
|
||||||
|
# lesser.
|
||||||
|
# The PARTITION is used to get the event_id in the greatest state
|
||||||
|
# group for the given type, state_key.
|
||||||
|
# This may return multiple rows per (type, state_key), but last_value
|
||||||
|
# should be the same.
|
||||||
|
sql = """
|
||||||
|
WITH RECURSIVE state(state_group) AS (
|
||||||
|
VALUES(?::bigint)
|
||||||
|
UNION ALL
|
||||||
|
SELECT prev_state_group FROM state_group_edges e, state s
|
||||||
|
WHERE s.state_group = e.state_group
|
||||||
|
)
|
||||||
|
SELECT DISTINCT type, state_key, last_value(event_id) OVER (
|
||||||
|
PARTITION BY type, state_key ORDER BY state_group ASC
|
||||||
|
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
|
||||||
|
) AS event_id FROM state_groups_state
|
||||||
|
WHERE state_group IN (
|
||||||
|
SELECT state_group FROM state
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
|
||||||
|
for group in groups:
|
||||||
|
args = [group]
|
||||||
|
args.extend(where_args)
|
||||||
|
|
||||||
|
txn.execute(sql + where_clause, args)
|
||||||
|
for row in txn:
|
||||||
|
typ, state_key, event_id = row
|
||||||
|
key = (typ, state_key)
|
||||||
|
results[group][key] = event_id
|
||||||
|
else:
|
||||||
|
max_entries_returned = state_filter.max_entries_returned()
|
||||||
|
|
||||||
|
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
||||||
|
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
||||||
|
for group in groups:
|
||||||
|
next_group = group
|
||||||
|
|
||||||
|
while next_group:
|
||||||
|
# We did this before by getting the list of group ids, and
|
||||||
|
# then passing that list to sqlite to get latest event for
|
||||||
|
# each (type, state_key). However, that was terribly slow
|
||||||
|
# without the right indices (which we can't add until
|
||||||
|
# after we finish deduping state, which requires this func)
|
||||||
|
args = [next_group]
|
||||||
|
args.extend(where_args)
|
||||||
|
|
||||||
|
txn.execute(
|
||||||
|
"SELECT type, state_key, event_id FROM state_groups_state"
|
||||||
|
" WHERE state_group = ? " + where_clause,
|
||||||
|
args,
|
||||||
|
)
|
||||||
|
results[group].update(
|
||||||
|
((typ, state_key), event_id)
|
||||||
|
for typ, state_key, event_id in txn
|
||||||
|
if (typ, state_key) not in results[group]
|
||||||
|
)
|
||||||
|
|
||||||
|
# If the number of entries in the (type,state_key)->event_id dict
|
||||||
|
# matches the number of (type,state_keys) types we were searching
|
||||||
|
# for, then we must have found them all, so no need to go walk
|
||||||
|
# further down the tree... UNLESS our types filter contained
|
||||||
|
# wildcards (i.e. Nones) in which case we have to do an exhaustive
|
||||||
|
# search
|
||||||
|
if (
|
||||||
|
max_entries_returned is not None
|
||||||
|
and len(results[group]) == max_entries_returned
|
||||||
|
):
|
||||||
|
break
|
||||||
|
|
||||||
|
next_group = self._simple_select_one_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="state_group_edges",
|
||||||
|
keyvalues={"state_group": next_group},
|
||||||
|
retcol="prev_state_group",
|
||||||
|
allow_none=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
# this inherits from EventsWorkerStore because it calls self.get_events
|
# this inherits from EventsWorkerStore because it calls self.get_events
|
||||||
class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
class StateGroupWorkerStore(
|
||||||
|
EventsWorkerStore, StateGroupBackgroundUpdateStore, SQLBaseStore
|
||||||
|
):
|
||||||
"""The parts of StateGroupStore that can be called from workers.
|
"""The parts of StateGroupStore that can be called from workers.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -694,107 +844,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
def _get_state_groups_from_groups_txn(
|
|
||||||
self, txn, groups, state_filter=StateFilter.all()
|
|
||||||
):
|
|
||||||
results = {group: {} for group in groups}
|
|
||||||
|
|
||||||
where_clause, where_args = state_filter.make_sql_filter_clause()
|
|
||||||
|
|
||||||
# Unless the filter clause is empty, we're going to append it after an
|
|
||||||
# existing where clause
|
|
||||||
if where_clause:
|
|
||||||
where_clause = " AND (%s)" % (where_clause,)
|
|
||||||
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
|
||||||
# Temporarily disable sequential scans in this transaction. This is
|
|
||||||
# a temporary hack until we can add the right indices in
|
|
||||||
txn.execute("SET LOCAL enable_seqscan=off")
|
|
||||||
|
|
||||||
# The below query walks the state_group tree so that the "state"
|
|
||||||
# table includes all state_groups in the tree. It then joins
|
|
||||||
# against `state_groups_state` to fetch the latest state.
|
|
||||||
# It assumes that previous state groups are always numerically
|
|
||||||
# lesser.
|
|
||||||
# The PARTITION is used to get the event_id in the greatest state
|
|
||||||
# group for the given type, state_key.
|
|
||||||
# This may return multiple rows per (type, state_key), but last_value
|
|
||||||
# should be the same.
|
|
||||||
sql = """
|
|
||||||
WITH RECURSIVE state(state_group) AS (
|
|
||||||
VALUES(?::bigint)
|
|
||||||
UNION ALL
|
|
||||||
SELECT prev_state_group FROM state_group_edges e, state s
|
|
||||||
WHERE s.state_group = e.state_group
|
|
||||||
)
|
|
||||||
SELECT DISTINCT type, state_key, last_value(event_id) OVER (
|
|
||||||
PARTITION BY type, state_key ORDER BY state_group ASC
|
|
||||||
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
|
|
||||||
) AS event_id FROM state_groups_state
|
|
||||||
WHERE state_group IN (
|
|
||||||
SELECT state_group FROM state
|
|
||||||
)
|
|
||||||
"""
|
|
||||||
|
|
||||||
for group in groups:
|
|
||||||
args = [group]
|
|
||||||
args.extend(where_args)
|
|
||||||
|
|
||||||
txn.execute(sql + where_clause, args)
|
|
||||||
for row in txn:
|
|
||||||
typ, state_key, event_id = row
|
|
||||||
key = (typ, state_key)
|
|
||||||
results[group][key] = event_id
|
|
||||||
else:
|
|
||||||
max_entries_returned = state_filter.max_entries_returned()
|
|
||||||
|
|
||||||
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
|
||||||
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
|
||||||
for group in groups:
|
|
||||||
next_group = group
|
|
||||||
|
|
||||||
while next_group:
|
|
||||||
# We did this before by getting the list of group ids, and
|
|
||||||
# then passing that list to sqlite to get latest event for
|
|
||||||
# each (type, state_key). However, that was terribly slow
|
|
||||||
# without the right indices (which we can't add until
|
|
||||||
# after we finish deduping state, which requires this func)
|
|
||||||
args = [next_group]
|
|
||||||
args.extend(where_args)
|
|
||||||
|
|
||||||
txn.execute(
|
|
||||||
"SELECT type, state_key, event_id FROM state_groups_state"
|
|
||||||
" WHERE state_group = ? " + where_clause,
|
|
||||||
args,
|
|
||||||
)
|
|
||||||
results[group].update(
|
|
||||||
((typ, state_key), event_id)
|
|
||||||
for typ, state_key, event_id in txn
|
|
||||||
if (typ, state_key) not in results[group]
|
|
||||||
)
|
|
||||||
|
|
||||||
# If the number of entries in the (type,state_key)->event_id dict
|
|
||||||
# matches the number of (type,state_keys) types we were searching
|
|
||||||
# for, then we must have found them all, so no need to go walk
|
|
||||||
# further down the tree... UNLESS our types filter contained
|
|
||||||
# wildcards (i.e. Nones) in which case we have to do an exhaustive
|
|
||||||
# search
|
|
||||||
if (
|
|
||||||
max_entries_returned is not None
|
|
||||||
and len(results[group]) == max_entries_returned
|
|
||||||
):
|
|
||||||
break
|
|
||||||
|
|
||||||
next_group = self._simple_select_one_onecol_txn(
|
|
||||||
txn,
|
|
||||||
table="state_group_edges",
|
|
||||||
keyvalues={"state_group": next_group},
|
|
||||||
retcol="prev_state_group",
|
|
||||||
allow_none=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_state_for_events(self, event_ids, state_filter=StateFilter.all()):
|
def get_state_for_events(self, event_ids, state_filter=StateFilter.all()):
|
||||||
"""Given a list of event_ids and type tuples, return a list of state
|
"""Given a list of event_ids and type tuples, return a list of state
|
||||||
|
@ -1238,66 +1287,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
return self.runInteraction("store_state_group", _store_state_group_txn)
|
return self.runInteraction("store_state_group", _store_state_group_txn)
|
||||||
|
|
||||||
def _count_state_group_hops_txn(self, txn, state_group):
|
|
||||||
"""Given a state group, count how many hops there are in the tree.
|
|
||||||
|
|
||||||
This is used to ensure the delta chains don't get too long.
|
class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore, BackgroundUpdateStore):
|
||||||
"""
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
|
||||||
sql = """
|
|
||||||
WITH RECURSIVE state(state_group) AS (
|
|
||||||
VALUES(?::bigint)
|
|
||||||
UNION ALL
|
|
||||||
SELECT prev_state_group FROM state_group_edges e, state s
|
|
||||||
WHERE s.state_group = e.state_group
|
|
||||||
)
|
|
||||||
SELECT count(*) FROM state;
|
|
||||||
"""
|
|
||||||
|
|
||||||
txn.execute(sql, (state_group,))
|
|
||||||
row = txn.fetchone()
|
|
||||||
if row and row[0]:
|
|
||||||
return row[0]
|
|
||||||
else:
|
|
||||||
return 0
|
|
||||||
else:
|
|
||||||
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
|
||||||
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
|
||||||
next_group = state_group
|
|
||||||
count = 0
|
|
||||||
|
|
||||||
while next_group:
|
|
||||||
next_group = self._simple_select_one_onecol_txn(
|
|
||||||
txn,
|
|
||||||
table="state_group_edges",
|
|
||||||
keyvalues={"state_group": next_group},
|
|
||||||
retcol="prev_state_group",
|
|
||||||
allow_none=True,
|
|
||||||
)
|
|
||||||
if next_group:
|
|
||||||
count += 1
|
|
||||||
|
|
||||||
return count
|
|
||||||
|
|
||||||
|
|
||||||
class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
|
||||||
""" Keeps track of the state at a given event.
|
|
||||||
|
|
||||||
This is done by the concept of `state groups`. Every event is a assigned
|
|
||||||
a state group (identified by an arbitrary string), which references a
|
|
||||||
collection of state events. The current state of an event is then the
|
|
||||||
collection of state events referenced by the event's state group.
|
|
||||||
|
|
||||||
Hence, every change in the current state causes a new state group to be
|
|
||||||
generated. However, if no change happens (e.g., if we get a message event
|
|
||||||
with only one parent it inherits the state group from its parent.)
|
|
||||||
|
|
||||||
There are three tables:
|
|
||||||
* `state_groups`: Stores group name, first event with in the group and
|
|
||||||
room id.
|
|
||||||
* `event_to_state_groups`: Maps events to state groups.
|
|
||||||
* `state_groups_state`: Maps state group to state events.
|
|
||||||
"""
|
|
||||||
|
|
||||||
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
|
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
|
||||||
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
|
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
|
||||||
|
@ -1305,7 +1296,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
||||||
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
|
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
|
||||||
|
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(StateStore, self).__init__(db_conn, hs)
|
super(StateBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||||
self.register_background_update_handler(
|
self.register_background_update_handler(
|
||||||
self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
|
self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
|
||||||
self._background_deduplicate_state,
|
self._background_deduplicate_state,
|
||||||
|
@ -1327,34 +1318,6 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
||||||
columns=["state_group"],
|
columns=["state_group"],
|
||||||
)
|
)
|
||||||
|
|
||||||
def _store_event_state_mappings_txn(self, txn, events_and_contexts):
|
|
||||||
state_groups = {}
|
|
||||||
for event, context in events_and_contexts:
|
|
||||||
if event.internal_metadata.is_outlier():
|
|
||||||
continue
|
|
||||||
|
|
||||||
# if the event was rejected, just give it the same state as its
|
|
||||||
# predecessor.
|
|
||||||
if context.rejected:
|
|
||||||
state_groups[event.event_id] = context.prev_group
|
|
||||||
continue
|
|
||||||
|
|
||||||
state_groups[event.event_id] = context.state_group
|
|
||||||
|
|
||||||
self._simple_insert_many_txn(
|
|
||||||
txn,
|
|
||||||
table="event_to_state_groups",
|
|
||||||
values=[
|
|
||||||
{"state_group": state_group_id, "event_id": event_id}
|
|
||||||
for event_id, state_group_id in iteritems(state_groups)
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
for event_id, state_group_id in iteritems(state_groups):
|
|
||||||
txn.call_after(
|
|
||||||
self._get_state_group_for_event.prefill, (event_id,), state_group_id
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _background_deduplicate_state(self, progress, batch_size):
|
def _background_deduplicate_state(self, progress, batch_size):
|
||||||
"""This background update will slowly deduplicate state by reencoding
|
"""This background update will slowly deduplicate state by reencoding
|
||||||
|
@ -1527,3 +1490,54 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
||||||
yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME)
|
yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME)
|
||||||
|
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore):
|
||||||
|
""" Keeps track of the state at a given event.
|
||||||
|
|
||||||
|
This is done by the concept of `state groups`. Every event is a assigned
|
||||||
|
a state group (identified by an arbitrary string), which references a
|
||||||
|
collection of state events. The current state of an event is then the
|
||||||
|
collection of state events referenced by the event's state group.
|
||||||
|
|
||||||
|
Hence, every change in the current state causes a new state group to be
|
||||||
|
generated. However, if no change happens (e.g., if we get a message event
|
||||||
|
with only one parent it inherits the state group from its parent.)
|
||||||
|
|
||||||
|
There are three tables:
|
||||||
|
* `state_groups`: Stores group name, first event with in the group and
|
||||||
|
room id.
|
||||||
|
* `event_to_state_groups`: Maps events to state groups.
|
||||||
|
* `state_groups_state`: Maps state group to state events.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, db_conn, hs):
|
||||||
|
super(StateStore, self).__init__(db_conn, hs)
|
||||||
|
|
||||||
|
def _store_event_state_mappings_txn(self, txn, events_and_contexts):
|
||||||
|
state_groups = {}
|
||||||
|
for event, context in events_and_contexts:
|
||||||
|
if event.internal_metadata.is_outlier():
|
||||||
|
continue
|
||||||
|
|
||||||
|
# if the event was rejected, just give it the same state as its
|
||||||
|
# predecessor.
|
||||||
|
if context.rejected:
|
||||||
|
state_groups[event.event_id] = context.prev_group
|
||||||
|
continue
|
||||||
|
|
||||||
|
state_groups[event.event_id] = context.state_group
|
||||||
|
|
||||||
|
self._simple_insert_many_txn(
|
||||||
|
txn,
|
||||||
|
table="event_to_state_groups",
|
||||||
|
values=[
|
||||||
|
{"state_group": state_group_id, "event_id": event_id}
|
||||||
|
for event_id, state_group_id in iteritems(state_groups)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
for event_id, state_group_id in iteritems(state_groups):
|
||||||
|
txn.call_after(
|
||||||
|
self._get_state_group_for_event.prefill, (event_id,), state_group_id
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue