Skip to content

Commit

Permalink
Implement advisory locking
Browse files Browse the repository at this point in the history
closes #1118
  • Loading branch information
tomhoule committed Jan 20, 2021
1 parent 359e744 commit 7efb442
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 5 deletions.
3 changes: 3 additions & 0 deletions migration-engine/connectors/migration-connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub trait MigrationConnector: Send + Sync + 'static {
/// For example, in the SQL connector, a step would represent an SQL statement like `CREATE TABLE`.
type DatabaseMigration: DatabaseMigrationMarker + Send + Sync + 'static;

/// If possible on the target connector, acquire an advisory lock, so multiple instances of migrate do not run concurrently.
async fn acquire_lock(&self) -> ConnectorResult<()>;

/// A string that should identify what database backend is being used. Note that this is not necessarily
/// the connector name. The SQL connector for example can return "postgresql", "mysql" or "sqlite".
fn connector_type(&self) -> &'static str;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub(crate) fn from_connection_info(
pub(crate) trait SqlFlavour:
DestructiveChangeCheckerFlavour + SqlRenderer + SqlSchemaDifferFlavour + SqlSchemaCalculatorFlavour + Debug
{
async fn acquire_lock(&self, connection: &Connection) -> ConnectorResult<()>;

fn check_database_version_compatibility(
&self,
_datamodel: &Datamodel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ impl MssqlFlavour {

#[async_trait::async_trait]
impl SqlFlavour for MssqlFlavour {
async fn acquire_lock(&self, connection: &Connection) -> ConnectorResult<()> {
// see https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-getapplock-transact-sql?view=sql-server-ver15
Ok(connection
.raw_cmd("sp_getapplock @Resource = 'prisma_migrate', @LockMode = 'Exclusive'")
.await?)
}

fn imperative_migrations_table(&self) -> Table<'_> {
(self.schema_name(), self.imperative_migrations_table_name()).into()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ impl MysqlFlavour {

#[async_trait::async_trait]
impl SqlFlavour for MysqlFlavour {
async fn acquire_lock(&self, connection: &Connection) -> ConnectorResult<()> {
// https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html
Ok(connection.raw_cmd("SELECT GET_LOCK('prisma_migrate', 60)").await?)
}

fn check_database_version_compatibility(
&self,
datamodel: &Datamodel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ impl PostgresFlavour {

#[async_trait::async_trait]
impl SqlFlavour for PostgresFlavour {
async fn acquire_lock(&self, connection: &Connection) -> ConnectorResult<()> {
// https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
Ok(connection.raw_cmd("SELECT pg_advisory_lock(72707369)").await?)
}

#[tracing::instrument(skip(database_str))]
async fn create_database(&self, database_str: &str) -> ConnectorResult<String> {
let mut url = Url::parse(database_str).map_err(|err| ConnectorError::url_parse_error(err, database_str))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub(crate) struct SqliteFlavour {

#[async_trait::async_trait]
impl SqlFlavour for SqliteFlavour {
async fn acquire_lock(&self, _connection: &Connection) -> ConnectorResult<()> {
Ok(())
}

async fn create_database(&self, database_str: &str) -> ConnectorResult<String> {
use anyhow::Context;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ impl MigrationConnector for SqlMigrationConnector {
self.connection.connection_info().sql_family().as_str()
}

async fn acquire_lock(&self) -> ConnectorResult<()> {
self.flavour().acquire_lock(self.conn()).await
}

async fn version(&self) -> ConnectorResult<String> {
Ok(self
.connection
Expand Down
2 changes: 2 additions & 0 deletions migration-engine/core/src/commands/apply_migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ impl<'a> MigrationCommand for ApplyMigrationsCommand {
//Validate Provider
migration_connector::error_on_changed_provider(&input.migrations_directory_path, connector.connector_type())?;

connector.acquire_lock().await?;

migration_persistence.initialize().await?;

let migrations_from_filesystem =
Expand Down
6 changes: 4 additions & 2 deletions migration-engine/core/src/commands/mark_migration_applied.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ impl MigrationCommand for MarkMigrationAppliedCommand {
type Output = MarkMigrationAppliedOutput;

async fn execute<C: MigrationConnector>(input: &Self::Input, engine: &MigrationApi<C>) -> CoreResult<Self::Output> {
// We should take a lock on the migrations table.
let connector = engine.connector();
let persistence = engine.connector().migration_persistence();

//Validate Provider
migration_connector::error_on_changed_provider(
&input.migrations_directory_path,
engine.connector().connector_type(),
)?;
let persistence = engine.connector().migration_persistence();

connector.acquire_lock().await?;

let migration_directory =
MigrationDirectory::new(Path::new(&input.migrations_directory_path).join(&input.migration_name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ impl MigrationCommand for MarkMigrationRolledBackCommand {
type Output = MarkMigrationRolledBackOutput;

async fn execute<C: MigrationConnector>(input: &Self::Input, engine: &MigrationApi<C>) -> CoreResult<Self::Output> {
// We should take a lock on the migrations table.
let persistence = engine.connector().migration_persistence();

//todo the input currently does not take the migration directory as input. therefore no error atm, but I think the behaviour
// should be consistent between mark applied and mark rolled back
let connector = engine.connector();
let persistence = engine.connector().migration_persistence();

connector.acquire_lock().await?;

let all_migrations = persistence.list_migrations().await?.map_err(|_err| {
CoreError::Generic(anyhow::anyhow!(
Expand Down

0 comments on commit 7efb442

Please sign in to comment.