Skip to content

Commit

Permalink
fix: fixed autopipeline performances. (#1226)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShogunPanda authored Jun 13, 2021
1 parent 71f2994 commit 42f1ee1
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 33 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ node_modules
built

.vscode
benchmarks/fixtures/*.txt
benchmarks/fixtures/*.txt

16 changes: 2 additions & 14 deletions lib/autoPipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,6 @@ export const notAllowedAutoPipelineCommands = [
"unpsubscribe",
];

function findAutoPipeline(
client,
_commandName,
...args: Array<string>
): string {
if (!client.isCluster) {
return "main";
}

// We have slot information, we can improve routing by grouping slots served by the same subset of nodes
return client.slots[calculateSlot(args[0])].join(",");
}

function executeAutoPipeline(client, slotKey: string) {
/*
If a pipeline is already executing, keep queueing up commands
Expand Down Expand Up @@ -116,7 +103,8 @@ export function executeWithAutoPipelining(
});
}

const slotKey = findAutoPipeline(client, commandName, ...args);
// If we have slot information, we can improve routing by grouping slots served by the same subset of nodes
const slotKey = client.isCluster ? client.slots[calculateSlot(args[0])].join(",") : 'main';

if (!client._autoPipelines.has(slotKey)) {
const pipeline = client.pipeline();
Expand Down
24 changes: 24 additions & 0 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class Cluster extends EventEmitter {
private isRefreshing = false;
public isCluster = true;
private _autoPipelines: Map<string, typeof Pipeline> = new Map();
private _groupsIds: {[key: string]: number} = {};
private _groupsBySlot: number[] = Array(16384);
private _runningAutoPipelines: Set<string> = new Set();
private _readyDelayedCallbacks: CallbackFunction[] = [];
public _addedScriptHashes: { [key: string]: any } = {};
Expand Down Expand Up @@ -188,7 +190,10 @@ class Cluster extends EventEmitter {
return;
}

// Make sure only one timer is active at a time
clearInterval(this._addedScriptHashesCleanInterval);

// Start the script cache cleaning
this._addedScriptHashesCleanInterval = setInterval(() => {
this._addedScriptHashes = {};
}, this.options.maxScriptsCachingTime);
Expand Down Expand Up @@ -627,6 +632,7 @@ class Cluster extends EventEmitter {
} else {
_this.slots[slot] = [key];
}
_this._groupsBySlot[slot] = _this._groupsIds[_this.slots[slot].join(';')];
_this.connectionPool.findOrCreate(_this.natMapper(key));
tryConnection();
debug("refreshing slot caches... (triggered by MOVED error)");
Expand Down Expand Up @@ -860,6 +866,24 @@ class Cluster extends EventEmitter {
}
}

// Assign to each node keys a numeric value to make autopipeline comparison faster.
this._groupsIds = Object.create(null);
let j = 0;
for (let i = 0; i < 16384; i++) {
const target = (this.slots[i] || []).join(';');

if (!target.length) {
this._groupsBySlot[i] = undefined;
continue;
}

if (!this._groupsIds[target]) {
this._groupsIds[target] = ++j;
}

this._groupsBySlot[i] = this._groupsIds[target];
}

this.connectionPool.reset(nodes);
callback();
}, this.options.slotsRefreshTimeout)
Expand Down
9 changes: 4 additions & 5 deletions lib/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ import Commander from "./commander";
*/
function generateMultiWithNodes(redis, keys) {
const slot = calculateSlot(keys[0]);
const target = redis.slots[slot].join(",");

const target = redis._groupsBySlot[slot];
for (let i = 1; i < keys.length; i++) {
const currentTarget = redis.slots[calculateSlot(keys[i])].join(",");

if (currentTarget !== target) {
if (redis._groupsBySlot[calculateSlot(keys[i])] !== target) {
return -1;
}
}
Expand Down Expand Up @@ -158,6 +156,7 @@ Pipeline.prototype.fillResult = function (value, position) {
moved: function (slot, key) {
_this.preferKey = key;
_this.redis.slots[errv[1]] = [key];
_this.redis._groupsBySlot[errv[1]] = _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")];
_this.redis.refreshSlotsCache();
_this.exec();
},
Expand Down
4 changes: 4 additions & 0 deletions lib/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,11 @@ Redis.prototype.connect = function (callback) {
reject(new Error("Redis is already connecting/connected"));
return;
}

// Make sure only one timer is active at a time
clearInterval(this._addedScriptHashesCleanInterval);

// Start the script cache cleaning
this._addedScriptHashesCleanInterval = setInterval(() => {
this._addedScriptHashes = {};
}, this.options.maxScriptsCachingTime);
Expand Down
32 changes: 19 additions & 13 deletions test/functional/cluster/autopipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ use(require("chai-as-promised"));
Instead foo1 and foo2 are usually served by different nodes in a 3-nodes cluster.
*/
describe("autoPipelining for cluster", function () {
function changeSlot(cluster, from, to) {
cluster.slots[from] = cluster.slots[to];
cluster._groupsBySlot[from] = cluster._groupsBySlot[to];
}

beforeEach(() => {
const slotTable = [
[0, 5000, ["127.0.0.1", 30001]],
Expand Down Expand Up @@ -402,11 +407,12 @@ describe("autoPipelining for cluster", function () {
const promise4 = cluster.set("foo6", "bar");

// Override slots to induce a failure
const key1Slot = calculateKeySlot("foo1");
const key2Slot = calculateKeySlot("foo2");
const key5Slot = calculateKeySlot("foo5");
cluster.slots[key1Slot] = cluster.slots[key2Slot];
cluster.slots[key2Slot] = cluster.slots[key5Slot];
const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
const key5Slot = calculateKeySlot('foo5');

changeSlot(cluster, key1Slot, key2Slot);
changeSlot(cluster, key2Slot, key5Slot);

await expect(promise1).to.eventually.be.rejectedWith(
"All keys in the pipeline should belong to the same slots allocation group"
Expand Down Expand Up @@ -492,11 +498,11 @@ describe("autoPipelining for cluster", function () {
expect(cluster.autoPipelineQueueSize).to.eql(4);

// Override slots to induce a failure
const key1Slot = calculateKeySlot("foo1");
const key2Slot = calculateKeySlot("foo2");
const key5Slot = calculateKeySlot("foo5");
cluster.slots[key1Slot] = cluster.slots[key2Slot];
cluster.slots[key2Slot] = cluster.slots[key5Slot];
const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
const key5Slot = calculateKeySlot('foo5');
changeSlot(cluster, key1Slot, key2Slot);
changeSlot(cluster, key2Slot, key5Slot);
});
});

Expand Down Expand Up @@ -541,9 +547,9 @@ describe("autoPipelining for cluster", function () {

expect(cluster.autoPipelineQueueSize).to.eql(3);

const key1Slot = calculateKeySlot("foo1");
const key2Slot = calculateKeySlot("foo2");
cluster.slots[key1Slot] = cluster.slots[key2Slot];
const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
changeSlot(cluster, key1Slot, key2Slot);
});
});
});
18 changes: 18 additions & 0 deletions test/functional/cluster/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,4 +438,22 @@ describe("cluster:disconnect", function () {
done();
});
});

it("should clear the added script hashes interval even when no connection succeeded", function (done) {
const cluster = new Cluster([{ host: "127.0.0.1", port: "0" }], {
enableReadyCheck: false,
});

let attempt = 0;
cluster.on("error", function () {
if(attempt < 5) {
attempt ++;
return
}
cluster.quit();

expect(cluster._addedScriptHashesCleanInterval).to.be.null;
done();
});
});
});
17 changes: 17 additions & 0 deletions test/functional/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,4 +549,21 @@ describe("disconnection", function () {
}
});
});

it("should clear the added script hashes interval even when no connection succeeded", function (done) {
let attempt = 0;
const redis = new Redis(0, 'localhost');

redis.on("error", function () {
if(attempt < 5) {
attempt ++;
return
}

redis.quit();

expect(redis._addedScriptHashesCleanInterval).to.be.null;
done();
});
});
});

0 comments on commit 42f1ee1

Please sign in to comment.