Skip to content

Commit

Permalink
fix(queues): Respect consumer max_retries and retry_delay properly (
Browse files Browse the repository at this point in the history
#6943)

* fix(miniflare): prevent eviction for queue DOs

* chore: fix typo for `maxRetries` in queues configuration
  • Loading branch information
sdnts authored Oct 15, 2024
1 parent 5761020 commit 7859a04
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .changeset/light-wasps-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"miniflare": patch
---

fix: local queues now respect consumer max delays and retry delays properly
6 changes: 5 additions & 1 deletion packages/miniflare/src/plugins/queues/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ export const QUEUES_PLUGIN: Plugin<typeof QueuesOptionsSchema> = {
{ name: "broker.worker.js", esModule: SCRIPT_QUEUE_BROKER_OBJECT() },
],
durableObjectNamespaces: [
{ className: QUEUE_BROKER_OBJECT_CLASS_NAME, uniqueKey },
{
className: QUEUE_BROKER_OBJECT_CLASS_NAME,
uniqueKey,
preventEviction: true,
},
],
// Miniflare's Queue broker is in-memory only at the moment
durableObjectStorage: { inMemory: kVoid },
Expand Down
2 changes: 1 addition & 1 deletion packages/miniflare/src/workers/queues/broker.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
assert(consumer !== undefined);

const batchSize = consumer.maxBatchSize ?? DEFAULT_BATCH_SIZE;
const maxAttempts = (consumer.maxRetires ?? DEFAULT_RETRIES) + 1;
const maxAttempts = (consumer.maxRetries ?? DEFAULT_RETRIES) + 1;
const maxAttemptsS = maxAttempts === 1 ? "" : "s";

// Extract and dispatch a batch
Expand Down
27 changes: 18 additions & 9 deletions packages/miniflare/src/workers/queues/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,24 @@ export type QueueProducer = z.infer<typeof QueueProducerSchema>;
export const QueueProducersSchema =
/* @__PURE__ */ z.record(QueueProducerSchema);

export const QueueConsumerOptionsSchema = /* @__PURE__ */ z.object({
// https://developers.cloudflare.com/queues/platform/configuration/#consumer
// https://developers.cloudflare.com/queues/platform/limits/
maxBatchSize: z.number().min(0).max(100).optional(),
maxBatchTimeout: z.number().min(0).max(30).optional(), // seconds
maxRetires: z.number().min(0).max(100).optional(),
deadLetterQueue: z.ostring(),
retryDelay: QueueMessageDelaySchema,
});
export const QueueConsumerOptionsSchema = /* @__PURE__ */ z
.object({
// https://developers.cloudflare.com/queues/platform/configuration/#consumer
// https://developers.cloudflare.com/queues/platform/limits/
maxBatchSize: z.number().min(0).max(100).optional(),
maxBatchTimeout: z.number().min(0).max(30).optional(), // seconds
maxRetires: z.number().min(0).max(100).optional(), // deprecated
maxRetries: z.number().min(0).max(100).optional(),
deadLetterQueue: z.ostring(),
retryDelay: QueueMessageDelaySchema,
})
.transform((queue) => {
if (queue.maxRetires !== undefined) {
queue.maxRetries = queue.maxRetires;
}

return queue as Omit<typeof queue, "maxRetires">;
});
export const QueueConsumerSchema = /* @__PURE__ */ z.intersection(
QueueConsumerOptionsSchema,
z.object({ workerName: z.string() })
Expand Down
8 changes: 4 additions & 4 deletions packages/miniflare/test/plugins/queues/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ test("sends all structured cloneable types", async (t) => {

queueProducers: ["QUEUE"],
queueConsumers: {
QUEUE: { maxBatchSize: 100, maxBatchTimeout: 0, maxRetires: 0 },
QUEUE: { maxBatchSize: 100, maxBatchTimeout: 0, maxRetries: 0 },
},
serviceBindings: {
async REPORTER(request) {
Expand Down Expand Up @@ -433,7 +433,7 @@ test("retries messages", async (t) => {
log,
queueProducers: { QUEUE: { queueName: "queue" } },
queueConsumers: {
queue: { maxBatchSize: 5, maxBatchTimeout: 1, maxRetires: 2 },
queue: { maxBatchSize: 5, maxBatchTimeout: 1, maxRetries: 2 },
},
serviceBindings: {
async RETRY_FILTER(request) {
Expand Down Expand Up @@ -685,13 +685,13 @@ test("moves to dead letter queue", async (t) => {
bad: {
maxBatchSize: 5,
maxBatchTimeout: 1,
maxRetires: 0,
maxRetries: 0,
deadLetterQueue: "dlq",
},
dlq: {
maxBatchSize: 5,
maxBatchTimeout: 1,
maxRetires: 0,
maxRetries: 0,
deadLetterQueue: "bad", // (cyclic)
},
},
Expand Down
118 changes: 118 additions & 0 deletions packages/miniflare/test/plugins/queues/retry.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import test from "ava";
import { Miniflare, QUEUES_PLUGIN_NAME, Response } from "miniflare";
import { z } from "zod";
import { MiniflareDurableObjectControlStub, TestLog } from "../../test-shared";

const StringArraySchema = z.string().array();

async function getControlStub(
mf: Miniflare,
queueName: string
): Promise<MiniflareDurableObjectControlStub> {
const objectNamespace = await mf._getInternalDurableObjectNamespace(
QUEUES_PLUGIN_NAME,
"queues:queue",
"QueueBrokerObject"
);
const objectId = objectNamespace.idFromName(queueName);
const objectStub = objectNamespace.get(objectId);
const stub = new MiniflareDurableObjectControlStub(objectStub);
await stub.enableFakeTimers(1_000_000);
return stub;
}

let batches: string[][] = [];
let mf: Miniflare;
let object: MiniflareDurableObjectControlStub;

test.beforeEach(async (t) => {
batches = [];

mf = new Miniflare({
log: new TestLog(t),
verbose: true,
queueProducers: { QUEUE: { queueName: "QUEUE" } },
queueConsumers: {
QUEUE: { retryDelay: 5, maxRetries: 2, maxBatchTimeout: 0 },
},
serviceBindings: {
async REPORTER(request) {
const batch = StringArraySchema.parse(await request.json());
if (batch.length > 0) {
batches.push(batch);
}
return new Response();
},
},
modules: true,
script: `export default {
async fetch(request, env, ctx) {
await env.QUEUE.send(await request.text());
return new Response(null, { status: 204 });
},
async queue(batch, env, ctx) {
await env.REPORTER.fetch("http://localhost", {
method: "POST",
body: JSON.stringify(batch.messages.map(({ id }) => id)),
});
batch.retryAll()
},
};`,
});

object = await getControlStub(mf, "QUEUE");
});

test.afterEach.always(() => mf.dispose());

test.serial("respects retry delays", async (t) => {
await mf.dispatchFetch("http://localhost/send", {
method: "POST",
body: "some-message",
});

// Message should be delivered once
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 1);

// Message should not be re-delivered one second later
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();
t.is(batches.length, 1);

// Message should be re-delivered once 5 seconds later
await object.advanceFakeTime(5000);
await object.waitForFakeTasks();
t.is(batches.length, 2);

// Message should be re-delivered once 5 seconds later
await object.advanceFakeTime(5000);
await object.waitForFakeTasks();
t.is(batches.length, 3);
});

test.serial("respects max retries", async (t) => {
await mf.dispatchFetch("http://localhost/send", {
method: "POST",
body: "some-message",
});

// Message should be delivered once
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();

// Message should not be re-delivered one second later
await object.advanceFakeTime(1000);
await object.waitForFakeTasks();

// Message should be re-delivered once 5 seconds later
await object.advanceFakeTime(5000);
await object.waitForFakeTasks();

// Message should not be delivered again 5 seconds later (max retries is 2)
await object.advanceFakeTime(5000);
await object.waitForFakeTasks();
t.is(batches.length, 3);
});
2 changes: 1 addition & 1 deletion packages/wrangler/src/dev/miniflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ function queueConsumerEntry(consumer: QueueConsumer) {
const options = {
maxBatchSize: consumer.max_batch_size,
maxBatchTimeout: consumer.max_batch_timeout,
maxRetires: consumer.max_retries,
maxRetries: consumer.max_retries,
deadLetterQueue: consumer.dead_letter_queue,
retryDelay: consumer.retry_delay,
};
Expand Down

0 comments on commit 7859a04

Please sign in to comment.