Run _upgrade_existing_database on workers if at current schema_version (#11346)
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
This commit is contained in:
parent
b596a1eb80
commit
9c59e117db
|
@ -0,0 +1 @@
|
||||||
|
Fix a bug introduced in v1.47.0rc1 which caused worker processes to not halt startup in the presence of outstanding database migrations.
|
|
@ -131,9 +131,9 @@ def prepare_database(
|
||||||
"config==None in prepare_database, but database is not empty"
|
"config==None in prepare_database, but database is not empty"
|
||||||
)
|
)
|
||||||
|
|
||||||
# if it's a worker app, refuse to upgrade the database, to avoid multiple
|
# This should be run on all processes, master or worker. The master will
|
||||||
# workers doing it at once.
|
# apply the deltas, while workers will check if any outstanding deltas
|
||||||
if config.worker.worker_app is None:
|
# exist and raise an PrepareDatabaseException if they do.
|
||||||
_upgrade_existing_database(
|
_upgrade_existing_database(
|
||||||
cur,
|
cur,
|
||||||
version_info,
|
version_info,
|
||||||
|
@ -141,14 +141,6 @@ def prepare_database(
|
||||||
config,
|
config,
|
||||||
databases=databases,
|
databases=databases,
|
||||||
)
|
)
|
||||||
elif version_info.current_version < SCHEMA_VERSION:
|
|
||||||
# If the DB is on an older version than we expect then we refuse
|
|
||||||
# to start the worker (as the main process needs to run first to
|
|
||||||
# update the schema).
|
|
||||||
raise UpgradeDatabaseException(
|
|
||||||
OUTDATED_SCHEMA_ON_WORKER_ERROR
|
|
||||||
% (SCHEMA_VERSION, version_info.current_version)
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.info("%r: Initialising new database", databases)
|
logger.info("%r: Initialising new database", databases)
|
||||||
|
@ -358,6 +350,18 @@ def _upgrade_existing_database(
|
||||||
|
|
||||||
is_worker = config and config.worker.worker_app is not None
|
is_worker = config and config.worker.worker_app is not None
|
||||||
|
|
||||||
|
# If the schema version needs to be updated, and we are on a worker, we immediately
|
||||||
|
# know to bail out as workers cannot update the database schema. Only one process
|
||||||
|
# must update the database at the time, therefore we delegate this task to the master.
|
||||||
|
if is_worker and current_schema_state.current_version < SCHEMA_VERSION:
|
||||||
|
# If the DB is on an older version than we expect then we refuse
|
||||||
|
# to start the worker (as the main process needs to run first to
|
||||||
|
# update the schema).
|
||||||
|
raise UpgradeDatabaseException(
|
||||||
|
OUTDATED_SCHEMA_ON_WORKER_ERROR
|
||||||
|
% (SCHEMA_VERSION, current_schema_state.current_version)
|
||||||
|
)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
current_schema_state.compat_version is not None
|
current_schema_state.compat_version is not None
|
||||||
and current_schema_state.compat_version > SCHEMA_VERSION
|
and current_schema_state.compat_version > SCHEMA_VERSION
|
||||||
|
|
|
@ -11,6 +11,9 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
from typing import List
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
from synapse.app.generic_worker import GenericWorkerServer
|
from synapse.app.generic_worker import GenericWorkerServer
|
||||||
from synapse.storage.database import LoggingDatabaseConnection
|
from synapse.storage.database import LoggingDatabaseConnection
|
||||||
from synapse.storage.prepare_database import PrepareDatabaseException, prepare_database
|
from synapse.storage.prepare_database import PrepareDatabaseException, prepare_database
|
||||||
|
@ -19,6 +22,22 @@ from synapse.storage.schema import SCHEMA_VERSION
|
||||||
from tests.unittest import HomeserverTestCase
|
from tests.unittest import HomeserverTestCase
|
||||||
|
|
||||||
|
|
||||||
|
def fake_listdir(filepath: str) -> List[str]:
|
||||||
|
"""
|
||||||
|
A fake implementation of os.listdir which we can use to mock out the filesystem.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath: The directory to list files for.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A list of files and folders in the directory.
|
||||||
|
"""
|
||||||
|
if filepath.endswith("full_schemas"):
|
||||||
|
return [SCHEMA_VERSION]
|
||||||
|
|
||||||
|
return ["99_add_unicorn_to_database.sql"]
|
||||||
|
|
||||||
|
|
||||||
class WorkerSchemaTests(HomeserverTestCase):
|
class WorkerSchemaTests(HomeserverTestCase):
|
||||||
def make_homeserver(self, reactor, clock):
|
def make_homeserver(self, reactor, clock):
|
||||||
hs = self.setup_test_homeserver(
|
hs = self.setup_test_homeserver(
|
||||||
|
@ -51,7 +70,7 @@ class WorkerSchemaTests(HomeserverTestCase):
|
||||||
|
|
||||||
prepare_database(db_conn, db_pool.engine, self.hs.config)
|
prepare_database(db_conn, db_pool.engine, self.hs.config)
|
||||||
|
|
||||||
def test_not_upgraded(self):
|
def test_not_upgraded_old_schema_version(self):
|
||||||
"""Test that workers don't start if the DB has an older schema version"""
|
"""Test that workers don't start if the DB has an older schema version"""
|
||||||
db_pool = self.hs.get_datastore().db_pool
|
db_pool = self.hs.get_datastore().db_pool
|
||||||
db_conn = LoggingDatabaseConnection(
|
db_conn = LoggingDatabaseConnection(
|
||||||
|
@ -67,3 +86,34 @@ class WorkerSchemaTests(HomeserverTestCase):
|
||||||
|
|
||||||
with self.assertRaises(PrepareDatabaseException):
|
with self.assertRaises(PrepareDatabaseException):
|
||||||
prepare_database(db_conn, db_pool.engine, self.hs.config)
|
prepare_database(db_conn, db_pool.engine, self.hs.config)
|
||||||
|
|
||||||
|
def test_not_upgraded_current_schema_version_with_outstanding_deltas(self):
|
||||||
|
"""
|
||||||
|
Test that workers don't start if the DB is on the current schema version,
|
||||||
|
but there are still outstanding delta migrations to run.
|
||||||
|
"""
|
||||||
|
db_pool = self.hs.get_datastore().db_pool
|
||||||
|
db_conn = LoggingDatabaseConnection(
|
||||||
|
db_pool._db_pool.connect(),
|
||||||
|
db_pool.engine,
|
||||||
|
"tests",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set the schema version of the database to the current version
|
||||||
|
cur = db_conn.cursor()
|
||||||
|
cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION,))
|
||||||
|
|
||||||
|
db_conn.commit()
|
||||||
|
|
||||||
|
# Path `os.listdir` here to make synapse think that there is a migration
|
||||||
|
# file ready to be run.
|
||||||
|
# Note that we can't patch this function for the whole method, else Synapse
|
||||||
|
# will try to find the file when building the database initially.
|
||||||
|
with mock.patch("os.listdir", mock.Mock(side_effect=fake_listdir)):
|
||||||
|
with self.assertRaises(PrepareDatabaseException):
|
||||||
|
# Synapse should think that there is an outstanding migration file due to
|
||||||
|
# patching 'os.listdir' in the function decorator.
|
||||||
|
#
|
||||||
|
# We expect Synapse to raise an exception to indicate the master process
|
||||||
|
# needs to apply this migration file.
|
||||||
|
prepare_database(db_conn, db_pool.engine, self.hs.config)
|
||||||
|
|
Loading…
Reference in New Issue