Skip to content

Commit

Permalink
Merge pull request #5 from WatchBeam/locks
Browse files Browse the repository at this point in the history
Add distributed locking capability
  • Loading branch information
connor4312 committed Apr 15, 2017
2 parents accd978 + 2e21512 commit 8615089
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 41 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"typings": "lib/src/index.d.ts",
"scripts": {
"test": "npm-run-all --parallel test:lint test:unit",
"test:unit": "mocha --compilers ts:ts-node/register --timeout 20000 -r test/_setup.ts test/*.test.ts",
"test:unit": "TS_NODE_COMPILER_OPTIONS='{\"target\":\"es6\"}' mocha --compilers ts:ts-node/register --timeout 20000 -r test/_setup.ts test/*.test.ts",
"test:lint": "tslint --type-check --project tsconfig.json '{src,test}/**/*.ts'",
"update-proto": "node ./bin/update-proto ./proto && node bin/generate-methods.js ./proto/rpc.proto > src/rpc.ts",
"build:doc": "rm -rf docs && typedoc --exclude \"**/test/*\" --excludePrivate --out ./docs ./src/index.ts && node bin/tame-typedoc",
Expand Down
9 changes: 7 additions & 2 deletions src/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,15 +485,20 @@ export class ComparatorBuilder {
* Adds a new clause to the transaction.
*/
public and(key: string | Buffer, column: keyof typeof compareTarget,
cmp: keyof typeof comparator, value: string | Buffer): this {
cmp: keyof typeof comparator, value: string | Buffer | number): this {
assertWithin(compareTarget, column, 'comparison target in client.and(...)');
assertWithin(comparator, cmp, 'comparator in client.and(...)');

if (column === 'value') {
value = toBuffer(<string | Buffer> value);
}

this.request.compare = this.request.compare || [];
this.request.compare.push({
key: toBuffer(key),
result: comparator[cmp],
target: compareTarget[column].value,
[compareTarget[column].key]: toBuffer(value),
[compareTarget[column].key]: typeof value === 'number' ? value : toBuffer(value),
});
return this;
}
Expand Down
48 changes: 11 additions & 37 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,33 @@
* A GRPCGenericError is rejected via the connection when some error occurs
* that we can't be more specific about.
*/
export class GRPCGenericError extends Error {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, GRPCGenericError.prototype);
}
}
export class GRPCGenericError extends Error {}

/**
* GRPCConnectFailed is thrown when connecting to GRPC fails.
*/
export class GRPCConnectFailedError extends GRPCGenericError {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, GRPCConnectFailedError.prototype);
}
}
export class GRPCConnectFailedError extends GRPCGenericError {}

/**
* GRPCProtocolError is thrown when a protocol error occurs on the other end,
* indicating that the external implementation is incorrect or incompatible.
*/
export class GRPCProtocolError extends GRPCGenericError {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, GRPCProtocolError.prototype);
}
}
export class GRPCProtocolError extends GRPCGenericError {}

/**
* GRPCInternalError is thrown when a internal error occurs on either end.
*/
export class GRPCInternalError extends GRPCGenericError {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, GRPCInternalError.prototype);
}
}
export class GRPCInternalError extends GRPCGenericError {}

/**
* GRPCCancelledError is emitted when an ongoing call is cancelled.
*/
export class GRPCCancelledError extends GRPCGenericError {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, GRPCCancelledError.prototype);
}
}
export class GRPCCancelledError extends GRPCGenericError {}

/**
* EtcdError is an application error returned by etcd.
*/
export class EtcdError extends Error {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, EtcdError.prototype);
}
}
export class EtcdError extends Error {}

/**
* EtcdLeaseTimeoutError is thrown when trying to renew a lease that's
Expand All @@ -68,10 +38,14 @@ export class EtcdError extends Error {
export class EtcdLeaseInvalidError extends Error {
constructor(leaseID: string) {
super(`Lease ${leaseID} is expired or revoked`);
Object.setPrototypeOf(this, EtcdLeaseInvalidError.prototype);
}
}

/**
* EtcdLockFailedError is thrown when we fail to aquire a lock.
*/
export class EtcdLockFailedError extends Error {}

interface IErrorCtor {
new (message: string): Error;
}
Expand Down
11 changes: 10 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Builder from './builder';
import { ConnectionPool } from './connection-pool';
import { Lease } from './lease';
import { Lock } from './lock';
import { IOptions } from './options';
import * as RPC from './rpc';

Expand Down Expand Up @@ -76,13 +77,21 @@ export class Etcd3 {
return new Lease(this.pool, ttl);
}

