Skip to content

Commit

Permalink
fix: use-versions-for-lua-commands
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Oct 19, 2024
1 parent ecec076 commit 9ea19b7
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 37 deletions.
10 changes: 6 additions & 4 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,11 +609,13 @@ export class Job<
Date.now() + delay,
token,
);
(<any>multi).moveToDelayed(args);
this.scripts.execCommand(multi, 'moveToDelayed', args);
command = 'delayed';
} else {
// Retry immediately
(<any>multi).retryJob(
this.scripts.execCommand(
multi,
'retryJob',
this.scripts.retryJobArgs(this.id, this.opts.lifo, token),
);
command = 'retryJob';
Expand All @@ -631,7 +633,7 @@ export class Job<
token,
fetchNext,
);
(<any>multi).moveToFinished(args);
this.scripts.execCommand(multi, 'moveToFinished', args);
finishedOn = args[13];
command = 'failed';
}
Expand Down Expand Up @@ -1110,7 +1112,7 @@ export class Job<
err?.message,
);

(<any>multi).saveStacktrace(args);
this.scripts.execCommand(multi, 'saveStacktrace', args);
}
}

Expand Down
16 changes: 12 additions & 4 deletions src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
isRedisCluster,
isRedisInstance,
isRedisVersionLowerThan,
readPackageJson,
} from '../utils';
import * as scripts from '../scripts';

Expand Down Expand Up @@ -153,13 +154,18 @@ export class RedisConnection extends EventEmitter {
return this.initializing;
}

