Fix bgupdate error if index already exists (#2167)
When creating a new table index in the background, guard against it existing already. Fixes https://github.com/matrix-org/synapse/issues/2135. Also, make sure we restore the autocommit flag when we're done, otherwise we get more failures from other operations later on. Fixes https://github.com/matrix-org/synapse/issues/1890 (hopefully).
This commit is contained in:
parent
380fb87ecc
commit
c84770b877
|
@ -228,46 +228,69 @@ class BackgroundUpdateStore(SQLBaseStore):
|
||||||
columns (list[str]): columns/expressions to include in index
|
columns (list[str]): columns/expressions to include in index
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# if this is postgres, we add the indexes concurrently. Otherwise
|
def create_index_psql(conn):
|
||||||
# we fall back to doing it inline
|
|
||||||
if isinstance(self.database_engine, engines.PostgresEngine):
|
|
||||||
conc = True
|
|
||||||
else:
|
|
||||||
conc = False
|
|
||||||
# We don't use partial indices on SQLite as it wasn't introduced
|
|
||||||
# until 3.8, and wheezy has 3.7
|
|
||||||
where_clause = None
|
|
||||||
|
|
||||||
sql = (
|
|
||||||
"CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)"
|
|
||||||
" %(where_clause)s"
|
|
||||||
) % {
|
|
||||||
"conc": "CONCURRENTLY" if conc else "",
|
|
||||||
"name": index_name,
|
|
||||||
"table": table,
|
|
||||||
"columns": ", ".join(columns),
|
|
||||||
"where_clause": "WHERE " + where_clause if where_clause else ""
|
|
||||||
}
|
|
||||||
|
|
||||||
def create_index_concurrently(conn):
|
|
||||||
conn.rollback()
|
conn.rollback()
|
||||||
# postgres insists on autocommit for the index
|
# postgres insists on autocommit for the index
|
||||||
conn.set_session(autocommit=True)
|
conn.set_session(autocommit=True)
|
||||||
c = conn.cursor()
|
|
||||||
c.execute(sql)
|
|
||||||
conn.set_session(autocommit=False)
|
|
||||||
|
|
||||||
def create_index(conn):
|
try:
|
||||||
|
c = conn.cursor()
|
||||||
|
|
||||||
|
# If a previous attempt to create the index was interrupted,
|
||||||
|
# we may already have a half-built index. Let's just drop it
|
||||||
|
# before trying to create it again.
|
||||||
|
|
||||||
|
sql = "DROP INDEX IF EXISTS %s" % (index_name,)
|
||||||
|
logger.debug("[SQL] %s", sql)
|
||||||
|
c.execute(sql)
|
||||||
|
|
||||||
|
sql = (
|
||||||
|
"CREATE INDEX CONCURRENTLY %(name)s ON %(table)s"
|
||||||
|
" (%(columns)s) %(where_clause)s"
|
||||||
|
) % {
|
||||||
|
"name": index_name,
|
||||||
|
"table": table,
|
||||||
|
"columns": ", ".join(columns),
|
||||||
|
"where_clause": "WHERE " + where_clause if where_clause else ""
|
||||||
|
}
|
||||||
|
logger.debug("[SQL] %s", sql)
|
||||||
|
c.execute(sql)
|
||||||
|
finally:
|
||||||
|
conn.set_session(autocommit=False)
|
||||||
|
|
||||||
|
def create_index_sqlite(conn):
|
||||||
|
# Sqlite doesn't support concurrent creation of indexes.
|
||||||
|
#
|
||||||
|
# We don't use partial indices on SQLite as it wasn't introduced
|
||||||
|
# until 3.8, and wheezy has 3.7
|
||||||
|
#
|
||||||
|
# We assume that sqlite doesn't give us invalid indices; however
|
||||||
|
# we may still end up with the index existing but the
|
||||||
|
# background_updates not having been recorded if synapse got shut
|
||||||
|
# down at the wrong moment - hance we use IF NOT EXISTS. (SQLite
|
||||||
|
# has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.)
|
||||||
|
sql = (
|
||||||
|
"CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s"
|
||||||
|
" (%(columns)s)"
|
||||||
|
) % {
|
||||||
|
"name": index_name,
|
||||||
|
"table": table,
|
||||||
|
"columns": ", ".join(columns),
|
||||||
|
}
|
||||||
|
|
||||||
c = conn.cursor()
|
c = conn.cursor()
|
||||||
|
logger.debug("[SQL] %s", sql)
|
||||||
c.execute(sql)
|
c.execute(sql)
|
||||||
|
|
||||||
|
if isinstance(self.database_engine, engines.PostgresEngine):
|
||||||
|
runner = create_index_psql
|
||||||
|
else:
|
||||||
|
runner = create_index_sqlite
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def updater(progress, batch_size):
|
def updater(progress, batch_size):
|
||||||
logger.info("Adding index %s to %s", index_name, table)
|
logger.info("Adding index %s to %s", index_name, table)
|
||||||
if conc:
|
yield self.runWithConnection(runner)
|
||||||
yield self.runWithConnection(create_index_concurrently)
|
|
||||||
else:
|
|
||||||
yield self.runWithConnection(create_index)
|
|
||||||
yield self._end_background_update(update_name)
|
yield self._end_background_update(update_name)
|
||||||
defer.returnValue(1)
|
defer.returnValue(1)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue