Skip to content

Commit

Permalink
Multiple fixes and updates to rhea-promise (#44)
Browse files Browse the repository at this point in the history
* Added id to the session object and improved log statements

* add closeSync flavored methods to Link and Session

* simplify the closeSync version.

* rectify the example

* Handle connection getting disconnected while closing the link, session or connection itself.

* handle connection disconnects while creating a session, sender or receiver.

* update readme

* Added asynchronous sender

* refactor code

* updated changelog

* addressed review feedback

* remove closeSync()

* address review feedback

* Awaitable Sender

* fix casing

* Log the protocol error since rhea does not pass that error to the context in disconnected event handler

* improve logs

* updates as per code review

* improved log statement and updated min. version of rhea to 1.0.8
  • Loading branch information
amarzavery authored Jun 27, 2019
1 parent 6f1a916 commit 3fdbbd5
Show file tree
Hide file tree
Showing 19 changed files with 847 additions and 160 deletions.
22 changes: 21 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,32 @@
{
"type": "node",
"request": "launch",
"name": "examples",
"name": "awaitableSend",
"program": "${workspaceFolder}/examples/awaitableSend.ts",
"outFiles": [
"${workspaceFolder}/dist/**/*.js"
],
"envFile": "${workspaceFolder}/.env" // You can take a look at the sample.env file for supported environment variables.
},
{
"type": "node",
"request": "launch",
"name": "send",
"program": "${workspaceFolder}/examples/send.ts",
"outFiles": [
"${workspaceFolder}/dist/**/*.js"
],
"envFile": "${workspaceFolder}/.env" // You can take a look at the sample.env file for supported environment variables.
},
{
"type": "node",
"request": "launch",
"name": "receive",
"program": "${workspaceFolder}/examples/receive.ts",
"outFiles": [
"${workspaceFolder}/dist/**/*.js"
],
"envFile": "${workspaceFolder}/.env" // You can take a look at the sample.env file for supported environment variables.
}
]
}
83 changes: 79 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# rhea-promise

