From 28c0f1e92f093c68fcf3af41892688da51163b38 Mon Sep 17 00:00:00 2001 From: James Sigurdarson Date: Tue, 29 Jun 2021 13:08:44 +0000 Subject: [PATCH] add shutdown delay options (#8) --- src/client.ts | 63 ++++++++++++++++++++++++++++++++++++++++++++- src/index.ts | 1 + test/test.client.ts | 17 ++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/src/client.ts b/src/client.ts index 362cc97..fcef5bc 100644 --- a/src/client.ts +++ b/src/client.ts @@ -77,6 +77,33 @@ export type SendQueueLimit = { length: number; }; +export type DisconnectOptions = { + /** + * The amount of time to wait between flush attempts on disconnection (after making at least one attempt) + * + * Useful to wait if the fluent server is unavailable right when we're disconnecting + * + * Defaults to 0 + */ + flushDelay: number; + /** + * The number of times to attempt a flush on disconnection + * + * Useful to empty the send queue before disconnecting + * + * Defaults to 1 + */ + maxFlushAttempts: number; + /** + * The amount of time to wait before disconnecting the socket on disconnection + * + * Useful to wait for acknowledgements on final flush + * + * Defaults to 0 + */ + socketDisconnectDelay: number; +}; + /** * The constructor options passed to the client */ @@ -160,6 +187,14 @@ export type FluentClientOptions = { * See subtype for defaults */ eventRetry?: Partial; + /** + * Options to control disconnection behavior + * + * How many times to try to flush before disconnecting, wait times, etc + * + * See subtype for defaults + */ + disconnect?: Partial; }; /** @@ -187,6 +222,7 @@ export class FluentClient { private nextFlushTimeoutId: null | NodeJS.Timeout = null; private flushing = false; private willFlushNextTick: Promise | null = null; + private disconnectOptions: DisconnectOptions; /** * Creates a new FluentClient @@ -245,6 +281,13 @@ export class FluentClient { this.sendQueueNotFlushableLimitDelay = options.sendQueueNotFlushableLimitDelay || 0; + this.disconnectOptions = { + flushDelay: 0, + maxFlushAttempts: 1, + socketDisconnectDelay: 0, + ...(options.disconnect || {}), + }; + this.socket = this.createSocket(options.security, options.socket); this.socket.on("writable", () => this.handleWritable()); @@ -449,8 +492,26 @@ export class FluentClient { */ public async disconnect(): Promise { try { - await this.flush(); + let flushCount = 0; + while (flushCount < this.disconnectOptions.maxFlushAttempts) { + // Only delay after making one flush attempt + if (flushCount > 0 && this.disconnectOptions.flushDelay > 0) { + await new Promise(r => + setTimeout(r, this.disconnectOptions.flushDelay) + ); + } + // Exit if flush returns false - queue is empty + if (!(await this.flush())) { + break; + } + flushCount += 1; + } } finally { + if (this.disconnectOptions.socketDisconnectDelay > 0) { + await new Promise(r => + setTimeout(r, this.disconnectOptions.socketDisconnectDelay) + ); + } try { await this.socket.disconnect(); } finally { diff --git a/src/index.ts b/src/index.ts index 515d2a4..450cb9c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,6 +11,7 @@ export type { AckOptions, EventModes, SendQueueLimit, + DisconnectOptions, } from "./client"; export type {FluentServerOptions, FluentServerSecurityOptions} from "./server"; diff --git a/test/test.client.ts b/test/test.client.ts index b835577..ea86614 100644 --- a/test/test.client.ts +++ b/test/test.client.ts @@ -456,6 +456,23 @@ describe("FluentClient", () => { socket.emit("error", new Error("test")); }); + it("should flush on disconnect", async () => { + const {client, socket} = createFluentClient("test"); + socket.isWritable = false; + const spy = sinon.spy(client, "flush"); + + const firstEvent = client.emit("a", {event: "foo bar"}); + + await new Promise(r => setTimeout(r, 100)); + sinon.assert.notCalled(spy); + + socket.isWritable = true; + await client.disconnect(); + await expect(firstEvent).to.eventually.be.fulfilled; + + sinon.assert.calledOnce(spy); + }); + it("should reject pending events after shutdown", async () => { const {client, socket} = createFluentClient("test"); socket.isWritable = false;