diff --git a/.vscode/launch.json b/.vscode/launch.json index 1ab3c5d..a113001 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -7,12 +7,32 @@ { "type": "node", "request": "launch", - "name": "examples", + "name": "awaitableSend", + "program": "${workspaceFolder}/examples/awaitableSend.ts", + "outFiles": [ + "${workspaceFolder}/dist/**/*.js" + ], + "envFile": "${workspaceFolder}/.env" // You can take a look at the sample.env file for supported environment variables. + }, + { + "type": "node", + "request": "launch", + "name": "send", "program": "${workspaceFolder}/examples/send.ts", "outFiles": [ "${workspaceFolder}/dist/**/*.js" ], "envFile": "${workspaceFolder}/.env" // You can take a look at the sample.env file for supported environment variables. + }, + { + "type": "node", + "request": "launch", + "name": "receive", + "program": "${workspaceFolder}/examples/receive.ts", + "outFiles": [ + "${workspaceFolder}/dist/**/*.js" + ], + "envFile": "${workspaceFolder}/.env" // You can take a look at the sample.env file for supported environment variables. } ] } \ No newline at end of file diff --git a/README.md b/README.md index b05644a..da79347 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # rhea-promise -A Promisified layer over rhea AMQP client. +A Promisified layer over [rhea](https://githhub.com/amqp/rhea) AMQP client. ## Pre-requisite ## - **Node.js version: 6.x or higher.** @@ -73,7 +73,7 @@ We believe our design enforces good practices to be followed while using the eve Please take a look at the [sample.env](https://github.com/amqp/rhea-promise/blob/master/sample.env) file for examples on how to provide the values for different parameters like host, username, password, port, senderAddress, receiverAddress, etc. -#### Sending a message. +#### Sending a message via `Sender`. - Running the example from terminal: `> ts-node ./examples/send.ts`. **NOTE:** If you are running the sample with `.env` config file, then please run the sample from the directory that contains `.env` config file. @@ -130,8 +130,79 @@ async function main(): Promise { message_id: "12343434343434" }; - const delivery: Delivery = await sender.send(message); - console.log(">>>>>[%s] Delivery id: ", connection.id, delivery.id); + // Please, note that we are not awaiting on sender.send() + // You will notice that `delivery.settled` will be `false`. + const delivery: Delivery = sender.send(message); + console.log(">>>>>[%s] Delivery id: %d, settled: %s", + connection.id, + delivery.id, + delivery.settled); + + await sender.close(); + await connection.close(); +} + +main().catch((err) => console.log(err)); +``` + +### Sending a message via `AwaitableSender` +- Running the example from terminal: `> ts-node ./examples/awaitableSend.ts`. + +```typescript +import { + Connection, Message, ConnectionOptions, Delivery, AwaitableSenderOptions, AwaitableSender +} from "rhea-promise"; + +import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file +dotenv.config(); + +const host = process.env.AMQP_HOST || "host"; +const username = process.env.AMQP_USERNAME || "sharedAccessKeyName"; +const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue"; +const port = parseInt(process.env.AMQP_PORT || "5671"); +const senderAddress = process.env.SENDER_ADDRESS || "address"; + +async function main(): Promise { + const connectionOptions: ConnectionOptions = { + transport: "tls", + host: host, + hostname: host, + username: username, + password: password, + port: port, + reconnect: false + }; + const connection: Connection = new Connection(connectionOptions); + const senderName = "sender-1"; + const awaitableSenderOptions: AwaitableSenderOptions = { + name: senderName, + target: { + address: senderAddress + }, + sendTimeoutInSeconds: 10 + }; + + await connection.open(); + // Notice that we are awaiting on the message being sent. + const sender: AwaitableSender = await connection.createAwaitableSender( + awaitableSenderOptions + ); + + for (let i = 0; i < 10; i++) { + const message: Message = { + body: `Hello World - ${i}`, + message_id: i + }; + // Note: Here we are awaiting for the send to complete. + // You will notice that `delivery.settled` will be `true`, irrespective of whether the promise resolves or rejects. + const delivery: Delivery = await sender.send(message); + console.log( + "[%s] await sendMessage -> Delivery id: %d, settled: %s", + connection.id, + delivery.id, + delivery.settled + ); + } await sender.close(); await connection.close(); @@ -222,3 +293,7 @@ npm i ``` npm run build ``` + + +## AMQP Protocol specification +Amqp protocol specification can be found [here](http://www.amqp.org/sites/amqp.org/files/amqp.pdf). \ No newline at end of file diff --git a/changelog.md b/changelog.md index 2bc9fd3..79da6c1 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,15 @@ +### 1.0.0 - 2019-06-18 +- Updated minimum version of `rhea` to `^1.0.8`. +- Added a read only property `id` to the `Session` object. The id property is created by concatenating session's local channel, remote channel and the connection id `"local-_remote-_"`, thus making it unique for that connection. +- Improved log statements by adding the session `id` and the sender, receiver `name` to help while debugging applications. +- Added `options` to `Link.close({closeSession: true | false})`, thus the user can specify whether the underlying session should be closed while closing the `Sender|Receiver`. Default is `true`. +- Improved `open` and `close` operations on `Connection`, `Session` and `Link` by creating timer in case the connection gets disconnected. Fixes [#41](https://github.com/amqp/rhea-promise/issues/41). +- The current `Sender` does not have a provision of **"awaiting"** on sending a message. The user needs to add handlers on the `Sender` for `accepted`, `rejected`, `released`, `modified` to ensure whether the message was successfully sent. +Now, we have added a new `AwaitableSender` which adds the handlers internally and provides an **awaitable** `send()` operation to the customer. Fixes [#45](https://github.com/amqp/rhea-promise/issues/45). +- Exporting new Errors: + - `InsufficientCreditError`: Defines the error that occurs when the Sender does not have enough credit. + - `SendOperationFailedError`: Defines the error that occurs when the Sender fails to send a message. + ### 0.2.0 - 2019-05-17 - Updated `OperationTimeoutError` to be a non-AMQP Error as pointed out in [#42](https://github.com/amqp/rhea-promise/issues/42). Fixed in [PR](https://github.com/amqp/rhea-promise/pull/43). diff --git a/examples/awaitableSend.ts b/examples/awaitableSend.ts new file mode 100644 index 0000000..7ecc2b8 --- /dev/null +++ b/examples/awaitableSend.ts @@ -0,0 +1,67 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache License. See License in the project root for license information. + +import { + Connection, + Message, + ConnectionOptions, + Delivery, + AwaitableSenderOptions, + AwaitableSender +} from "../lib"; + +import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file +dotenv.config(); + +const host = process.env.AMQP_HOST || "host"; +const username = process.env.AMQP_USERNAME || "sharedAccessKeyName"; +const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue"; +const port = parseInt(process.env.AMQP_PORT || "5671"); +const senderAddress = process.env.SENDER_ADDRESS || "address"; + +async function main(): Promise { + const connectionOptions: ConnectionOptions = { + transport: "tls", + host: host, + hostname: host, + username: username, + password: password, + port: port, + reconnect: false + }; + const connection: Connection = new Connection(connectionOptions); + const senderName = "sender-1"; + const senderOptions: AwaitableSenderOptions = { + name: senderName, + target: { + address: senderAddress + }, + sendTimeoutInSeconds: 10 + }; + + await connection.open(); + const sender: AwaitableSender = await connection.createAwaitableSender( + senderOptions + ); + + for (let i = 0; i < 10; i++) { + const message: Message = { + body: `Hello World - ${i}`, + message_id: i + }; + // Please, note that we are awaiting on sender.send() to complete. + // You will notice that `delivery.settled` will be `true`, irrespective of whether the promise resolves or rejects. + const delivery: Delivery = await sender.send(message); + console.log( + "[%s] await sendMessage -> Delivery id: %d, settled: %s", + connection.id, + delivery.id, + delivery.settled + ); + } + + await sender.close(); + await connection.close(); +} + +main().catch((err) => console.log(err)); diff --git a/examples/send.ts b/examples/send.ts index 155e24c..def8570 100644 --- a/examples/send.ts +++ b/examples/send.ts @@ -2,7 +2,13 @@ // Licensed under the Apache License. See License in the project root for license information. import { - Connection, Sender, EventContext, Message, ConnectionOptions, Delivery, SenderOptions + Connection, + Sender, + EventContext, + Message, + ConnectionOptions, + Delivery, + SenderOptions } from "../lib"; import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file @@ -34,15 +40,23 @@ async function main(): Promise { onError: (context: EventContext) => { const senderError = context.sender && context.sender.error; if (senderError) { - console.log(">>>>> [%s] An error occurred for sender '%s': %O.", - connection.id, senderName, senderError); + console.log( + ">>>>> [%s] An error occurred for sender '%s': %O.", + connection.id, + senderName, + senderError + ); } }, onSessionError: (context: EventContext) => { const sessionError = context.session && context.session.error; if (sessionError) { - console.log(">>>>> [%s] An error occurred for session of sender '%s': %O.", - connection.id, senderName, sessionError); + console.log( + ">>>>> [%s] An error occurred for session of sender '%s': %O.", + connection.id, + senderName, + sessionError + ); } } }; @@ -54,8 +68,15 @@ async function main(): Promise { message_id: "12343434343434" }; - const delivery: Delivery = await sender.send(message); - console.log(">>>>>[%s] Delivery id: ", connection.id, delivery.id); + // Please, note that we are not awaiting on sender.send() + // You will notice that `delivery.settled` will be `false`. + const delivery: Delivery = sender.send(message); + console.log( + ">>>>>[%s] send -> Delivery id: %d, settled: %s", + connection.id, + delivery.id, + delivery.settled + ); await sender.close(); await connection.close(); diff --git a/lib/awaitableSender.ts b/lib/awaitableSender.ts new file mode 100644 index 0000000..7d65fd3 --- /dev/null +++ b/lib/awaitableSender.ts @@ -0,0 +1,207 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache License. See License in the project root for license information. + +import { + Delivery, Message, Sender as RheaSender, SessionEvents +} from "rhea"; +import * as log from "./log"; +import { BaseSender, BaseSenderOptions } from "./sender"; +import { SenderEvents } from "rhea"; +import { OnAmqpEvent, EventContext } from "./eventContext"; +import { Session } from "./session"; +import { + OperationTimeoutError, InsufficientCreditError, SendOperationFailedError +} from "./errorDefinitions"; + +/** + * Describes the interface for the send operation Promise which contains a reference to resolve, + * reject functions and the timer for that Promise. + * @interface PromiseLike + */ +export interface PromiseLike { + resolve: (value?: any) => void; + reject: (reason?: any) => void; + timer: NodeJS.Timer; +} + +/** + * Describes the event listeners that can be added to the AwaitableSender. + * @interface Sender + */ +export declare interface AwaitableSender { + on(event: SenderEvents, listener: OnAmqpEvent): this; +} + +export interface AwaitableSenderOptions extends BaseSenderOptions { + /** + * The duration in which the promise to send the message should complete (resolve/reject). + * If it is not completed, then the Promise will be rejected after timeout occurs. + * Default: `20 seconds`. + */ + sendTimeoutInSeconds?: number; +} + +/** + * Describes the sender where one can await on the message being sent. + * @class AwaitableSender + */ +export class AwaitableSender extends BaseSender { + /** + * The duration in which the promise to send the message should complete (resolve/reject). + * If it is not completed, then the Promise will be rejected after timeout occurs. + * Default: `20 seconds`. + */ + sendTimeoutInSeconds: number; + /** + * @property {Map = new Map(); + + constructor(session: Session, sender: RheaSender, options: AwaitableSenderOptions = {}) { + super(session, sender, options); + this.sendTimeoutInSeconds = options.sendTimeoutInSeconds || 20; + /** + * The handler that will be added on the Sender for `accepted` event. If the delivery id is + * present in the disposition map then it will clear the timer and resolve the promise with the + * delivery. + * @param delivery Delivery associated with message that was sent. + */ + const onSendSuccess = (delivery: Delivery) => { + const id = delivery.id; + if (this.deliveryDispositionMap.has(delivery.id)) { + const promise = this.deliveryDispositionMap.get(id) as PromiseLike; + clearTimeout(promise.timer); + const deleteResult = this.deliveryDispositionMap.delete(id); + log.sender( + "[%s] Event: 'Accepted', Successfully deleted the delivery with id %d from " + + "the map of sender '%s' on amqp session '%s' and cleared the timer: %s.", + this.connection.id, id, this.name, this.session.id, deleteResult + ); + return promise.resolve(delivery); + } + }; + + /** + * The handler is added on the Sender for `rejected`, `released` and `modified` events. + * If the delivery is found in the disposition map then the timer will be cleared and the + * promise will be rejected with an appropriate error message. + * @param eventName Name of the event that was raised. + * @param id Delivery id. + * @param error Error from the context if any. + */ + const onSendFailure = ( + eventName: "rejected" | "released" | "modified" | "sender_error" | "session_error", + id: number, + error?: Error) => { + if (this.deliveryDispositionMap.has(id)) { + const promise = this.deliveryDispositionMap.get(id) as PromiseLike; + clearTimeout(promise.timer); + const deleteResult = this.deliveryDispositionMap.delete(id); + log.sender( + "[%s] Event: '%s', Successfully deleted the delivery with id %d from the " + + " map of sender '%s' on amqp session '%s' and cleared the timer: %s.", + this.connection.id, eventName, id, this.name, this.session.id, deleteResult + ); + const msg = `Sender '${this.name}' on amqp session '${this.session.id}', received a ` + + `'${eventName}' disposition. Hence we are rejecting the promise.`; + const err = new SendOperationFailedError(msg, eventName, error); + log.error("[%s] %s", this.connection.id, msg); + return promise.reject(err); + } + }; + + /** + * The handler that will be added on the Sender link for `sender_error` and on it's underlying + * session for `session_error` event. These events are raised when the sender link or it's + * underlying session get disconnected. + * The handler will clear the timer and reject the promise for every pending send in the map. + * @param eventName Name of the event that was raised. + * @param error Error from the context if any + */ + const onError = (eventName: "sender_error" | "session_error", error?: Error) => { + for (const id of this.deliveryDispositionMap.keys()) { + onSendFailure(eventName, id, error); + } + }; + + this.on(SenderEvents.accepted, (context: EventContext) => { + onSendSuccess(context.delivery!); + }); + this.on(SenderEvents.rejected, (context: EventContext) => { + const delivery = context.delivery!; + onSendFailure(SenderEvents.rejected, delivery.id, delivery.remote_state && delivery.remote_state.error); + }); + this.on(SenderEvents.released, (context: EventContext) => { + const delivery = context.delivery!; + onSendFailure(SenderEvents.released, delivery.id, delivery.remote_state && delivery.remote_state.error); + }); + this.on(SenderEvents.modified, (context: EventContext) => { + const delivery = context.delivery!; + onSendFailure(SenderEvents.modified, delivery.id, delivery.remote_state && delivery.remote_state.error); + }); + + // The user may have it's custom reconnect logic for bringing the sender link back online and + // retry logic for sending messages on failures hence they can provide their error handlers + // for `sender_error` and `session_error`. + // If the user did not provide its error handler for `sender_error` and `session_error`, + // then we add our handlers and make sure we clear the timer and reject the promise for sending + // messages with appropriate Error. + if (!options.onError) { + this.on(SenderEvents.senderError, (context: EventContext) => { + onError(SenderEvents.senderError, context.sender!.error as Error); + }); + } + if (!options.onSessionError) { + this.session.on(SessionEvents.sessionError, (context: EventContext) => { + onError(SessionEvents.sessionError, context.session!.error as Error); + }); + } + } + + /** + * Sends the message on which one can await to ensure that the message has been successfully + * delivered. + * @param {Message | Buffer} msg The message to be sent. For default AMQP format msg parameter + * should be of type Message interface. For a custom format, the msg parameter should be a Buffer + * and a valid value should be passed to the `format` argument. + * @param {Buffer | string} [tag] The message tag if any. + * @param {number} [format] The message format. Specify this if a message with custom format needs + * to be sent. `0` implies the standard AMQP 1.0 defined format. If no value is provided, then the + * given message is assumed to be of type Message interface and encoded appropriately. + * @returns {Promise} Promise The delivery information about the sent message. + */ + send(msg: Message | Buffer, tag?: Buffer | string, format?: number): Promise { + return new Promise((resolve, reject) => { + log.sender("[%s] Sender '%s' on amqp session '%s', credit: %d available: %d", + this.connection.id, this.name, this.session.id, this.credit, + this.session.outgoing.available()); + if (this.sendable()) { + const timer = setTimeout(() => { + this.deliveryDispositionMap.delete(delivery.id); + const message = `Sender '${this.name}' on amqp session ` + + `'${this.session.id}', with address '${this.address}' was not able to send the ` + + `message with delivery id ${delivery.id} right now, due to operation timeout.`; + log.error("[%s] %s", this.connection.id, message); + return reject(new OperationTimeoutError(message)); + }, this.sendTimeoutInSeconds * 1000); + + const delivery = (this._link as RheaSender).send(msg, tag, format); + this.deliveryDispositionMap.set(delivery.id, { + resolve: resolve, + reject: reject, + timer: timer + }); + } else { + // Please send the message after some time. + const msg = + `Sender "${this.name}" on amqp session "${this.session.id}", with address ` + + `${this.address} cannot send the message right now as it does not have ` + + `enough credit. Please try later.`; + log.error("[%s] %s", this.connection.id, msg); + reject(new InsufficientCreditError(msg)); + } + }); + } +} diff --git a/lib/connection.ts b/lib/connection.ts index 64bfbf6..85d5a49 100644 --- a/lib/connection.ts +++ b/lib/connection.ts @@ -18,7 +18,8 @@ import { import { OnAmqpEvent } from "./eventContext"; import { Entity } from "./entity"; -import { OperationTimeoutError } from "./operationTimeoutError"; +import { OperationTimeoutError } from "./errorDefinitions"; +import { AwaitableSender, AwaitableSenderOptions } from "./awaitableSender"; /** * Describes the options that can be provided while creating an AMQP sender. One can also provide @@ -29,6 +30,15 @@ export interface SenderOptionsWithSession extends SenderOptions { session?: Session; } +/** + * Describes the options that can be provided while creating an Async AMQP sender. + * One can also provide a session if it was already created. + * @interface AwaitableSenderOptionsWithSession + */ +export interface AwaitableSenderOptionsWithSession extends AwaitableSenderOptions { + session?: Session; +} + /** * Describes the options that can be provided while creating an AMQP receiver. One can also provide * a session if it was already created. @@ -324,12 +334,14 @@ export class Connection extends Entity { if (this.isOpen()) { let onClose: Func; let onError: Func; + let onDisconnected: Func; let waitTimer: any; const removeListeners = () => { clearTimeout(waitTimer); this.actionInitiated--; this._connection.removeListener(ConnectionEvents.connectionError, onError); this._connection.removeListener(ConnectionEvents.connectionClose, onClose); + this._connection.removeListener(ConnectionEvents.disconnected, onDisconnected); }; onClose = (context: RheaEventContext) => { @@ -346,6 +358,14 @@ export class Connection extends Entity { return reject(context.connection.error); }; + onDisconnected = (context: RheaEventContext) => { + removeListeners(); + const error = context.connection && context.connection.error + ? context.connection.error + : context.error; + log.error("[%s] Connection got disconnected while closing itself: %O.", this.id, error); + }; + const actionAfterTimeout = () => { removeListeners(); const msg: string = `Unable to close the amqp connection "${this.id}" due to operation timeout.`; @@ -356,6 +376,7 @@ export class Connection extends Entity { // listeners that we add for completing the operation are added directly to rhea's objects. this._connection.once(ConnectionEvents.connectionClose, onClose); this._connection.once(ConnectionEvents.connectionError, onError); + this._connection.once(ConnectionEvents.disconnected, onDisconnected); waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000); this._connection.close(); this.actionInitiated++; @@ -443,6 +464,7 @@ export class Connection extends Entity { session.actionInitiated++; let onOpen: Func; let onClose: Func; + let onDisconnected: Func; let waitTimer: any; const removeListeners = () => { @@ -450,11 +472,12 @@ export class Connection extends Entity { session.actionInitiated--; rheaSession.removeListener(SessionEvents.sessionOpen, onOpen); rheaSession.removeListener(SessionEvents.sessionClose, onClose); + rheaSession.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); }; onOpen = (context: RheaEventContext) => { removeListeners(); - log.session("[%s] Resolving the promise with amqp session.", this.id); + log.session("[%s] Resolving the promise with amqp session '%s'.", this.id, session.id); return resolve(session); }; @@ -465,6 +488,16 @@ export class Connection extends Entity { return reject(context.session!.error); }; + onDisconnected = (context: RheaEventContext) => { + removeListeners(); + const error = context.connection && context.connection.error + ? context.connection.error + : context.error; + log.error("[%s] Connection got disconnected while creating amqp session '%s': %O.", + this.id, session.id, error); + return reject(error); + }; + const actionAfterTimeout = () => { removeListeners(); const msg: string = `Unable to create the amqp session due to operation timeout.`; @@ -475,6 +508,7 @@ export class Connection extends Entity { // listeners that we add for completing the operation are added directly to rhea's objects. rheaSession.once(SessionEvents.sessionOpen, onOpen); rheaSession.once(SessionEvents.sessionClose, onClose); + rheaSession.connection.once(ConnectionEvents.disconnected, onDisconnected); log.session("[%s] Calling amqp session.begin().", this.id); waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000); rheaSession.begin(); @@ -494,6 +528,26 @@ export class Connection extends Entity { return session.createSender(options); } + /** + * Creates an awaitable amqp sender. It either uses the provided session or creates a new one. + * @param options Optional parameters to create an awaitable sender link. + * - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will + * clear the timer and reject the Promise for all the entries of inflight send operation in its + * `deliveryDispositionMap`. + * - If the user is handling the reconnection of sender link or the underlying connection in it's + * app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he + * shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation. + * + * @return Promise. + */ + async createAwaitableSender(options?: AwaitableSenderOptionsWithSession): Promise { + if (options && options.session && options.session.createAwaitableSender) { + return options.session.createAwaitableSender(options); + } + const session = await this.createSession(); + return session.createAwaitableSender(options); + } + /** * Creates an amqp receiver link. It either uses the provided session or creates a new one. * @param {ReceiverOptionsWithSession} options Optional parameters to create a receiver link. @@ -530,8 +584,8 @@ export class Connection extends Entity { session.createSender(senderOptions), session.createReceiver(receiverOptions) ]); - log.connection("[%s] Successfully created the sender and receiver links on the same session.", - this.id); + log.connection("[%s] Successfully created the sender '%s' and receiver '%s' on the same " + + "amqp session '%s'.", this.id, sender.name, receiver.name, session.id); return { session: session, sender: sender, @@ -555,6 +609,9 @@ export class Connection extends Entity { emitterType: "connection", connectionId: this.id }; + if (eventName === ConnectionEvents.protocolError) { + log.connection("[%s] ProtocolError is: %O.", this.id, context); + } emitEvent(params); }); } diff --git a/lib/errorDefinitions.ts b/lib/errorDefinitions.ts new file mode 100644 index 0000000..c533f76 --- /dev/null +++ b/lib/errorDefinitions.ts @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache License. See License in the project root for license information. + +/** + * Defines the error that occurs when an operation timeout occurs. + */ +export class OperationTimeoutError extends Error { + /** + * Describes the name of the error. + */ + readonly name: string = "OperationTimeoutError"; + + constructor(message: string) { + super(message); + } +} + +/** + * Defines the error that occurs when the Sender does not have enough credit. + */ +export class InsufficientCreditError extends Error { + /** + * Describes the name of the error. + */ + readonly name: string = "InsufficientCreditError"; + + constructor(message: string) { + super(message); + } +} + +/** + * Defines the error that occurs when the Sender fails to send a message. + */ +export class SendOperationFailedError extends Error { + /** + * Describes the name of the error. + */ + readonly name: string = "SendOperationFailedError"; + + constructor( + /** + * Provides descriptive information about the error. + */ + readonly message: string, + /** + * Provides the corresponding event associated with the `SendOperationFailedError`. + * - If the code is `"sender_error"` | `"session_error"`, then the send operation failed + * due to the sender link getting disconnected. + * - If the code is `"rejected"` | `"released"` | `"modified"`, then the send operation failed + * because the server is currently unable to accept the message being sent. Please take a look + * at the [AMQP 1.0 specification - "Section 3.4 Delivery State"](http://www.amqp.org/sites/amqp.org/files/amqp.pdf) + * for details about `"rejected"` | `"released"` | `"modified"` disposition. + */ + readonly code: "rejected" | "released" | "modified" | "sender_error" | "session_error", + /** + * Describes the underlying error that caused the send operation to fail. + */ + readonly innerError?: Error) { + super(message); + this.code = code; + this.innerError = innerError; + } +} diff --git a/lib/eventContext.ts b/lib/eventContext.ts index 77563d1..464c5b2 100644 --- a/lib/eventContext.ts +++ b/lib/eventContext.ts @@ -94,18 +94,18 @@ export module EventContext { rheaContext: RheaEventContext, emitter: Link | Session | Connection, eventName: string): EventContext { - const connectionId = (rheaContext.connection && rheaContext.connection.options) ? rheaContext.connection.options.id : ""; - log.contextTranslator("[%s] Translating the context for event: '%s'.", connectionId, eventName); + const connection: Connection = emitter instanceof Connection + ? emitter + : (emitter as Link | Session).connection; + + log.contextTranslator("[%s] Translating the context for event: '%s'.", connection.id, eventName); + // initialize the result const result: EventContext = { _context: rheaContext, ...rheaContext } as any; - const connection: Connection = emitter instanceof Connection - ? emitter - : (emitter as Link | Session).connection; - // set rhea-promise connection and container result.connection = connection; result.container = connection.container; diff --git a/lib/index.ts b/lib/index.ts index 318fc56..dcf0414 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -7,7 +7,8 @@ export { uuid_to_string, generate_uuid, string_to_uuid, LinkError, ProtocolError, LinkOptions, DeliveryAnnotations, MessageAnnotations, ReceiverEvents, SenderEvents, ConnectionEvents, SessionEvents, ContainerOptions as ContainerOptionsBase, TerminusOptions, Types, Sasl, - EndpointOptions, MessageUtil, TypeError, SimpleError, Source, ConnectionError, Typed, WebSocketImpl, WebSocketInstance + EndpointOptions, MessageUtil, TypeError, SimpleError, Source, ConnectionError, Typed, + WebSocketImpl, WebSocketInstance, TargetTerminusOptions } from "rhea"; export { EventContext, OnAmqpEvent } from "./eventContext"; @@ -18,7 +19,12 @@ export { export { Session } from "./session"; export { Receiver, ReceiverOptions } from "./receiver"; export { Sender, SenderOptions } from "./sender"; +export { AwaitableSenderOptions, AwaitableSender, PromiseLike } from "./awaitableSender"; +export { LinkCloseOptions } from "./link"; export { Func, AmqpResponseStatusCode, isAmqpError, ConnectionStringParseOptions, delay, messageHeader, messageProperties, parseConnectionString, ParsedOutput } from "./util/utils"; +export { + InsufficientCreditError, OperationTimeoutError, SendOperationFailedError +} from "./errorDefinitions"; diff --git a/lib/link.ts b/lib/link.ts index f568acb..50515f8 100644 --- a/lib/link.ts +++ b/lib/link.ts @@ -4,19 +4,32 @@ import * as log from "./log"; import { link, LinkOptions, AmqpError, Dictionary, Source, TerminusOptions, SenderEvents, ReceiverEvents, - EventContext as RheaEventContext + EventContext as RheaEventContext, ConnectionEvents } from "rhea"; import { Session } from "./session"; import { Connection } from "./connection"; import { Func, emitEvent, EmitParameters } from './util/utils'; import { Entity } from "./entity"; -import { OperationTimeoutError } from "./operationTimeoutError"; +import { OperationTimeoutError } from "./errorDefinitions"; export enum LinkType { sender = "sender", receiver = "receiver" } +/** + * @interface LinkCloseOptions + * Describes the options that can be provided while closing the link. + */ +export interface LinkCloseOptions { + /** + * Indicates whether the underlying amqp session should also be closed when the + * link is being closed. + * - **Default: `true`**. + */ + closeSession?: boolean; +} + export abstract class Link extends Entity { linkOptions?: LinkOptions; type: LinkType; @@ -208,17 +221,21 @@ export abstract class Link extends Entity { } /** - * Closes the underlying amqp link and session in rhea if open. Also removes all the event - * handlers added in the rhea-promise library on the link and it's session - * @return {Promise} Promise + * Closes the underlying amqp link and optionally the session as well in rhea if open. + * Also removes all the event handlers added in the rhea-promise library on the link + * and optionally it's session. + * @returns Promise * - **Resolves** the promise when rhea emits the "sender_close" | "receiver_close" event. * - **Rejects** the promise with an AmqpError when rhea emits the * "sender_error" | "receiver_error" event while trying to close the amqp link. */ - async close(): Promise { + async close(options?: LinkCloseOptions): Promise { + if (!options) options = {}; + if (options.closeSession == undefined) options.closeSession = true; this.removeAllListeners(); await new Promise((resolve, reject) => { - log.error("[%s] The %s is open ? -> %s", this.connection.id, this.type, this.isOpen()); + log.error("[%s] The %s '%s' on amqp session '%s' is open ? -> %s", + this.connection.id, this.type, this.name, this.session.id, this.isOpen()); if (this.isOpen()) { const errorEvent = this.type === LinkType.sender ? SenderEvents.senderError @@ -228,6 +245,7 @@ export abstract class Link extends Entity { : ReceiverEvents.receiverClose; let onError: Func; let onClose: Func; + let onDisconnected: Func; let waitTimer: any; const removeListeners = () => { @@ -235,32 +253,44 @@ export abstract class Link extends Entity { this.actionInitiated--; this._link.removeListener(errorEvent, onError); this._link.removeListener(closeEvent, onClose); + this._link.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); }; onClose = (context: RheaEventContext) => { removeListeners(); - log[this.type]("[%s] Resolving the promise as the amqp %s has been closed.", - this.connection.id, this.type); + log[this.type]("[%s] Resolving the promise as the %s '%s' on amqp session '%s' " + + "has been closed.", this.connection.id, this.type, this.name, this.session.id); return resolve(); }; onError = (context: RheaEventContext) => { removeListeners(); - log.error("[%s] Error occurred while closing amqp %s: %O.", - this.connection.id, this.type, context.session!.error); + log.error("[%s] Error occurred while closing %s '%s' on amqp session '%s': %O.", + this.connection.id, this.type, this.name, this.session.id, context.session!.error); return reject(context.session!.error); }; + onDisconnected = (context: RheaEventContext) => { + removeListeners(); + const error = context.connection && context.connection.error + ? context.connection.error + : context.error; + log.error("[%s] Connection got disconnected while closing amqp %s '%s' on amqp " + + "session '%s': %O.", this.connection.id, this.type, this.name, this.session.id, error); + }; + const actionAfterTimeout = () => { removeListeners(); - const msg: string = `Unable to close the amqp %s ${this.name} due to operation timeout.`; - log.error("[%s] %s", this.connection.id, this.type, msg); + const msg: string = `Unable to close the ${this.type} '${this.name}' ` + + `on amqp session '${this.session.id}' due to operation timeout.`; + log.error("[%s] %s", this.connection.id, msg); return reject(new OperationTimeoutError(msg)); }; // listeners that we add for completing the operation are added directly to rhea's objects. this._link.once(closeEvent, onClose); this._link.once(errorEvent, onError); + this._link.connection.once(ConnectionEvents.disconnected, onDisconnected); waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000); this._link.close(); @@ -269,9 +299,12 @@ export abstract class Link extends Entity { return resolve(); } }); - log[this.type]("[%s] %s has been closed, now closing it's session.", - this.connection.id, this.type); - return this._session.close(); + + if (options.closeSession) { + log[this.type]("[%s] %s '%s' has been closed, now closing it's amqp session '%s'.", + this.connection.id, this.type, this.name, this.session.id); + return this._session.close(); + } } /** diff --git a/lib/operationTimeoutError.ts b/lib/operationTimeoutError.ts deleted file mode 100644 index fdbacac..0000000 --- a/lib/operationTimeoutError.ts +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the Apache License. See License in the project root for license information. - -/** - * Defines the error that occurs when an operation timeout occurs. - */ -export class OperationTimeoutError extends Error { - /** - * Describes the name of the error. - */ - readonly name: string = "OperationTimeoutError"; - - constructor(message: string) { - super(message); - } -} diff --git a/lib/sender.ts b/lib/sender.ts index 2988653..e0e1c2b 100644 --- a/lib/sender.ts +++ b/lib/sender.ts @@ -9,11 +9,38 @@ import { SenderEvents } from "rhea"; import { Link, LinkType } from './link'; import { OnAmqpEvent } from "./eventContext"; +/** + * Descibes the options that can be provided while creating an AMQP Basesender. + * @interface BaseSenderOptions + */ +export interface BaseSenderOptions extends RheaSenderOptions { + /** + * @property {OnAmqpEvent} [onError] The handler that can be provided for receiving any + * errors that occur on the "sender_error" event. + */ + onError?: OnAmqpEvent; + /** + * @property {OnAmqpEvent} [onClose] The handler that can be provided for receiving the + * "sender_close" event. + */ + onClose?: OnAmqpEvent; + /** + * @property {OnAmqpEvent} [onSessionError] The handler that can be provided for receiving + * the "session_error" event that occurs on the underlying session. + */ + onSessionError?: OnAmqpEvent; + /** + * @property {OnAmqpEvent} [onSessionClose] The handler that can be provided for receiving the + * "session_close" event that occurs on the underlying session. + */ + onSessionClose?: OnAmqpEvent; +} + /** * Descibes the options that can be provided while creating an AMQP sender. * @interface SenderOptions */ -export interface SenderOptions extends RheaSenderOptions { +export interface SenderOptions extends BaseSenderOptions { /** * @property {OnAmqpEvent} [onAccepted] The handler that can be provided for receiving the * "accepted" event after a message is sent from the underlying rhea sender. @@ -34,26 +61,6 @@ export interface SenderOptions extends RheaSenderOptions { * "modified" event after a message is sent from the underlying rhea sender. */ onModified?: OnAmqpEvent; - /** - * @property {OnAmqpEvent} [onError] The handler that can be provided for receiving any - * errors that occur on the "sender_error" event. - */ - onError?: OnAmqpEvent; - /** - * @property {OnAmqpEvent} [onClose] The handler that can be provided for receiving the - * "sender_close" event. - */ - onClose?: OnAmqpEvent; - /** - * @property {OnAmqpEvent} [onSessionError] The handler that can be provided for receiving - * the "session_error" event that occurs on the underlying session. - */ - onSessionError?: OnAmqpEvent; - /** - * @property {OnAmqpEvent} [onSessionClose] The handler that can be provided for receiving the - * "session_close" event that occurs on the underlying session. - */ - onSessionClose?: OnAmqpEvent; } /** @@ -65,13 +72,12 @@ export declare interface Sender { } /** - * Describes the sender that wraps the rhea sender. - * @class Sender + * Describes the base sender that wraps the rhea sender. + * @class BaseSender */ -export class Sender extends Link { - senderOptions?: SenderOptions; +export class BaseSender extends Link { - constructor(session: Session, sender: RheaSender, options?: SenderOptions) { + constructor(session: Session, sender: RheaSender, options?: BaseSenderOptions) { super(LinkType.sender, session, sender, options); } @@ -86,6 +92,17 @@ export class Sender extends Link { sendable(): boolean { return (this._link as RheaSender).sendable(); } +} + +/** + * Describes the AMQP Sender. + * @class Sender + */ +export class Sender extends BaseSender { + + constructor(session: Session, sender: RheaSender, options?: SenderOptions) { + super(session, sender, options); + } /** * Sends the message diff --git a/lib/session.ts b/lib/session.ts index 683d9b7..ec66f2c 100644 --- a/lib/session.ts +++ b/lib/session.ts @@ -7,12 +7,13 @@ import { Receiver, ReceiverOptions } from "./receiver"; import { Sender, SenderOptions } from "./sender"; import { SenderEvents, ReceiverEvents, SessionEvents, AmqpError, Session as RheaSession, - EventContext as RheaEventContext + EventContext as RheaEventContext, ConnectionEvents } from "rhea"; import { Func, EmitParameters, emitEvent } from "./util/utils"; import { OnAmqpEvent } from "./eventContext"; import { Entity } from "./entity"; -import { OperationTimeoutError } from "./operationTimeoutError"; +import { OperationTimeoutError } from "./errorDefinitions"; +import { AwaitableSender, AwaitableSenderOptions } from "./awaitableSender"; /** * Describes the event listeners that can be added to the Session. @@ -22,6 +23,14 @@ export declare interface Session { on(event: SessionEvents, listener: OnAmqpEvent): this; } +/** + * @internal + */ +enum SenderType { + sender = "sender", + AwaitableSender = "AwaitableSender" +} + /** * Describes the session that wraps the rhea session. * @class Session @@ -52,6 +61,29 @@ export class Session extends Entity { return this._session.error; } + /** + * Returns the unique identifier for the session in the format: + * "local_-remote_-" or an empty string if the local channel or + * remote channel are not yet defined. + */ + get id(): string { + let result: string = ""; + const session: any = this._session; + if (session.local) { + result += `local-${session.local.channel}_`; + } + + if (session.remote) { + result += `remote-${session.remote.channel}_`; + } + + if (result) { + result += `${this._connection.id}`; + } + + return result; + } + /** * Determines whether the session and the underlying connection is open. * @returns {boolean} result `true` - is open; `false` otherwise. @@ -114,10 +146,11 @@ export class Session extends Entity { close(): Promise { this.removeAllListeners(); return new Promise((resolve, reject) => { - log.error("[%s] The session is open ? -> %s", this.connection.id, this.isOpen()); + log.error("[%s] The amqp session '%s' is open ? -> %s", this.connection.id, this.id, this.isOpen()); if (this.isOpen()) { let onError: Func; let onClose: Func; + let onDisconnected: Func; let waitTimer: any; const removeListeners = () => { @@ -125,25 +158,35 @@ export class Session extends Entity { this.actionInitiated--; this._session.removeListener(SessionEvents.sessionError, onError); this._session.removeListener(SessionEvents.sessionClose, onClose); + this._session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); }; onClose = (context: RheaEventContext) => { removeListeners(); - log.session("[%s] Resolving the promise as the amqp session has been closed.", - this.connection.id); + log.session("[%s] Resolving the promise as the amqp session '%s' has been closed.", + this.connection.id, this.id); return resolve(); }; onError = (context: RheaEventContext) => { removeListeners(); - log.error("[%s] Error occurred while closing amqp session.", - this.connection.id, context.session!.error); + log.error("[%s] Error occurred while closing amqp session '%s'.", + this.connection.id, this.id, context.session!.error); reject(context.session!.error); }; + onDisconnected = (context: RheaEventContext) => { + removeListeners(); + const error = context.connection && context.connection.error + ? context.connection.error + : context.error; + log.error("[%s] Connection got disconnected while closing amqp session '%s': %O.", + this.connection.id, this.id, error); + }; + const actionAfterTimeout = () => { removeListeners(); - const msg: string = `Unable to close the amqp session due to operation timeout.`; + const msg: string = `Unable to close the amqp session ${this.id} due to operation timeout.`; log.error("[%s] %s", this.connection.id, msg); reject(new OperationTimeoutError(msg)); }; @@ -151,7 +194,8 @@ export class Session extends Entity { // listeners that we add for completing the operation are added directly to rhea's objects. this._session.once(SessionEvents.sessionClose, onClose); this._session.once(SessionEvents.sessionError, onError); - log.session("[%s] Calling session.close()", this.connection.id); + this._session.connection.once(ConnectionEvents.disconnected, onDisconnected); + log.session("[%s] Calling session.close() for amqp session '%s'.", this.connection.id, this.id); waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000); this._session.close(); this.actionInitiated++; @@ -163,9 +207,9 @@ export class Session extends Entity { /** * Creates an amqp receiver on this session. - * @param {Session} session The amqp session object on which the receiver link needs to be established. - * @param {ReceiverOptions} [options] Options that can be provided while creating an amqp receiver. - * @return {Promise} Promise + * @param session The amqp session object on which the receiver link needs to be established. + * @param options Options that can be provided while creating an amqp receiver. + * @return Promise * - **Resolves** the promise with the Receiver object when rhea emits the "receiver_open" event. * - **Rejects** the promise with an AmqpError when rhea emits the "receiver_close" event while trying * to create an amqp receiver or the operation timeout occurs. @@ -196,20 +240,21 @@ export class Session extends Entity { // to our (rhea-promise) object. if (options && options.onSessionError) { this.on(SessionEvents.sessionError, options.onSessionError); - log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " + - "while creating the 'receiver'.", this.connection.id, SessionEvents.sessionError); + log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " + + "while creating the 'receiver'.", this.connection.id, SessionEvents.sessionError, this.id); } if (options && options.onSessionClose) { this.on(SessionEvents.sessionClose, options.onSessionClose); - log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " + - " while creating the 'receiver'.", this.connection.id, SessionEvents.sessionClose); + log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " + + " while creating the 'receiver'.", this.connection.id, SessionEvents.sessionClose, this.id); } const rheaReceiver = this._session.attach_receiver(options); const receiver = new Receiver(this, rheaReceiver, options); receiver.actionInitiated++; let onOpen: Func; let onClose: Func; + let onDisconnected: Func; let waitTimer: any; if (options && options.onMessage) { @@ -240,26 +285,38 @@ export class Session extends Entity { receiver.actionInitiated--; rheaReceiver.removeListener(ReceiverEvents.receiverOpen, onOpen); rheaReceiver.removeListener(ReceiverEvents.receiverClose, onClose); + rheaReceiver.session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); }; onOpen = (context: RheaEventContext) => { removeListeners(); - log.receiver("[%s] Resolving the promise with amqp receiver '%s'.", - this.connection.id, receiver.name); + log.receiver("[%s] Resolving the promise with amqp receiver '%s' on amqp session '%s'.", + this.connection.id, receiver.name, this.id); return resolve(receiver); }; onClose = (context: RheaEventContext) => { removeListeners(); - log.error("[%s] Error occurred while creating a receiver over amqp connection: %O.", - this.connection.id, context.receiver!.error); + log.error("[%s] Error occurred while creating the amqp receiver '%s' on amqp session " + + "'%s' over amqp connection: %O.", + this.connection.id, receiver.name, this.id, context.receiver!.error); return reject(context.receiver!.error); }; + onDisconnected = (context: RheaEventContext) => { + removeListeners(); + const error = context.connection && context.connection.error + ? context.connection.error + : context.error; + log.error("[%s] Connection got disconnected while creating amqp receiver '%s' on amqp " + + "session '%s': %O.", this.connection.id, receiver.name, this.id, error); + return reject(error); + }; + const actionAfterTimeout = () => { removeListeners(); - const msg: string = `Unable to create the amqp receiver ${receiver.name} due to ` + - `operation timeout.`; + const msg: string = `Unable to create the amqp receiver '${receiver.name}' on amqp ` + + `session '${this.id}' due to operation timeout.`; log.error("[%s] %s", this.connection.id, msg); return reject(new OperationTimeoutError(msg)); }; @@ -267,38 +324,76 @@ export class Session extends Entity { // listeners that we add for completing the operation are added directly to rhea's objects. rheaReceiver.once(ReceiverEvents.receiverOpen, onOpen); rheaReceiver.once(ReceiverEvents.receiverClose, onClose); + rheaReceiver.session.connection.on(ConnectionEvents.disconnected, onDisconnected); waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000); }); } /** * Creates an amqp sender on this session. - * @param {SenderOptions} [options] Options that can be provided while creating an amqp sender. - * @return {Promise} Promise + * @param options Options that can be provided while creating an amqp sender. + * @return Promise * - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event. * - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while trying * to create an amqp sender or the operation timeout occurs. */ createSender(options?: SenderOptions): Promise { + return this._createSender(SenderType.sender, options) as Promise; + } + + /** + * Creates an awaitable amqp sender on this session. + * @param options Options that can be provided while creating an async amqp sender. + * - If `onError` and `onSessionError` handlers are not provided then the `AwaitableSender` will + * clear the timer and reject the Promise for all the entries of inflight send operation in its + * `deliveryDispositionMap`. + * - If the user is handling the reconnection of sender link or the underlying connection in it's + * app, then the `onError` and `onSessionError` handlers must be provided by the user and (s)he + * shall be responsible of clearing the `deliveryDispotionMap` of inflight `send()` operation. + * + * @return Promise + * - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event. + * - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while trying + * to create an amqp sender or the operation timeout occurs. + */ + createAwaitableSender(options?: AwaitableSenderOptions): Promise { + return this._createSender(SenderType.AwaitableSender, options) as Promise; + } + + /** + * Creates the Sender based on the provided type. + * @internal + * @param type The type of sender + * @param options Options to be provided while creating the sender. + */ + private _createSender( + type: SenderType, + options?: SenderOptions | AwaitableSenderOptions): Promise { return new Promise((resolve, reject) => { // Register session handlers for session_error and session_close if provided. if (options && options.onSessionError) { this.on(SessionEvents.sessionError, options.onSessionError); - log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " + - "while creating the sender.", this.connection.id, SessionEvents.sessionError); + log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " + + "while creating the sender.", this.connection.id, SessionEvents.sessionError, this.id); } if (options && options.onSessionClose) { this.on(SessionEvents.sessionClose, options.onSessionClose); - log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " + - "while creating the sender.", this.connection.id, SessionEvents.sessionClose); + log.session("[%s] Added event handler for event '%s' on rhea-promise 'session: %s', " + + "while creating the sender.", this.connection.id, SessionEvents.sessionClose, this.id); } const rheaSender = this._session.attach_sender(options); - const sender = new Sender(this, rheaSender, options); + let sender: Sender | AwaitableSender; + if (type === SenderType.sender) { + sender = new Sender(this, rheaSender, options); + } else { + sender = new AwaitableSender(this, rheaSender, options); + } sender.actionInitiated++; let onSendable: Func; let onClose: Func; + let onDisconnected: Func; let waitTimer: any; // listeners provided by the user in the options object should be added @@ -310,17 +405,19 @@ export class Session extends Entity { if (options.onClose) { sender.on(SenderEvents.senderClose, options.onClose); } - if (options.onAccepted) { - sender.on(SenderEvents.accepted, options.onAccepted); - } - if (options.onRejected) { - sender.on(SenderEvents.rejected, options.onRejected); - } - if (options.onReleased) { - sender.on(SenderEvents.released, options.onReleased); - } - if (options.onModified) { - sender.on(SenderEvents.modified, options.onModified); + if (type === SenderType.sender) { + if ((options as SenderOptions).onAccepted) { + sender.on(SenderEvents.accepted, (options as SenderOptions).onAccepted!); + } + if ((options as SenderOptions).onRejected) { + sender.on(SenderEvents.rejected, (options as SenderOptions).onRejected!); + } + if ((options as SenderOptions).onReleased) { + sender.on(SenderEvents.released, (options as SenderOptions).onReleased!); + } + if ((options as SenderOptions).onModified) { + sender.on(SenderEvents.modified, (options as SenderOptions).onModified!); + } } } @@ -329,26 +426,38 @@ export class Session extends Entity { sender.actionInitiated--; rheaSender.removeListener(SenderEvents.senderOpen, onSendable); rheaSender.removeListener(SenderEvents.senderClose, onClose); + rheaSender.session.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); }; onSendable = (context: RheaEventContext) => { removeListeners(); - log.sender("[%s] Resolving the promise with amqp sender '%s'.", - this.connection.id, sender.name); + log.sender("[%s] Resolving the promise with amqp sender '%s' on amqp session '%s'.", + this.connection.id, sender.name, this.id); return resolve(sender); }; onClose = (context: RheaEventContext) => { removeListeners(); - log.error("[%s] Error occurred while creating a sender over amqp connection: %O.", - this.connection.id, context.sender!.error); + log.error("[%s] Error occurred while creating the amqp sender '%s' on amqp session '%s' " + + "over amqp connection: %O.", + this.connection.id, sender.name, this.id, context.sender!.error); return reject(context.sender!.error); }; + onDisconnected = (context: RheaEventContext) => { + removeListeners(); + const error = context.connection && context.connection.error + ? context.connection.error + : context.error; + log.error("[%s] Connection got disconnected while creating amqp sender '%s' on amqp " + + "session '%s': %O.", this.connection.id, sender.name, this.id, error); + return reject(error); + }; + const actionAfterTimeout = () => { removeListeners(); - const msg: string = `Unable to create the amqp sender ${sender.name} due to ` + - `operation timeout.`; + const msg: string = `Unable to create the amqp sender '${sender.name}' on amqp session ` + + `'${this.id}' due to operation timeout.`; log.error("[%s] %s", this.connection.id, msg); return reject(new OperationTimeoutError(msg)); }; @@ -356,6 +465,7 @@ export class Session extends Entity { // listeners that we add for completing the operation are added directly to rhea's objects. rheaSender.once(SenderEvents.sendable, onSendable); rheaSender.once(SenderEvents.senderClose, onClose); + rheaSender.session.connection.on(ConnectionEvents.disconnected, onDisconnected); waitTimer = setTimeout(actionAfterTimeout, this.connection.options!.operationTimeoutInSeconds! * 1000); }); } @@ -429,8 +539,9 @@ export class Session extends Entity { emitEvent(params); }); if (typeof this._session.eventNames === "function") { - log.eventHandler("[%s] rhea-promise 'session' object is listening for events: %o " + - "emitted by rhea's 'session' object.", this.connection.id, this._session.eventNames()); + log.eventHandler("[%s] rhea-promise 'session' object '%s' is listening for events: %o " + + "emitted by rhea's 'session' object.", + this.connection.id, this.id, this._session.eventNames()); } } } diff --git a/lib/util/utils.ts b/lib/util/utils.ts index f6e83f5..268e980 100644 --- a/lib/util/utils.ts +++ b/lib/util/utils.ts @@ -178,8 +178,10 @@ export interface EmitParameters { */ export function emitEvent(params: EmitParameters): void { const emit = () => { - log[params.emitterType]("[%s] %s got event: '%s'. Re-emitting the translated context.", - params.connectionId, params.emitterType, params.eventName); + const id = params.emitter && + ((params.emitter as Connection | Session).id || (params.emitter as Link).name); + log[params.emitterType]("[%s] %s '%s' got event: '%s'. Re-emitting the translated context.", + params.connectionId, params.emitterType, id, params.eventName); params.emitter.emit(params.eventName, EventContext.translate(params.rheaContext, params.emitter, params.eventName)); }; diff --git a/package-lock.json b/package-lock.json index 031f08f..c2c53ac 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "rhea-promise", - "version": "0.2.0", + "version": "1.0.0", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -144,9 +144,9 @@ } }, "diff": { - "version": "3.5.0", - "resolved": "https://registry.npmjs.org/diff/-/diff-3.5.0.tgz", - "integrity": "sha512-A46qtFgd+g7pDZinpnwiRJtxbC1hpgf0uzP3iG89scHk0AUC7A1TGxf5OiiOUv/JMZR8GOt8hL900hV0bOy5xA==", + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.1.tgz", + "integrity": "sha512-s2+XdvhPCOF01LRQBC8hf4vhbVmI2CGS5aZnxLJlT5FtdhPCDFq80q++zK2KlrVorVDdL5BOGZ/VfLrVtYNF+Q==", "dev": true }, "dotenv": { @@ -297,9 +297,9 @@ } }, "rhea": { - "version": "1.0.7", - "resolved": "https://registry.npmjs.org/rhea/-/rhea-1.0.7.tgz", - "integrity": "sha512-AQOnM8OjGZyTP1TFsP7IY6O0+obYmkF2OxuRbj0O5eUzNUA5w3jeMOwa7aD1MnjwE5RIZjwdNdIfSzoWk8ANag==", + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/rhea/-/rhea-1.0.8.tgz", + "integrity": "sha512-TNv6rD/74h2QiXrUpFHw6fzGNuOpVOcjPRGtlKC5eIy5NLfvb2RiZGZs7lpg4al22p5pAAQjLZuPTDipReUzPA==", "requires": { "debug": "0.8.0 - 3.5.0" } @@ -351,27 +351,27 @@ } }, "ts-node": { - "version": "8.1.0", - "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-8.1.0.tgz", - "integrity": "sha512-34jpuOrxDuf+O6iW1JpgTRDFynUZ1iEqtYruBqh35gICNjN8x+LpVcPAcwzLPi9VU6mdA3ym+x233nZmZp445A==", + "version": "8.2.0", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-8.2.0.tgz", + "integrity": "sha512-m8XQwUurkbYqXrKqr3WHCW310utRNvV5OnRVeISeea7LoCWVcdfeB/Ntl8JYWFh+WRoUAdBgESrzKochQt7sMw==", "dev": true, "requires": { "arg": "^4.1.0", - "diff": "^3.1.0", + "diff": "^4.0.1", "make-error": "^1.1.1", "source-map-support": "^0.5.6", "yn": "^3.0.0" } }, "tslib": { - "version": "1.9.3", - "resolved": "https://registry.npmjs.org/tslib/-/tslib-1.9.3.tgz", - "integrity": "sha512-4krF8scpejhaOgqzBEcGM7yDIEfi0/8+8zDRZhNZZ2kjmHJ4hv3zCbQWxoJGz1iw5U0Jl0nma13xzHXcncMavQ==" + "version": "1.10.0", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-1.10.0.tgz", + "integrity": "sha512-qOebF53frne81cf0S9B41ByenJ3/IuH8yJKngAX35CmiZySA0khhkovshKK+jGCaMnVomla7gVlIcc3EvKPbTQ==" }, "tslint": { - "version": "5.16.0", - "resolved": "https://registry.npmjs.org/tslint/-/tslint-5.16.0.tgz", - "integrity": "sha512-UxG2yNxJ5pgGwmMzPMYh/CCnCnh0HfPgtlVRDs1ykZklufFBL1ZoTlWFRz2NQjcoEiDoRp+JyT0lhBbbH/obyA==", + "version": "5.17.0", + "resolved": "https://registry.npmjs.org/tslint/-/tslint-5.17.0.tgz", + "integrity": "sha512-pflx87WfVoYepTet3xLfDOLDm9Jqi61UXIKePOuca0qoAZyrGWonDG9VTbji58Fy+8gciUn8Bt7y69+KEVjc/w==", "dev": true, "requires": { "@babel/code-frame": "^7.0.0", @@ -380,13 +380,21 @@ "commander": "^2.12.1", "diff": "^3.2.0", "glob": "^7.1.1", - "js-yaml": "^3.13.0", + "js-yaml": "^3.13.1", "minimatch": "^3.0.4", "mkdirp": "^0.5.1", "resolve": "^1.3.2", "semver": "^5.3.0", "tslib": "^1.8.0", "tsutils": "^2.29.0" + }, + "dependencies": { + "diff": { + "version": "3.5.0", + "resolved": "https://registry.npmjs.org/diff/-/diff-3.5.0.tgz", + "integrity": "sha512-A46qtFgd+g7pDZinpnwiRJtxbC1hpgf0uzP3iG89scHk0AUC7A1TGxf5OiiOUv/JMZR8GOt8hL900hV0bOy5xA==", + "dev": true + } } }, "tsutils": { @@ -399,9 +407,9 @@ } }, "typescript": { - "version": "3.4.5", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.4.5.tgz", - "integrity": "sha512-YycBxUb49UUhdNMU5aJ7z5Ej2XGmaIBL0x34vZ82fn3hGvD+bgrMrVDpatgz2f7YxUMJxMkbWxJZeAvDxVe7Vw==", + "version": "3.5.1", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.5.1.tgz", + "integrity": "sha512-64HkdiRv1yYZsSe4xC1WVgamNigVYjlssIoaH2HcZF0+ijsk5YK2g0G34w9wJkze8+5ow4STd22AynfO6ZYYLw==", "dev": true }, "wrappy": { diff --git a/package.json b/package.json index 8c99df0..1503fe7 100644 --- a/package.json +++ b/package.json @@ -1,14 +1,14 @@ { "name": "rhea-promise", - "version": "0.2.0", + "version": "1.0.0", "description": "A Promisified layer over rhea AMQP client", "license": "Apache-2.0", "main": "./dist/lib/index.js", "types": "./typings/lib/index.d.ts", "dependencies": { "debug": "^3.1.0", - "rhea": "^1.0.7", - "tslib": "^1.9.3" + "rhea": "^1.0.8", + "tslib": "^1.10.0" }, "keywords": [ "amqp", @@ -22,9 +22,9 @@ "@types/node": "^8.0.37", "@types/dotenv": "^6.1.1", "rimraf": "^2.6.3", - "ts-node": "^8.1.0", - "tslint": "^5.16.0", - "typescript": "^3.4.5", + "ts-node": "^8.2.0", + "tslint": "^5.17.0", + "typescript": "^3.5.1", "dotenv": "^8.0.0" }, "scripts": { diff --git a/sample.env b/sample.env index 6b0e9f4..8db1bc5 100644 --- a/sample.env +++ b/sample.env @@ -1,12 +1,16 @@ AMQP_HOST="name of the host,ex: foo.servicebus.windows.net" AMQP_USERNAME="username, ex: RootManagedSharedAccessKey" AMQP_PASSWORD="sharedAccessKeyValue" -AMQP_PORT=5672 +AMQP_PORT=5671 # - For Azure EventHub # - EventHub Receiver: "/ConsumerGroups//Partitions/" +# - For Azure ServiceBus +# - "" RECEIVER_ADDRESS="address of the receiver" # - For Azure EventHub # - PartitionedSender: "/Partitions/" # - Normal Sender: "" +# - For Azure ServiceBus +# - "" SENDER_ADDRESS="address of the sender" DEBUG=rhea*,-rhea:raw,-rhea:message,-azure:amqp-common:datatransformer \ No newline at end of file diff --git a/tslint.json b/tslint.json index be926c3..9eaa641 100644 --- a/tslint.json +++ b/tslint.json @@ -41,7 +41,6 @@ "no-unsafe-finally": true, "no-unused-expression": true, "no-unused-variable": false, - "no-use-before-declare": true, "no-var-keyword": true, "no-floating-promises": true, "no-return-await": true,