Make _state_groups_id_gen a normal IdGenerator
This commit is contained in:
parent
3e784eff74
commit
5dc2a702cf
|
@ -115,7 +115,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
)
|
)
|
||||||
|
|
||||||
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
|
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
|
||||||
self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id")
|
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
|
||||||
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
|
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
|
||||||
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
|
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
|
||||||
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
||||||
|
|
|
@ -271,39 +271,35 @@ class EventsStore(SQLBaseStore):
|
||||||
len(events_and_contexts)
|
len(events_and_contexts)
|
||||||
)
|
)
|
||||||
|
|
||||||
state_group_id_manager = self._state_groups_id_gen.get_next_mult(
|
|
||||||
len(events_and_contexts)
|
|
||||||
)
|
|
||||||
with stream_ordering_manager as stream_orderings:
|
with stream_ordering_manager as stream_orderings:
|
||||||
with state_group_id_manager as state_group_ids:
|
for (event, context), stream, in zip(
|
||||||
for (event, context), stream, state_group_id in zip(
|
events_and_contexts, stream_orderings
|
||||||
events_and_contexts, stream_orderings, state_group_ids
|
):
|
||||||
):
|
event.internal_metadata.stream_ordering = stream
|
||||||
event.internal_metadata.stream_ordering = stream
|
# Assign a state group_id in case a new id is needed for
|
||||||
# Assign a state group_id in case a new id is needed for
|
# this context. In theory we only need to assign this
|
||||||
# this context. In theory we only need to assign this
|
# for contexts that have current_state and aren't outliers
|
||||||
# for contexts that have current_state and aren't outliers
|
# but that make the code more complicated. Assigning an ID
|
||||||
# but that make the code more complicated. Assigning an ID
|
# per event only causes the state_group_ids to grow as fast
|
||||||
# per event only causes the state_group_ids to grow as fast
|
# as the stream_ordering so in practise shouldn't be a problem.
|
||||||
# as the stream_ordering so in practise shouldn't be a problem.
|
context.new_state_group_id = self._state_groups_id_gen.get_next()
|
||||||
context.new_state_group_id = state_group_id
|
|
||||||
|
|
||||||
chunks = [
|
chunks = [
|
||||||
events_and_contexts[x:x + 100]
|
events_and_contexts[x:x + 100]
|
||||||
for x in xrange(0, len(events_and_contexts), 100)
|
for x in xrange(0, len(events_and_contexts), 100)
|
||||||
]
|
]
|
||||||
|
|
||||||
for chunk in chunks:
|
for chunk in chunks:
|
||||||
# We can't easily parallelize these since different chunks
|
# We can't easily parallelize these since different chunks
|
||||||
# might contain the same event. :(
|
# might contain the same event. :(
|
||||||
yield self.runInteraction(
|
yield self.runInteraction(
|
||||||
"persist_events",
|
"persist_events",
|
||||||
self._persist_events_txn,
|
self._persist_events_txn,
|
||||||
events_and_contexts=chunk,
|
events_and_contexts=chunk,
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
delete_existing=delete_existing,
|
delete_existing=delete_existing,
|
||||||
)
|
)
|
||||||
persist_event_counter.inc_by(len(chunk))
|
persist_event_counter.inc_by(len(chunk))
|
||||||
|
|
||||||
@_retry_on_integrity_error
|
@_retry_on_integrity_error
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -312,19 +308,18 @@ class EventsStore(SQLBaseStore):
|
||||||
delete_existing=False):
|
delete_existing=False):
|
||||||
try:
|
try:
|
||||||
with self._stream_id_gen.get_next() as stream_ordering:
|
with self._stream_id_gen.get_next() as stream_ordering:
|
||||||
with self._state_groups_id_gen.get_next() as state_group_id:
|
event.internal_metadata.stream_ordering = stream_ordering
|
||||||
event.internal_metadata.stream_ordering = stream_ordering
|
context.new_state_group_id = self._state_groups_id_gen.get_next()
|
||||||
context.new_state_group_id = state_group_id
|
yield self.runInteraction(
|
||||||
yield self.runInteraction(
|
"persist_event",
|
||||||
"persist_event",
|
self._persist_event_txn,
|
||||||
self._persist_event_txn,
|
event=event,
|
||||||
event=event,
|
context=context,
|
||||||
context=context,
|
current_state=current_state,
|
||||||
current_state=current_state,
|
backfilled=backfilled,
|
||||||
backfilled=backfilled,
|
delete_existing=delete_existing,
|
||||||
delete_existing=delete_existing,
|
)
|
||||||
)
|
persist_event_counter.inc()
|
||||||
persist_event_counter.inc()
|
|
||||||
except _RollbackButIsFineException:
|
except _RollbackButIsFineException:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
@ -526,6 +526,3 @@ class StateStore(SQLBaseStore):
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"get_all_new_state_groups", get_all_new_state_groups_txn
|
"get_all_new_state_groups", get_all_new_state_groups_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_state_stream_token(self):
|
|
||||||
return self._state_groups_id_gen.get_current_token()
|
|
||||||
|
|
Loading…
Reference in New Issue