Skip to content

Commit

Permalink
fix(waku_archive): only allow a single instance to execute migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed May 31, 2024
1 parent a65b13f commit 0937e9b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
10 changes: 10 additions & 0 deletions waku/waku_archive/driver/postgres_driver/migrations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ proc migrate*(
# Load migration scripts
let scripts = pg_migration_manager.getMigrationScripts(currentVersion, targetVersion)

# Lock the db
(await driver.acquireDatabaseLock()).isOkOr:
error "failed to acquire lock", error = error
return err("failed to lock the db")

defer:
(await driver.releaseDatabaseLock()).isOkOr:
error "failed to release lock", error = error
return err("failed to unlock the db.")

# Run the migration scripts
for script in scripts:
for statement in script.breakIntoStatements():
Expand Down
36 changes: 36 additions & 0 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,42 @@ proc sleep*(

return ok()

proc acquireDatabaseLock*(
s: PostgresDriver, lockId: int = 841886
): Future[ArchiveDriverResult[void]] {.async.} =
## Acquire an advisory lock (useful to avoid more than one application running migrations at the same time)
let locked = (
await s.getStr(
fmt"""
SELECT pg_try_advisory_lock({lockId})
"""
)
).valueOr:
return err("error acquiring a lock: " & error)

if locked == "f":
return err("another waku instance is currently executing a migration")

return ok()

proc releaseDatabaseLock*(
s: PostgresDriver, lockId: int = 841886
): Future[ArchiveDriverResult[void]] {.async.} =
## Acquire an advisory lock (useful to avoid more than one application running migrations at the same time)
let unlocked = (
await s.getStr(
fmt"""
SELECT pg_advisory_unlock({lockId})
"""
)
).valueOr:
return err("error releasing a lock: " & error)

if unlocked == "f":
return err("could not release advisory lock")

return ok()

proc performWriteQuery*(
s: PostgresDriver, query: string
): Future[ArchiveDriverResult[void]] {.async.} =
Expand Down

0 comments on commit 0937e9b

Please sign in to comment.