From 0240a4f7212655cceb14af2ec88a830c3ceccec8 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Sun, 23 Jun 2019 12:44:59 -0400 Subject: [PATCH 01/20] refactor sentinel iterator for type --- .../SentinelConnector/SentinelIterator.ts | 41 +++---- lib/connectors/SentinelConnector/index.ts | 107 +++++++++--------- 2 files changed, 75 insertions(+), 73 deletions(-) diff --git a/lib/connectors/SentinelConnector/SentinelIterator.ts b/lib/connectors/SentinelConnector/SentinelIterator.ts index 6cfdaf19..7d7d63a7 100644 --- a/lib/connectors/SentinelConnector/SentinelIterator.ts +++ b/lib/connectors/SentinelConnector/SentinelIterator.ts @@ -5,36 +5,37 @@ function isSentinelEql (a: Partial, b: Partial> { private cursor: number = 0 + private sentinels: Partial[] - constructor (private sentinels: Partial[]) {} - - hasNext (): boolean { - return this.cursor < this.sentinels.length + constructor (sentinels: Partial[]) { + this.sentinels = [...sentinels]; } - next (): Partial | null { - return this.hasNext() ? this.sentinels[this.cursor++] : null + next () { + return this.cursor < this.sentinels.length + ? { value: this.sentinels[this.cursor++], done: false } + : { value: undefined, done: true }; } - reset (moveCurrentEndpointToFirst: boolean): void { - if (moveCurrentEndpointToFirst && this.sentinels.length > 1 && this.cursor !== 1) { - const remains = this.sentinels.slice(this.cursor - 1) - this.sentinels = remains.concat(this.sentinels.slice(0, this.cursor - 1)) + reset (moveCurrentEndpointToFirst: true): SentinelIterator + reset (moveCurrentEndpointToFirst?: false): void + reset (moveCurrentEndpointToFirst?: boolean) { + if (moveCurrentEndpointToFirst) { + if (this.sentinels.length > 1 && this.cursor !== 1) { + this.cursor = 0 + return this + } + return new SentinelIterator([...this.sentinels.slice(this.cursor - 1), ...this.sentinels.slice(0, this.cursor - 1)]) } this.cursor = 0 } - add (sentinel: ISentinelAddress): boolean { - for (let i = 0; i < this.sentinels.length; i++) { - if (isSentinelEql(sentinel, this.sentinels[i])) { - return false - } - } - - this.sentinels.push(sentinel) - return true + add (sentinel: ISentinelAddress) { + return this.sentinels.some(isSentinelEql.bind(null, sentinel)) + ? null + : this.sentinels.push(sentinel); } toString (): string { diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index 3ca9af78..9eedb16f 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -38,6 +38,7 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { } export default class SentinelConnector extends AbstractConnector { + private lastError: Error private retryAttempts: number private sentinelIterator: SentinelIterator @@ -71,71 +72,71 @@ export default class SentinelConnector extends AbstractConnector { public connect (callback: NodeCallback, eventEmitter: ErrorEmitter): void { this.connecting = true this.retryAttempts = 0 + this.lastError = null - let lastError - const _this = this - connectToNext() + this.connectToNext(callback, eventEmitter) + } - function connectToNext() { - if (!_this.sentinelIterator.hasNext()) { - _this.sentinelIterator.reset(false) - const retryDelay = typeof _this.options.sentinelRetryStrategy === 'function' - ? _this.options.sentinelRetryStrategy(++_this.retryAttempts) - : null + private connectToNext(callback: NodeCallback, eventEmitter: ErrorEmitter) { + const endpoint = this.sentinelIterator.next(); - let errorMsg = typeof retryDelay !== 'number' - ? 'All sentinels are unreachable and retry is disabled.' - : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.` + if (endpoint.done) { + this.sentinelIterator.reset(false) + const retryDelay = typeof this.options.sentinelRetryStrategy === 'function' + ? this.options.sentinelRetryStrategy(++this.retryAttempts) + : null - if (lastError) { - errorMsg += ` Last error: ${lastError.message}` - } + let errorMsg = typeof retryDelay !== 'number' + ? 'All sentinels are unreachable and retry is disabled.' + : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.` - debug(errorMsg) + if (this.lastError) { + errorMsg += ` Last error: ${this.lastError.message}` + } - const error = new Error(errorMsg) - if (typeof retryDelay === 'number') { - setTimeout(connectToNext, retryDelay) - eventEmitter('error', error) - } else { - callback(error) - } - return + debug(errorMsg) + + const error = new Error(errorMsg) + if (typeof retryDelay === 'number') { + setTimeout(this.connectToNext.bind(this, callback, eventEmitter), retryDelay) + eventEmitter('error', error) + } else { + callback(error) } + return + } - const endpoint = _this.sentinelIterator.next() - _this.resolve(endpoint, function (err, resolved) { - if (!_this.connecting) { - callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) - return - } - if (resolved) { - debug('resolved: %s:%s', resolved.host, resolved.port) - if (_this.options.enableTLSForSentinelMode && _this.options.tls) { - Object.assign(resolved, _this.options.tls) - _this.stream = createTLSConnection(resolved) - } else { - _this.stream = createConnection(resolved) - } - _this.sentinelIterator.reset(true) - callback(null, _this.stream) + this.resolve(endpoint.value, function (err, resolved) { + if (!this.connecting) { + callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) + return + } + if (resolved) { + debug('resolved: %s:%s', resolved.host, resolved.port) + if (this.options.enableTLSForSentinelMode && this.options.tls) { + Object.assign(resolved, this.options.tls) + this.stream = createTLSConnection(resolved) } else { - const endpointAddress = endpoint.host + ':' + endpoint.port - const errorMsg = err - ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message - : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved + this.stream = createConnection(resolved) + } + this.sentinelIterator = this.sentinelIterator.reset(true) + callback(null, this.stream) + } else { + const endpointAddress = endpoint.value.host + ':' + endpoint.value.port + const errorMsg = err + ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message + : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved - debug(errorMsg) + debug(errorMsg) - eventEmitter('sentinelError', new Error(errorMsg)) + eventEmitter('sentinelError', new Error(errorMsg)) - if (err) { - lastError = err - } - connectToNext() + if (err) { + this.lastError = err } - }) - } + this.connectToNext(callback, eventEmitter); + } + }) } private updateSentinels (client, callback: NodeCallback): void { @@ -157,7 +158,7 @@ export default class SentinelConnector extends AbstractConnector { const flags = sentinel.flags ? sentinel.flags.split(',') : [] if (flags.indexOf('disconnected') === -1 && sentinel.ip && sentinel.port) { const endpoint = this.sentinelNatResolve(addressResponseToAddress(sentinel)) - if (this.sentinelIterator.add(endpoint)) { + if (this.sentinelIterator.add(endpoint) != null) { debug('adding sentinel %s:%s', endpoint.host, endpoint.port) } } From 9a0ff64b1e9153e4c6932bc803e3233872191a97 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Sun, 23 Jun 2019 13:12:45 -0400 Subject: [PATCH 02/20] Add connectToFloat --- .../SentinelConnector/SentinelIterator.ts | 14 +++------ lib/connectors/SentinelConnector/index.ts | 29 +++++++++++++++++-- lib/redis/index.ts | 2 +- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/lib/connectors/SentinelConnector/SentinelIterator.ts b/lib/connectors/SentinelConnector/SentinelIterator.ts index 7d7d63a7..876ede30 100644 --- a/lib/connectors/SentinelConnector/SentinelIterator.ts +++ b/lib/connectors/SentinelConnector/SentinelIterator.ts @@ -9,8 +9,8 @@ export default class SentinelIterator implements Iterator[] - constructor (sentinels: Partial[]) { - this.sentinels = [...sentinels]; + constructor (sentinels?: Partial[]) { + this.sentinels = sentinels ? [].concat(sentinels) : []; } next () { @@ -19,15 +19,9 @@ export default class SentinelIterator implements Iterator 1 && this.cursor !== 1) { - this.cursor = 0 - return this - } - return new SentinelIterator([...this.sentinels.slice(this.cursor - 1), ...this.sentinels.slice(0, this.cursor - 1)]) + if (moveCurrentEndpointToFirst && this.sentinels.length > 1 && this.cursor !== 1) { + this.sentinels.unshift(...this.sentinels.splice(this.cursor - 1)) } this.cursor = 0 } diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index 9eedb16f..09939785 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -11,6 +11,8 @@ import Redis from '../../redis' const debug = Debug('SentinelConnector') +const EMPTY_SENTINELS_MSG = 'Requires at least one sentinel to connect to.'; + interface IAddressFromResponse { port: string, ip: string, @@ -18,6 +20,7 @@ interface IAddressFromResponse { } type NodeCallback = (err: Error | null, result?: T) => void +type FloatingSentinels = NodeCallback[]> type PreferredSlaves = ((slaves: Array) => IAddressFromResponse | null) | Array<{port: string, ip: string, prio?: number}> | @@ -28,6 +31,7 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { name: string sentinelPassword?: string sentinels: Partial[] + floatingSentinels?: (FloatingSentinels) => void sentinelRetryStrategy?: (retryAttempts: number) => number preferredSlaves?: PreferredSlaves connectTimeout?: number @@ -45,8 +49,8 @@ export default class SentinelConnector extends AbstractConnector { constructor (protected options: ISentinelConnectionOptions) { super() - if (this.options.sentinels.length === 0) { - throw new Error('Requires at least one sentinel to connect to.') + if (!(this.options.sentinels && this.options.sentinels.length || this.options.floatingSentinels)) { + throw new Error(EMPTY_SENTINELS_MSG) } if (!this.options.name) { throw new Error('Requires the name of master.') @@ -74,6 +78,11 @@ export default class SentinelConnector extends AbstractConnector { this.retryAttempts = 0 this.lastError = null + if (typeof this.options.sentinelRetryStrategy !== 'function') { + this.connectToFloat(callback, eventEmitter) + return + } + this.connectToNext(callback, eventEmitter) } @@ -119,7 +128,7 @@ export default class SentinelConnector extends AbstractConnector { } else { this.stream = createConnection(resolved) } - this.sentinelIterator = this.sentinelIterator.reset(true) + this.sentinelIterator.reset(true) callback(null, this.stream) } else { const endpointAddress = endpoint.value.host + ':' + endpoint.value.port @@ -139,6 +148,20 @@ export default class SentinelConnector extends AbstractConnector { }) } + private connectToFloat(callback: NodeCallback, eventEmitter: ErrorEmitter) { + const sentinelCallback: FloatingSentinels = (err, result) => { + if (err) { + callback(err); + return; + } + + this.sentinelIterator = new SentinelIterator(result) + this.connectToNext(callback, eventEmitter); + } + + this.options.floatingSentinels(sentinelCallback); + } + private updateSentinels (client, callback: NodeCallback): void { if (!this.options.updateSentinels) { diff --git a/lib/redis/index.ts b/lib/redis/index.ts index b3145829..2e55f1ff 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -132,7 +132,7 @@ function Redis() { this.resetCommandQueue(); this.resetOfflineQueue(); - if (this.options.sentinels) { + if (this.options.sentinels || this.options.floatingSentinels) { this.connector = new SentinelConnector(this.options); } else { this.connector = new StandaloneConnector(this.options); From a2c04cdb9d625d40781e2e49d6411c3b899569c2 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Sun, 23 Jun 2019 13:15:51 -0400 Subject: [PATCH 03/20] error on float no sentinels --- lib/connectors/SentinelConnector/index.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index 09939785..ece9b48b 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -49,7 +49,7 @@ export default class SentinelConnector extends AbstractConnector { constructor (protected options: ISentinelConnectionOptions) { super() - if (!(this.options.sentinels && this.options.sentinels.length || this.options.floatingSentinels)) { + if (!(Array.isArray(this.options.sentinels) && this.options.sentinels.length || this.options.floatingSentinels)) { throw new Error(EMPTY_SENTINELS_MSG) } if (!this.options.name) { @@ -153,6 +153,9 @@ export default class SentinelConnector extends AbstractConnector { if (err) { callback(err); return; + } else if (!(Array.isArray(this.options.sentinels) && this.options.sentinels.length)) { + callback(new Error(EMPTY_SENTINELS_MSG)); + return; } this.sentinelIterator = new SentinelIterator(result) From 9372906c86a36cd22799f29d6d3f3a27f2f41640 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Sun, 23 Jun 2019 13:18:01 -0400 Subject: [PATCH 04/20] logic error --- lib/connectors/SentinelConnector/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index ece9b48b..d247dfb5 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -78,7 +78,7 @@ export default class SentinelConnector extends AbstractConnector { this.retryAttempts = 0 this.lastError = null - if (typeof this.options.sentinelRetryStrategy !== 'function') { + if (typeof this.options.floatingSentinels === 'function') { this.connectToFloat(callback, eventEmitter) return } From 367c12c50602955d877e7339c5c79c8e329abb3b Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Sun, 23 Jun 2019 13:22:54 -0400 Subject: [PATCH 05/20] fix TypeError --- lib/connectors/SentinelConnector/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index d247dfb5..fcfcf5e0 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -115,7 +115,7 @@ export default class SentinelConnector extends AbstractConnector { return } - this.resolve(endpoint.value, function (err, resolved) { + this.resolve(endpoint.value, (err, resolved) => { if (!this.connecting) { callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) return From d237c0b0f2d6439058eff7daa1d2136f4a5cb997 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Sun, 23 Jun 2019 13:40:34 -0400 Subject: [PATCH 06/20] make iterator work inplace for tests --- .../SentinelConnector/SentinelIterator.ts | 25 ++++++++++--------- lib/connectors/SentinelConnector/index.ts | 4 +-- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/lib/connectors/SentinelConnector/SentinelIterator.ts b/lib/connectors/SentinelConnector/SentinelIterator.ts index 876ede30..0ee609a3 100644 --- a/lib/connectors/SentinelConnector/SentinelIterator.ts +++ b/lib/connectors/SentinelConnector/SentinelIterator.ts @@ -7,29 +7,30 @@ function isSentinelEql (a: Partial, b: Partial> { private cursor: number = 0 - private sentinels: Partial[] - constructor (sentinels?: Partial[]) { - this.sentinels = sentinels ? [].concat(sentinels) : []; - } + constructor (private sentinels: Partial[]) {} next () { - return this.cursor < this.sentinels.length - ? { value: this.sentinels[this.cursor++], done: false } - : { value: undefined, done: true }; + const done = this.cursor >= this.sentinels.length + return { done, value: done ? this.sentinels[this.cursor++] : null } } - reset (moveCurrentEndpointToFirst?: boolean) { + reset (moveCurrentEndpointToFirst: boolean): void { if (moveCurrentEndpointToFirst && this.sentinels.length > 1 && this.cursor !== 1) { this.sentinels.unshift(...this.sentinels.splice(this.cursor - 1)) } this.cursor = 0 } - add (sentinel: ISentinelAddress) { - return this.sentinels.some(isSentinelEql.bind(null, sentinel)) - ? null - : this.sentinels.push(sentinel); + add (sentinel: ISentinelAddress): boolean { + for (let i = 0; i < this.sentinels.length; i++) { + if (isSentinelEql(sentinel, this.sentinels[i])) { + return false + } + } + + this.sentinels.push(sentinel) + return true } toString (): string { diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index fcfcf5e0..0485000d 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -56,7 +56,7 @@ export default class SentinelConnector extends AbstractConnector { throw new Error('Requires the name of master.') } - this.sentinelIterator = new SentinelIterator(this.options.sentinels) + this.sentinelIterator = new SentinelIterator(this.options.sentinels || []) } public check (info: {role?: string}): boolean { @@ -184,7 +184,7 @@ export default class SentinelConnector extends AbstractConnector { const flags = sentinel.flags ? sentinel.flags.split(',') : [] if (flags.indexOf('disconnected') === -1 && sentinel.ip && sentinel.port) { const endpoint = this.sentinelNatResolve(addressResponseToAddress(sentinel)) - if (this.sentinelIterator.add(endpoint) != null) { + if (this.sentinelIterator.add(endpoint)) { debug('adding sentinel %s:%s', endpoint.host, endpoint.port) } } From f39bda007238b837037001961bad6ce469b4466c Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Sun, 23 Jun 2019 17:06:59 -0400 Subject: [PATCH 07/20] fix tertiary flip --- lib/connectors/SentinelConnector/SentinelIterator.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/connectors/SentinelConnector/SentinelIterator.ts b/lib/connectors/SentinelConnector/SentinelIterator.ts index 0ee609a3..5ce55651 100644 --- a/lib/connectors/SentinelConnector/SentinelIterator.ts +++ b/lib/connectors/SentinelConnector/SentinelIterator.ts @@ -12,7 +12,7 @@ export default class SentinelIterator implements Iterator= this.sentinels.length - return { done, value: done ? this.sentinels[this.cursor++] : null } + return { done, value: done ? undefined : this.sentinels[this.cursor++] } } reset (moveCurrentEndpointToFirst: boolean): void { From 6aaca847ba2220a1435b5ce3fe21b9495da7afb5 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 08:27:09 -0400 Subject: [PATCH 08/20] pass through connector --- lib/connectors/AsyncSentinelConnector.ts | 36 +++++ lib/connectors/SentinelConnector/index.ts | 154 ++++++++++------------ lib/connectors/types.ts | 1 + lib/index.ts | 19 ++- lib/redis/RedisOptions.ts | 2 + lib/redis/index.ts | 14 +- tsconfig.json | 1 - 7 files changed, 127 insertions(+), 100 deletions(-) create mode 100644 lib/connectors/AsyncSentinelConnector.ts create mode 100644 lib/connectors/types.ts diff --git a/lib/connectors/AsyncSentinelConnector.ts b/lib/connectors/AsyncSentinelConnector.ts new file mode 100644 index 00000000..12817811 --- /dev/null +++ b/lib/connectors/AsyncSentinelConnector.ts @@ -0,0 +1,36 @@ +import SentinelConnector, { ISentinelConnectionOptions, ISentinelAddress, SentinelIterator, EMPTY_SENTINELS_MSG } from './SentinelConnector'; +import { ErrorEmitter } from './AbstractConnector'; +import { NetStream } from '../types'; +import { NodeCallback } from './types'; + +export type FloatingSentinels = (err: Error, sentinels: Partial[]) => void + +export default class AsyncSentinelConnector extends SentinelConnector { + private fetch: (FloatingSentinels) => void + + constructor(options: ISentinelConnectionOptions, fetch: (FloatingSentinels) => void) { + options.sentinels = options.sentinels || [{host: 'localhost', port: 6379}] // Placeholder + super(options) + + this.fetch = fetch; + } + + public connect(callback: NodeCallback, eventEmitter: ErrorEmitter) { + const sentinelCallback: FloatingSentinels = (err, result) => { + if (err) { + callback(err); + return; + } else if (!result.length) { + callback(new Error(EMPTY_SENTINELS_MSG)); + return; + } + + this.options.sentinels = result; + this.sentinelIterator = new SentinelIterator(result) + super.connect(callback, eventEmitter); + } + + this.fetch(sentinelCallback); + } + +} \ No newline at end of file diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index 0485000d..c8e069fc 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -7,11 +7,12 @@ import SentinelIterator from './SentinelIterator' import {ISentinelAddress} from './types'; import AbstractConnector, { ErrorEmitter } from '../AbstractConnector' import {NetStream} from '../../types' +import {NodeCallback} from '../types'; import Redis from '../../redis' const debug = Debug('SentinelConnector') -const EMPTY_SENTINELS_MSG = 'Requires at least one sentinel to connect to.'; +export const EMPTY_SENTINELS_MSG = 'Requires at least one sentinel to connect to.'; interface IAddressFromResponse { port: string, @@ -19,19 +20,18 @@ interface IAddressFromResponse { flags?: string } -type NodeCallback = (err: Error | null, result?: T) => void -type FloatingSentinels = NodeCallback[]> type PreferredSlaves = ((slaves: Array) => IAddressFromResponse | null) | Array<{port: string, ip: string, prio?: number}> | {port: string, ip: string, prio?: number} +export { ISentinelAddress, SentinelIterator }; + export interface ISentinelConnectionOptions extends ITcpConnectionOptions { role: 'master' | 'slave' name: string sentinelPassword?: string sentinels: Partial[] - floatingSentinels?: (FloatingSentinels) => void sentinelRetryStrategy?: (retryAttempts: number) => number preferredSlaves?: PreferredSlaves connectTimeout?: number @@ -42,21 +42,20 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { } export default class SentinelConnector extends AbstractConnector { - private lastError: Error private retryAttempts: number - private sentinelIterator: SentinelIterator + protected sentinelIterator: SentinelIterator constructor (protected options: ISentinelConnectionOptions) { super() - if (!(Array.isArray(this.options.sentinels) && this.options.sentinels.length || this.options.floatingSentinels)) { + if (!this.options.sentinels.length) { throw new Error(EMPTY_SENTINELS_MSG) } if (!this.options.name) { throw new Error('Requires the name of master.') } - this.sentinelIterator = new SentinelIterator(this.options.sentinels || []) + this.sentinelIterator = new SentinelIterator(this.options.sentinels) } public check (info: {role?: string}): boolean { @@ -76,93 +75,72 @@ export default class SentinelConnector extends AbstractConnector { public connect (callback: NodeCallback, eventEmitter: ErrorEmitter): void { this.connecting = true this.retryAttempts = 0 - this.lastError = null - - if (typeof this.options.floatingSentinels === 'function') { - this.connectToFloat(callback, eventEmitter) - return - } - - this.connectToNext(callback, eventEmitter) - } - - private connectToNext(callback: NodeCallback, eventEmitter: ErrorEmitter) { - const endpoint = this.sentinelIterator.next(); - - if (endpoint.done) { - this.sentinelIterator.reset(false) - const retryDelay = typeof this.options.sentinelRetryStrategy === 'function' - ? this.options.sentinelRetryStrategy(++this.retryAttempts) - : null - - let errorMsg = typeof retryDelay !== 'number' - ? 'All sentinels are unreachable and retry is disabled.' - : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.` - - if (this.lastError) { - errorMsg += ` Last error: ${this.lastError.message}` - } - - debug(errorMsg) - - const error = new Error(errorMsg) - if (typeof retryDelay === 'number') { - setTimeout(this.connectToNext.bind(this, callback, eventEmitter), retryDelay) - eventEmitter('error', error) - } else { - callback(error) - } - return - } - this.resolve(endpoint.value, (err, resolved) => { - if (!this.connecting) { - callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) - return - } - if (resolved) { - debug('resolved: %s:%s', resolved.host, resolved.port) - if (this.options.enableTLSForSentinelMode && this.options.tls) { - Object.assign(resolved, this.options.tls) - this.stream = createTLSConnection(resolved) - } else { - this.stream = createConnection(resolved) + let lastError + + const connectToNext = () => { + const endpoint = this.sentinelIterator.next(); + + if (endpoint.done) { + this.sentinelIterator.reset(false) + const retryDelay = typeof this.options.sentinelRetryStrategy === 'function' + ? this.options.sentinelRetryStrategy(++this.retryAttempts) + : null + + let errorMsg = typeof retryDelay !== 'number' + ? 'All sentinels are unreachable and retry is disabled.' + : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.` + + if (lastError) { + errorMsg += ` Last error: ${lastError.message}` } - this.sentinelIterator.reset(true) - callback(null, this.stream) - } else { - const endpointAddress = endpoint.value.host + ':' + endpoint.value.port - const errorMsg = err - ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message - : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved - + debug(errorMsg) - - eventEmitter('sentinelError', new Error(errorMsg)) - - if (err) { - this.lastError = err + + const error = new Error(errorMsg) + if (typeof retryDelay === 'number') { + setTimeout(connectToNext, retryDelay) + eventEmitter('error', error) + } else { + callback(error) } - this.connectToNext(callback, eventEmitter); - } - }) - } - - private connectToFloat(callback: NodeCallback, eventEmitter: ErrorEmitter) { - const sentinelCallback: FloatingSentinels = (err, result) => { - if (err) { - callback(err); - return; - } else if (!(Array.isArray(this.options.sentinels) && this.options.sentinels.length)) { - callback(new Error(EMPTY_SENTINELS_MSG)); - return; + return } - - this.sentinelIterator = new SentinelIterator(result) - this.connectToNext(callback, eventEmitter); + + this.resolve(endpoint.value, (err, resolved) => { + if (!this.connecting) { + callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) + return + } + if (resolved) { + debug('resolved: %s:%s', resolved.host, resolved.port) + if (this.options.enableTLSForSentinelMode && this.options.tls) { + Object.assign(resolved, this.options.tls) + this.stream = createTLSConnection(resolved) + } else { + this.stream = createConnection(resolved) + } + this.sentinelIterator.reset(true) + callback(null, this.stream) + } else { + const endpointAddress = endpoint.value.host + ':' + endpoint.value.port + const errorMsg = err + ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message + : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved + + debug(errorMsg) + + eventEmitter('sentinelError', new Error(errorMsg)) + + if (err) { + lastError = err + } + connectToNext(); + } + }) } - this.options.floatingSentinels(sentinelCallback); + connectToNext() } private updateSentinels (client, callback: NodeCallback): void { diff --git a/lib/connectors/types.ts b/lib/connectors/types.ts new file mode 100644 index 00000000..985fab56 --- /dev/null +++ b/lib/connectors/types.ts @@ -0,0 +1 @@ +export type NodeCallback = (err: Error | null, result?: T) => void diff --git a/lib/index.ts b/lib/index.ts index 0e58af23..a6a10fe8 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,10 +1,19 @@ exports = module.exports = require('./redis').default -export {ReplyError} from 'redis-errors' -export const Cluster = require('./cluster').default -export const Command = require('./command').default -export const ScanStream = require('./ScanStream').default -export const Pipeline = require('./pipeline').default +export {default} from './redis'; +export {default as Cluster} from './cluster' +export {default as Command} from './command' +export {default as ScanStream} from './ScanStream' +export {default as Pipeline} from './pipeline' +export {default as AbstractConnector} from './connectors/AbstractConnector' +export {default as SentinelConnector} from './connectors/SentinelConnector' +export {default as AsyncSentinelConnector} from './connectors/AsyncSentinelConnector' + +// Type Exports +export {IRedisOptions} from './redis/RedisOptions'; + +// No TS typings +export const ReplyError = require('redis-errors').ReplyError const PromiseContainer = require('./promiseContainer') Object.defineProperty(exports, 'Promise', { diff --git a/lib/redis/RedisOptions.ts b/lib/redis/RedisOptions.ts index 2e094124..8581faf6 100644 --- a/lib/redis/RedisOptions.ts +++ b/lib/redis/RedisOptions.ts @@ -1,10 +1,12 @@ import {ISentinelConnectionOptions} from '../connectors/SentinelConnector'; +import AbstractConnector from '../connectors/AbstractConnector'; import {IClusterOptions} from '../cluster/ClusterOptions'; import {ICommanderOptions} from '../commander'; export type ReconnectOnError = (err: Error) => boolean | 1 | 2; export interface IRedisOptions extends Partial, Partial, Partial { + connector?: AbstractConnector, retryStrategy?: (times: number) => number | void | null, keepAlive?: number, noDelay?: boolean, diff --git a/lib/redis/index.ts b/lib/redis/index.ts index 2e55f1ff..0e4aebf5 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -132,10 +132,12 @@ function Redis() { this.resetCommandQueue(); this.resetOfflineQueue(); - if (this.options.sentinels || this.options.floatingSentinels) { - this.connector = new SentinelConnector(this.options); - } else { - this.connector = new StandaloneConnector(this.options); + if (!this.options.connector) { + if (this.options.sentinels) { + this.options.connector = new SentinelConnector(this.options); + } else { + this.options.connector = new StandaloneConnector(this.options); + } } this.retryAttempts = 0; @@ -249,7 +251,7 @@ Redis.prototype.connect = function (callback) { }; var _this = this; - this.connector.connect(function (err, stream) { + this.options.connector.connect(function (err, stream) { if (err) { _this.flushQueue(err); _this.silentEmit('error', err); @@ -343,7 +345,7 @@ Redis.prototype.disconnect = function (reconnect) { if (this.status === 'wait') { eventHandler.closeHandler(this)(); } else { - this.connector.disconnect(); + this.options.connector.disconnect(); } }; diff --git a/tsconfig.json b/tsconfig.json index 2b93196b..9e92aef4 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,7 +1,6 @@ { "compilerOptions": { "outDir": "./built", - "allowJs": true, "target": "es6", "lib": ["es6"], "moduleResolution": "node", From 44117105c32f6cd52e6a132d944baeed4b7ebffa Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 08:32:32 -0400 Subject: [PATCH 09/20] Use promises for simpler interface --- lib/connectors/AsyncSentinelConnector.ts | 30 ++++++++++-------------- lib/index.ts | 3 ++- 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/lib/connectors/AsyncSentinelConnector.ts b/lib/connectors/AsyncSentinelConnector.ts index 12817811..3adabf15 100644 --- a/lib/connectors/AsyncSentinelConnector.ts +++ b/lib/connectors/AsyncSentinelConnector.ts @@ -3,12 +3,12 @@ import { ErrorEmitter } from './AbstractConnector'; import { NetStream } from '../types'; import { NodeCallback } from './types'; -export type FloatingSentinels = (err: Error, sentinels: Partial[]) => void +export type AsyncSentinelFetch = () => Promise[]> export default class AsyncSentinelConnector extends SentinelConnector { - private fetch: (FloatingSentinels) => void + private fetch: AsyncSentinelFetch - constructor(options: ISentinelConnectionOptions, fetch: (FloatingSentinels) => void) { + constructor(options: ISentinelConnectionOptions, fetch: AsyncSentinelFetch) { options.sentinels = options.sentinels || [{host: 'localhost', port: 6379}] // Placeholder super(options) @@ -16,21 +16,15 @@ export default class AsyncSentinelConnector extends SentinelConnector { } public connect(callback: NodeCallback, eventEmitter: ErrorEmitter) { - const sentinelCallback: FloatingSentinels = (err, result) => { - if (err) { - callback(err); - return; - } else if (!result.length) { - callback(new Error(EMPTY_SENTINELS_MSG)); - return; - } - - this.options.sentinels = result; - this.sentinelIterator = new SentinelIterator(result) - super.connect(callback, eventEmitter); - } - - this.fetch(sentinelCallback); + this.fetch() + .then(sentinels => { + if (!sentinels.length) throw new Error(EMPTY_SENTINELS_MSG); + + this.options.sentinels = sentinels; + this.sentinelIterator = new SentinelIterator(sentinels) + return super.connect(callback, eventEmitter); + }) + .catch(callback); } } \ No newline at end of file diff --git a/lib/index.ts b/lib/index.ts index a6a10fe8..b497ebde 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -10,7 +10,8 @@ export {default as SentinelConnector} from './connectors/SentinelConnector' export {default as AsyncSentinelConnector} from './connectors/AsyncSentinelConnector' // Type Exports -export {IRedisOptions} from './redis/RedisOptions'; +export {ISentinelAddress} from './connectors/SentinelConnector' +export {IRedisOptions} from './redis/RedisOptions' // No TS typings export const ReplyError = require('redis-errors').ReplyError From ba2ad7cef183447849fca70c44feafa67a269c74 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 08:39:27 -0400 Subject: [PATCH 10/20] remove hanging spaces --- lib/connectors/SentinelConnector/index.ts | 20 ++++++++++---------- tsconfig.json | 1 + 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index c8e069fc..f6f277bd 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -80,23 +80,23 @@ export default class SentinelConnector extends AbstractConnector { const connectToNext = () => { const endpoint = this.sentinelIterator.next(); - + if (endpoint.done) { this.sentinelIterator.reset(false) const retryDelay = typeof this.options.sentinelRetryStrategy === 'function' ? this.options.sentinelRetryStrategy(++this.retryAttempts) : null - + let errorMsg = typeof retryDelay !== 'number' ? 'All sentinels are unreachable and retry is disabled.' : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.` - + if (lastError) { errorMsg += ` Last error: ${lastError.message}` } - + debug(errorMsg) - + const error = new Error(errorMsg) if (typeof retryDelay === 'number') { setTimeout(connectToNext, retryDelay) @@ -106,7 +106,7 @@ export default class SentinelConnector extends AbstractConnector { } return } - + this.resolve(endpoint.value, (err, resolved) => { if (!this.connecting) { callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) @@ -127,15 +127,15 @@ export default class SentinelConnector extends AbstractConnector { const errorMsg = err ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved - + debug(errorMsg) - + eventEmitter('sentinelError', new Error(errorMsg)) - + if (err) { lastError = err } - connectToNext(); + connectToNext() } }) } diff --git a/tsconfig.json b/tsconfig.json index 9e92aef4..2b93196b 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,6 +1,7 @@ { "compilerOptions": { "outDir": "./built", + "allowJs": true, "target": "es6", "lib": ["es6"], "moduleResolution": "node", From 76050114312b7a27eb17972099e59ddd72bdf567 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 08:54:32 -0400 Subject: [PATCH 11/20] change typings for async --- lib/connectors/AsyncSentinelConnector.ts | 9 +++++++-- lib/connectors/SentinelConnector/index.ts | 8 ++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/lib/connectors/AsyncSentinelConnector.ts b/lib/connectors/AsyncSentinelConnector.ts index 3adabf15..a77b847f 100644 --- a/lib/connectors/AsyncSentinelConnector.ts +++ b/lib/connectors/AsyncSentinelConnector.ts @@ -1,4 +1,9 @@ -import SentinelConnector, { ISentinelConnectionOptions, ISentinelAddress, SentinelIterator, EMPTY_SENTINELS_MSG } from './SentinelConnector'; +import SentinelConnector, { + ISentinelConnectionOptions, + ISentinelConnectorOptions, + ISentinelAddress, + SentinelIterator, + EMPTY_SENTINELS_MSG } from './SentinelConnector'; import { ErrorEmitter } from './AbstractConnector'; import { NetStream } from '../types'; import { NodeCallback } from './types'; @@ -10,7 +15,7 @@ export default class AsyncSentinelConnector extends SentinelConnector { constructor(options: ISentinelConnectionOptions, fetch: AsyncSentinelFetch) { options.sentinels = options.sentinels || [{host: 'localhost', port: 6379}] // Placeholder - super(options) + super(options as ISentinelConnectorOptions) this.fetch = fetch; } diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index f6f277bd..b07be8f1 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -31,7 +31,7 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { role: 'master' | 'slave' name: string sentinelPassword?: string - sentinels: Partial[] + sentinels?: Partial[] sentinelRetryStrategy?: (retryAttempts: number) => number preferredSlaves?: PreferredSlaves connectTimeout?: number @@ -41,11 +41,15 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { updateSentinels?: boolean } +export interface ISentinelConnectorOptions extends ISentinelConnectionOptions { + sentinels: Partial[] +} + export default class SentinelConnector extends AbstractConnector { private retryAttempts: number protected sentinelIterator: SentinelIterator - constructor (protected options: ISentinelConnectionOptions) { + constructor (protected options: ISentinelConnectorOptions) { super() if (!this.options.sentinels.length) { From 0fb5c88048baef9fd248d492b09468b7914aabde Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 09:06:58 -0400 Subject: [PATCH 12/20] add build to travis to test build and made ts errors quiet for tests --- .travis.yml | 1 + package.json | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 583f6ced..ed822692 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ services: - redis-server script: +- npm run build - npm run test:cov || npm run test:cov || npm run test:cov env: diff --git a/package.json b/package.json index 43e4c8ca..3a66f6b6 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ ], "scripts": { "test": "NODE_ENV=test mocha test/**/*.ts", - "test:cov": "NODE_ENV=test node ./node_modules/istanbul/lib/cli.js cover --preserve-comments ./node_modules/mocha/bin/_mocha -- -r ts-node/register -R spec --exit test/**/*.ts", + "test:cov": "TS_NODE_TRANSPILE_ONLY=true NODE_ENV=test node ./node_modules/istanbul/lib/cli.js cover --preserve-comments ./node_modules/mocha/bin/_mocha -- -r ts-node/register -R spec --exit test/**/*.ts", "build": "rm -rf built && tsc", "prepublishOnly": "npm run build && npm test", "bench": "matcha benchmarks/*.js", From c040ba12bc36650d0f77968bd357192621e4e9a6 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 09:17:22 -0400 Subject: [PATCH 13/20] Fix bad connector reference --- lib/redis/event_handler.ts | 2 +- lib/redis/index.ts | 2 +- package.json | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/redis/event_handler.ts b/lib/redis/event_handler.ts index 90734a67..35d1218d 100644 --- a/lib/redis/event_handler.ts +++ b/lib/redis/event_handler.ts @@ -54,7 +54,7 @@ export function connectHandler(self) { } } else { self.serverInfo = info; - if (self.connector.check(info)) { + if (self.options.connector.check(info)) { exports.readyHandler(self)(); } else { self.disconnect(true); diff --git a/lib/redis/index.ts b/lib/redis/index.ts index 0e4aebf5..0d3de7c2 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -250,7 +250,7 @@ Redis.prototype.connect = function (callback) { subscriber: false }; - var _this = this; + var _this: Redis = this; this.options.connector.connect(function (err, stream) { if (err) { _this.flushQueue(err); diff --git a/package.json b/package.json index 3a66f6b6..b0ed630e 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ ], "scripts": { "test": "NODE_ENV=test mocha test/**/*.ts", - "test:cov": "TS_NODE_TRANSPILE_ONLY=true NODE_ENV=test node ./node_modules/istanbul/lib/cli.js cover --preserve-comments ./node_modules/mocha/bin/_mocha -- -r ts-node/register -R spec --exit test/**/*.ts", + "test:cov": "TS_NODE_LOG_ERROR=true NODE_ENV=test node ./node_modules/istanbul/lib/cli.js cover --preserve-comments ./node_modules/mocha/bin/_mocha -- -r ts-node/register -R spec --exit test/**/*.ts", "build": "rm -rf built && tsc", "prepublishOnly": "npm run build && npm test", "bench": "matcha benchmarks/*.js", From 27efcb7547e5ab80de82c7a9c722c5551458ed17 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 12:51:05 -0400 Subject: [PATCH 14/20] =?UTF-8?q?callback=20=E2=86=92=20promise=20connect?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/connectors/AbstractConnector.ts | 2 +- lib/connectors/AsyncSentinelConnector.ts | 26 ++-- lib/connectors/SentinelConnector/index.ts | 137 +++++++++++----------- lib/connectors/StandaloneConnector.ts | 46 ++++---- lib/connectors/types.ts | 1 - lib/redis/index.ts | 43 ++++--- 6 files changed, 128 insertions(+), 127 deletions(-) delete mode 100644 lib/connectors/types.ts diff --git a/lib/connectors/AbstractConnector.ts b/lib/connectors/AbstractConnector.ts index 6a7aa9c3..f608cb45 100644 --- a/lib/connectors/AbstractConnector.ts +++ b/lib/connectors/AbstractConnector.ts @@ -17,5 +17,5 @@ export default abstract class AbstractConnector { } } - public abstract connect (callback: Function, _: ErrorEmitter) + public abstract connect (_: ErrorEmitter): Promise } diff --git a/lib/connectors/AsyncSentinelConnector.ts b/lib/connectors/AsyncSentinelConnector.ts index a77b847f..eda3995b 100644 --- a/lib/connectors/AsyncSentinelConnector.ts +++ b/lib/connectors/AsyncSentinelConnector.ts @@ -1,35 +1,35 @@ import SentinelConnector, { ISentinelConnectionOptions, - ISentinelConnectorOptions, ISentinelAddress, SentinelIterator, EMPTY_SENTINELS_MSG } from './SentinelConnector'; import { ErrorEmitter } from './AbstractConnector'; import { NetStream } from '../types'; -import { NodeCallback } from './types'; export type AsyncSentinelFetch = () => Promise[]> +export interface IAsyncSentinelConnectionOptions extends Pick> { + sentinels?: Partial[] +} + export default class AsyncSentinelConnector extends SentinelConnector { private fetch: AsyncSentinelFetch - constructor(options: ISentinelConnectionOptions, fetch: AsyncSentinelFetch) { + constructor(options: IAsyncSentinelConnectionOptions, fetch: AsyncSentinelFetch) { options.sentinels = options.sentinels || [{host: 'localhost', port: 6379}] // Placeholder - super(options as ISentinelConnectorOptions) + super(options as ISentinelConnectionOptions) this.fetch = fetch; } - public connect(callback: NodeCallback, eventEmitter: ErrorEmitter) { - this.fetch() - .then(sentinels => { - if (!sentinels.length) throw new Error(EMPTY_SENTINELS_MSG); + public connect(eventEmitter: ErrorEmitter): Promise { + return this.fetch().then(sentinels => { + if (!sentinels.length) throw new Error(EMPTY_SENTINELS_MSG); - this.options.sentinels = sentinels; - this.sentinelIterator = new SentinelIterator(sentinels) - return super.connect(callback, eventEmitter); - }) - .catch(callback); + this.options.sentinels = sentinels; + this.sentinelIterator = new SentinelIterator(sentinels) + return super.connect(eventEmitter); + }); } } \ No newline at end of file diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index b07be8f1..115fe247 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -6,8 +6,8 @@ import {ITcpConnectionOptions, isIIpcConnectionOptions} from '../StandaloneConne import SentinelIterator from './SentinelIterator' import {ISentinelAddress} from './types'; import AbstractConnector, { ErrorEmitter } from '../AbstractConnector' -import {NetStream} from '../../types' -import {NodeCallback} from '../types'; +import {NetStream, CallbackFunction} from '../../types'; +import * as PromiseContainer from '../../promiseContainer'; import Redis from '../../redis' const debug = Debug('SentinelConnector') @@ -31,7 +31,7 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { role: 'master' | 'slave' name: string sentinelPassword?: string - sentinels?: Partial[] + sentinels: Partial[] sentinelRetryStrategy?: (retryAttempts: number) => number preferredSlaves?: PreferredSlaves connectTimeout?: number @@ -41,15 +41,11 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { updateSentinels?: boolean } -export interface ISentinelConnectorOptions extends ISentinelConnectionOptions { - sentinels: Partial[] -} - export default class SentinelConnector extends AbstractConnector { private retryAttempts: number protected sentinelIterator: SentinelIterator - constructor (protected options: ISentinelConnectorOptions) { + constructor (protected options: ISentinelConnectionOptions) { super() if (!this.options.sentinels.length) { @@ -76,78 +72,83 @@ export default class SentinelConnector extends AbstractConnector { return roleMatches } - public connect (callback: NodeCallback, eventEmitter: ErrorEmitter): void { + public connect (eventEmitter: ErrorEmitter): Promise { this.connecting = true this.retryAttempts = 0 let lastError + const _Promise = PromiseContainer.get(); - const connectToNext = () => { + const connectToNext = (): Promise => { const endpoint = this.sentinelIterator.next(); - if (endpoint.done) { - this.sentinelIterator.reset(false) - const retryDelay = typeof this.options.sentinelRetryStrategy === 'function' - ? this.options.sentinelRetryStrategy(++this.retryAttempts) - : null - - let errorMsg = typeof retryDelay !== 'number' - ? 'All sentinels are unreachable and retry is disabled.' - : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.` - - if (lastError) { - errorMsg += ` Last error: ${lastError.message}` - } - - debug(errorMsg) - - const error = new Error(errorMsg) - if (typeof retryDelay === 'number') { - setTimeout(connectToNext, retryDelay) - eventEmitter('error', error) - } else { - callback(error) - } - return - } - - this.resolve(endpoint.value, (err, resolved) => { - if (!this.connecting) { - callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) - return - } - if (resolved) { - debug('resolved: %s:%s', resolved.host, resolved.port) - if (this.options.enableTLSForSentinelMode && this.options.tls) { - Object.assign(resolved, this.options.tls) - this.stream = createTLSConnection(resolved) - } else { - this.stream = createConnection(resolved) + return new _Promise((resolve, reject) => { + if (endpoint.done) { + this.sentinelIterator.reset(false) + const retryDelay = typeof this.options.sentinelRetryStrategy === 'function' + ? this.options.sentinelRetryStrategy(++this.retryAttempts) + : null + + let errorMsg = typeof retryDelay !== 'number' + ? 'All sentinels are unreachable and retry is disabled.' + : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.` + + if (lastError) { + errorMsg += ` Last error: ${lastError.message}` } - this.sentinelIterator.reset(true) - callback(null, this.stream) - } else { - const endpointAddress = endpoint.value.host + ':' + endpoint.value.port - const errorMsg = err - ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message - : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved - + debug(errorMsg) - - eventEmitter('sentinelError', new Error(errorMsg)) - - if (err) { - lastError = err + + const error = new Error(errorMsg) + if (typeof retryDelay === 'number') { + setTimeout(() => { + resolve(connectToNext()); + }, retryDelay) + eventEmitter('error', error) + } else { + reject(error) } - connectToNext() + return } - }) + + this.resolve(endpoint.value, (err, resolved) => { + if (!this.connecting) { + reject(new Error(CONNECTION_CLOSED_ERROR_MSG)) + return + } + if (resolved) { + debug('resolved: %s:%s', resolved.host, resolved.port) + if (this.options.enableTLSForSentinelMode && this.options.tls) { + Object.assign(resolved, this.options.tls) + this.stream = createTLSConnection(resolved) + } else { + this.stream = createConnection(resolved) + } + this.sentinelIterator.reset(true) + resolve(this.stream) + } else { + const endpointAddress = endpoint.value.host + ':' + endpoint.value.port + const errorMsg = err + ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message + : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved + + debug(errorMsg) + + eventEmitter('sentinelError', new Error(errorMsg)) + + if (err) { + lastError = err + } + connectToNext() + } + }) + }) } - connectToNext() + return connectToNext(); } - private updateSentinels (client, callback: NodeCallback): void { + private updateSentinels (client, callback: CallbackFunction): void { if (!this.options.updateSentinels) { return callback(null) @@ -176,7 +177,7 @@ export default class SentinelConnector extends AbstractConnector { }) } - private resolveMaster (client, callback: NodeCallback): void { + private resolveMaster (client, callback: CallbackFunction): void { client.sentinel('get-master-addr-by-name', this.options.name, (err, result) => { if (err) { client.disconnect() @@ -195,7 +196,7 @@ export default class SentinelConnector extends AbstractConnector { }) } - private resolveSlave (client, callback: NodeCallback): void { + private resolveSlave (client, callback: CallbackFunction): void { client.sentinel('slaves', this.options.name, (err, result) => { client.disconnect() if (err) { @@ -223,7 +224,7 @@ export default class SentinelConnector extends AbstractConnector { return this.options.natMap[`${item.host}:${item.port}`] || item } - private resolve (endpoint, callback: NodeCallback): void { + private resolve (endpoint, callback: CallbackFunction): void { var client = new Redis({ port: endpoint.port || 26379, host: endpoint.host, diff --git a/lib/connectors/StandaloneConnector.ts b/lib/connectors/StandaloneConnector.ts index 27673b31..10d38a32 100644 --- a/lib/connectors/StandaloneConnector.ts +++ b/lib/connectors/StandaloneConnector.ts @@ -2,6 +2,7 @@ import {createConnection, TcpNetConnectOpts, IpcNetConnectOpts} from 'net' import {connect as createTLSConnection, SecureContextOptions} from 'tls' import {CONNECTION_CLOSED_ERROR_MSG} from '../utils' import AbstractConnector, {ErrorEmitter} from './AbstractConnector' +import * as PromiseContainer from '../promiseContainer'; import {NetStream} from '../types' export function isIIpcConnectionOptions (value: any): value is IIpcConnectionOptions { @@ -21,7 +22,7 @@ export default class StandaloneConnector extends AbstractConnector { super() } - public connect (callback: Function, _: ErrorEmitter) { + public connect (_: ErrorEmitter): Promise { const {options} = this this.connecting = true @@ -46,27 +47,30 @@ export default class StandaloneConnector extends AbstractConnector { if (options.tls) { Object.assign(connectionOptions, options.tls) } - - process.nextTick(() => { - if (!this.connecting) { - callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) - return - } - - let stream: NetStream - try { - if (options.tls) { - stream = createTLSConnection(connectionOptions) - } else { - stream = createConnection(connectionOptions) + + const _Promise = PromiseContainer.get(); + return new _Promise((resolve, reject) => { + process.nextTick(() => { + if (!this.connecting) { + reject(new Error(CONNECTION_CLOSED_ERROR_MSG)) + return } - } catch (err) { - callback(err) - return - } - - this.stream = stream - callback(null, stream) + + let stream: NetStream + try { + if (options.tls) { + stream = createTLSConnection(connectionOptions) + } else { + stream = createConnection(connectionOptions) + } + } catch (err) { + reject(err) + return + } + + this.stream = stream + resolve(stream) + }) }) } } diff --git a/lib/connectors/types.ts b/lib/connectors/types.ts deleted file mode 100644 index 985fab56..00000000 --- a/lib/connectors/types.ts +++ /dev/null @@ -1 +0,0 @@ -export type NodeCallback = (err: Error | null, result?: T) => void diff --git a/lib/redis/index.ts b/lib/redis/index.ts index 0d3de7c2..df71730d 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -13,6 +13,7 @@ import * as commands from 'redis-commands'; import * as PromiseContainer from '../promiseContainer'; import {addTransactionSupport} from '../transaction'; import {IRedisOptions, ReconnectOnError, DEFAULT_REDIS_OPTIONS} from './RedisOptions'; +import { NetStream } from '../types'; var debug = Debug('redis') @@ -234,8 +235,8 @@ Redis.prototype.setStatus = function (status, arg) { * @public */ Redis.prototype.connect = function (callback) { - var Promise = PromiseContainer.get(); - var promise = new Promise(function (resolve, reject) { + var _Promise = PromiseContainer.get(); + var promise = new _Promise((resolve, reject) => { if (this.status === 'connecting' || this.status === 'connect' || this.status === 'ready') { reject(new Error('Redis is already connecting/connected')); return; @@ -250,28 +251,21 @@ Redis.prototype.connect = function (callback) { subscriber: false }; - var _this: Redis = this; - this.options.connector.connect(function (err, stream) { - if (err) { - _this.flushQueue(err); - _this.silentEmit('error', err); - reject(err); - _this.setStatus('end'); - return; - } + const connectPromise = this.options.connector.connect(this.silentEmit.bind(this)).then((stream: NetStream) => { + var CONNECT_EVENT = options.tls ? 'secureConnect' : 'connect'; if (options.sentinels && !options.enableTLSForSentinelMode) { CONNECT_EVENT = 'connect'; } - _this.stream = stream; + this.stream = stream; if (typeof options.keepAlive === 'number') { stream.setKeepAlive(true, options.keepAlive); } - stream.once(CONNECT_EVENT, eventHandler.connectHandler(_this)); - stream.once('error', eventHandler.errorHandler(_this)); - stream.once('close', eventHandler.closeHandler(_this)); + stream.once(CONNECT_EVENT, eventHandler.connectHandler(this)); + stream.once('error', eventHandler.errorHandler(this)); + stream.once('close', eventHandler.closeHandler(this)); if (options.connectTimeout) { /* @@ -296,7 +290,7 @@ Redis.prototype.connect = function (callback) { err.code = 'ETIMEDOUT'; // @ts-ignore err.syscall = 'connect'; - eventHandler.errorHandler(_this)(err); + eventHandler.errorHandler(this)(err); }); stream.once(CONNECT_EVENT, function () { connectTimeoutCleared = true; @@ -309,19 +303,22 @@ Redis.prototype.connect = function (callback) { } var connectionReadyHandler = function () { - _this.removeListener('close', connectionCloseHandler); + this.removeListener('close', connectionCloseHandler); resolve(); }; var connectionCloseHandler = function () { - _this.removeListener('ready', connectionReadyHandler); + this.removeListener('ready', connectionReadyHandler); reject(new Error(CONNECTION_CLOSED_ERROR_MSG)); }; - _this.once('ready', connectionReadyHandler); - _this.once('close', connectionCloseHandler); - }, function (type, err) { - _this.silentEmit(type, err); + this.once('ready', connectionReadyHandler); + this.once('close', connectionCloseHandler); + }).catch(err => { + this.flushQueue(err); + this.silentEmit('error', err); + this.setStatus('end'); + throw err; }); - }.bind(this)) + }) return asCallback(promise, callback) }; From 7f5f5926fd361e95f49a1f124fd9a3f8d1ce27c4 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 12:59:12 -0400 Subject: [PATCH 15/20] Fix more `this` --- lib/redis/index.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/redis/index.ts b/lib/redis/index.ts index df71730d..edf17548 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -276,7 +276,7 @@ Redis.prototype.connect = function (callback) { * See https://github.com/electron/electron/issues/14915 */ var connectTimeoutCleared = false; - stream.setTimeout(options.connectTimeout, function () { + stream.setTimeout(options.connectTimeout, () => { if (connectTimeoutCleared) { return; } @@ -292,7 +292,7 @@ Redis.prototype.connect = function (callback) { err.syscall = 'connect'; eventHandler.errorHandler(this)(err); }); - stream.once(CONNECT_EVENT, function () { + stream.once(CONNECT_EVENT, () => { connectTimeoutCleared = true; stream.setTimeout(0); }); @@ -302,11 +302,11 @@ Redis.prototype.connect = function (callback) { stream.setNoDelay(true); } - var connectionReadyHandler = function () { + var connectionReadyHandler = () => { this.removeListener('close', connectionCloseHandler); resolve(); }; - var connectionCloseHandler = function () { + var connectionCloseHandler = () => { this.removeListener('ready', connectionReadyHandler); reject(new Error(CONNECTION_CLOSED_ERROR_MSG)); }; From adae66a139436e100e877e36ea4964822a7971c6 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 13:09:29 -0400 Subject: [PATCH 16/20] fix hanging promise --- lib/connectors/SentinelConnector/index.ts | 116 +++++++++++----------- lib/connectors/StandaloneConnector.ts | 4 +- 2 files changed, 59 insertions(+), 61 deletions(-) diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index 115fe247..71d7fd42 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -79,71 +79,69 @@ export default class SentinelConnector extends AbstractConnector { let lastError const _Promise = PromiseContainer.get(); - const connectToNext = (): Promise => { + const connectToNext = () => new _Promise((resolve, reject) => { const endpoint = this.sentinelIterator.next(); - return new _Promise((resolve, reject) => { - if (endpoint.done) { - this.sentinelIterator.reset(false) - const retryDelay = typeof this.options.sentinelRetryStrategy === 'function' - ? this.options.sentinelRetryStrategy(++this.retryAttempts) - : null - - let errorMsg = typeof retryDelay !== 'number' - ? 'All sentinels are unreachable and retry is disabled.' - : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.` - - if (lastError) { - errorMsg += ` Last error: ${lastError.message}` - } - - debug(errorMsg) - - const error = new Error(errorMsg) - if (typeof retryDelay === 'number') { - setTimeout(() => { - resolve(connectToNext()); - }, retryDelay) - eventEmitter('error', error) - } else { - reject(error) - } + if (endpoint.done) { + this.sentinelIterator.reset(false) + const retryDelay = typeof this.options.sentinelRetryStrategy === 'function' + ? this.options.sentinelRetryStrategy(++this.retryAttempts) + : null + + let errorMsg = typeof retryDelay !== 'number' + ? 'All sentinels are unreachable and retry is disabled.' + : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.` + + if (lastError) { + errorMsg += ` Last error: ${lastError.message}` + } + + debug(errorMsg) + + const error = new Error(errorMsg) + if (typeof retryDelay === 'number') { + setTimeout(() => { + resolve(connectToNext()); + }, retryDelay) + eventEmitter('error', error) + } else { + reject(error) + } + return + } + + this.resolve(endpoint.value, (err, resolved) => { + if (!this.connecting) { + reject(new Error(CONNECTION_CLOSED_ERROR_MSG)) return } - - this.resolve(endpoint.value, (err, resolved) => { - if (!this.connecting) { - reject(new Error(CONNECTION_CLOSED_ERROR_MSG)) - return - } - if (resolved) { - debug('resolved: %s:%s', resolved.host, resolved.port) - if (this.options.enableTLSForSentinelMode && this.options.tls) { - Object.assign(resolved, this.options.tls) - this.stream = createTLSConnection(resolved) - } else { - this.stream = createConnection(resolved) - } - this.sentinelIterator.reset(true) - resolve(this.stream) + if (resolved) { + debug('resolved: %s:%s', resolved.host, resolved.port) + if (this.options.enableTLSForSentinelMode && this.options.tls) { + Object.assign(resolved, this.options.tls) + this.stream = createTLSConnection(resolved) } else { - const endpointAddress = endpoint.value.host + ':' + endpoint.value.port - const errorMsg = err - ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message - : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved - - debug(errorMsg) - - eventEmitter('sentinelError', new Error(errorMsg)) - - if (err) { - lastError = err - } - connectToNext() + this.stream = createConnection(resolved) } - }) - }) - } + this.sentinelIterator.reset(true) + resolve(this.stream) + } else { + const endpointAddress = endpoint.value.host + ':' + endpoint.value.port + const errorMsg = err + ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message + : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved + + debug(errorMsg) + + eventEmitter('sentinelError', new Error(errorMsg)) + + if (err) { + lastError = err + } + resolve(connectToNext()) + } + }) + }); return connectToNext(); } diff --git a/lib/connectors/StandaloneConnector.ts b/lib/connectors/StandaloneConnector.ts index 10d38a32..738a91e9 100644 --- a/lib/connectors/StandaloneConnector.ts +++ b/lib/connectors/StandaloneConnector.ts @@ -22,7 +22,7 @@ export default class StandaloneConnector extends AbstractConnector { super() } - public connect (_: ErrorEmitter): Promise { + public connect (_: ErrorEmitter) { const {options} = this this.connecting = true @@ -49,7 +49,7 @@ export default class StandaloneConnector extends AbstractConnector { } const _Promise = PromiseContainer.get(); - return new _Promise((resolve, reject) => { + return new _Promise((resolve, reject) => { process.nextTick(() => { if (!this.connecting) { reject(new Error(CONNECTION_CLOSED_ERROR_MSG)) From 9e8bded4a4c802a45cba53c926f473801c9fd4af Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 13:16:39 -0400 Subject: [PATCH 17/20] another unhooked promise --- lib/redis/index.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/redis/index.ts b/lib/redis/index.ts index edf17548..b9c50bd1 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -318,6 +318,8 @@ Redis.prototype.connect = function (callback) { this.setStatus('end'); throw err; }); + + resolve(connectPromise); }) return asCallback(promise, callback) From f6c4ab87f9c586b3b5a8feaee9dbe562ec04699a Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 13:31:08 -0400 Subject: [PATCH 18/20] move to examples --- .eslintrc | 3 +- examples/basic_operations.js | 4 +- examples/custom_connector.js | 53 +++++++++++++++++++++++ lib/connectors/AsyncSentinelConnector.ts | 35 --------------- lib/connectors/SentinelConnector/index.ts | 4 +- lib/index.ts | 3 +- 6 files changed, 58 insertions(+), 44 deletions(-) create mode 100644 examples/custom_connector.js delete mode 100644 lib/connectors/AsyncSentinelConnector.ts diff --git a/.eslintrc b/.eslintrc index 1f01f7d5..85f55bfe 100644 --- a/.eslintrc +++ b/.eslintrc @@ -1,6 +1,7 @@ { "env": { - "node": true + "node": true, + "es6": true }, "rules": { "no-const-assign": 2, diff --git a/examples/basic_operations.js b/examples/basic_operations.js index 7f45edf7..36cb6f31 100644 --- a/examples/basic_operations.js +++ b/examples/basic_operations.js @@ -27,6 +27,4 @@ redis.sadd('set', [1, 3, 5, 7]); redis.set('key', 100, 'EX', 10); // Change the server configuration -redis.config('set', 'notify-keyspace-events', 'KEA') - - +redis.config('set', 'notify-keyspace-events', 'KEA'); diff --git a/examples/custom_connector.js b/examples/custom_connector.js new file mode 100644 index 00000000..fbe811fe --- /dev/null +++ b/examples/custom_connector.js @@ -0,0 +1,53 @@ +'use strict'; + +const Redis = require('ioredis'); +const MyService = require('path/to/my/service'); + +// Create a custom connector that fetches sentinels from an external call +class AsyncSentinelConnector extends Redis.SentinelConnector { + constructor(options = {}) { + // Placeholder + options.sentinels = options.sentinels || [{ host: 'localhost', port: 6379 }]; + + // SentinelConnector saves options as its property + super(options); + } + + connect(eventEmitter) { + return MyService.getSentinels().then(sentinels => { + this.options.sentinels = sentinels; + this.sentinelIterator = new Redis.SentinelIterator(sentinels); + return Redis.SentinelConnector.prototype.connect.call(this, eventEmitter); + }); + } +} + +const redis = new Redis({ + connector: new AsyncSentinelConnector() +}); + +// ioredis supports all Redis commands: +redis.set('foo', 'bar'); +redis.get('foo', function (err, result) { + if (err) { + console.error(err); + } else { + console.log(result); + } +}); +redis.del('foo'); + +// Or using a promise if the last argument isn't a function +redis.get('foo').then(function (result) { + console.log(result); +}); + +// Arguments to commands are flattened, so the following are the same: +redis.sadd('set', 1, 3, 5, 7); +redis.sadd('set', [1, 3, 5, 7]); + +// All arguments are passed directly to the redis server: +redis.set('key', 100, 'EX', 10); + +// Change the server configuration +redis.config('set', 'notify-keyspace-events', 'KEA'); diff --git a/lib/connectors/AsyncSentinelConnector.ts b/lib/connectors/AsyncSentinelConnector.ts deleted file mode 100644 index eda3995b..00000000 --- a/lib/connectors/AsyncSentinelConnector.ts +++ /dev/null @@ -1,35 +0,0 @@ -import SentinelConnector, { - ISentinelConnectionOptions, - ISentinelAddress, - SentinelIterator, - EMPTY_SENTINELS_MSG } from './SentinelConnector'; -import { ErrorEmitter } from './AbstractConnector'; -import { NetStream } from '../types'; - -export type AsyncSentinelFetch = () => Promise[]> - -export interface IAsyncSentinelConnectionOptions extends Pick> { - sentinels?: Partial[] -} - -export default class AsyncSentinelConnector extends SentinelConnector { - private fetch: AsyncSentinelFetch - - constructor(options: IAsyncSentinelConnectionOptions, fetch: AsyncSentinelFetch) { - options.sentinels = options.sentinels || [{host: 'localhost', port: 6379}] // Placeholder - super(options as ISentinelConnectionOptions) - - this.fetch = fetch; - } - - public connect(eventEmitter: ErrorEmitter): Promise { - return this.fetch().then(sentinels => { - if (!sentinels.length) throw new Error(EMPTY_SENTINELS_MSG); - - this.options.sentinels = sentinels; - this.sentinelIterator = new SentinelIterator(sentinels) - return super.connect(eventEmitter); - }); - } - -} \ No newline at end of file diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index 71d7fd42..8e037973 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -12,8 +12,6 @@ import Redis from '../../redis' const debug = Debug('SentinelConnector') -export const EMPTY_SENTINELS_MSG = 'Requires at least one sentinel to connect to.'; - interface IAddressFromResponse { port: string, ip: string, @@ -49,7 +47,7 @@ export default class SentinelConnector extends AbstractConnector { super() if (!this.options.sentinels.length) { - throw new Error(EMPTY_SENTINELS_MSG) + throw new Error('Requires at least one sentinel to connect to.') } if (!this.options.name) { throw new Error('Requires the name of master.') diff --git a/lib/index.ts b/lib/index.ts index b497ebde..eb33b9e9 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -6,8 +6,7 @@ export {default as Command} from './command' export {default as ScanStream} from './ScanStream' export {default as Pipeline} from './pipeline' export {default as AbstractConnector} from './connectors/AbstractConnector' -export {default as SentinelConnector} from './connectors/SentinelConnector' -export {default as AsyncSentinelConnector} from './connectors/AsyncSentinelConnector' +export {default as SentinelConnector, SentinelIterator} from './connectors/SentinelConnector' // Type Exports export {ISentinelAddress} from './connectors/SentinelConnector' From 678ab980d177dd8338bbbdbfc34f1a5bdab56ea6 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 14:06:01 -0400 Subject: [PATCH 19/20] one more bad resolve --- lib/connectors/StandaloneConnector.ts | 12 +++++------- lib/redis/index.ts | 24 +++++++++++++----------- package.json | 2 +- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/lib/connectors/StandaloneConnector.ts b/lib/connectors/StandaloneConnector.ts index 738a91e9..c9297c58 100644 --- a/lib/connectors/StandaloneConnector.ts +++ b/lib/connectors/StandaloneConnector.ts @@ -55,21 +55,19 @@ export default class StandaloneConnector extends AbstractConnector { reject(new Error(CONNECTION_CLOSED_ERROR_MSG)) return } - - let stream: NetStream + try { if (options.tls) { - stream = createTLSConnection(connectionOptions) + this.stream = createTLSConnection(connectionOptions) } else { - stream = createConnection(connectionOptions) + this.stream = createConnection(connectionOptions) } } catch (err) { reject(err) return } - - this.stream = stream - resolve(stream) + + resolve(this.stream) }) }) } diff --git a/lib/redis/index.ts b/lib/redis/index.ts index b9c50bd1..12a638d3 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -251,7 +251,7 @@ Redis.prototype.connect = function (callback) { subscriber: false }; - const connectPromise = this.options.connector.connect(this.silentEmit.bind(this)).then((stream: NetStream) => { + const connectPromise = options.connector.connect(this.silentEmit.bind(this)).then((stream: NetStream) => { var CONNECT_EVENT = options.tls ? 'secureConnect' : 'connect'; if (options.sentinels && !options.enableTLSForSentinelMode) { @@ -302,16 +302,18 @@ Redis.prototype.connect = function (callback) { stream.setNoDelay(true); } - var connectionReadyHandler = () => { - this.removeListener('close', connectionCloseHandler); - resolve(); - }; - var connectionCloseHandler = () => { - this.removeListener('ready', connectionReadyHandler); - reject(new Error(CONNECTION_CLOSED_ERROR_MSG)); - }; - this.once('ready', connectionReadyHandler); - this.once('close', connectionCloseHandler); + return new _Promise((_reject, _resolve) => { + var connectionReadyHandler = () => { + this.removeListener('close', connectionCloseHandler); + _resolve(); + }; + var connectionCloseHandler = () => { + this.removeListener('ready', connectionReadyHandler); + _reject(new Error(CONNECTION_CLOSED_ERROR_MSG)); + }; + this.once('ready', connectionReadyHandler); + this.once('close', connectionCloseHandler); + }); }).catch(err => { this.flushQueue(err); this.silentEmit('error', err); diff --git a/package.json b/package.json index b0ed630e..01f6443d 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "built/" ], "scripts": { - "test": "NODE_ENV=test mocha test/**/*.ts", + "test": "TS_NODE_LOG_ERROR=true NODE_ENV=test mocha test/**/*.ts", "test:cov": "TS_NODE_LOG_ERROR=true NODE_ENV=test node ./node_modules/istanbul/lib/cli.js cover --preserve-comments ./node_modules/mocha/bin/_mocha -- -r ts-node/register -R spec --exit test/**/*.ts", "build": "rm -rf built && tsc", "prepublishOnly": "npm run build && npm test", From 9e43e47319234568adc32e8eb360ff1fb18b2334 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Mon, 24 Jun 2019 14:16:09 -0400 Subject: [PATCH 20/20] take a step back --- lib/redis/index.ts | 57 +++++++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/lib/redis/index.ts b/lib/redis/index.ts index 12a638d3..7b283da5 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -13,7 +13,6 @@ import * as commands from 'redis-commands'; import * as PromiseContainer from '../promiseContainer'; import {addTransactionSupport} from '../transaction'; import {IRedisOptions, ReconnectOnError, DEFAULT_REDIS_OPTIONS} from './RedisOptions'; -import { NetStream } from '../types'; var debug = Debug('redis') @@ -251,21 +250,30 @@ Redis.prototype.connect = function (callback) { subscriber: false }; - const connectPromise = options.connector.connect(this.silentEmit.bind(this)).then((stream: NetStream) => { - + var _this = this; + asCallback(options.connector.connect(function (type, err) { + _this.silentEmit(type, err); + }), function (err, stream) { + if (err) { + _this.flushQueue(err); + _this.silentEmit('error', err); + reject(err); + _this.setStatus('end'); + return; + } var CONNECT_EVENT = options.tls ? 'secureConnect' : 'connect'; if (options.sentinels && !options.enableTLSForSentinelMode) { CONNECT_EVENT = 'connect'; } - this.stream = stream; + _this.stream = stream; if (typeof options.keepAlive === 'number') { stream.setKeepAlive(true, options.keepAlive); } - stream.once(CONNECT_EVENT, eventHandler.connectHandler(this)); - stream.once('error', eventHandler.errorHandler(this)); - stream.once('close', eventHandler.closeHandler(this)); + stream.once(CONNECT_EVENT, eventHandler.connectHandler(_this)); + stream.once('error', eventHandler.errorHandler(_this)); + stream.once('close', eventHandler.closeHandler(_this)); if (options.connectTimeout) { /* @@ -276,7 +284,7 @@ Redis.prototype.connect = function (callback) { * See https://github.com/electron/electron/issues/14915 */ var connectTimeoutCleared = false; - stream.setTimeout(options.connectTimeout, () => { + stream.setTimeout(options.connectTimeout, function () { if (connectTimeoutCleared) { return; } @@ -290,9 +298,9 @@ Redis.prototype.connect = function (callback) { err.code = 'ETIMEDOUT'; // @ts-ignore err.syscall = 'connect'; - eventHandler.errorHandler(this)(err); + eventHandler.errorHandler(_this)(err); }); - stream.once(CONNECT_EVENT, () => { + stream.once(CONNECT_EVENT, function () { connectTimeoutCleared = true; stream.setTimeout(0); }); @@ -302,26 +310,17 @@ Redis.prototype.connect = function (callback) { stream.setNoDelay(true); } - return new _Promise((_reject, _resolve) => { - var connectionReadyHandler = () => { - this.removeListener('close', connectionCloseHandler); - _resolve(); - }; - var connectionCloseHandler = () => { - this.removeListener('ready', connectionReadyHandler); - _reject(new Error(CONNECTION_CLOSED_ERROR_MSG)); - }; - this.once('ready', connectionReadyHandler); - this.once('close', connectionCloseHandler); - }); - }).catch(err => { - this.flushQueue(err); - this.silentEmit('error', err); - this.setStatus('end'); - throw err; + var connectionReadyHandler = function () { + _this.removeListener('close', connectionCloseHandler); + resolve(); + }; + var connectionCloseHandler = function () { + _this.removeListener('ready', connectionReadyHandler); + reject(new Error(CONNECTION_CLOSED_ERROR_MSG)); + }; + _this.once('ready', connectionReadyHandler); + _this.once('close', connectionCloseHandler); }); - - resolve(connectPromise); }) return asCallback(promise, callback)