Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DX-1614: Parallelism and Calls per second parameters #223

Merged
merged 12 commits into from
Feb 7, 2025
115 changes: 115 additions & 0 deletions src/client/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { afterAll, beforeAll, describe, expect, test } from "bun:test";
import { nanoid } from "nanoid";
import { Client } from "./client";
import type { PublishToUrlResponse } from "../../dist";
import { MOCK_QSTASH_SERVER_URL, mockQStashServer } from "./workflow/test-utils";

export const clearQueues = async (client: Client) => {
const queueDetails = await client.queue().list();
Expand Down Expand Up @@ -415,3 +416,117 @@ describe("E2E Queue", () => {
{ timeout: 35_000 }
);
});

describe("flow control", () => {
const token = nanoid();
const client = new Client({
baseUrl: MOCK_QSTASH_SERVER_URL,
token,
});

const flowControlKey = nanoid();

test("should throw if key is passed but no ratePerSec or parallelism", () => {
// eslint-disable-next-line unicorn/consistent-function-scoping
const throws = async () => {
await client.publishJSON({
url: "https://example.com/",
// @ts-expect-error missing ratePerSecond or parallelism for test purposes
flowControl: {
key: flowControlKey,
},
});
};
expect(throws).toThrow("Provide at least one of parallelism or ratePerSecond for flowControl");
});

test("should publish a message with flow control", async () => {
await mockQStashServer({
execute: async () => {
await client.publishJSON({
urlGroup: "my-group",
flowControl: {
key: flowControlKey,
parallelism: 3,
ratePerSecond: 5,
},
});
},
responseFields: {
body: { messageId: "msgId" },
status: 200,
},
receivesRequest: {
method: "POST",
token,
url: "http://localhost:8080/v2/publish/my-group",
body: undefined,
headers: {
"Upstash-Flow-Control-Key": flowControlKey,
"Upstash-Flow-Control-Value": "parallelism=3, rate=5",
},
},
});
});

test("should batch messages with flow control", async () => {
const flowControlKeyOne = nanoid();
const flowControlKeyTwo = nanoid();
await mockQStashServer({
execute: async () => {
await client.batch([
{
url: "https://example.com/one",
flowControl: {
key: flowControlKeyOne,
ratePerSecond: 10,
},
body: "some-body",
},
{
url: "https://example.com/two",
flowControl: {
key: flowControlKeyTwo,
parallelism: 5,
},
method: "GET",
},
]);
},
responseFields: {
body: { messageId: "msgId" },
status: 200,
},
receivesRequest: {
method: "POST",
token,
url: "http://localhost:8080/v2/batch",
body: [
{
body: "some-body",
destination: "https://example.com/one",
headers: {
"upstash-flow-control-key": flowControlKeyOne,
"upstash-flow-control-value": "rate=10",
"upstash-method": "POST",
},
},
{
destination: "https://example.com/two",
headers: {
"upstash-flow-control-key": flowControlKeyTwo,
"upstash-flow-control-value": "parallelism=5",
"upstash-method": "GET",
},
},
],
headers: {
// eslint-disable-next-line unicorn/no-null
"Upstash-Flow-Control-Key": null,
// eslint-disable-next-line unicorn/no-null
"Upstash-Flow-Control-Value": null,
},
},
});
});
});
16 changes: 15 additions & 1 deletion src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@ import { Chat } from "./llm/chat";
import { Messages } from "./messages";
import { Queue } from "./queue";
import { Schedules } from "./schedules";
import type { BodyInit, Event, GetEventsPayload, HeadersInit, HTTPMethods, State } from "./types";
import type {
BodyInit,
Event,
FlowControl,
GetEventsPayload,
HeadersInit,
HTTPMethods,
State,
} from "./types";
import { UrlGroups } from "./url-groups";
import { getRequestPath, prefixHeaders, processHeaders, wrapWithGlobalHeaders } from "./utils";
import { Workflow } from "./workflow";
Expand Down Expand Up @@ -156,6 +164,12 @@ export type PublishRequest<TBody = BodyInit> = {
* @default undefined
*/
timeout?: Duration | number;

/**
* Settings for controlling the number of active requests
* and number of requests per second with the same key.
*/
flowControl?: FlowControl;
} & (
| {
/**
Expand Down
35 changes: 35 additions & 0 deletions src/client/dlq.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,39 @@ describe("DLQ", () => {
},
{ timeout: 20_000 }
);

test(
"should get flow control",
async () => {
const parallelism = 10;
const ratePerSecond = 5;
const { messageId } = await client.publish({
url: "https://httpstat.us/400",
body: "hello",
retries: 0,
flowControl: {
key: "flow-key",
parallelism,
ratePerSecond,
},
});

await sleep(5000);

const result = await client.dlq.listMessages({
filter: {
messageId,
},
});
expect(result.messages.length).toBe(1);
const message = result.messages[0];

expect(message.flowControlKey).toBe("flow-key");
expect(message.parallelism).toBe(parallelism);
expect(message.ratePerSecond).toBe(ratePerSecond);
},
{
timeout: 10_000,
}
);
});
1 change: 1 addition & 0 deletions src/client/dlq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export class DLQ {
return {
...message,
urlGroup: message.topicName,
ratePerSecond: "rate" in message ? (message.rate as number) : undefined,
};
}),
cursor: messagesPayload.cursor,
Expand Down
6 changes: 3 additions & 3 deletions src/client/llm/chat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ describe("Test QStash chat", () => {
{ timeout: 30_000, retry: 3 }
);

test("should publish with llm api", async () => {
test.skip("should publish with llm api", async () => {
const result = await client.publishJSON({
api: { name: "llm", provider: upstash() },
body: {
Expand Down Expand Up @@ -290,7 +290,7 @@ describe("Test QStash chat with third party LLMs", () => {
{ timeout: 30_000, retry: 3 }
);

test("should publish with llm api", async () => {
test.skip("should publish with llm api", async () => {
const result = await client.publishJSON({
api: {
name: "llm",
Expand Down Expand Up @@ -321,7 +321,7 @@ describe("Test QStash chat with third party LLMs", () => {
expect(deliveredEvent).not.toBeUndefined();
});

test("should publish with llm api", () => {
test("should not be able to without callback", () => {
//@ts-expect-error We intentionally omit the callback to ensure the function fails as expected
const resultPromise = client.publishJSON({
api: {
Expand Down
19 changes: 19 additions & 0 deletions src/client/messages.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,23 @@ describe("Messages", () => {
},
{ timeout: 20_000 }
);

test("should create message with flow control", async () => {
const parallelism = 10;
const ratePerSecond = 5;
const { messageId } = await client.publish({
url: "https://httpstat.us/200?sleep=30000",
body: "hello",
flowControl: {
key: "flow-key",
parallelism,
ratePerSecond,
},
});

const message = await client.messages.get(messageId);
expect(message.flowControlKey).toBe("flow-key");
expect(message.parallelism).toBe(parallelism);
expect(message.ratePerSecond).toBe(ratePerSecond);
});
});
14 changes: 14 additions & 0 deletions src/client/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ export type Message = {
* IP address of the publisher of this message
*/
callerIp?: string;

/**
* flow control key
*/
flowControlKey: string;
/**
* number of requests which can be active with the same flow control key
*/
parallelism?: number;
/**
* number of requests to activate per second with the same flow control key
*/
ratePerSecond?: number;
};

export type MessagePayload = Omit<Message, "urlGroup"> & { topicName: string };
Expand All @@ -115,6 +128,7 @@ export class Messages {
const message: Message = {
...messagePayload,
urlGroup: messagePayload.topicName,
ratePerSecond: "rate" in messagePayload ? (messagePayload.rate as number) : undefined,
};
return message;
}
Expand Down
22 changes: 22 additions & 0 deletions src/client/schedules.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,26 @@ describe("Schedules", () => {
},
});
});

test("should create schedule with flow control", async () => {
const parallelism = 10;
const ratePerSecond = 5;
const scheduleId = nanoid();
await client.schedules.create({
destination: "https://www.initial.com",
cron: "*/5 * * * *",
scheduleId,
body: "my-payload",
flowControl: {
key: "flow-key",
parallelism,
ratePerSecond,
},
});

const schedule = await client.schedules.get(scheduleId);
expect(schedule.flowControlKey).toBe("flow-key");
expect(schedule.parallelism).toBe(parallelism);
expect(schedule.ratePerSecond).toBe(ratePerSecond);
});
});
Loading
Loading