Skip to content

Commit

Permalink
feat!: Updates and adds Storage Queue functions (#154)
Browse files Browse the repository at this point in the history
* chore: updates blob storage to use blob specific Shared Key Credentials

* fix: updates queue related functions

- use storage queue specific key credentials
- updates for changed response formats

* feat: new list queue function

Can list queues from teh specific storage account

* chore: docblock for list queues function

* feat!: returns the send message response rather than console output

`sendMessageToQueue()` now returns  the response from the API rather than writing to the console.

BREAKING CHANGE

* feat: checks for required options when generating a SAS URL

* ci: unit test for sending message to queue

* ci: skipped unit tests for storage queue functions

* adds TODOs for future queue functions

* feat(queues): get queue properties

This can be used to get the queue length `approximateMessagesCount` to see if there are any messages to process

includes unit test

* feat(queue): gets messages from queue

* chore: lintfix
  • Loading branch information
beauraines authored Oct 14, 2024
1 parent 50f26b0 commit 7301e37
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 35 deletions.
162 changes: 133 additions & 29 deletions src/azure.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const dayjs = require('dayjs')
const fs = require('fs');
const { streamToBuffer } = require('./helpers.js')
const { BlobServiceClient, StorageSharedKeyCredential } = require("@azure/storage-blob");
const { QueueClient } = require("@azure/storage-queue");
const { BlobServiceClient, StorageSharedKeyCredential:BlobStorageSharedKeyCredential } = require("@azure/storage-blob");
const { QueueServiceClient ,StorageSharedKeyCredential:QueueStorageSharedKeyCredential } = require("@azure/storage-queue");
var path = require('path');

/**
Expand Down Expand Up @@ -74,44 +74,148 @@ class AzureStorage {
/**
* Sends a message to the specified storage queue. The messages are given a TTL based upon the
* class's `queueMessageTTLSeconds` value.
*
* The response includes the `expiresOn`, `messageId` and `requestId` as well as a status code in
* `_response.status`
*
* @param {string} queueUrl The URL to the storage queue
* @param {string} messageContent The message to send to the queue
*
* @returns {Object} sendMessageResponse
*/
async sendMessageToQueue(queueUrl, messageContent,) {
async sendMessageToQueue(queueName, messageContent) {
try {
const queueClient = new QueueClient(
queueUrl,
new StorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
const queueServiceClient = new QueueServiceClient(
this.host('queue',this.cloudName),
new QueueStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);

const queueClient = queueServiceClient.getQueueClient(queueName);

let queueOptions = {
messageTimeToLive: this.queueMessageTTLSeconds
};
let sendMessageResponse = await queueClient.sendMessage(messageContent, queueOptions);
console.log(
"Sent message successfully, service assigned message Id:", sendMessageResponse.messageId,
"service assigned request Id:", sendMessageResponse.requestId
);

let sendMessageResponse = await queueClient.sendMessage(messageContent,queueOptions);
sendMessageResponse.status = sendMessageResponse._response.status;
delete sendMessageResponse._response;
return sendMessageResponse;

} catch (error) {
console.error(error.message)
console.log(error.message)
}
}

/**
* Gets any number of messages from the queue. The messages themselves are in the property
* `receivedMessageItems[]messageText` Use the options to control the number of messages returned,
* defaults are 1 with a 30 second timeout. You will need the `messageId` and `popReceipt` to
* delete the message after processing.
*
* @param {string} queueName The name of the queue
* @param {object} options Any options such as `numberOfMessages` or `visibilityTimeout`
* @returns {object} The queue messages response
*/
async getQueueMessages(queueName,options) {
const queueServiceClient = new QueueServiceClient(
this.host('queue',this.cloudName),
new QueueStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);

const queueClient = queueServiceClient.getQueueClient(queueName);
const receivedMessagesResponse = await queueClient.receiveMessages(options);
return receivedMessagesResponse;

}


/**
* Deletes a message, by `messageId` and `popReceipt` from a named queue
*
* @param {string} queueName The name of the queue that has the message
* @param {string} messageId The message id to be deleted
* @param {string} popReceipt The popReceipt of teh message to be deleted
* @returns object with a nothing really useful
*/
async deleteQueueMessage(queueName,messageId,popReceipt) {
const queueServiceClient = new QueueServiceClient(
this.host('queue',this.cloudName),
new QueueStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);

const queueClient = queueServiceClient.getQueueClient(queueName);
const deleteMessageResponse = await queueClient.deleteMessage(messageId,popReceipt);
return deleteMessageResponse

}

/**
* Gets the named queues properties, which includes the approximateMessagesCount
*
* @param {string} queueName
*
* @returns {Object} the queues properties
*/
async getQueueProperties(queueName) {
const queueServiceClient = new QueueServiceClient(
this.host('queue',this.cloudName),
new QueueStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);

const queueClient = queueServiceClient.getQueueClient(queueName);
const properties = await queueClient.getProperties();
return properties

}

/**
* Lists storage queues in the storage account
*
* @returns Array
*/
async listsQueues() {
try {
const queueServiceClient = new QueueServiceClient(
this.host('queue',this.cloudName),
new QueueStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);

let queues = []
for await (const queue of queueServiceClient.listQueues()) {
queues.push(queue)
}

return queues;

} catch (error) {
console.log(error.message)
}
}

/**
* Gets a SAS token for the storage queue
* Gets a SAS URL for the storage queue
* .
* @param {string} queueUrl The URL to the storage queue
* @param {string} queueName The name of the storage queue
* @param {object} options Should include `permissions: "raup"` or some combination thereof Any additional options supported. https://docs.microsoft.com/en-us/rest/api/storageservices/constructing-a-service-sas
*
* @returns {string} SAS URL for the specified queue
*/
getStorageQueueSignedURL(queueUrl,options) {

const queueClient = new QueueClient(
queueUrl,
new StorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
)

getStorageQueueSignedURL(queueName,options) {
const queueServiceClient = new QueueServiceClient(
this.host('queue',this.cloudName),
new QueueStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);

const queueClient = queueServiceClient.getQueueClient(queueName);

options = {
startsOn: dayjs().toDate(),
...options
startsOn: dayjs().toDate(),
expiresOn: dayjs().add(this.tokenExpiry,'minutes'),
...options
}

if (!options.permissions || !options.expiresOn) {
throw new Error("Must provide 'permissions' and 'expiresOn' for Queue SAS generation when 'identifier' is not provided");
}

return queueClient.generateSasUrl(options)
Expand All @@ -130,7 +234,7 @@ getStorageQueueSignedURL(queueUrl,options) {

const blobServiceClient = new BlobServiceClient(
this.host('blob',this.cloudName),
new StorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
new BlobStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);
const containerClient = blobServiceClient.getContainerClient(containerName);
const blockBlobClient = containerClient.getBlockBlobClient(blobName);
Expand Down Expand Up @@ -158,7 +262,7 @@ getStorageQueueSignedURL(queueUrl,options) {
async uploadBlobFromFile(containerName,file) {
const blobServiceClient = new BlobServiceClient(
this.host('blob',this.cloudName),
new StorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
new BlobStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);
const containerClient = blobServiceClient.getContainerClient(containerName);

Expand Down Expand Up @@ -194,7 +298,7 @@ getStorageQueueSignedURL(queueUrl,options) {
async downloadBlobToFile(containerName,blobName,file) {
const blobServiceClient = new BlobServiceClient(
this.host('blob',this.cloudName),
new StorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
new BlobStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);
const containerClient = blobServiceClient.getContainerClient(containerName);
const blobClient = containerClient.getBlobClient(blobName);
Expand All @@ -219,7 +323,7 @@ getStorageQueueSignedURL(queueUrl,options) {
async getBlob(containerName,blobName) {
const blobServiceClient = new BlobServiceClient(
this.host('blob',this.cloudName),
new StorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
new BlobStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);
const containerClient = blobServiceClient.getContainerClient(containerName);
const blobClient = containerClient.getBlobClient(blobName);
Expand All @@ -246,7 +350,7 @@ getStorageQueueSignedURL(queueUrl,options) {
async getBinaryBlob(containerName,blobName) {
const blobServiceClient = new BlobServiceClient(
this.host('blob',this.cloudName),
new StorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
new BlobStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);
const containerClient = blobServiceClient.getContainerClient(containerName);
const blobClient = containerClient.getBlobClient(blobName);
Expand All @@ -268,7 +372,7 @@ getStorageQueueSignedURL(queueUrl,options) {
async listBlobs(containerName) {
const blobServiceClient = new BlobServiceClient(
this.host('blob',this.cloudName),
new StorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
new BlobStorageSharedKeyCredential(this.storageAccountName, this.storageAccountKey)
);
const containerClient = blobServiceClient.getContainerClient(containerName);
let blobs = []
Expand Down
90 changes: 85 additions & 5 deletions src/azure.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ describe('Azure Storage module', () => {
expect(success)
})

it.todo('should send a message to the storage queue')

it.skip('should get a blob from azure storage', async () =>{
const account = "devstoreaccount1";
const accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
Expand All @@ -102,9 +100,6 @@ describe('Azure Storage module', () => {
expect(fileExists(file))
})




it.skip('should list blobs from azure storage', async () => {
const account = "devstoreaccount1";
const accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
Expand All @@ -118,4 +113,89 @@ describe('Azure Storage module', () => {
expect(blobs.filter(b => b.name == blobName).length).toBe(1)
})

it.skip('should send a message to the storage queue', async () => {
const account = "devstoreaccount1";
const accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
let azure = new AzureStorage(account,accountKey,{cloudName:'Azurite'})
const message = {foo:"bar"}
const queueName = 'node-helpers-testing'
let response = await azure.sendMessageToQueue(queueName,JSON.stringify(message))
expect(response.status).toBe(201)
})

it.skip('should error when generating a SAS URL for the storage queue without permissions', async () => {
const account = "devstoreaccount1";
const accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
let azure = new AzureStorage(account,accountKey,{cloudName:'Azurite'})
const queueName = 'node-helpers-testing'
expect(() => azure.getStorageQueueSignedURL(queueName)).toThrow();
})

it.skip('should generate a SAS URL for the storage queue', async () => {
const account = "devstoreaccount1";
const accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
let azure = new AzureStorage(account,accountKey,{cloudName:'Azurite'})
const queueName = 'node-helpers-testing'
const options = {permissions:"r"}
let response = await azure.getStorageQueueSignedURL(queueName,options)
expect(response.includes('sp=r')).toBe(true) // Read permissions
expect(response.includes('http://127.0.0.1:10001/devstoreaccount1/node-helpers-testing')).toBe(true) // Azurite URL for storage queue
// TODO compute the expected expiration time
// expect(response.includes('se=2024-10-13T23%3A59%3A02Z')).toBe(true) // Expiration time defaults to 30 minutes
})


it.skip('should list the queues in the storage account', async () => {
const account = "devstoreaccount1";
const accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
let azure = new AzureStorage(account,accountKey,{cloudName:'Azurite'})
const queueName = 'node-helpers-testing';
let queues = await azure.listsQueues();
expect(queues.map(x => x.name).indexOf(queueName)).toBeGreaterThan(-1)
})

it.skip('should get the queue properties', async () => {
const account = "devstoreaccount1";
const accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
let azure = new AzureStorage(account,accountKey,{cloudName:'Azurite'})
const queueName = 'node-helpers-testing';
let properties = await azure.getQueueProperties(queueName);
expect(properties.approximateMessagesCount).toBeGreaterThan(0)
})

it.skip('should get a message from the queue', async () => {
const account = "devstoreaccount1";
const accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
let azure = new AzureStorage(account,accountKey,{cloudName:'Azurite'})
const queueName = 'node-helpers-testing';
let response = await azure.getQueueMessages(queueName);
const expectedMessage = {foo:"bar"}
expect(response._response.status).toBe(200)
expect(response.receivedMessageItems.length).toBeGreaterThan(0)
expect(response.receivedMessageItems[0].messageText).toBe(JSON.stringify(expectedMessage))
})

it.skip('should get multiple messages from the queue', async () => {
const account = "devstoreaccount1";
const accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
let azure = new AzureStorage(account,accountKey,{cloudName:'Azurite'})
const queueName = 'node-helpers-testing';
let response = await azure.getQueueMessages(queueName,{numberOfMessages:5});
expect(response._response.status).toBe(200)
expect(response.receivedMessageItems.length).toBeGreaterThan(1)
expect(response.receivedMessageItems.length).toBeLessThan(6)
})

it.skip('should delete a message from the queue',async () => {
const account = "devstoreaccount1";
const accountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
let azure = new AzureStorage(account,accountKey,{cloudName:'Azurite'})
const queueName = 'node-helpers-testing';
expect(() => azure.deleteQueueMessage(queueName,undefined,undefined)).toThrow() // popReceipt cannot be null
let addedMessage = await azure.sendMessageToQueue(queueName,'abc123')
let response = await azure.deleteQueueMessage(queueName,addedMessage.messageId,addedMessage.popReceipt);
expect(response.errorCode).toBe(undefined)
})


})
2 changes: 1 addition & 1 deletion src/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ function getEpochMillis() {
*
* @returns
*/
// eslint-disable-next-line no-unused-vars
function sparkline(data,label,options) {

options = {
Expand Down

0 comments on commit 7301e37

Please sign in to comment.