Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixed autopipeline performances. #1226

Merged
merged 4 commits into from
Jun 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ node_modules
built

.vscode
benchmarks/fixtures/*.txt
benchmarks/fixtures/*.txt

16 changes: 2 additions & 14 deletions lib/autoPipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,6 @@ export const notAllowedAutoPipelineCommands = [
"unpsubscribe",
];

function findAutoPipeline(
client,
_commandName,
...args: Array<string>
): string {
if (!client.isCluster) {
return "main";
}

// We have slot information, we can improve routing by grouping slots served by the same subset of nodes
return client.slots[calculateSlot(args[0])].join(",");
}

function executeAutoPipeline(client, slotKey: string) {
/*
If a pipeline is already executing, keep queueing up commands
Expand Down Expand Up @@ -116,7 +103,8 @@ export function executeWithAutoPipelining(
});
}

const slotKey = findAutoPipeline(client, commandName, ...args);
// If we have slot information, we can improve routing by grouping slots served by the same subset of nodes
const slotKey = client.isCluster ? client.slots[calculateSlot(args[0])].join(",") : 'main';

if (!client._autoPipelines.has(slotKey)) {
const pipeline = client.pipeline();
Expand Down
24 changes: 24 additions & 0 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class Cluster extends EventEmitter {
private isRefreshing = false;
public isCluster = true;
private _autoPipelines: Map<string, typeof Pipeline> = new Map();
private _groupsIds: {[key: string]: number} = {};
private _groupsBySlot: number[] = Array(16384);
private _runningAutoPipelines: Set<string> = new Set();
private _readyDelayedCallbacks: CallbackFunction[] = [];
public _addedScriptHashes: { [key: string]: any } = {};
Expand Down Expand Up @@ -188,7 +190,10 @@ class Cluster extends EventEmitter {
return;
}

// Make sure only one timer is active at a time
clearInterval(this._addedScriptHashesCleanInterval);

// Start the script cache cleaning
this._addedScriptHashesCleanInterval = setInterval(() => {
this._addedScriptHashes = {};
}, this.options.maxScriptsCachingTime);
Expand Down Expand Up @@ -627,6 +632,7 @@ class Cluster extends EventEmitter {
} else {
_this.slots[slot] = [key];
}
_this._groupsBySlot[slot] = _this._groupsIds[_this.slots[slot].join(';')];
_this.connectionPool.findOrCreate(_this.natMapper(key));
tryConnection();
debug("refreshing slot caches... (triggered by MOVED error)");
Expand Down Expand Up @@ -860,6 +866,24 @@ class Cluster extends EventEmitter {
}
}

// Assign to each node keys a numeric value to make autopipeline comparison faster.
this._groupsIds = Object.create(null);
let j = 0;
for (let i = 0; i < 16384; i++) {
const target = (this.slots[i] || []).join(';');

if (!target.length) {
this._groupsBySlot[i] = undefined;
continue;
}

if (!this._groupsIds[target]) {
this._groupsIds[target] = ++j;
}

this._groupsBySlot[i] = this._groupsIds[target];
}

this.connectionPool.reset(nodes);
callback();
}, this.options.slotsRefreshTimeout)
Expand Down
9 changes: 4 additions & 5 deletions lib/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ import Commander from "./commander";
*/
function generateMultiWithNodes(redis, keys) {
const slot = calculateSlot(keys[0]);
const target = redis.slots[slot].join(",");

const target = redis._groupsBySlot[slot];
for (let i = 1; i < keys.length; i++) {
const currentTarget = redis.slots[calculateSlot(keys[i])].join(",");

if (currentTarget !== target) {
if (redis._groupsBySlot[calculateSlot(keys[i])] !== target) {
return -1;
}
}
Expand Down Expand Up @@ -158,6 +156,7 @@ Pipeline.prototype.fillResult = function (value, position) {
moved: function (slot, key) {
_this.preferKey = key;
_this.redis.slots[errv[1]] = [key];
_this.redis._groupsBySlot[errv[1]] = _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")];
_this.redis.refreshSlotsCache();
_this.exec();
},
Expand Down
4 changes: 4 additions & 0 deletions lib/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,11 @@ Redis.prototype.connect = function (callback) {
reject(new Error("Redis is already connecting/connected"));
return;
}

// Make sure only one timer is active at a time
clearInterval(this._addedScriptHashesCleanInterval);

// Start the script cache cleaning
this._addedScriptHashesCleanInterval = setInterval(() => {
this._addedScriptHashes = {};
}, this.options.maxScriptsCachingTime);
Expand Down
32 changes: 19 additions & 13 deletions test/functional/cluster/autopipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ use(require("chai-as-promised"));
Instead foo1 and foo2 are usually served by different nodes in a 3-nodes cluster.
*/
describe("autoPipelining for cluster", function () {
function changeSlot(cluster, from, to) {
cluster.slots[from] = cluster.slots[to];
cluster._groupsBySlot[from] = cluster._groupsBySlot[to];
}

beforeEach(() => {
const slotTable = [
[0, 5000, ["127.0.0.1", 30001]],
Expand Down Expand Up @@ -402,11 +407,12 @@ describe("autoPipelining for cluster", function () {
const promise4 = cluster.set("foo6", "bar");

// Override slots to induce a failure
const key1Slot = calculateKeySlot("foo1");
const key2Slot = calculateKeySlot("foo2");
const key5Slot = calculateKeySlot("foo5");
cluster.slots[key1Slot] = cluster.slots[key2Slot];
cluster.slots[key2Slot] = cluster.slots[key5Slot];
const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
const key5Slot = calculateKeySlot('foo5');

changeSlot(cluster, key1Slot, key2Slot);
changeSlot(cluster, key2Slot, key5Slot);

await expect(promise1).to.eventually.be.rejectedWith(
"All keys in the pipeline should belong to the same slots allocation group"
Expand Down Expand Up @@ -492,11 +498,11 @@ describe("autoPipelining for cluster", function () {
expect(cluster.autoPipelineQueueSize).to.eql(4);

// Override slots to induce a failure
const key1Slot = calculateKeySlot("foo1");
const key2Slot = calculateKeySlot("foo2");
const key5Slot = calculateKeySlot("foo5");
cluster.slots[key1Slot] = cluster.slots[key2Slot];
cluster.slots[key2Slot] = cluster.slots[key5Slot];
const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
const key5Slot = calculateKeySlot('foo5');
changeSlot(cluster, key1Slot, key2Slot);
changeSlot(cluster, key2Slot, key5Slot);
});
});

Expand Down Expand Up @@ -541,9 +547,9 @@ describe("autoPipelining for cluster", function () {

expect(cluster.autoPipelineQueueSize).to.eql(3);

const key1Slot = calculateKeySlot("foo1");
const key2Slot = calculateKeySlot("foo2");
cluster.slots[key1Slot] = cluster.slots[key2Slot];
const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
changeSlot(cluster, key1Slot, key2Slot);
});
});
});
18 changes: 18 additions & 0 deletions test/functional/cluster/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,4 +438,22 @@ describe("cluster:disconnect", function () {
done();
});
});

it("should clear the added script hashes interval even when no connection succeeded", function (done) {
const cluster = new Cluster([{ host: "127.0.0.1", port: "0" }], {
enableReadyCheck: false,
});

let attempt = 0;
cluster.on("error", function () {
if(attempt < 5) {
attempt ++;
return
}
cluster.quit();

expect(cluster._addedScriptHashesCleanInterval).to.be.null;
done();
});
});
});
17 changes: 17 additions & 0 deletions test/functional/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,4 +549,21 @@ describe("disconnection", function () {
}
});
});

it("should clear the added script hashes interval even when no connection succeeded", function (done) {
let attempt = 0;
const redis = new Redis(0, 'localhost');

redis.on("error", function () {
if(attempt < 5) {
attempt ++;
return
}

redis.quit();

expect(redis._addedScriptHashesCleanInterval).to.be.null;
done();
});
});
});