Skip to content

Commit

Permalink
add shutdown delay options (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiees2 authored Jun 29, 2021
1 parent 0b9de84 commit 28c0f1e
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
63 changes: 62 additions & 1 deletion src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,33 @@ export type SendQueueLimit = {
length: number;
};

export type DisconnectOptions = {
/**
* The amount of time to wait between flush attempts on disconnection (after making at least one attempt)
*
* Useful to wait if the fluent server is unavailable right when we're disconnecting
*
* Defaults to 0
*/
flushDelay: number;
/**
* The number of times to attempt a flush on disconnection
*
* Useful to empty the send queue before disconnecting
*
* Defaults to 1
*/
maxFlushAttempts: number;
/**
* The amount of time to wait before disconnecting the socket on disconnection
*
* Useful to wait for acknowledgements on final flush
*
* Defaults to 0
*/
socketDisconnectDelay: number;
};

/**
* The constructor options passed to the client
*/
Expand Down Expand Up @@ -160,6 +187,14 @@ export type FluentClientOptions = {
* See subtype for defaults
*/
eventRetry?: Partial<EventRetryOptions>;
/**
* Options to control disconnection behavior
*
* How many times to try to flush before disconnecting, wait times, etc
*
* See subtype for defaults
*/
disconnect?: Partial<DisconnectOptions>;
};

/**
Expand Down Expand Up @@ -187,6 +222,7 @@ export class FluentClient {
private nextFlushTimeoutId: null | NodeJS.Timeout = null;
private flushing = false;
private willFlushNextTick: Promise<boolean> | null = null;
private disconnectOptions: DisconnectOptions;

/**
* Creates a new FluentClient
Expand Down Expand Up @@ -245,6 +281,13 @@ export class FluentClient {
this.sendQueueNotFlushableLimitDelay =
options.sendQueueNotFlushableLimitDelay || 0;

this.disconnectOptions = {
flushDelay: 0,
maxFlushAttempts: 1,
socketDisconnectDelay: 0,
...(options.disconnect || {}),
};

this.socket = this.createSocket(options.security, options.socket);

this.socket.on("writable", () => this.handleWritable());
Expand Down Expand Up @@ -449,8 +492,26 @@ export class FluentClient {
*/
public async disconnect(): Promise<void> {
try {
await this.flush();
let flushCount = 0;
while (flushCount < this.disconnectOptions.maxFlushAttempts) {
// Only delay after making one flush attempt
if (flushCount > 0 && this.disconnectOptions.flushDelay > 0) {
await new Promise(r =>
setTimeout(r, this.disconnectOptions.flushDelay)
);
}
// Exit if flush returns false - queue is empty
if (!(await this.flush())) {
break;
}
flushCount += 1;
}
} finally {
if (this.disconnectOptions.socketDisconnectDelay > 0) {
await new Promise(r =>
setTimeout(r, this.disconnectOptions.socketDisconnectDelay)
);
}
try {
await this.socket.disconnect();
} finally {
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export type {
AckOptions,
EventModes,
SendQueueLimit,
DisconnectOptions,
} from "./client";

export type {FluentServerOptions, FluentServerSecurityOptions} from "./server";
Expand Down
17 changes: 17 additions & 0 deletions test/test.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,23 @@ describe("FluentClient", () => {
socket.emit("error", new Error("test"));
});

it("should flush on disconnect", async () => {
const {client, socket} = createFluentClient("test");
socket.isWritable = false;
const spy = sinon.spy(client, "flush");

const firstEvent = client.emit("a", {event: "foo bar"});

await new Promise(r => setTimeout(r, 100));
sinon.assert.notCalled(spy);

socket.isWritable = true;
await client.disconnect();
await expect(firstEvent).to.eventually.be.fulfilled;

sinon.assert.calledOnce(spy);
});

it("should reject pending events after shutdown", async () => {
const {client, socket} = createFluentClient("test");
socket.isWritable = false;
Expand Down

0 comments on commit 28c0f1e

Please sign in to comment.