protected loadCommands(providedScripts?: Record<string, RawCommand>): void {
protected loadCommands(
version?: string,
providedScripts?: Record<string, RawCommand>,
): void {
const finalScripts =
providedScripts || (scripts as Record<string, RawCommand>);
for (const property in finalScripts as Record<string, RawCommand>) {
// Only define the command if not already defined
if (!(<any>this._client)[finalScripts[property].name]) {
(<any>this._client).defineCommand(finalScripts[property].name, {
const commandName = `${finalScripts[property].name}:${version}`;

if (!(<any>this._client)[commandName]) {
(<any>this._client).defineCommand(commandName, {
numberOfKeys: finalScripts[property].keys,
lua: finalScripts[property].content,
});
Expand All @@ -178,8 +184,10 @@ export class RedisConnection extends EventEmitter {

this._client.on('ready', this.handleClientReady);

const { version } = readPackageJson();

await RedisConnection.waitUntilReady(this._client);
this.loadCommands();
this.loadCommands(version);

this.version = await this.getRedisVersion();
if (this.opts && this.opts.skipVersionCheck !== true && !this.closing) {
Expand Down
106 changes: 77 additions & 29 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,26 @@ import {
RedisJobOptions,
} from '../types';
import { ErrorCode } from '../enums';
import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils';
import {
array2obj,
getParentKey,
isRedisVersionLowerThan,
readPackageJson,
} from '../utils';
import { ChainableCommander } from 'ioredis';

export type JobData = [JobJsonRaw | number, string?];

export class Scripts {
protected version;

moveToFinishedKeys: (string | undefined)[];

constructor(protected queue: MinimalQueue) {
const queueKeys = this.queue.keys;

this.version = readPackageJson().version;

this.moveToFinishedKeys = [
queueKeys.wait,
queueKeys.active,
Expand All @@ -59,13 +68,22 @@ export class Scripts {
];
}

public execCommand(
client: RedisClient | ChainableCommander,
commandName: string,
args: any[],
) {
const commandNameWithVersion = `${commandName}:${this.version}`;
return (<any>client)[commandNameWithVersion](args);
}

async isJobInList(listKey: string, jobId: string): Promise<boolean> {
const client = await this.queue.client;
let result;
if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6')) {
result = await (<any>client).isJobInList([listKey, jobId]);
result = await this.execCommand(client, 'isJobInList', [listKey, jobId]);
} else {
result = await (<any>client).lpos(listKey, jobId);
result = await client.lpos(listKey, jobId);
}
return Number.isInteger(result);
}
Expand Down Expand Up @@ -127,7 +145,7 @@ export class Scripts {

keys.push(pack(args), job.data, encodedOpts);

const result = await (<any>client).addJob(keys);
const result = await this.execCommand(client, 'addJob', keys);

if (result < 0) {
throw this.finishedErrors(result, parentOpts.parentKey, 'addJob');
Expand All @@ -152,7 +170,11 @@ export class Scripts {

keys.push(this.queue.keys.events);

return (<any>client).pause(keys.concat([pause ? 'paused' : 'resumed']));
return this.execCommand(
client,
'pause',
keys.concat([pause ? 'paused' : 'resumed']),
);
}

private removeRepeatableArgs(
Expand All @@ -175,14 +197,14 @@ export class Scripts {
const client = await this.queue.client;
const args = this.removeRepeatableArgs(repeatJobId, repeatJobKey);

return (<any>client).removeRepeatable(args);
return this.execCommand(client, 'removeRepeatable', args);
}

async remove(jobId: string): Promise<number> {
const client = await this.queue.client;

const keys = [''].map(name => this.queue.toKey(name));
return (<any>client).removeJob(keys.concat([jobId]));
return this.execCommand(client, 'removeJob', keys.concat([jobId]));
}

async extendLock(
Expand All @@ -199,7 +221,7 @@ export class Scripts {
duration,
jobId,
];
return (<any>client).extendLock(args);
return this.execCommand(client, 'extendLock', args);
}

async updateData<T = any, R = any, N extends string = string>(
Expand All @@ -211,7 +233,11 @@ export class Scripts {
const keys = [this.queue.toKey(job.id)];
const dataJson = JSON.stringify(data);

const result = await (<any>client).updateData(keys.concat([dataJson]));
const result = await this.execCommand(
client,
'updateData',
keys.concat([dataJson]),
);

if (result < 0) {
throw this.finishedErrors(result, job.id, 'updateData');
Expand All @@ -227,7 +253,9 @@ export class Scripts {
const keys = [this.queue.toKey(job.id), this.queue.keys.events];
const progressJson = JSON.stringify(progress);

const result = await (<any>client).updateProgress(
const result = await this.execCommand(
client,
'updateProgress',
keys.concat([job.id, progressJson]),
);

Expand Down Expand Up @@ -309,7 +337,7 @@ export class Scripts {
) {
const client = await this.queue.client;

const result = await (<any>client).moveToFinished(args);
const result = await this.execCommand(client, 'moveToFinished', args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'moveToFinished', 'active');
} else {
Expand Down Expand Up @@ -366,7 +394,7 @@ export class Scripts {
const client = await this.queue.client;
const args = this.drainArgs(delayed);

return (<any>client).drain(args);
return this.execCommand(client, 'drain', args);
}

private getRangesArgs(
Expand Down Expand Up @@ -396,7 +424,7 @@ export class Scripts {
const client = await this.queue.client;
const args = this.getRangesArgs(types, start, end, asc);

return (<any>client).getRanges(args);
return this.execCommand(client, 'getRanges', args);
}

private getCountsArgs(types: JobType[]): (string | number)[] {
Expand All @@ -416,7 +444,7 @@ export class Scripts {
const client = await this.queue.client;
const args = this.getCountsArgs(types);

return (<any>client).getCounts(args);
return this.execCommand(client, 'getCounts', args);
}

moveToCompletedArgs<T = any, R = any, N extends string = string>(
Expand Down Expand Up @@ -469,7 +497,9 @@ export class Scripts {
return this.queue.toKey(key);
});

return (<any>client).isFinished(
return this.execCommand(
client,
'isFinished',
keys.concat([jobId, returnValue ? '1' : '']),
);
}
Expand All @@ -490,16 +520,16 @@ export class Scripts {
});

if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6')) {
return (<any>client).getState(keys.concat([jobId]));
return this.execCommand(client, 'getState', keys.concat([jobId]));
}
return (<any>client).getStateV2(keys.concat([jobId]));
return this.execCommand(client, 'getStateV2', keys.concat([jobId]));
}

async changeDelay(jobId: string, delay: number): Promise<void> {
const client = await this.queue.client;

const args = this.changeDelayArgs(jobId, delay);
const result = await (<any>client).changeDelay(args);
const result = await this.execCommand(client, 'changeDelay', args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'changeDelay', 'delayed');
}
Expand Down Expand Up @@ -535,7 +565,7 @@ export class Scripts {
const client = await this.queue.client;

const args = this.changePriorityArgs(jobId, priority, lifo);
const result = await (<any>client).changePriority(args);
const result = await this.execCommand(client, 'changePriority', args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'changePriority');
}
Expand Down Expand Up @@ -645,7 +675,7 @@ export class Scripts {
const client = await this.queue.client;

const args = this.moveToDelayedArgs(jobId, timestamp, token);
const result = await (<any>client).moveToDelayed(args);
const result = await this.execCommand(client, 'moveToDelayed', args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'moveToDelayed', 'active');
}
Expand All @@ -670,7 +700,11 @@ export class Scripts {
const client = await this.queue.client;

const args = this.moveToWaitingChildrenArgs(jobId, token, opts);
const result = await (<any>client).moveToWaitingChildren(args);
const result = await this.execCommand(
client,
'moveToWaitingChildren',
args,
);

switch (result) {
case 0:
Expand Down Expand Up @@ -699,7 +733,7 @@ export class Scripts {
): Promise<string[]> {
const client = await this.queue.client;

return (<any>client).cleanJobsInSet([
return this.execCommand(client, 'cleanJobsInSet', [
this.queue.toKey(set),
this.queue.toKey('events'),
this.queue.toKey(''),
Expand Down Expand Up @@ -769,7 +803,7 @@ export class Scripts {

const args = this.retryJobsArgs(state, count, timestamp);

return (<any>client).retryJobs(args);
return this.execCommand(client, 'retryJobs', args);
}

/**
Expand Down Expand Up @@ -807,7 +841,11 @@ export class Scripts {
state,
];

const result = await (<any>client).reprocessJob(keys.concat(args));
const result = await this.execCommand(
client,
'reprocessJob',
keys.concat(args),
);

switch (result) {
case 1:
Expand Down Expand Up @@ -845,7 +883,9 @@ export class Scripts {
}),
];

const result = await (<any>client).moveToActive(
const result = await this.execCommand(
client,
'moveToActive',
(<(string | number | boolean | Buffer)[]>keys).concat(args),
);

Expand All @@ -866,7 +906,7 @@ export class Scripts {

const args = [this.queue.toKey(''), jobId];

return (<any>client).promote(keys.concat(args));
return this.execCommand(client, 'promote', keys.concat(args));
}

/**
Expand Down Expand Up @@ -898,7 +938,7 @@ export class Scripts {
Date.now(),
opts.stalledInterval,
];
return (<any>client).moveStalledJobsToWait(keys.concat(args));
return this.execCommand(client, 'moveStalledJobsToWait', keys.concat(args));
}

/**
Expand Down Expand Up @@ -928,7 +968,11 @@ export class Scripts {

const args = [jobId, token, this.queue.toKey(jobId)];

const pttl = await (<any>client).moveJobFromActiveToWait(keys.concat(args));
const pttl = await this.execCommand(
client,
'moveJobFromActiveToWait',
keys.concat(args),
);

return pttl < 0 ? 0 : pttl;
}
Expand All @@ -942,7 +986,11 @@ export class Scripts {
];
const args = [opts.count, opts.force ? 'force' : null];

const result = await (<any>client).obliterate(keys.concat(args));
const result = await this.execCommand(
client,
'obliterate',
keys.concat(args),
);
if (result < 0) {
switch (result) {
case -1:
Expand Down

0 comments on commit 9ea19b7

Please sign in to comment.