A Promisified layer over rhea AMQP client.
A Promisified layer over [rhea](https://githhub.com/amqp/rhea) AMQP client.

## Pre-requisite ##
- **Node.js version: 6.x or higher.**
Expand Down Expand Up @@ -73,7 +73,7 @@ We believe our design enforces good practices to be followed while using the eve
Please take a look at the [sample.env](https://github.com/amqp/rhea-promise/blob/master/sample.env) file for examples on how to provide the values for different
parameters like host, username, password, port, senderAddress, receiverAddress, etc.
#### Sending a message.
#### Sending a message via `Sender`.
- Running the example from terminal: `> ts-node ./examples/send.ts`.
**NOTE:** If you are running the sample with `.env` config file, then please run the sample from the directory that contains `.env` config file.
Expand Down Expand Up @@ -130,8 +130,79 @@ async function main(): Promise<void> {
message_id: "12343434343434"
};
const delivery: Delivery = await sender.send(message);
console.log(">>>>>[%s] Delivery id: ", connection.id, delivery.id);
// Please, note that we are not awaiting on sender.send()
// You will notice that `delivery.settled` will be `false`.
const delivery: Delivery = sender.send(message);
console.log(">>>>>[%s] Delivery id: %d, settled: %s",
connection.id,
delivery.id,
delivery.settled);
await sender.close();
await connection.close();
}
main().catch((err) => console.log(err));
```
### Sending a message via `AwaitableSender`
- Running the example from terminal: `> ts-node ./examples/awaitableSend.ts`.
```typescript
import {
Connection, Message, ConnectionOptions, Delivery, AwaitableSenderOptions, AwaitableSender
} from "rhea-promise";
import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
dotenv.config();
const host = process.env.AMQP_HOST || "host";
const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue";
const port = parseInt(process.env.AMQP_PORT || "5671");
const senderAddress = process.env.SENDER_ADDRESS || "address";
async function main(): Promise<void> {
const connectionOptions: ConnectionOptions = {
transport: "tls",
host: host,
hostname: host,
username: username,
password: password,
port: port,
reconnect: false
};
const connection: Connection = new Connection(connectionOptions);
const senderName = "sender-1";
const awaitableSenderOptions: AwaitableSenderOptions = {
name: senderName,
target: {
address: senderAddress
},
sendTimeoutInSeconds: 10
};
await connection.open();
// Notice that we are awaiting on the message being sent.
const sender: AwaitableSender = await connection.createAwaitableSender(
awaitableSenderOptions
);
for (let i = 0; i < 10; i++) {
const message: Message = {
body: `Hello World - ${i}`,
message_id: i
};
// Note: Here we are awaiting for the send to complete.
// You will notice that `delivery.settled` will be `true`, irrespective of whether the promise resolves or rejects.
const delivery: Delivery = await sender.send(message);
console.log(
"[%s] await sendMessage -> Delivery id: %d, settled: %s",
connection.id,
delivery.id,
delivery.settled
);
}
await sender.close();
await connection.close();
Expand Down Expand Up @@ -222,3 +293,7 @@ npm i
```
npm run build
```
## AMQP Protocol specification
Amqp protocol specification can be found [here](http://www.amqp.org/sites/amqp.org/files/amqp.pdf).
12 changes: 12 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
### 1.0.0 - 2019-06-18
- Updated minimum version of `rhea` to `^1.0.8`.
- Added a read only property `id` to the `Session` object. The id property is created by concatenating session's local channel, remote channel and the connection id `"local-<number>_remote-<number>_<connection-id>"`, thus making it unique for that connection.
- Improved log statements by adding the session `id` and the sender, receiver `name` to help while debugging applications.
- Added `options` to `Link.close({closeSession: true | false})`, thus the user can specify whether the underlying session should be closed while closing the `Sender|Receiver`. Default is `true`.
- Improved `open` and `close` operations on `Connection`, `Session` and `Link` by creating timer in case the connection gets disconnected. Fixes [#41](https://github.com/amqp/rhea-promise/issues/41).
- The current `Sender` does not have a provision of **"awaiting"** on sending a message. The user needs to add handlers on the `Sender` for `accepted`, `rejected`, `released`, `modified` to ensure whether the message was successfully sent.
Now, we have added a new `AwaitableSender` which adds the handlers internally and provides an **awaitable** `send()` operation to the customer. Fixes [#45](https://github.com/amqp/rhea-promise/issues/45).
- Exporting new Errors:
- `InsufficientCreditError`: Defines the error that occurs when the Sender does not have enough credit.
- `SendOperationFailedError`: Defines the error that occurs when the Sender fails to send a message.

### 0.2.0 - 2019-05-17
- Updated `OperationTimeoutError` to be a non-AMQP Error as pointed out in [#42](https://github.com/amqp/rhea-promise/issues/42). Fixed in [PR](https://github.com/amqp/rhea-promise/pull/43).

Expand Down
67 changes: 67 additions & 0 deletions examples/awaitableSend.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache License. See License in the project root for license information.

import {
Connection,
Message,
ConnectionOptions,
Delivery,
AwaitableSenderOptions,
AwaitableSender
} from "../lib";

import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
dotenv.config();

const host = process.env.AMQP_HOST || "host";
const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue";
const port = parseInt(process.env.AMQP_PORT || "5671");
const senderAddress = process.env.SENDER_ADDRESS || "address";

async function main(): Promise<void> {
const connectionOptions: ConnectionOptions = {
transport: "tls",
host: host,
hostname: host,
username: username,
password: password,
port: port,
reconnect: false
};
const connection: Connection = new Connection(connectionOptions);
const senderName = "sender-1";
const senderOptions: AwaitableSenderOptions = {
name: senderName,
target: {
address: senderAddress
},
sendTimeoutInSeconds: 10
};

await connection.open();
const sender: AwaitableSender = await connection.createAwaitableSender(
senderOptions
);

for (let i = 0; i < 10; i++) {
const message: Message = {
body: `Hello World - ${i}`,
message_id: i
};
// Please, note that we are awaiting on sender.send() to complete.
// You will notice that `delivery.settled` will be `true`, irrespective of whether the promise resolves or rejects.
const delivery: Delivery = await sender.send(message);
console.log(
"[%s] await sendMessage -> Delivery id: %d, settled: %s",
connection.id,
delivery.id,
delivery.settled
);
}

await sender.close();
await connection.close();
}

main().catch((err) => console.log(err));
35 changes: 28 additions & 7 deletions examples/send.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
// Licensed under the Apache License. See License in the project root for license information.

import {
Connection, Sender, EventContext, Message, ConnectionOptions, Delivery, SenderOptions
Connection,
Sender,
EventContext,
Message,
ConnectionOptions,
Delivery,
SenderOptions
} from "../lib";

import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
Expand Down Expand Up @@ -34,15 +40,23 @@ async function main(): Promise<void> {
onError: (context: EventContext) => {
const senderError = context.sender && context.sender.error;
if (senderError) {
console.log(">>>>> [%s] An error occurred for sender '%s': %O.",
connection.id, senderName, senderError);
console.log(
">>>>> [%s] An error occurred for sender '%s': %O.",
connection.id,
senderName,
senderError
);
}
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(">>>>> [%s] An error occurred for session of sender '%s': %O.",
connection.id, senderName, sessionError);
console.log(
">>>>> [%s] An error occurred for session of sender '%s': %O.",
connection.id,
senderName,
sessionError
);
}
}
};
Expand All @@ -54,8 +68,15 @@ async function main(): Promise<void> {
message_id: "12343434343434"
};

const delivery: Delivery = await sender.send(message);
console.log(">>>>>[%s] Delivery id: ", connection.id, delivery.id);
// Please, note that we are not awaiting on sender.send()
// You will notice that `delivery.settled` will be `false`.
const delivery: Delivery = sender.send(message);
console.log(
">>>>>[%s] send -> Delivery id: %d, settled: %s",
connection.id,
delivery.id,
delivery.settled
);

await sender.close();
await connection.close();
Expand Down
Loading

0 comments on commit 3fdbbd5

Please sign in to comment.