Skip to content

Commit

Permalink
feat(Azure): Adds support for Azure Service Bus worker queues as an A…
Browse files Browse the repository at this point in the history
…zure alternative to Redis and AWS SQS. (#1406 - [LL-37](https://learningpool.atlassian.net/browse/LL-37))
  • Loading branch information
cbishopvelti authored and ryasmi committed Oct 11, 2019
1 parent 990c8f7 commit b747ffd
Show file tree
Hide file tree
Showing 6 changed files with 476 additions and 15 deletions.
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ GOOGLE_ENABLED=false
# Queues #
##########

# Queue management system (REDIS|SQS|PUBSUB)
# Queue management system (REDIS|SQS|PUBSUB|SERVICE_BUS)
QUEUE_PROVIDER=REDIS

# Namespace for queues
Expand All @@ -181,6 +181,9 @@ QUEUE_NAMESPACE=DEV
#PUBSUB_GOOGLE_CLOUD_PROJECT_ID=
#PUBSUB_GOOGLE_CLOUD_SUBSCRIPTION_NAME=ll

# Azure service bus endpoint, required if QUEUE_PROVIDER=SERVICE_BUS
# SERVICE_BUS_ENDPOINT='Endpoint=sb://[namespace].servicebus.windows.net/;SharedAccessKeyName=[keyName];SharedAccessKey=[key]'

################
# File storage #
################
Expand Down
2 changes: 2 additions & 0 deletions lib/services/queue/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as bullProvider from './bull';
import * as sqsProvider from './sqs';
import * as localProvider from './local';
import * as pubsubProvider from './pubsub';
import * as serviceBusProvider from './serviceBus';

const defaultCallback = (err) => {
if (err) logger.error('QUEUE DEFAULT ERROR', err);
Expand All @@ -14,6 +15,7 @@ const getProvider = ({ queueProvider = process.env.QUEUE_PROVIDER }, done) => {
switch (queueProvider) {
case 'PUBSUB': return done(null, pubsubProvider);
case 'SQS': return done(null, sqsProvider);
case 'SERVICE_BUS': return done(null, serviceBusProvider);
case 'REDIS':
case 'BULL':
return done(null, bullProvider);
Expand Down
95 changes: 95 additions & 0 deletions lib/services/queue/serviceBus/index-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { expect } from 'chai';
import { promisify, delay } from 'bluebird';
import { subscribe, publish } from './index';

describe('serviceBus', () => {
it('should subscribe to queue', async () => {
let called = false;
let handler;
const handledPromise = new Promise((resolve) => {
handler = (data, done) => {
called = true;
expect(data.test).to.equal('data');
resolve();
done();
};
});

await promisify(subscribe)({
queueName: 'test-queue',
handler
});

await promisify(publish)({
queueName: 'test-queue',
payload: {
test: 'data'
}
});

await handledPromise;
expect(called).to.equal(true);
}).timeout(5000);

it('should dead letter on error', async () => {
let called = false;

let handler;
const handledPromise = new Promise((resolve) => {

handler = (data, done) => {
called = true;
expect(data.test).to.equal('data');
const errMsg = 'An error';
resolve();
done(errMsg);
};
});

await promisify(subscribe)({
queueName: 'test-queue-2',
handler,
deadLetter: 'test-queue-2-deadletter'
});

await promisify(publish)({
queueName: 'test-queue-2',
payload: {
test: 'data'
}
});

await handledPromise;

expect(called).to.equal(true);
});

it.skip('should review the lock', async () => {
let called = 0;

let handler;
const donePromise = new Promise((resolve) => {
handler = async (data, done) => {
called += 1;
await delay(6 * 60 * 1000);
resolve();
done();
};
});

await promisify(subscribe)({
queueName: 'test-queue-3',
handler
});

await promisify(publish)({
queueName: 'test-queue-3',
payload: {
test: 'data'
}
});

await donePromise;
expect(called).to.equal(1);
})/*.timeout(7 * 60 * 1000)*/;
});
149 changes: 149 additions & 0 deletions lib/services/queue/serviceBus/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import { memoize, isString } from 'lodash';
import logger from 'lib/logger';
import { ServiceBusClient, ReceiveMode } from '@azure/service-bus';
import azure from 'azure-sb';
import { promisify, map as bmap, delay, race } from 'bluebird';
import moment from 'moment';

let consumers = [];

export const unsubscribeAll = async () => {
await bmap(consumers, async (consumer) => {
await consumer.close();
});
consumers = [];
};

const connectionString = process.env.SERVICE_BUS_ENDPOINT;

const serviceBusClient = memoize(() => ServiceBusClient.createFromConnectionString(connectionString));

const sbService = memoize((() => azure.createServiceBusService(connectionString)));

const getQueueClient = memoize(async (queueName, {
visibilityTimeout
} = {}) => {
const lockDurationSeconds = (visibilityTimeout > 300 ? 300 : visibilityTimeout);
const lockDuration = moment.duration(lockDurationSeconds, 'seconds').toISOString();

const options = (visibilityTimeout ?
{ LockDuration: lockDuration } : {}
);

const service = sbService();
await promisify(service.createQueueIfNotExists, {
context: service
})(queueName, options);

return serviceBusClient().createQueueClient(queueName);
}, queueName => queueName);

const getSender = memoize(async (queueName, {
visibilityTimeout
} = {}) => {
const queueClient = await getQueueClient(queueName, {
visibilityTimeout
});

const sender = queueClient.createSender();
return sender;
}, queueName => queueName);

export const subscribe = async ({
queueName,
handler,
onProccessed = () => {},
deadLetter,
retryDelay,
visibilityTimeout,
}, done) => {
const queueClient = await getQueueClient(queueName, {
visibilityTimeout
});

const receiver = queueClient.createReceiver(ReceiveMode.peekLock);
receiver.registerMessageHandler(async (brokeredMessage) => {
let payload;
try {
if (isString(brokeredMessage.body)) {
payload = JSON.parse(brokeredMessage.body);
} else {
payload = brokeredMessage.body;
}
} catch (err) {
logger.error(err);
await brokeredMessage.deadLetter({
deadLetterErrorDescription: JSON.stringify(err, null, 2),
deadletterReason: 'Message was not valid JSON'
});
}
try {
const renewLock = async () => {
let visibilityTimeoutUpdated = visibilityTimeout;
while (visibilityTimeoutUpdated > 0) {
const nextDelay = visibilityTimeoutUpdated < 250 ? visibilityTimeoutUpdated : 250;
await delay(nextDelay);
visibilityTimeoutUpdated -= nextDelay;
if (visibilityTimeoutUpdated > 0) {
receiver.renewMessageLock(payload);
}
}
};
const renewLockPromise = renewLock();

await race(promisify(handler)(payload), renewLockPromise);
} catch (error) {
if (deadLetter) {
const sender = await getSender(deadLetter);
await sender.send({
body: {
...payload,
error
}
});
}
}

await brokeredMessage.complete();
onProccessed(brokeredMessage.body);
}, async (err) => {
logger.error(`Error processing message for ${queueName}`, err);
});

consumers.push(receiver);

done();
};

// =================================================================================================

export const publish = async ({
queueName,
payload,
retryDelay,
visibilityTimeout
}, done) => {
const sender = await getSender(queueName, {
visibilityTimeout
});

let stringPayload;
try {
stringPayload = JSON.stringify(payload);
} catch (err) {
done(err);
throw err;
}

if (!retryDelay) {
await sender.send({
body: stringPayload
});
} else {
await sender.schedualMessage(moment().add(retryDelay, 'seconds').toDate(), {
body: stringPayload
});
}

done();
};
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"semantic-release": "ht2-release-private-circleci"
},
"dependencies": {
"@azure/service-bus": "^1.0.2",
"@azure/storage-blob": "^10.3.0",
"@google-cloud/pubsub": "^0.16.1",
"@google-cloud/storage": "^1.5.2",
Expand All @@ -57,6 +58,7 @@
"async": "^2.0.0-rc.3",
"aws-sdk": "^2.77.0",
"axios": "^0.18.0",
"azure-sb": "^0.11.0",
"babel-runtime": "6.22.0",
"bcrypt": "^2",
"bluebird": "^3.4.6",
Expand Down
Loading

0 comments on commit b747ffd

Please sign in to comment.