/**
* `lock()` is a helper to provide distributed locking capability. See
* the documentation on the Lock class for more information and examples.
*/
public lock(key: string | Buffer): Lock {
return new Lock(this.pool, key);
}

/**
* `if()` starts a new etcd transaction, which allows you to execute complex
* statements atomically. See documentation on the ComparatorBuilder for
* more information.
*/
public if(key: string | Buffer, column: keyof typeof Builder.compareTarget,
cmp: keyof typeof Builder.comparator, value: string | Buffer): Builder.ComparatorBuilder {
cmp: keyof typeof Builder.comparator, value: string | Buffer | number): Builder.ComparatorBuilder {
return new Builder.ComparatorBuilder(this.kv).and(key, column, cmp, value);
}

Expand Down
95 changes: 95 additions & 0 deletions src/lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { EtcdLockFailedError } from './';
import { ComparatorBuilder, PutBuilder } from './builder';
import { ConnectionPool } from './connection-pool';
import { Lease } from './lease';
import * as RPC from './rpc';

/**
* A Lock can be used for distributed locking to create atomic operations
* across multiple systems. An EtcdLockFailedError is thrown if the lock
* can't be acquired.
*
* Under the hood, the Lock uses a lease on a key which is revoked when the
* the lock is released. If the server the lock is running on dies, or the
* network is disconnected, etcd will time out the lock.
*
* Bear in mind that this means that in certain rare situations (a network
* disconnect or wholesale etcd failure), the caller may lose the lock while
* operations may still be running.
*
* A quick example:
*
* ```
* const { Etcd3 } = require('etcd3');
* const client = new Etcd3();
*
* client.lock('my_resource').do(() => {
* // The lock will automatically be released when this promise returns
* return doMyAtomicAction();
* });
* ```
*/
export class Lock {

private leaseTTL = 30;
private lease: Lease | null;

constructor(private pool: ConnectionPool, private key: string | Buffer) {}

/**
* Sets the TTL of the lease underlying the lock. The lease TTL defaults
* to 30 seconds.
*/
public ttl(seconds: number): this {
if (!this.lease) {
throw new Error('Cannot set a lock TTL after acquiring the lock');
}

this.leaseTTL = seconds;
return this;
}

/**
* Acquire attempts to acquire the lock, rejecting if it's unable to.
*/
public acquire(): Promise<void> {
const lease = this.lease = new Lease(this.pool, this.leaseTTL);
const kv = new RPC.KVClient(this.pool);

return lease.grant().then(leaseID => {
return new ComparatorBuilder(kv)
.and(this.key, 'createdAt', '==', 0)
.then(new PutBuilder(kv, this.key).value('').lease(leaseID))
.commit()
.then(res => {
if (!res.succeeded) {
this.release();
throw new EtcdLockFailedError(`Failed to acquire a lock on ${this.key}`);
}
});
});
}

/**
* Release frees the lock.
*/
public release(): Promise<void> {
if (!this.lease) {
throw new Error('Attempted to release a lock which was not acquired');
}

return this.lease.revoke();
}

/**
* `do()` wraps the inner function. It acquires the lock before running
* the function, and releases the lock after any promise the function
* returns resolves or throws.
*/
public do<T>(fn: () => T | Promise<T>): Promise<T> {
return this.acquire()
.then(fn)
.then(value => this.release().then(() => value))
.catch(err => this.release().then(() => { throw err; }));
}
}
31 changes: 31 additions & 0 deletions test/kv.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as sinon from 'sinon';
import {
Etcd3,
EtcdLeaseInvalidError,
EtcdLockFailedError,
GRPCConnectFailedError,
Lease,
} from '../src';
Expand Down Expand Up @@ -258,5 +259,35 @@ describe('connection pool', () => {
expect(await client.get('foo1').string()).to.equal('bar3');
});
});

describe('lock()', () => {
const assertCantLock = () => {
return client.lock('resource')
.acquire()
.then(() => { throw new Error('expected to throw'); })
.catch(err => expect(err).to.be.an.instanceof(EtcdLockFailedError));
};

const assertAbleToLock = async () => {
const lock = client.lock('resource');
await lock.acquire();
await lock.release();
};

it('locks exclusively around a resource', async () => {
const lock1 = client.lock('resource');
await lock1.acquire();

await assertCantLock();
await lock1.release();

await assertAbleToLock();
});

it('provides locking around functions', async () => {
await client.lock('resource').do(assertCantLock);
await assertAbleToLock();
});
});
});
});

0 comments on commit 8615089

Please sign in to comment.