Skip to content

Commit

Permalink
[1.2.1-wanderlog.1] Build exponential backoff retry strategy code
Browse files Browse the repository at this point in the history
  • Loading branch information
hsource committed Oct 3, 2023
1 parent 342af6f commit 92b9424
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
48 changes: 35 additions & 13 deletions lib/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ var _connection = require("./connection");
var _events = require("events");
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
const ECONNREFUSED_REGEXP = /ECONNREFUSED/;
const DEFAULT_RETRY_TIMEOUT_MS_FOR_EXPONENTIAL_BACKOFF = 100;
class Manager extends _events.EventEmitter {
constructor(options, connection) {
var _options$max_connect_, _options$timeout_conn;
super();
_defineProperty(this, "connection", void 0);
_defineProperty(this, "logQueue", void 0);
_defineProperty(this, "options", void 0);
_defineProperty(this, "retries", -1);
_defineProperty(this, "maxConnectRetries", void 0);
_defineProperty(this, "timeoutConnectRetries", void 0);
_defineProperty(this, "retryStrategy", void 0);
_defineProperty(this, "nextRetryTimeoutForExponentialBackoff", DEFAULT_RETRY_TIMEOUT_MS_FOR_EXPONENTIAL_BACKOFF);
_defineProperty(this, "retryTimeout", undefined);
_defineProperty(this, "connectionCallbacks", new Map());
this.options = options;
this.connection = connection;
this.logQueue = new Array();
this.logQueue = [];
this.connectionCallbacks.set(_connection.ConnectionEvents.Connected, this.onConnected.bind(this));
this.connectionCallbacks.set(_connection.ConnectionEvents.Closed, this.onConnectionClosed.bind(this));
this.connectionCallbacks.set(_connection.ConnectionEvents.ClosedByServer, this.onConnectionError.bind(this));
Expand All @@ -32,8 +32,22 @@ class Manager extends _events.EventEmitter {

// Connection retry attributes
this.retries = 0;
this.maxConnectRetries = (_options$max_connect_ = options === null || options === void 0 ? void 0 : options.max_connect_retries) !== null && _options$max_connect_ !== void 0 ? _options$max_connect_ : 4;
this.timeoutConnectRetries = (_options$timeout_conn = options === null || options === void 0 ? void 0 : options.timeout_connect_retries) !== null && _options$timeout_conn !== void 0 ? _options$timeout_conn : 100;
if (options.retryStrategy) {
this.retryStrategy = options.retryStrategy;
} else if (options !== null && options !== void 0 && options.max_connect_retries || options !== null && options !== void 0 && options.timeout_connect_retries) {
var _options$max_connect_, _options$timeout_conn;
this.retryStrategy = {
strategy: 'fixedDelay',
maxConnectRetries: (_options$max_connect_ = options === null || options === void 0 ? void 0 : options.max_connect_retries) !== null && _options$max_connect_ !== void 0 ? _options$max_connect_ : 4,
delayBeforeRetryMs: (_options$timeout_conn = options === null || options === void 0 ? void 0 : options.timeout_connect_retries) !== null && _options$timeout_conn !== void 0 ? _options$timeout_conn : 100
};
} else {
this.retryStrategy = {
strategy: 'exponentialBackoff',
maxConnectRetries: -1,
maxDelayBeforeRetryMs: 120000
};
}
}
addEventListeners() {
this.connection.once(_connection.ConnectionEvents.Connected, this.connectionCallbacks.get(_connection.ConnectionEvents.Connected));
Expand All @@ -54,6 +68,7 @@ class Manager extends _events.EventEmitter {
onConnected() {
this.emit('connected');
this.retries = 0;
this.nextRetryTimeoutForExponentialBackoff = DEFAULT_RETRY_TIMEOUT_MS_FOR_EXPONENTIAL_BACKOFF;
this.flush();
}
onConnectionClosed(error) {
Expand All @@ -67,12 +82,11 @@ class Manager extends _events.EventEmitter {
}

shouldTryToReconnect(error) {
if (this.isRetryableError(error) === true) {
if (this.maxConnectRetries < 0 || this.retries < this.maxConnectRetries) {
return true;
} else {
return false;
}
if (this.isRetryableError(error)) {
const {
maxConnectRetries
} = this.retryStrategy;
return maxConnectRetries < 0 || this.retries < maxConnectRetries;
} else {
return false;
}
Expand All @@ -96,9 +110,17 @@ class Manager extends _events.EventEmitter {
const self = this;
this.connection.once(_connection.ConnectionEvents.Closed, () => {
self.removeEventListeners();
let retryTimeoutMs;
if (self.retryStrategy.strategy === 'exponentialBackoff') {
retryTimeoutMs = self.nextRetryTimeoutForExponentialBackoff;
self.nextRetryTimeoutForExponentialBackoff *= 2;
self.nextRetryTimeoutForExponentialBackoff = Math.min(self.nextRetryTimeoutForExponentialBackoff, self.retryStrategy.maxDelayBeforeRetryMs);
} else {
retryTimeoutMs = self.retryStrategy.delayBeforeRetryMs;
}
self.retryTimeout = setTimeout(() => {
self.start();
}, self.timeoutConnectRetries);
}, retryTimeoutMs);
});
this.connection.close();
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "winston-logstash",
"version": "1.2.1",
"version": "1.2.1-wanderlog.1",
"description": "A Logstash transport for winston",
"main": "./lib/winston-logstash",
"homepage": "https://github.com/jaakkos/winston-logstash",
Expand Down

0 comments on commit 92b9424

Please sign in to comment.