Skip to content

Commit

Permalink
Implement advisory locking
Browse files Browse the repository at this point in the history
closes #1118
  • Loading branch information
tomhoule committed Jan 4, 2021
1 parent 8213130 commit 86f814e
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 5 deletions.
3 changes: 3 additions & 0 deletions migration-engine/connectors/migration-connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub trait MigrationConnector: Send + Sync + 'static {
/// For example, in the SQL connector, a step would represent an SQL statement like `CREATE TABLE`.
type DatabaseMigration: DatabaseMigrationMarker + Send + Sync + 'static;

/// If possible on the target connector, acquire an advisory lock, so multiple instances of migrate do not run concurrently.
async fn acquire_lock(&self) -> ConnectorResult<()>;

/// A string that should identify what database backend is being used. Note that this is not necessarily
/// the connector name. The SQL connector for example can return "postgresql", "mysql" or "sqlite".
fn connector_type(&self) -> &'static str;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ mod mysql;
mod postgres;
mod sqlite;

use enumflags2::BitFlags;
pub(crate) use mssql::MssqlFlavour;
pub(crate) use mysql::MysqlFlavour;
pub(crate) use postgres::PostgresFlavour;
Expand All @@ -19,6 +18,7 @@ use crate::{
sql_schema_differ::SqlSchemaDifferFlavour,
};
use datamodel::Datamodel;
use enumflags2::BitFlags;
use migration_connector::{ConnectorResult, MigrationDirectory, MigrationFeature};
use quaint::{
connector::ConnectionInfo,
Expand Down Expand Up @@ -55,6 +55,8 @@ pub(crate) fn from_connection_info(
pub(crate) trait SqlFlavour:
DestructiveChangeCheckerFlavour + SqlRenderer + SqlSchemaDifferFlavour + SqlSchemaCalculatorFlavour + Debug
{
async fn acquire_lock(&self, connection: &Connection) -> ConnectorResult<()>;

fn check_database_version_compatibility(
&self,
_datamodel: &Datamodel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ impl MssqlFlavour {

#[async_trait::async_trait]
impl SqlFlavour for MssqlFlavour {
async fn acquire_lock(&self, connection: &Connection) -> ConnectorResult<()> {
// see https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-getapplock-transact-sql?view=sql-server-ver15
Ok(connection
.raw_cmd("sp_getapplock @Resource = 'prisma_migrate', @LockMode = 'Exclusive'")
.await?)
}

fn imperative_migrations_table(&self) -> Table<'_> {
(self.schema_name(), self.imperative_migrations_table_name()).into()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ impl MysqlFlavour {

#[async_trait::async_trait]
impl SqlFlavour for MysqlFlavour {
async fn acquire_lock(&self, connection: &Connection) -> ConnectorResult<()> {
// https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html
Ok(connection.raw_cmd("SELECT GET_LOCK('prisma_migrate', 60)").await?)
}

fn check_database_version_compatibility(
&self,
datamodel: &Datamodel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ impl PostgresFlavour {

#[async_trait::async_trait]
impl SqlFlavour for PostgresFlavour {
async fn acquire_lock(&self, connection: &Connection) -> ConnectorResult<()> {
// https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
Ok(connection.raw_cmd("SELECT pg_advisory_lock(72707369)").await?)
}

#[tracing::instrument(skip(database_str))]
async fn create_database(&self, database_str: &str) -> ConnectorResult<String> {
let mut url = Url::parse(database_str).map_err(|err| ConnectorError::url_parse_error(err, database_str))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ pub(crate) struct SqliteFlavour {

#[async_trait::async_trait]
impl SqlFlavour for SqliteFlavour {
async fn acquire_lock(&self, _connection: &Connection) -> ConnectorResult<()> {
Ok(())
}

async fn create_database(&self, database_str: &str) -> ConnectorResult<String> {
use anyhow::Context;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ impl MigrationConnector for SqlMigrationConnector {
self.connection.connection_info().sql_family().as_str()
}

async fn acquire_lock(&self) -> ConnectorResult<()> {
self.flavour().acquire_lock(self.conn()).await
}

async fn version(&self) -> ConnectorResult<String> {
Ok(self
.connection
Expand Down
2 changes: 2 additions & 0 deletions migration-engine/core/src/commands/apply_migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ impl<'a> MigrationCommand for ApplyMigrationsCommand {
let applier = connector.database_migration_step_applier();
let migration_persistence = connector.migration_persistence();

connector.acquire_lock().await?;

migration_persistence.initialize().await?;

let migrations_from_filesystem =
Expand Down
5 changes: 3 additions & 2 deletions migration-engine/core/src/commands/mark_migration_applied.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ impl MigrationCommand for MarkMigrationAppliedCommand {
type Output = MarkMigrationAppliedOutput;

async fn execute<C: MigrationConnector>(input: &Self::Input, engine: &MigrationApi<C>) -> CoreResult<Self::Output> {
// We should take a lock on the migrations table.

let connector = engine.connector();
let persistence = engine.connector().migration_persistence();

connector.acquire_lock().await?;

let migration_directory =
MigrationDirectory::new(Path::new(&input.migrations_directory_path).join(&input.migration_name));
let script = migration_directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ impl MigrationCommand for MarkMigrationRolledBackCommand {
type Output = MarkMigrationRolledBackOutput;

async fn execute<C: MigrationConnector>(input: &Self::Input, engine: &MigrationApi<C>) -> CoreResult<Self::Output> {
// We should take a lock on the migrations table.

let connector = engine.connector();
let persistence = engine.connector().migration_persistence();

connector.acquire_lock().await?;

let all_migrations = persistence.list_migrations().await?.map_err(|_err| {
CoreError::Generic(anyhow::anyhow!(
"Invariant violation: called markMigrationRolledBack on a database without migrations table."
Expand Down

0 comments on commit 86f814e

Please sign in to comment.