Skip to content

Commit

Permalink
fix(queues): Respect delivery delays during local dev (cloudflare#6719)
Browse files Browse the repository at this point in the history
* fix(queues) Respect delivery delays in local dev

* test(queues): reorganize delay tests

* improve(queues): Return early from dev route handlers when no consumer is attached
  • Loading branch information
sdnts authored Sep 16, 2024
1 parent c135de4 commit 5b5dd95
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 159 deletions.
5 changes: 5 additions & 0 deletions .changeset/pink-meals-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"miniflare": patch
---

fix: Respect delivery delays for Queue consumers in local dev mode
25 changes: 14 additions & 11 deletions packages/miniflare/src/workers/queues/broker.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
}

get #maybeProducer() {
return this.#producers[this.name];
return Object.values(this.#producers).find(
(p) => p?.queueName === this.name
);
}

get #maybeConsumer() {
Expand Down Expand Up @@ -355,7 +357,7 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
this.#pendingFlush = { immediate: delay === 0, timeout };
}

#enqueue(messages: QueueIncomingMessage[], globalDelay: number = 0) {
#enqueue(messages: QueueIncomingMessage[], globalDelay = 0) {
for (const message of messages) {
const randomness = crypto.getRandomValues(new Uint8Array(16));
const id = message.id ?? Buffer.from(randomness).toString("hex");
Expand All @@ -375,15 +377,16 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE

@POST("/message")
message: RouteHandler = async (req) => {
validateMessageSize(req.headers);
const contentType = validateContentType(req.headers);
const delay = validateMessageDelay(req.headers);
const body = Buffer.from(await req.arrayBuffer());

// If we don't have a consumer, drop the message
const consumer = this.#maybeConsumer;
if (consumer === undefined) return new Response();

validateMessageSize(req.headers);
const contentType = validateContentType(req.headers);
const delay =
validateMessageDelay(req.headers) ?? this.#maybeProducer?.deliveryDelay;
const body = Buffer.from(await req.arrayBuffer());

this.#enqueue(
[{ contentType, delaySecs: delay, body }],
this.#maybeProducer?.deliveryDelay
Expand All @@ -393,6 +396,10 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE

@POST("/batch")
batch: RouteHandler = async (req) => {
// If we don't have a consumer, drop the message
const consumer = this.#maybeConsumer;
if (consumer === undefined) return new Response();

// NOTE: this endpoint is also used when moving messages to the dead-letter
// queue. In this case, size headers won't be added and this validation is
// a no-op. This allows us to enqueue a maximum size batch with additional
Expand All @@ -402,10 +409,6 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
validateMessageDelay(req.headers) ?? this.#maybeProducer?.deliveryDelay;
const body = QueuesBatchRequestSchema.parse(await req.json());

// If we don't have a consumer, drop the message
const consumer = this.#maybeConsumer;
if (consumer === undefined) return new Response();

this.#enqueue(body.messages, delay);
return new Response();
};
Expand Down
186 changes: 186 additions & 0 deletions packages/miniflare/test/plugins/queues/delay.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import test from "ava";
import { Miniflare, QUEUES_PLUGIN_NAME, Response } from "miniflare";
import { z } from "zod";
import { MiniflareDurableObjectControlStub, TestLog } from "../../test-shared";

const StringArraySchema = z.string().array();

async function getControlStub(
mf: Miniflare,
queueName: string
): Promise<MiniflareDurableObjectControlStub> {
const objectNamespace = await mf._getInternalDurableObjectNamespace(
QUEUES_PLUGIN_NAME,
"queues:queue",
"QueueBrokerObject"
);
const objectId = objectNamespace.idFromName(queueName);
const objectStub = objectNamespace.get(objectId);
const stub = new MiniflareDurableObjectControlStub(objectStub);
await stub.enableFakeTimers(1_000_000);
return stub;
}

let batches: string[][] = [];
let mf: Miniflare;
let object: MiniflareDurableObjectControlStub;

test.beforeEach(async (t) => {
batches = [];

mf = new Miniflare({
log: new TestLog(t),
verbose: true,
queueProducers: { QUEUE: { queueName: "QUEUE", deliveryDelay: 2 } },
queueConsumers: {
QUEUE: {
maxBatchSize: 100,
maxBatchTimeout: 0,
},
},
serviceBindings: {
async REPORTER(request) {
const batch = StringArraySchema.parse(await request.json());
if (batch.length > 0) {
batches.push(batch);
}
return new Response();
},
},
modules: true,
script: `export default {
async fetch(request, env, ctx) {
const delay = request.headers.get("X-Msg-Delay-Secs");
const url = new URL(request.url);
const body = await request.json();
if (url.pathname === "/send") {
if (delay === null) {
await env.QUEUE.send(body);
} else {
await env.QUEUE.send(body, { delaySeconds: Number(delay) });
}
} else if (url.pathname === "/batch") {
if (delay === null) {
await env.QUEUE.sendBatch(body);
} else {
await env.QUEUE.sendBatch(body, { delaySeconds: Number(delay) });
}
}
return new Response(null, { status: 204 });
},
async queue(batch, env, ctx) {
delete Date.prototype.toJSON; // JSON.stringify calls .toJSON before the replacer
await env.REPORTER.fetch("http://localhost", {
method: "POST",
body: JSON.stringify(batch.messages.map(({ id }) => id)),
});
},
};`,
});

object = await getControlStub(mf, "QUEUE");
});

test.afterEach.always(() => mf.dispose());

test.serial(".send() respects default delay", async (t) => {
await mf.dispatchFetch("http://localhost/send", {
method: "POST",
body: JSON.stringify("default"),
});

// Nothing should happen one second later
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 0);

// A batch should be delivered 2 seconds later
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 1);
});

test.serial(".send() respects per-message delay", async (t) => {
// Send 10 messages.
for (let i = 1; i <= 10; i++) {
await mf.dispatchFetch("http://localhost/send", {
method: "POST",
headers: { "X-Msg-Delay-Secs": i.toString() },
body: JSON.stringify(i),
});
}

// Verify messages are received at the right times.
t.is(batches.length, 0);

for (let i = 1; i <= 10; i++) {
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, i);
}
});

test.serial(".sendBatch() respects default delay", async (t) => {
await mf.dispatchFetch("http://localhost/batch", {
method: "POST",
body: JSON.stringify([{ body: "msg1" }, { body: "msg2" }]),
});

// Nothing should happen one second later
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 0);

// Batch should be delivered 2 seconds later
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 2);
});

test.serial(".sendBatch() respects per-batch delay", async (t) => {
await mf.dispatchFetch("http://localhost/batch", {
method: "POST",
headers: { "X-Msg-Delay-Secs": "3" },
body: JSON.stringify([{ body: 1 }, { body: 2 }]),
});

// Verify messages are received at the right times.
t.is(batches.length, 0);

await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 0);

await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 0);

await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 2);
});

test.serial(".sendBatch() respects per-message delay", async (t) => {
await mf.dispatchFetch("http://localhost/batch", {
method: "POST",
headers: { "X-Msg-Delay-Secs": "1" },
body: JSON.stringify([
{ body: 10, delaySeconds: 2 },
{ body: 11, delaySeconds: 3 },
{ body: 12, delaySeconds: 4 },
]),
});

// Verify messages are received at the right times.
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 0);

for (let i = 1; i <= 3; i++) {
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, i);
}
});
Loading

0 comments on commit 5b5dd95

Please sign in to comment.