diff --git a/packages/miniflare/src/workers/queues/broker.worker.ts b/packages/miniflare/src/workers/queues/broker.worker.ts index 75892d252b3c..f5d8368379ee 100644 --- a/packages/miniflare/src/workers/queues/broker.worker.ts +++ b/packages/miniflare/src/workers/queues/broker.worker.ts @@ -357,7 +357,7 @@ export class QueueBrokerObject extends MiniflareDurableObject { - validateMessageSize(req.headers); - const contentType = validateContentType(req.headers); - const delay = validateMessageDelay(req.headers); - const body = Buffer.from(await req.arrayBuffer()); - // If we don't have a consumer, drop the message const consumer = this.#maybeConsumer; if (consumer === undefined) return new Response(); + validateMessageSize(req.headers); + const contentType = validateContentType(req.headers); + const delay = + validateMessageDelay(req.headers) ?? this.#maybeProducer?.deliveryDelay; + const body = Buffer.from(await req.arrayBuffer()); + this.#enqueue( [{ contentType, delaySecs: delay, body }], this.#maybeProducer?.deliveryDelay @@ -395,6 +396,10 @@ export class QueueBrokerObject extends MiniflareDurableObject { + // If we don't have a consumer, drop the message + const consumer = this.#maybeConsumer; + if (consumer === undefined) return new Response(); + // NOTE: this endpoint is also used when moving messages to the dead-letter // queue. In this case, size headers won't be added and this validation is // a no-op. This allows us to enqueue a maximum size batch with additional @@ -404,10 +409,6 @@ export class QueueBrokerObject extends MiniflareDurableObject { const mf = new Miniflare({ verbose: true, queueProducers: ["QUEUE"], + queueConsumers: { + QUEUE: { + maxBatchSize: 100, + maxBatchTimeout: 0, + }, + }, modules: true, script: `export default { async fetch(request, env, ctx) {