Skip to content

Commit

Permalink
improve(queues): Return early from dev route handlers when no consume…
Browse files Browse the repository at this point in the history
…r is attached
  • Loading branch information
sdnts committed Sep 14, 2024
1 parent aa9c888 commit 7aca8f7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
21 changes: 11 additions & 10 deletions packages/miniflare/src/workers/queues/broker.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
this.#pendingFlush = { immediate: delay === 0, timeout };
}

#enqueue(messages: QueueIncomingMessage[], globalDelay: number = 0) {
#enqueue(messages: QueueIncomingMessage[], globalDelay = 0) {
for (const message of messages) {
const randomness = crypto.getRandomValues(new Uint8Array(16));
const id = message.id ?? Buffer.from(randomness).toString("hex");
Expand All @@ -377,15 +377,16 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE

@POST("/message")
message: RouteHandler = async (req) => {
validateMessageSize(req.headers);
const contentType = validateContentType(req.headers);
const delay = validateMessageDelay(req.headers);
const body = Buffer.from(await req.arrayBuffer());

// If we don't have a consumer, drop the message
const consumer = this.#maybeConsumer;
if (consumer === undefined) return new Response();

validateMessageSize(req.headers);
const contentType = validateContentType(req.headers);
const delay =
validateMessageDelay(req.headers) ?? this.#maybeProducer?.deliveryDelay;
const body = Buffer.from(await req.arrayBuffer());

this.#enqueue(
[{ contentType, delaySecs: delay, body }],
this.#maybeProducer?.deliveryDelay
Expand All @@ -395,6 +396,10 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE

@POST("/batch")
batch: RouteHandler = async (req) => {
// If we don't have a consumer, drop the message
const consumer = this.#maybeConsumer;
if (consumer === undefined) return new Response();

// NOTE: this endpoint is also used when moving messages to the dead-letter
// queue. In this case, size headers won't be added and this validation is
// a no-op. This allows us to enqueue a maximum size batch with additional
Expand All @@ -404,10 +409,6 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
validateMessageDelay(req.headers) ?? this.#maybeProducer?.deliveryDelay;
const body = QueuesBatchRequestSchema.parse(await req.json());

// If we don't have a consumer, drop the message
const consumer = this.#maybeConsumer;
if (consumer === undefined) return new Response();

this.#enqueue(body.messages, delay);
return new Response();
};
Expand Down
6 changes: 6 additions & 0 deletions packages/miniflare/test/plugins/queues/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,12 @@ test("validates message size", async (t) => {
const mf = new Miniflare({
verbose: true,
queueProducers: ["QUEUE"],
queueConsumers: {
QUEUE: {
maxBatchSize: 100,
maxBatchTimeout: 0,
},
},
modules: true,
script: `export default {
async fetch(request, env, ctx) {
Expand Down

0 comments on commit 7aca8f7

Please sign in to comment.