Add new sequences to port DB script (#8387)

This commit is contained in:
Erik Johnston 2020-09-24 13:43:49 +01:00 committed by GitHub
parent ac11fcbbb8
commit 6fdf577593
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 0 deletions

1
changelog.d/8387.feature Normal file
View File

@ -0,0 +1 @@
Add experimental support for sharding event persister.

View File

@ -628,6 +628,7 @@ class Porter(object):
self.progress.set_state("Setting up sequence generators") self.progress.set_state("Setting up sequence generators")
await self._setup_state_group_id_seq() await self._setup_state_group_id_seq()
await self._setup_user_id_seq() await self._setup_user_id_seq()
await self._setup_events_stream_seqs()
self.progress.done() self.progress.done()
except Exception as e: except Exception as e:
@ -804,6 +805,29 @@ class Porter(object):
return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r) return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
def _setup_events_stream_seqs(self):
def r(txn):
txn.execute("SELECT MAX(stream_ordering) FROM events")
curr_id = txn.fetchone()[0]
if curr_id:
next_id = curr_id + 1
txn.execute(
"ALTER SEQUENCE events_stream_seq RESTART WITH %s", (next_id,)
)
txn.execute("SELECT -MIN(stream_ordering) FROM events")
curr_id = txn.fetchone()[0]
if curr_id:
next_id = curr_id + 1
txn.execute(
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
(next_id,),
)
return self.postgres_store.db_pool.runInteraction(
"_setup_events_stream_seqs", r
)
############################################## ##############################################
# The following is simply UI stuff # The following is simply UI stuff