Skip to content

Commit

Permalink
[v1.2.1-wanderlog.4] Release built version
Browse files Browse the repository at this point in the history
  • Loading branch information
pxpeterxu committed Jun 28, 2024
1 parent 899d092 commit c1217cd
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 20 deletions.
47 changes: 47 additions & 0 deletions lib/__mocks__/connection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"use strict";

function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { var symbols = Object.getOwnPropertySymbols(object); enumerableOnly && (symbols = symbols.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; })), keys.push.apply(keys, symbols); } return keys; }
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = null != arguments[i] ? arguments[i] : {}; i % 2 ? ownKeys(Object(source), !0).forEach(function (key) { _defineProperty(target, key, source[key]); }) : Object.getOwnPropertyDescriptors ? Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)) : ownKeys(Object(source)).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } return target; }
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
const connectionModule = jest.requireActual('../connection');
class PlainConnection {
constructor() {
_defineProperty(this, "listeners", {});
_defineProperty(this, "onceListeners", {});
}
connect() {}
close() {}
send() {
return true;
}
readyToSend() {
return true;
}
once(eventName, callback) {
const onceListeners = this.onceListeners[eventName] || [];
onceListeners.push(callback);
this.onceListeners[eventName] = onceListeners;
}
on(eventName, callback) {
const listeners = this.listeners[eventName] || [];
listeners.push(callback);
this.listeners[eventName] = listeners;
}
off(eventName, callback) {
const listeners = this.listeners[eventName] || [];
this.listeners[eventName] = listeners.filter(l => l !== callback);
const onceListeners = this.listeners[eventName] || [];
this.onceListeners[eventName] = onceListeners.filter(l => l !== callback);
}
emit(eventName, ...args) {
const onceListeners = this.onceListeners[eventName] || [];
const listeners = this.listeners[eventName] || [];
this.onceListeners[eventName] = [];
for (const listener of [...listeners, ...onceListeners]) {
listener(...args);
}
}
}
module.exports = _objectSpread(_objectSpread({}, connectionModule), {}, {
PlainConnection
});
15 changes: 9 additions & 6 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,22 @@ class Connection extends _events.EventEmitter {
socket.once('close', this.socketOnClose.bind(this));
}
close() {
var _this$socket3, _this$socket4;
var _this$socket3, _this$socket4, _this$socket5;
this.action = ConnectionActions.Closing;
(_this$socket3 = this.socket) === null || _this$socket3 === void 0 ? void 0 : _this$socket3.removeAllListeners();
(_this$socket4 = this.socket) === null || _this$socket4 === void 0 ? void 0 : _this$socket4.destroy();
// Try to close the socket gracefully before destroying.
(_this$socket4 = this.socket) === null || _this$socket4 === void 0 ? void 0 : _this$socket4.end();
(_this$socket5 = this.socket) === null || _this$socket5 === void 0 ? void 0 : _this$socket5.destroy();
this.socket = undefined;
this.emit(ConnectionEvents.Closed);
}
send(message, writeCallback) {
var _this$socket5;
return ((_this$socket5 = this.socket) === null || _this$socket5 === void 0 ? void 0 : _this$socket5.write(Buffer.from(message), writeCallback)) === true;
var _this$socket6;
return ((_this$socket6 = this.socket) === null || _this$socket6 === void 0 ? void 0 : _this$socket6.write(Buffer.from(message), writeCallback)) === true;
}
readyToSend() {
var _this$socket6;
return ((_this$socket6 = this.socket) === null || _this$socket6 === void 0 ? void 0 : _this$socket6.readyState) === 'open';
var _this$socket7;
return ((_this$socket7 = this.socket) === null || _this$socket7 === void 0 ? void 0 : _this$socket7.readyState) === 'open';
}
connect() {
this.action = ConnectionActions.Connecting;
Expand Down
53 changes: 40 additions & 13 deletions lib/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@ var _connection = require("./connection");
var _events = require("events");
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
const ECONNREFUSED_REGEXP = /ECONNREFUSED/;
const DEFAULT_RETRY_TIMEOUT_MS_FOR_EXPONENTIAL_BACKOFF = 100;
class Manager extends _events.EventEmitter {
constructor(options, connection) {
var _options$max_connect_, _options$timeout_conn;
super();
_defineProperty(this, "connection", void 0);
_defineProperty(this, "logQueue", void 0);
_defineProperty(this, "options", void 0);
_defineProperty(this, "retries", -1);
_defineProperty(this, "maxConnectRetries", void 0);
_defineProperty(this, "timeoutConnectRetries", void 0);
_defineProperty(this, "retryStrategy", void 0);
_defineProperty(this, "nextRetryTimeoutForExponentialBackoff", DEFAULT_RETRY_TIMEOUT_MS_FOR_EXPONENTIAL_BACKOFF);
_defineProperty(this, "retryTimeout", undefined);
_defineProperty(this, "isClosing", false);
_defineProperty(this, "connectionCallbacks", new Map());
this.options = options;
this.connection = connection;
this.logQueue = new Array();
this.logQueue = [];
this.connectionCallbacks.set(_connection.ConnectionEvents.Connected, this.onConnected.bind(this));
this.connectionCallbacks.set(_connection.ConnectionEvents.Closed, this.onConnectionClosed.bind(this));
this.connectionCallbacks.set(_connection.ConnectionEvents.ClosedByServer, this.onConnectionError.bind(this));
Expand All @@ -32,8 +33,22 @@ class Manager extends _events.EventEmitter {

// Connection retry attributes
this.retries = 0;
this.maxConnectRetries = (_options$max_connect_ = options === null || options === void 0 ? void 0 : options.max_connect_retries) !== null && _options$max_connect_ !== void 0 ? _options$max_connect_ : 4;
this.timeoutConnectRetries = (_options$timeout_conn = options === null || options === void 0 ? void 0 : options.timeout_connect_retries) !== null && _options$timeout_conn !== void 0 ? _options$timeout_conn : 100;
if (options.retryStrategy) {
this.retryStrategy = options.retryStrategy;
} else if (options !== null && options !== void 0 && options.max_connect_retries || options !== null && options !== void 0 && options.timeout_connect_retries) {
var _options$max_connect_, _options$timeout_conn;
this.retryStrategy = {
strategy: 'fixedDelay',
maxConnectRetries: (_options$max_connect_ = options === null || options === void 0 ? void 0 : options.max_connect_retries) !== null && _options$max_connect_ !== void 0 ? _options$max_connect_ : 4,
delayBeforeRetryMs: (_options$timeout_conn = options === null || options === void 0 ? void 0 : options.timeout_connect_retries) !== null && _options$timeout_conn !== void 0 ? _options$timeout_conn : 100
};
} else {
this.retryStrategy = {
strategy: 'exponentialBackoff',
maxConnectRetries: -1,
maxDelayBeforeRetryMs: 120000
};
}
}
addEventListeners() {
this.connection.once(_connection.ConnectionEvents.Connected, this.connectionCallbacks.get(_connection.ConnectionEvents.Connected));
Expand All @@ -54,6 +69,7 @@ class Manager extends _events.EventEmitter {
onConnected() {
this.emit('connected');
this.retries = 0;
this.nextRetryTimeoutForExponentialBackoff = DEFAULT_RETRY_TIMEOUT_MS_FOR_EXPONENTIAL_BACKOFF;
this.flush();
}
onConnectionClosed(error) {
Expand All @@ -67,12 +83,11 @@ class Manager extends _events.EventEmitter {
}

shouldTryToReconnect(error) {
if (this.isRetryableError(error) === true) {
if (this.maxConnectRetries < 0 || this.retries < this.maxConnectRetries) {
return true;
} else {
return false;
}
if (this.isRetryableError(error) && !this.isClosing) {
const {
maxConnectRetries
} = this.retryStrategy;
return maxConnectRetries < 0 || this.retries < maxConnectRetries;
} else {
return false;
}
Expand All @@ -96,9 +111,17 @@ class Manager extends _events.EventEmitter {
const self = this;
this.connection.once(_connection.ConnectionEvents.Closed, () => {
self.removeEventListeners();
let retryTimeoutMs;
if (self.retryStrategy.strategy === 'exponentialBackoff') {
retryTimeoutMs = self.nextRetryTimeoutForExponentialBackoff;
self.nextRetryTimeoutForExponentialBackoff *= 2;
self.nextRetryTimeoutForExponentialBackoff = Math.min(self.nextRetryTimeoutForExponentialBackoff, self.retryStrategy.maxDelayBeforeRetryMs);
} else {
retryTimeoutMs = self.retryStrategy.delayBeforeRetryMs;
}
self.retryTimeout = setTimeout(() => {
self.start();
}, self.timeoutConnectRetries);
}, retryTimeoutMs);
});
this.connection.close();
}
Expand All @@ -116,6 +139,10 @@ class Manager extends _events.EventEmitter {
}
close() {
var _this$connection2;
this.isClosing = true;
if (this.retryTimeout) {
clearTimeout(this.retryTimeout);
}
this.emit('closing');
this.flush();
this.removeEventListeners();
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "winston-logstash",
"version": "1.2.1",
"version": "1.2.1-wanderlog.4",
"description": "A Logstash transport for winston",
"main": "./lib/winston-logstash",
"types": "./types/src/winston-logstash.d.ts",
Expand Down

0 comments on commit c1217cd

Please sign in to comment.