diff --git a/CHANGELOG.md b/CHANGELOG.md index 33e6b49599..3e33086d73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ #### Changes +* Node: Added FUNCTION KILL command ([#2114](https://github.com/valkey-io/valkey-glide/pull/2114)) * Node: Added XAUTOCLAIM command ([#2108](https://github.com/valkey-io/valkey-glide/pull/2108)) * Node: Added XPENDING commands ([#2085](https://github.com/valkey-io/valkey-glide/pull/2085)) * Node: Added HSCAN command ([#2098](https://github.com/valkey-io/valkey-glide/pull/2098/)) diff --git a/java/integTest/src/test/java/glide/TestUtilities.java b/java/integTest/src/test/java/glide/TestUtilities.java index 9fc3a2931b..97c5222ba7 100644 --- a/java/integTest/src/test/java/glide/TestUtilities.java +++ b/java/integTest/src/test/java/glide/TestUtilities.java @@ -323,8 +323,8 @@ public static GlideString generateLuaLibCodeBinary( } /** - * Create a lua lib with a RO function which runs an endless loop up to timeout sec.
- * Execution takes at least 5 sec regardless of the timeout configured.
+ * Create a lua lib with a function which runs an endless loop up to timeout sec.
+ * Execution takes at least 5 sec regardless of the timeout configured. */ public static String createLuaLibWithLongRunningFunction( String libName, String funcName, int timeout, boolean readOnly) { diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 86295c9154..e831492622 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -2240,6 +2240,11 @@ export function createFunctionStats(): command_request.Command { return createCommand(RequestType.FunctionStats, []); } +/** @internal */ +export function createFunctionKill(): command_request.Command { + return createCommand(RequestType.FunctionKill, []); +} + /** * Represents offsets specifying a string interval to analyze in the {@link BaseClient.bitcount|bitcount} command. The offsets are * zero-based indexes, with `0` being the first index of the string, `1` being the next index and so on. diff --git a/node/src/GlideClient.ts b/node/src/GlideClient.ts index bf631b898d..4dfb69d567 100644 --- a/node/src/GlideClient.ts +++ b/node/src/GlideClient.ts @@ -34,6 +34,7 @@ import { createFlushDB, createFunctionDelete, createFunctionFlush, + createFunctionKill, createFunctionList, createFunctionLoad, createFunctionStats, @@ -634,6 +635,24 @@ export class GlideClient extends BaseClient { return this.createWritePromise(createFunctionStats()); } + /** + * Kills a function that is currently executing. + * `FUNCTION KILL` terminates read-only functions only. + * + * See https://valkey.io/commands/function-kill/ for details. + * + * since Valkey version 7.0.0. + * + * @returns `OK` if function is terminated. Otherwise, throws an error. + * @example + * ```typescript + * await client.functionKill(); + * ``` + */ + public async functionKill(): Promise<"OK"> { + return this.createWritePromise(createFunctionKill()); + } + /** * Deletes all the keys of all the existing databases. This command never fails. * diff --git a/node/src/GlideClusterClient.ts b/node/src/GlideClusterClient.ts index 4cf01a7073..479e6d87da 100644 --- a/node/src/GlideClusterClient.ts +++ b/node/src/GlideClusterClient.ts @@ -36,6 +36,7 @@ import { createFlushDB, createFunctionDelete, createFunctionFlush, + createFunctionKill, createFunctionList, createFunctionLoad, createFunctionStats, @@ -960,6 +961,28 @@ export class GlideClusterClient extends BaseClient { }); } + /** + * Kills a function that is currently executing. + * `FUNCTION KILL` terminates read-only functions only. + * + * See https://valkey.io/commands/function-kill/ for details. + * + * since Valkey version 7.0.0. + * + * @param route - (Optional) The client will route the command to the nodes defined by `route`. + * If not defined, the command will be routed to all primary nodes. + * @returns `OK` if function is terminated. Otherwise, throws an error. + * @example + * ```typescript + * await client.functionKill(); + * ``` + */ + public async functionKill(route?: Routes): Promise<"OK"> { + return this.createWritePromise(createFunctionKill(), { + route: toProtobufRoute(route), + }); + } + /** * Deletes all the keys of all the existing databases. This command never fails. * diff --git a/node/tests/GlideClient.test.ts b/node/tests/GlideClient.test.ts index 656481bd53..48d1035603 100644 --- a/node/tests/GlideClient.test.ts +++ b/node/tests/GlideClient.test.ts @@ -28,6 +28,7 @@ import { checkFunctionListResponse, checkFunctionStatsResponse, convertStringArrayToBuffer, + createLuaLibWithLongRunningFunction, encodableTransactionTest, encodedTransactionTest, flushAndCloseClient, @@ -37,6 +38,7 @@ import { parseEndpoints, transactionTest, validateTransactionResponse, + waitForNotBusy, } from "./TestUtilities"; /* eslint-disable @typescript-eslint/no-var-requires */ @@ -830,6 +832,155 @@ describe("GlideClient", () => { }, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "function kill RO func %p", + async (protocol) => { + if (cluster.checkIfServerVersionLessThan("7.0.0")) return; + + const config = getClientConfigurationOption( + cluster.getAddresses(), + protocol, + 10000, + ); + const client = await GlideClient.createClient(config); + const testClient = await GlideClient.createClient(config); + + try { + const libName = "function_kill_no_write"; + const funcName = "deadlock_no_write"; + const code = createLuaLibWithLongRunningFunction( + libName, + funcName, + 6, + true, + ); + expect(await client.functionFlush()).toEqual("OK"); + // nothing to kill + await expect(client.functionKill()).rejects.toThrow(/notbusy/i); + + // load the lib + expect(await client.functionLoad(code, true)).toEqual(libName); + + try { + // call the function without await + const promise = testClient + .fcall(funcName, [], []) + .catch((e) => + expect((e as Error).message).toContain( + "Script killed", + ), + ); + + let killed = false; + let timeout = 4000; + await new Promise((resolve) => setTimeout(resolve, 1000)); + + while (timeout >= 0) { + try { + expect(await client.functionKill()).toEqual("OK"); + killed = true; + break; + } catch { + // do nothing + } + + await new Promise((resolve) => + setTimeout(resolve, 500), + ); + timeout -= 500; + } + + expect(killed).toBeTruthy(); + await promise; + } finally { + await waitForNotBusy(client); + } + } finally { + expect(await client.functionFlush()).toEqual("OK"); + testClient.close(); + client.close(); + } + }, + ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "function kill RW func %p", + async (protocol) => { + if (cluster.checkIfServerVersionLessThan("7.0.0")) return; + + const config = getClientConfigurationOption( + cluster.getAddresses(), + protocol, + 10000, + ); + const client = await GlideClient.createClient(config); + const testClient = await GlideClient.createClient(config); + + try { + const libName = "function_kill_write"; + const key = libName; + const funcName = "deadlock_write"; + const code = createLuaLibWithLongRunningFunction( + libName, + funcName, + 6, + false, + ); + expect(await client.functionFlush()).toEqual("OK"); + // nothing to kill + await expect(client.functionKill()).rejects.toThrow(/notbusy/i); + + // load the lib + expect(await client.functionLoad(code, true)).toEqual(libName); + + let promise = null; + + try { + // call the function without await + promise = testClient.fcall(funcName, [key], []); + + let foundUnkillable = false; + let timeout = 4000; + await new Promise((resolve) => setTimeout(resolve, 1000)); + + while (timeout >= 0) { + try { + // valkey kills a function with 5 sec delay + // but this will always throw an error in the test + await client.functionKill(); + } catch (err) { + // looking for an error with "unkillable" in the message + // at that point we can break the loop + if ( + (err as Error).message + .toLowerCase() + .includes("unkillable") + ) { + foundUnkillable = true; + break; + } + } + + await new Promise((resolve) => + setTimeout(resolve, 500), + ); + timeout -= 500; + } + + expect(foundUnkillable).toBeTruthy(); + } finally { + // If function wasn't killed, and it didn't time out - it blocks the server and cause rest + // test to fail. Wait for the function to complete (we cannot kill it) + expect(await promise).toContain("Timed out"); + } + } finally { + expect(await client.functionFlush()).toEqual("OK"); + testClient.close(); + client.close(); + } + }, + ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( "sort sortstore sort_store sortro sort_ro sortreadonly test_%p", async (protocol) => { diff --git a/node/tests/GlideClusterClient.test.ts b/node/tests/GlideClusterClient.test.ts index fbb6a58d51..d76ff0310a 100644 --- a/node/tests/GlideClusterClient.test.ts +++ b/node/tests/GlideClusterClient.test.ts @@ -37,6 +37,7 @@ import { checkClusterResponse, checkFunctionListResponse, checkFunctionStatsResponse, + createLuaLibWithLongRunningFunction, flushAndCloseClient, generateLuaLibCode, getClientConfigurationOption, @@ -47,6 +48,7 @@ import { parseEndpoints, transactionTest, validateTransactionResponse, + waitForNotBusy, } from "./TestUtilities"; type Context = { client: GlideClusterClient; @@ -998,17 +1000,6 @@ describe("GlideClusterClient", () => { }, TIMEOUT, ); - }, - ); - }, - ); - - describe.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( - "Protocol is RESP2 = %s", - (protocol) => { - describe.each([true, false])( - "Single node route = %s", - (singleNodeRoute) => { it( "function flush", async () => { @@ -1095,17 +1086,6 @@ describe("GlideClusterClient", () => { }, TIMEOUT, ); - }, - ); - }, - ); - - describe.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( - "Protocol is RESP2 = %s", - (protocol) => { - describe.each([true, false])( - "Single node route = %s", - (singleNodeRoute) => { it( "function delete", async () => { @@ -1179,7 +1159,201 @@ describe("GlideClusterClient", () => { }, TIMEOUT, ); + it( + "function kill with route", + async () => { + if (cluster.checkIfServerVersionLessThan("7.0.0")) + return; + + const config = getClientConfigurationOption( + cluster.getAddresses(), + protocol, + 10000, + ); + const client = + await GlideClusterClient.createClient(config); + const testClient = + await GlideClusterClient.createClient(config); + + try { + const libName = + "function_kill_no_write_with_route_" + + singleNodeRoute; + const funcName = + "deadlock_with_route_" + singleNodeRoute; + const code = + createLuaLibWithLongRunningFunction( + libName, + funcName, + 6, + true, + ); + const route: Routes = singleNodeRoute + ? { type: "primarySlotKey", key: "1" } + : "allPrimaries"; + expect(await client.functionFlush()).toEqual( + "OK", + ); + + // nothing to kill + await expect( + client.functionKill(route), + ).rejects.toThrow(/notbusy/i); + + // load the lib + expect( + await client.functionLoad( + code, + true, + route, + ), + ).toEqual(libName); + + try { + // call the function without await + const promise = testClient + .fcallWithRoute(funcName, [], route) + .catch((e) => + expect( + (e as Error).message, + ).toContain("Script killed"), + ); + + let killed = false; + let timeout = 4000; + await new Promise((resolve) => + setTimeout(resolve, 1000), + ); + + while (timeout >= 0) { + try { + expect( + await client.functionKill( + route, + ), + ).toEqual("OK"); + killed = true; + break; + } catch { + // do nothing + } + + await new Promise((resolve) => + setTimeout(resolve, 500), + ); + timeout -= 500; + } + + expect(killed).toBeTruthy(); + await promise; + } finally { + await waitForNotBusy(client); + } + } finally { + expect(await client.functionFlush()).toEqual( + "OK", + ); + client.close(); + testClient.close(); + } + }, + TIMEOUT, + ); + }, + ); + it( + "function kill key based write function", + async () => { + if (cluster.checkIfServerVersionLessThan("7.0.0")) return; + + const config = getClientConfigurationOption( + cluster.getAddresses(), + protocol, + 10000, + ); + const client = + await GlideClusterClient.createClient(config); + const testClient = + await GlideClusterClient.createClient(config); + + try { + const libName = + "function_kill_key_based_write_function"; + const funcName = + "deadlock_write_function_with_key_based_route"; + const key = libName; + const code = createLuaLibWithLongRunningFunction( + libName, + funcName, + 6, + false, + ); + + const route: Routes = { + type: "primarySlotKey", + key: key, + }; + expect(await client.functionFlush()).toEqual("OK"); + + // nothing to kill + await expect( + client.functionKill(route), + ).rejects.toThrow(/notbusy/i); + + // load the lib + expect( + await client.functionLoad(code, true, route), + ).toEqual(libName); + + let promise = null; + + try { + // call the function without await + promise = testClient.fcall(funcName, [key], []); + + let foundUnkillable = false; + let timeout = 4000; + await new Promise((resolve) => + setTimeout(resolve, 1000), + ); + + while (timeout >= 0) { + try { + // valkey kills a function with 5 sec delay + // but this will always throw an error in the test + await client.functionKill(route); + } catch (err) { + // looking for an error with "unkillable" in the message + // at that point we can break the loop + if ( + (err as Error).message + .toLowerCase() + .includes("unkillable") + ) { + foundUnkillable = true; + break; + } + } + + await new Promise((resolve) => + setTimeout(resolve, 500), + ); + timeout -= 500; + } + + expect(foundUnkillable).toBeTruthy(); + } finally { + // If function wasn't killed, and it didn't time out - it blocks the server and cause rest + // test to fail. Wait for the function to complete (we cannot kill it) + expect(await promise).toContain("Timed out"); + } + } finally { + expect(await client.functionFlush()).toEqual("OK"); + client.close(); + testClient.close(); + } }, + TIMEOUT, ); }, ); diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index dba0f69f17..f1a8ade345 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -254,6 +254,57 @@ export function generateLuaLibCode( return code; } +/** + * Create a lua lib with a function which runs an endless loop up to timeout sec. + * Execution takes at least 5 sec regardless of the timeout configured. + */ +export function createLuaLibWithLongRunningFunction( + libName: string, + funcName: string, + timeout: number, + readOnly: boolean, +): string { + const code = + "#!lua name=$libName\n" + + "local function $libName_$funcName(keys, args)\n" + + " local started = tonumber(redis.pcall('time')[1])\n" + + // fun fact - redis does no write if 'no-writes' flag is set + " redis.pcall('set', keys[1], 42)\n" + + " while (true) do\n" + + " local now = tonumber(redis.pcall('time')[1])\n" + + " if now > started + $timeout then\n" + + " return 'Timed out $timeout sec'\n" + + " end\n" + + " end\n" + + " return 'OK'\n" + + "end\n" + + "redis.register_function{\n" + + "function_name='$funcName',\n" + + "callback=$libName_$funcName,\n" + + (readOnly ? "flags={ 'no-writes' }\n" : "") + + "}"; + return code + .replaceAll("$timeout", timeout.toString()) + .replaceAll("$funcName", funcName) + .replaceAll("$libName", libName); +} + +export async function waitForNotBusy(client: GlideClusterClient | GlideClient) { + // If function wasn't killed, and it didn't time out - it blocks the server and cause rest test to fail. + let isBusy = true; + + do { + try { + await client.functionKill(); + } catch (err) { + // should throw `notbusy` error, because the function should be killed before + if ((err as Error).message.toLowerCase().includes("notbusy")) { + isBusy = false; + } + } + } while (isBusy); +} + /** * Parses the command-line arguments passed to the Node.js process. *