Skip to content

Commit

Permalink
fix: Ensure delayed callbacks are always invoked.
Browse files Browse the repository at this point in the history
  • Loading branch information
ShogunPanda authored and AVVS committed Oct 23, 2020
1 parent f560813 commit d6e78c3
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 22 deletions.
14 changes: 8 additions & 6 deletions lib/autoPipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ export function executeWithAutoPipelining(
// On cluster mode let's wait for slots to be available
if (client.isCluster && !client.slots.length) {
return new CustomPromise(function (resolve, reject) {
client.delayUntilReady(() => {
executeWithAutoPipelining(client, commandName, args, callback).then(
resolve,
reject
);
});
client.delayUntilReady(err => {
if (err) {
reject(err);
return;
}

executeWithAutoPipelining(client, commandName, args, callback).then(resolve, reject);
})
});
}

Expand Down
26 changes: 14 additions & 12 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class Cluster extends EventEmitter {

let closeListener: () => void = undefined;
const refreshListener = () => {
this.invokeReadyDelayedCallbacks(undefined);
this.removeListener("close", closeListener);
this.manuallyClosing = false;
this.setStatus("connect");
Expand All @@ -250,8 +251,11 @@ class Cluster extends EventEmitter {
};

closeListener = function () {
const error = new Error("None of startup nodes is available");

this.removeListener("refresh", refreshListener);
reject(new Error("None of startup nodes is available"));
this.invokeReadyDelayedCallbacks(error);
reject(error);
};

this.once("refresh", refreshListener);
Expand All @@ -271,6 +275,7 @@ class Cluster extends EventEmitter {
.catch((err) => {
this.setStatus("close");
this.handleCloseEvent(err);
this.invokeReadyDelayedCallbacks(err);
reject(err);
});
});
Expand Down Expand Up @@ -440,17 +445,6 @@ class Cluster extends EventEmitter {

// This is needed in order not to install a listener for each auto pipeline
public delayUntilReady(callback: CallbackFunction) {
// First call, setup the event listener
if (!this._readyDelayedCallbacks.length) {
this.once("ready", (...args) => {
for (const c of this._readyDelayedCallbacks) {
c(...args);
}

this._readyDelayedCallbacks = [];
});
}

this._readyDelayedCallbacks.push(callback);
}

Expand Down Expand Up @@ -850,6 +844,14 @@ class Cluster extends EventEmitter {
);
}

private invokeReadyDelayedCallbacks(err) {
for (const c of this._readyDelayedCallbacks) {
process.nextTick(c, err);
}

this._readyDelayedCallbacks = [];
}

/**
* Check whether Cluster is able to process commands
*
Expand Down
9 changes: 7 additions & 2 deletions lib/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,14 @@ Pipeline.prototype.execBuffer = deprecate(function () {
Pipeline.prototype.exec = function (callback: CallbackFunction) {
// Wait for the cluster to be connected, since we need nodes information before continuing
if (this.isCluster && !this.redis.slots.length) {
this.redis.delayUntilReady(() => {
this.redis.delayUntilReady(err => {
if (err) {
callback(err);
return;
}

this.exec(callback);
});
})

return this.promise;
}
Expand Down
9 changes: 7 additions & 2 deletions lib/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ export function addTransactionSupport(redis) {
if (this.isCluster && !this.redis.slots.length) {
return asCallback(
new Promise((resolve, reject) => {
this.redis.delayUntilReady(() => {
this.redis.delayUntilReady(err => {
if (err) {
reject(err);
return;
}

this.exec(pipeline).then(resolve, reject);
});
})
}),
callback
);
Expand Down

0 comments on commit d6e78c3

Please sign in to comment.