Skip to content

Commit

Permalink
Port locking functionality from LineUpr project
Browse files Browse the repository at this point in the history
See #30
  • Loading branch information
qqilihq committed Mar 29, 2022
1 parent 0fa56d9 commit 7a42921
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 15 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.1.0] – 2022-03-29

### Added
- Add lock mechanism to prevent running migrations simulataneously in clustered environments.

## [3.0.0] – 2021-10-03

### Changed
Expand Down
48 changes: 45 additions & 3 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
import { MongoClient, Db } from 'mongodb';
import { promisify } from 'util';

interface Options {
uri: string;
collectionName?: string;
/** Optionally specify a collection to use for locking. This is intended for
* clusters with multiple nodes to ensure that not more than one migration
* can run at any given time. */
lockCollectionName?: string;
}

export class MongoStateStore {
private readonly collectionName: string;
private readonly mongodbHost: string;
private readonly lockCollectionName?: string;

constructor(objectOrHost: Options | string) {
this.mongodbHost = typeof objectOrHost === 'string' ? objectOrHost : objectOrHost.uri;
this.collectionName = (objectOrHost as Options).collectionName ?? 'migrations';
this.lockCollectionName = typeof objectOrHost !== 'string' ? objectOrHost.lockCollectionName : undefined;
}

load(fn: (err?: any, set?: any) => void): void {
this.doWithErrorHandling(fn, async db => {
await this.acquireLock(db);
const result = await db.collection(this.collectionName).find({}).toArray();
if (result.length > 1) {
throw new Error(`Expected exactly one result, but got ${result.length}`);
Expand All @@ -30,9 +38,13 @@ export class MongoStateStore {

save(set: any, fn: (err?: any) => void): void {
const { migrations, lastRun } = set;
this.doWithErrorHandling(fn, db =>
db.collection(this.collectionName).replaceOne({}, { migrations, lastRun }, { upsert: true })
);
this.doWithErrorHandling(fn, async db => {
try {
await db.collection(this.collectionName).replaceOne({}, { migrations, lastRun }, { upsert: true });
} finally {
await this.releaseLock(db);
}
});
}

private doWithErrorHandling(fn: (err?: any, set?: any) => void, actionCallback: (db: Db) => Promise<any>): void {
Expand All @@ -58,4 +70,34 @@ export class MongoStateStore {
// ignore (handled via fn above)
});
}

// locking

private async acquireLock(db: Db): Promise<void> {
if (typeof this.lockCollectionName !== 'string') return; // nothing to lock
const collection = db.collection(this.lockCollectionName);
// index is needed for atomicity:
// https://docs.mongodb.com/manual/reference/method/db.collection.update/#use-unique-indexes
// https://groups.google.com/forum/#!topic/mongodb-user/-fucdS-7kIU
// https://stackoverflow.com/questions/33346175/mongodb-upsert-operation-seems-not-atomic-which-throws-duplicatekeyexception/34784533
await collection.createIndex({ lock: 1 }, { unique: true });
let showMessage = true;
for (;;) {
const result = await collection.updateOne({ lock: 'lock' }, { $set: { lock: 'lock' } }, { upsert: true });
const lockAcquired = result.upsertedCount > 0;
if (lockAcquired) {
break;
}
if (showMessage) {
console.log('Waiting for migration lock release …');
showMessage = false;
}
await promisify(setTimeout)(100);
}
}

private async releaseLock(db: Db): Promise<void> {
if (typeof this.lockCollectionName !== 'string') return; // nothing to release
await db.collection(this.lockCollectionName).deleteOne({ lock: 'lock' });
}
}
95 changes: 83 additions & 12 deletions test/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { MongoStateStore } from '../lib/index';
import { MongoClient } from 'mongodb';
import { promisify } from 'util';

declare global {
// eslint-disable-next-line @typescript-eslint/no-namespace
Expand All @@ -11,20 +12,21 @@ declare global {
}
}

describe('migrate MongoDB state store', () => {
const migrationDoc = {
migrations: [
{
title: '1587915479438-my-migration.js',
description: null,
timestamp: 1587919095301.0
}
],
lastRun: '1587915479438-my-migration.js'
};
const migrationDoc = {
migrations: [
{
title: '1587915479438-my-migration.js',
description: null,
timestamp: 1587919095301.0
}
],
lastRun: '1587915479438-my-migration.js'
};

const mongoUrl = `${global.__MONGO_URI__}${global.__MONGO_DB_NAME__}`;

describe('migrate MongoDB state store', () => {
const defaultCollectionName = 'migrations';
const mongoUrl = `${global.__MONGO_URI__}${global.__MONGO_DB_NAME__}`;

let client: MongoClient;

Expand Down Expand Up @@ -168,3 +170,72 @@ describe('migrate MongoDB state store', () => {
});
});
});

describe('migrate MongoDB state store with locking', () => {
const collectionName = 'migrations';
const lockCollectionName = 'migrationlock';

let client: MongoClient;

beforeAll(async () => {
client = await MongoClient.connect(mongoUrl);
});

beforeEach(async () => {
await client.db().collection(collectionName).deleteMany({});
await client.db().collection(lockCollectionName).deleteMany({});
});

afterAll(async () => {
await client.close();
});

it('creates lock entry in database upon load', async () => {
const stateStore = new MongoStateStore({
uri: mongoUrl,
collectionName: collectionName,
lockCollectionName: lockCollectionName
});
await promisify(callback => stateStore.load(callback))();
// lockCollection should have exactly one entry
const numDocsInLockCollection = await client.db().collection(lockCollectionName).countDocuments();
expect(numDocsInLockCollection).toEqual(1);
});

it('deletes lock entry in database upon save', async () => {
const stateStore = new MongoStateStore({
uri: mongoUrl,
collectionName: collectionName,
lockCollectionName: lockCollectionName
});
await client.db().collection(lockCollectionName).insertOne({ lock: 'lock' });
await promisify<void>(callback => stateStore.save(migrationDoc, callback))();
// lockCollection should have no entry
const numDocsInLockCollection = await client.db().collection(lockCollectionName).countDocuments();
expect(numDocsInLockCollection).toEqual(0);
});

it('prevents executing two migrations at once', async () => {
const stateStore = new MongoStateStore({
uri: mongoUrl,
collectionName: collectionName,
lockCollectionName: lockCollectionName
});
let executing = 0;
const promises: Promise<void>[] = [];
// simulate ten nodes
for (let i = 0; i < 10; i++) {
promises.push(
(async () => {
await promisify(callback => stateStore.load(callback))();
executing++;
expect(executing).toEqual(1);
await promisify(setTimeout)(100);
await promisify<void>(callback => stateStore.save(migrationDoc, callback))();
executing--;
})()
);
}
await Promise.all(promises);
});
});

0 comments on commit 7a42921

Please sign in to comment.