diff --git a/packages/miniflare/README.md b/packages/miniflare/README.md index 95cb07c1c..02c890d59 100644 --- a/packages/miniflare/README.md +++ b/packages/miniflare/README.md @@ -408,8 +408,6 @@ parameter in module format Workers. #### Queues -> :warning: Queues are only supported with Node.js 18 or above. - - `queueProducers?: Record | string[]` Record mapping binding name to queue names to inject as `WorkerQueue` bindings diff --git a/packages/miniflare/src/index.ts b/packages/miniflare/src/index.ts index ebd25560e..e4d3c520d 100644 --- a/packages/miniflare/src/index.ts +++ b/packages/miniflare/src/index.ts @@ -232,20 +232,14 @@ function getQueueConsumers( } } - // Populate all `deadLetterConsumer`s, note this may create cycles for (const [queueName, consumer] of queueConsumers) { - if (consumer.deadLetterQueue !== undefined) { - // Check the dead letter queue isn't configured to be the queue itself - // (NOTE: Queues *does* permit DLQ cycles between multiple queues, - // i.e. if Q2 is DLQ for Q1, but Q1 is DLQ for Q2) - if (consumer.deadLetterQueue === queueName) { - throw new QueuesError( - "ERR_DEAD_LETTER_QUEUE_CYCLE", - `Dead letter queue for queue "${queueName}" cannot be itself` - ); - } - consumer.deadLetterConsumer = queueConsumers.get( - consumer.deadLetterQueue + // Check the dead letter queue isn't configured to be the queue itself + // (NOTE: Queues *does* permit DLQ cycles between multiple queues, + // i.e. if Q2 is DLQ for Q1, but Q1 is DLQ for Q2) + if (consumer.deadLetterQueue === queueName) { + throw new QueuesError( + "ERR_DEAD_LETTER_QUEUE_CYCLE", + `Dead letter queue for queue "${queueName}" cannot be itself` ); } } diff --git a/packages/miniflare/src/plugins/queues/errors.ts b/packages/miniflare/src/plugins/queues/errors.ts index 1bc0be120..43614a79a 100644 --- a/packages/miniflare/src/plugins/queues/errors.ts +++ b/packages/miniflare/src/plugins/queues/errors.ts @@ -1,9 +1,6 @@ -import { HttpError, MiniflareError } from "../../shared"; +import { MiniflareError } from "../../shared"; export type QueuesErrorCode = - | "ERR_V8_UNSUPPORTED" // V8 version too old | "ERR_MULTIPLE_CONSUMERS" // Attempted to set multiple consumers for a single queue; | "ERR_DEAD_LETTER_QUEUE_CYCLE"; // Attempted to set dead letter queue to self export class QueuesError extends MiniflareError {} - -export class QueuesHTTPError extends HttpError {} diff --git a/packages/miniflare/src/plugins/queues/gateway.ts b/packages/miniflare/src/plugins/queues/gateway.ts deleted file mode 100644 index e425d875c..000000000 --- a/packages/miniflare/src/plugins/queues/gateway.ts +++ /dev/null @@ -1,259 +0,0 @@ -import assert from "assert"; -import crypto from "crypto"; -import v8 from "v8"; -import { stringify } from "devalue"; -import { Colorize, bold, green, grey, red, reset, yellow } from "kleur/colors"; -import { z } from "zod"; -import { Base64DataSchema, Log, Timers, viewToBuffer } from "../../shared"; -import { Storage } from "../../storage"; -import { CoreHeaders, structuredSerializableReducers } from "../../workers"; -import { DispatchFetch, QueueConsumer } from "../shared"; - -const DEFAULT_BATCH_SIZE = 5; -const DEFAULT_BATCH_TIMEOUT = 1; // second -const DEFAULT_RETRIES = 2; - -// https://github.com/cloudflare/workerd/blob/01b87642f4eac932aa9074d7e5eec4fd3c90968a/src/workerd/io/outcome.capnp -const OutcomeSchema = z.enum([ - "unknown", - "ok", - "exception", - "exceededCpu", - "killSwitch", - "daemonDown", - "scriptNotFound", - "cancelled", - "exceededMemory", -]); - -const QueueResponseSchema = z.object({ - outcome: OutcomeSchema, - retryAll: z.boolean(), - ackAll: z.boolean(), - explicitRetries: z.string().array(), - explicitAcks: z.string().array(), - time: z.number().optional(), // (added by Miniflare) -}); -type QueueResponse = z.infer; -const exceptionQueueResponse: QueueResponse = { - outcome: "exception", - retryAll: false, - ackAll: false, - explicitRetries: [], - explicitAcks: [], -}; - -export const QueueContentTypeSchema = z - .enum(["text", "json", "bytes", "v8"]) - .default("v8"); -export const GatewayMessageSchema = z.object({ - body: Base64DataSchema, - contentType: QueueContentTypeSchema, -}); -export type GatewayMessage = z.infer; - -export class Message { - #failedAttempts = 0; - - constructor( - readonly id: string, - readonly timestamp: number, - readonly body: unknown - ) {} - - incrementFailedAttempts(): number { - return ++this.#failedAttempts; - } -} - -function formatQueueResponse( - queueName: string, - acked: number, - total: number, - time?: number -) { - let colour: Colorize; - if (acked === total) colour = green; - else if (acked > 0) colour = yellow; - else colour = red; - - let message = `${bold("QUEUE")} ${queueName} ${colour(`${acked}/${total}`)}`; - if (time !== undefined) message += grey(` (${time}ms)`); - return reset(message); -} - -interface PendingFlush { - immediate: boolean; - timeout: unknown; -} - -export type QueueEnqueueOn = ( - queueName: string, - consumer: QueueConsumer, - messages: (Message | GatewayMessage)[] -) => void; - -// `QueuesGateway` slightly misrepresents what this class does. Each queue will -// get a single `QueuesGateway` instance (per `Miniflare` instance). This class -// is responsible for receiving queued messages, batching them up, dispatching -// them, and retrying any messages that failed. -export class QueuesGateway { - readonly #queueUrl: URL; - readonly #messages: Message[] = []; - - #pendingFlush?: PendingFlush; - - constructor( - private readonly log: Log, - _storage: Storage, - private readonly timers: Timers, - private readonly queueName: string, - private readonly dispatchFetch: DispatchFetch - ) { - this.#queueUrl = new URL(`http://entry/${queueName}`); - } - - async #dispatchBatch(workerName: string, batch: Message[]) { - // The `queue()` method on a service binding expects regular, de-serialised - // JavaScript objects. Unfortunately, `workerd` doesn't expose the V8 - // serialiser in Workers yet, so we need to re-serialise all messages into a - // format we can deserialise in a Worker. - // TODO: stop re-serialising messages once `v8` module added to `workerd` as - // part of Node.js compat work, can also remove V8 version restriction too - // `stringify` doesn't support arbitrary non-POJOs, so convert to POJOs - const messages = batch.map((message) => ({ ...message })); - const response = await this.dispatchFetch(this.#queueUrl, { - method: "POST", - headers: { - "Content-Type": "application/json; charset=utf-8", - [CoreHeaders.ROUTE_OVERRIDE]: workerName, - [CoreHeaders.CUSTOM_EVENT]: "queue", - }, - body: stringify(messages, structuredSerializableReducers), - }); - if (!response.ok) assert.fail(await response.text()); - return QueueResponseSchema.parse(await response.json()); - } - - #flush = async (enqueueOn: QueueEnqueueOn, consumer: QueueConsumer) => { - const batchSize = consumer.maxBatchSize ?? DEFAULT_BATCH_SIZE; - const maxAttempts = (consumer.maxRetires ?? DEFAULT_RETRIES) + 1; - const maxAttemptsS = maxAttempts === 1 ? "" : "s"; - - // Extract and dispatch a batch - const batch = this.#messages.splice(0, batchSize); - let response: QueueResponse; - try { - response = await this.#dispatchBatch(consumer.workerName, batch); - } catch (e: any) { - this.log.error(e); - response = exceptionQueueResponse; - } - - // Get messages to retry. If dispatching the batch failed for any reason, - // retry all messages. - const retryAll = response.retryAll || response.outcome !== "ok"; - const explicitRetries = new Set(response.explicitRetries); - - let failedMessages = 0; - const toRetry: Message[] = []; - const toDeadLetterQueue: Message[] = []; - for (const message of batch) { - if (retryAll || explicitRetries.has(message.id)) { - failedMessages++; - const failedAttempts = message.incrementFailedAttempts(); - if (failedAttempts < maxAttempts) { - this.log.debug( - `Retrying message "${message.id}" on queue "${this.queueName}"...` - ); - toRetry.push(message); - } else if (consumer.deadLetterQueue !== undefined) { - this.log.warn( - `Moving message "${message.id}" on queue "${this.queueName}" to dead letter queue "${consumer.deadLetterQueue}" after ${maxAttempts} failed attempt${maxAttemptsS}...` - ); - toDeadLetterQueue.push(message); - } else { - this.log.warn( - `Dropped message "${message.id}" on queue "${this.queueName}" after ${maxAttempts} failed attempt${maxAttemptsS}!` - ); - } - } - } - const acked = batch.length - failedMessages; - this.log.info( - formatQueueResponse(this.queueName, acked, batch.length, response.time) - ); - - // Add messages for retry back to the queue, and ensure we flush again if - // we still have messages - this.#messages.push(...toRetry); - this.#pendingFlush = undefined; - if (this.#messages.length > 0) { - this.#ensurePendingFlush(enqueueOn, consumer); - } - - if (toDeadLetterQueue.length > 0) { - // If we have messages to move to a dead letter queue, do so - assert(consumer.deadLetterQueue !== undefined); - assert(consumer.deadLetterConsumer !== undefined); - enqueueOn( - consumer.deadLetterQueue, - consumer.deadLetterConsumer, - toDeadLetterQueue // Reuse same message instances with same IDs - ); - } - }; - - #ensurePendingFlush(enqueueOn: QueueEnqueueOn, consumer: QueueConsumer) { - const batchSize = consumer.maxBatchSize ?? DEFAULT_BATCH_SIZE; - const batchTimeout = consumer.maxBatchTimeout ?? DEFAULT_BATCH_TIMEOUT; - const batchHasSpace = this.#messages.length < batchSize; - - if (this.#pendingFlush !== undefined) { - // If we have a pending immediate flush, or a delayed flush we haven't - // filled the batch for yet, just wait for it - if (this.#pendingFlush.immediate || batchHasSpace) return; - // Otherwise, the batch is full, so clear the existing timeout, and - // register an immediate flush - this.timers.clearTimeout(this.#pendingFlush.timeout); - this.#pendingFlush = undefined; - } - - // Register a new flush timeout with the appropriate delay - const delay = batchHasSpace ? batchTimeout * 1000 : 0; - const timeout = this.timers.setTimeout( - this.#flush, - delay, - enqueueOn, - consumer - ); - this.#pendingFlush = { immediate: delay === 0, timeout }; - } - - enqueue( - enqueueOn: QueueEnqueueOn, - consumer: QueueConsumer, - messages: (Message | GatewayMessage)[] - ) { - for (const message of messages) { - if (message instanceof Message) { - this.#messages.push(message); - } else { - const id = crypto.randomBytes(16).toString("hex"); - const timestamp = this.timers.now(); - let body: unknown; - if (message.contentType === "text") { - body = message.body.toString(); - } else if (message.contentType === "json") { - body = JSON.parse(message.body.toString()); - } else if (message.contentType === "bytes") { - body = viewToBuffer(message.body); - } else { - body = v8.deserialize(message.body); - } - this.#messages.push(new Message(id, timestamp, body)); - } - } - this.#ensurePendingFlush(enqueueOn, consumer); - } -} diff --git a/packages/miniflare/src/plugins/queues/index.ts b/packages/miniflare/src/plugins/queues/index.ts index 12d5d134e..35a37262b 100644 --- a/packages/miniflare/src/plugins/queues/index.ts +++ b/packages/miniflare/src/plugins/queues/index.ts @@ -1,19 +1,25 @@ -import { stringify } from "devalue"; -import semiver from "semiver"; +import SCRIPT_QUEUE_BROKER_OBJECT from "worker:queues/broker"; import { z } from "zod"; -import { Service, Worker_Binding } from "../../runtime"; -import { maybeApply } from "../../shared"; import { - Plugin, + Service, + Worker_Binding, + Worker_Binding_DurableObjectNamespaceDesignator, + kVoid, +} from "../../runtime"; +import { + QueueBindings, QueueConsumerOptionsSchema, + SharedBindings, +} from "../../workers"; +import { getUserServiceName } from "../core"; +import { + Plugin, + SERVICE_LOOPBACK, kProxyNodeBinding, namespaceEntries, namespaceKeys, - pluginNamespacePersistWorker, + objectEntryWorker, } from "../shared"; -import { QueuesError } from "./errors"; -import { QueuesGateway } from "./gateway"; -import { QueuesRouter } from "./router"; export const QueuesOptionsSchema = z.object({ queueProducers: z @@ -24,79 +30,84 @@ export const QueuesOptionsSchema = z.object({ .optional(), }); -// workerd uses V8 serialisation version 15 when sending messages: -// https://github.com/cloudflare/workerd/blob/575eba6747054fb810f8a8138c2bf04b22339f77/src/workerd/api/queue.c%2B%2B#L17 -// This is only supported by V8 versions 10.0.29 and above: -// https://github.com/v8/v8/commit/fc23bc1de29f415f5e3bc080055b67fb3ea19c53. -// -// For reference, the V8 versions associated with notable Node versions are: -// - Miniflare's minimum supported version: Node 16.13.0 --> V8 9.4 -// - Last Node 17/unsupported version: Node 17.9.1 --> V8 9.6 -// - First supported version: Node 18.0.0 --> V8 10.1 -// -// See also https://github.com/nodejs/node/issues/42192. -/** @internal */ -export const _QUEUES_COMPATIBLE_V8_VERSION = - semiver(process.versions.v8, "10.0.29") >= 0; - -function assertCompatibleV8Version() { - if (!_QUEUES_COMPATIBLE_V8_VERSION) { - throw new QueuesError( - "ERR_V8_UNSUPPORTED", - "The version of V8 bundled with this version of Node.js does not support Queues. " + - "Please upgrade to the latest Node.js LTS release." - ); - } -} - export const QUEUES_PLUGIN_NAME = "queues"; -export const QUEUES_PLUGIN: Plugin< - typeof QueuesOptionsSchema, - undefined, - QueuesGateway -> = { - gateway: QueuesGateway, - router: QueuesRouter, +const SERVICE_QUEUE_PREFIX = `${QUEUES_PLUGIN_NAME}:queue`; +const QUEUE_BROKER_OBJECT_CLASS_NAME = "QueueBrokerObject"; +const QUEUE_BROKER_OBJECT: Worker_Binding_DurableObjectNamespaceDesignator = { + serviceName: SERVICE_QUEUE_PREFIX, + className: QUEUE_BROKER_OBJECT_CLASS_NAME, +}; + +export const QUEUES_PLUGIN: Plugin = { options: QueuesOptionsSchema, getBindings(options) { const queues = namespaceEntries(options.queueProducers); - - const hasProducers = queues.length > 0; - const hasConsumers = Object.keys(options.queueConsumers ?? {}).length > 0; - if (hasProducers || hasConsumers) assertCompatibleV8Version(); - return queues.map(([name, id]) => ({ name, - queue: { name: `${QUEUES_PLUGIN_NAME}:${id}` }, + queue: { name: `${SERVICE_QUEUE_PREFIX}:${id}` }, })); }, getNodeBindings(options) { const queues = namespaceKeys(options.queueProducers); return Object.fromEntries(queues.map((name) => [name, kProxyNodeBinding])); }, - async getServices({ options, queueConsumers: allQueueConsumers }) { + async getServices({ + options, + workerNames, + queueConsumers: allQueueConsumers, + }) { const queues = namespaceEntries(options.queueProducers); if (queues.length === 0) return []; - return queues.map(([_, id]) => { - // Abusing persistence to store queue consumer. We don't support - // persisting queued data yet, but we are essentially persisting messages - // to a consumer. We'll unwrap this in the router as usual. Note we're - // using `devalue` here as `consumer` may contain cycles, if a dead-letter - // queue references itself or another queue that references the same - // dead-letter queue. - const consumer = allQueueConsumers.get(id); - const persist = maybeApply(stringify, consumer); - return { - name: `${QUEUES_PLUGIN_NAME}:${id}`, - worker: pluginNamespacePersistWorker( - QUEUES_PLUGIN_NAME, - encodeURIComponent(id), - persist - ), - }; - }); + + const services = queues.map(([_, id]) => ({ + name: `${SERVICE_QUEUE_PREFIX}:${id}`, + worker: objectEntryWorker(QUEUE_BROKER_OBJECT, id), + })); + + const uniqueKey = `miniflare-${QUEUE_BROKER_OBJECT_CLASS_NAME}`; + const objectService: Service = { + name: SERVICE_QUEUE_PREFIX, + worker: { + compatibilityDate: "2023-07-24", + compatibilityFlags: [ + "nodejs_compat", + "experimental", + "service_binding_extra_handlers", + ], + modules: [ + { name: "broker.worker.js", esModule: SCRIPT_QUEUE_BROKER_OBJECT() }, + ], + durableObjectNamespaces: [ + { className: QUEUE_BROKER_OBJECT_CLASS_NAME, uniqueKey }, + ], + // Miniflare's Queue broker is in-memory only at the moment + durableObjectStorage: { inMemory: kVoid }, + bindings: [ + { + name: SharedBindings.MAYBE_SERVICE_LOOPBACK, + service: { name: SERVICE_LOOPBACK }, + }, + { + name: SharedBindings.DURABLE_OBJECT_NAMESPACE_OBJECT, + durableObjectNamespace: { + className: QUEUE_BROKER_OBJECT_CLASS_NAME, + }, + }, + { + name: QueueBindings.MAYBE_JSON_QUEUE_CONSUMERS, + json: JSON.stringify(Object.fromEntries(allQueueConsumers)), + }, + ...workerNames.map((name) => ({ + name: QueueBindings.SERVICE_WORKER_PREFIX + name, + service: { name: getUserServiceName(name) }, + })), + ], + }, + }; + services.push(objectService); + + return services; }, }; export * from "./errors"; -export * from "./gateway"; diff --git a/packages/miniflare/src/plugins/queues/router.ts b/packages/miniflare/src/plugins/queues/router.ts deleted file mode 100644 index 128f0abf3..000000000 --- a/packages/miniflare/src/plugins/queues/router.ts +++ /dev/null @@ -1,132 +0,0 @@ -import { parse } from "devalue"; -import { z } from "zod"; -import { Headers, Response } from "../../http"; -import { HttpError } from "../../shared"; -import { - HEADER_PERSIST, - POST, - QueueConsumer, - RouteHandler, - Router, -} from "../shared"; -import { - GatewayMessageSchema, - QueueContentTypeSchema, - QueueEnqueueOn, - QueuesGateway, -} from "./gateway"; - -const MAX_MESSAGE_SIZE_BYTES = 128 * 1000; -const MAX_MESSAGE_BATCH_COUNT = 100; -const MAX_MESSAGE_BATCH_SIZE = (256 + 32) * 1000; - -class PayloadTooLargeError extends HttpError { - constructor(message: string) { - super(413, message); - } -} - -function validateMessageSize(headers: Headers) { - const size = headers.get("Content-Length"); - if (size !== null && parseInt(size) > MAX_MESSAGE_SIZE_BYTES) { - throw new PayloadTooLargeError( - `message length of ${size} bytes exceeds limit of ${MAX_MESSAGE_SIZE_BYTES}` - ); - } -} - -function validateContentType( - headers: Headers -): z.infer { - const format = headers.get("X-Msg-Fmt") ?? undefined; // zod will throw if null - const result = QueueContentTypeSchema.safeParse(format); - if (!result.success) { - throw new HttpError( - 400, - `message content type ${format} is invalid; if specified, must be one of 'text', 'json', 'bytes', or 'v8'` - ); - } else { - return result.data; - } -} - -function validateBatchSize(headers: Headers) { - const count = headers.get("CF-Queue-Batch-Count"); - if (count !== null && parseInt(count) > MAX_MESSAGE_BATCH_COUNT) { - throw new PayloadTooLargeError( - `batch message count of ${count} exceeds limit of ${MAX_MESSAGE_BATCH_COUNT}` - ); - } - const largestSize = headers.get("CF-Queue-Largest-Msg"); - if (largestSize !== null && parseInt(largestSize) > MAX_MESSAGE_SIZE_BYTES) { - throw new PayloadTooLargeError( - `message in batch has length ${largestSize} bytes which exceeds single message size limit of ${MAX_MESSAGE_SIZE_BYTES}` - ); - } - const batchSize = headers.get("CF-Queue-Batch-Bytes"); - if (batchSize !== null && parseInt(batchSize) > MAX_MESSAGE_BATCH_SIZE) { - throw new PayloadTooLargeError( - `batch size of ${batchSize} bytes exceeds limit of 256000` - ); - } -} - -async function decodeQueueConsumer( - headers: Headers -): Promise { - const header = headers.get(HEADER_PERSIST); - // We stringify the consumer using `devalue` (as it may contain dead letter - // queue cycles). This which will then be JSON-stringified again when encoding - // the header (yuck). Unfortunately, we can't use Zod to validate this as it - // doesn't support cyclic data. - return header === null ? undefined : parse(JSON.parse(header)); -} - -const QueuesBatchRequestSchema = z.object({ - messages: z.array(GatewayMessageSchema), -}); - -export interface QueuesParams { - queue: string; -} -export class QueuesRouter extends Router { - #enqueueOn: QueueEnqueueOn = (queueName, consumer, messages) => { - const gateway = this.gatewayFactory.get(queueName, undefined); - gateway.enqueue(this.#enqueueOn, consumer, messages); - }; - - @POST("/:queue/message") - message: RouteHandler = async (req, params) => { - validateMessageSize(req.headers); - const contentType = validateContentType(req.headers); - - // Get consumer from persistence header, if we don't have a consumer, - // drop the message - const consumer = await decodeQueueConsumer(req.headers); - if (consumer === undefined) return new Response(); - - const queue = decodeURIComponent(params.queue); - this.#enqueueOn(queue, consumer, [ - { - body: Buffer.from(await req.arrayBuffer()), - contentType, - }, - ]); - return new Response(); - }; - - @POST("/:queue/batch") - batch: RouteHandler = async (req, params) => { - validateBatchSize(req.headers); - - // Get consumer from persistence header, if we don't have a consumer, - // drop the batch - const consumer = await decodeQueueConsumer(req.headers); - if (consumer === undefined) return new Response(); - - const queue = decodeURIComponent(params.queue); - const body = QueuesBatchRequestSchema.parse(await req.json()); - this.#enqueueOn(queue, consumer, body.messages); - return new Response(); - }; -} diff --git a/packages/miniflare/src/workers/index.ts b/packages/miniflare/src/workers/index.ts index e70c6e0ec..3cfe67ac4 100644 --- a/packages/miniflare/src/workers/index.ts +++ b/packages/miniflare/src/workers/index.ts @@ -1,4 +1,5 @@ export * from "./cache"; export * from "./core"; export * from "./kv"; +export * from "./queues"; export * from "./shared"; diff --git a/packages/miniflare/src/workers/queues/broker.worker.ts b/packages/miniflare/src/workers/queues/broker.worker.ts new file mode 100644 index 000000000..6276a5bf8 --- /dev/null +++ b/packages/miniflare/src/workers/queues/broker.worker.ts @@ -0,0 +1,362 @@ +import assert from "node:assert"; +import { Buffer } from "node:buffer"; +import { Colorize, bold, green, grey, red, reset, yellow } from "kleur/colors"; +import { + HttpError, + LogLevel, + MiniflareDurableObject, + MiniflareDurableObjectCf, + MiniflareDurableObjectEnv, + POST, + RouteHandler, + SharedBindings, + TimerHandle, + viewToBuffer, +} from "miniflare:shared"; +import { QueueBindings } from "./constants"; +import { + QueueConsumer, + QueueConsumersSchema, + QueueContentType, + QueueContentTypeSchema, + QueueIncomingMessage, + QueueOutgoingMessage, + QueuesBatchRequestSchema, + QueuesOutgoingBatchRequest, +} from "./schemas"; + +const MAX_MESSAGE_SIZE_BYTES = 128 * 1000; +const MAX_MESSAGE_BATCH_COUNT = 100; +const MAX_MESSAGE_BATCH_SIZE = (256 + 32) * 1000; + +const DEFAULT_BATCH_SIZE = 5; +const DEFAULT_BATCH_TIMEOUT = 1; // second +const DEFAULT_RETRIES = 2; + +const exceptionQueueResponse: FetcherQueueResult = { + outcome: "exception", + retryAll: false, + ackAll: false, + explicitRetries: [], + explicitAcks: [], +}; + +class PayloadTooLargeError extends HttpError { + constructor(message: string) { + super(413, message); + } +} + +function validateMessageSize(headers: Headers) { + const size = headers.get("Content-Length"); + if (size !== null && parseInt(size) > MAX_MESSAGE_SIZE_BYTES) { + throw new PayloadTooLargeError( + `message length of ${size} bytes exceeds limit of ${MAX_MESSAGE_SIZE_BYTES}` + ); + } +} + +function validateContentType(headers: Headers): QueueContentType { + const format = headers.get("X-Msg-Fmt") ?? undefined; // zod will throw if null + const result = QueueContentTypeSchema.safeParse(format); + if (!result.success) { + throw new HttpError( + 400, + `message content type ${format} is invalid; if specified, must be one of 'text', 'json', 'bytes', or 'v8'` + ); + } + return result.data; +} + +function validateBatchSize(headers: Headers) { + const count = headers.get("CF-Queue-Batch-Count"); + if (count !== null && parseInt(count) > MAX_MESSAGE_BATCH_COUNT) { + throw new PayloadTooLargeError( + `batch message count of ${count} exceeds limit of ${MAX_MESSAGE_BATCH_COUNT}` + ); + } + const largestSize = headers.get("CF-Queue-Largest-Msg"); + if (largestSize !== null && parseInt(largestSize) > MAX_MESSAGE_SIZE_BYTES) { + throw new PayloadTooLargeError( + `message in batch has length ${largestSize} bytes which exceeds single message size limit of ${MAX_MESSAGE_SIZE_BYTES}` + ); + } + const batchSize = headers.get("CF-Queue-Batch-Bytes"); + if (batchSize !== null && parseInt(batchSize) > MAX_MESSAGE_BATCH_SIZE) { + throw new PayloadTooLargeError( + `batch size of ${batchSize} bytes exceeds limit of 256000` + ); + } +} + +type QueueBody = + | { contentType: "text"; body: string } + | { contentType: "json"; body: unknown } + | { contentType: "bytes"; body: ArrayBuffer } + | { contentType: "v8"; body: Buffer }; + +function deserialise({ contentType, body }: QueueIncomingMessage): QueueBody { + if (contentType === "text") { + return { contentType, body: body.toString() }; + } else if (contentType === "json") { + return { contentType, body: JSON.parse(body.toString()) }; + } else if (contentType === "bytes") { + return { contentType, body: viewToBuffer(body) }; + } else { + return { contentType, body }; + } +} + +function serialise(msg: QueueMessage): QueueOutgoingMessage { + let body: Buffer; + if (msg.body.contentType === "text") { + body = Buffer.from(msg.body.body); + } else if (msg.body.contentType === "json") { + body = Buffer.from(JSON.stringify(msg.body.body)); + } else if (msg.body.contentType === "bytes") { + body = Buffer.from(msg.body.body); + } else { + body = msg.body.body; + } + return { + id: msg.id, + timestamp: msg.timestamp, + contentType: msg.body.contentType, + body: body.toString("base64"), + }; +} + +class QueueMessage { + #failedAttempts = 0; + + constructor( + readonly id: string, + readonly timestamp: number, + readonly body: QueueBody + ) {} + + incrementFailedAttempts(): number { + return ++this.#failedAttempts; + } +} + +function formatQueueResponse( + queueName: string, + acked: number, + total: number, + time?: number +) { + let colour: Colorize; + if (acked === total) colour = green; + else if (acked > 0) colour = yellow; + else colour = red; + + let message = `${bold("QUEUE")} ${queueName} ${colour(`${acked}/${total}`)}`; + if (time !== undefined) message += grey(` (${time}ms)`); + return reset(message); +} + +interface PendingFlush { + immediate: boolean; + timeout: TimerHandle; +} + +type QueueBrokerObjectEnv = MiniflareDurableObjectEnv & { + // Reference to own Durable Object namespace for sending to dead-letter queues + [SharedBindings.DURABLE_OBJECT_NAMESPACE_OBJECT]: DurableObjectNamespace; + [QueueBindings.MAYBE_JSON_QUEUE_CONSUMERS]?: unknown; +} & { + [K in `${typeof QueueBindings.SERVICE_WORKER_PREFIX}${string}`]: + | Fetcher + | undefined; // Won't have a `Fetcher` for every possible `string` +}; + +export class QueueBrokerObject extends MiniflareDurableObject { + readonly #consumers: Record; + readonly #messages: QueueMessage[] = []; + #pendingFlush?: PendingFlush; + + constructor(state: DurableObjectState, env: QueueBrokerObjectEnv) { + super(state, env); + const maybeConsumers = env[QueueBindings.MAYBE_JSON_QUEUE_CONSUMERS]; + if (maybeConsumers === undefined) this.#consumers = {}; + else this.#consumers = QueueConsumersSchema.parse(maybeConsumers); + } + + get #maybeConsumer() { + return this.#consumers[this.name]; + } + + #dispatchBatch(workerName: string, batch: QueueMessage[]) { + const bindingName = + `${QueueBindings.SERVICE_WORKER_PREFIX}${workerName}` as const; + const maybeService = this.env[bindingName]; + assert( + maybeService !== undefined, + `Expected ${bindingName} service binding` + ); + const messages = batch.map(({ id, timestamp, body }) => { + if (body.contentType === "v8") { + return { id, timestamp, serializedBody: body.body }; + } else { + return { id, timestamp, body: body.body }; + } + }); + // @ts-expect-error `Fetcher#queue()` types haven't been updated for + // `serializedBody` yet, and don't allow `number` for `timestamp`, even + // though that's permitted at runtime + return maybeService.queue(this.name, messages); + } + + #flush = async () => { + const consumer = this.#maybeConsumer; + assert(consumer !== undefined); + + const batchSize = consumer.maxBatchSize ?? DEFAULT_BATCH_SIZE; + const maxAttempts = (consumer.maxRetires ?? DEFAULT_RETRIES) + 1; + const maxAttemptsS = maxAttempts === 1 ? "" : "s"; + + // Extract and dispatch a batch + const batch = this.#messages.splice(0, batchSize); + const startTime = Date.now(); + let endTime: number; + let response: FetcherQueueResult; + try { + response = await this.#dispatchBatch(consumer.workerName, batch); + endTime = Date.now(); + } catch (e: any) { + endTime = Date.now(); + await this.logWithLevel(LogLevel.ERROR, String(e)); + response = exceptionQueueResponse; + } + + // Get messages to retry. If dispatching the batch failed for any reason, + // retry all messages. + const retryAll = response.retryAll || response.outcome !== "ok"; + const explicitRetries = new Set(response.explicitRetries); + + let failedMessages = 0; + const toRetry: QueueMessage[] = []; + const toDeadLetterQueue: QueueMessage[] = []; + for (const message of batch) { + if (retryAll || explicitRetries.has(message.id)) { + failedMessages++; + const failedAttempts = message.incrementFailedAttempts(); + if (failedAttempts < maxAttempts) { + await this.logWithLevel( + LogLevel.DEBUG, + `Retrying message "${message.id}" on queue "${this.name}"...` + ); + toRetry.push(message); + } else if (consumer.deadLetterQueue !== undefined) { + await this.logWithLevel( + LogLevel.WARN, + `Moving message "${message.id}" on queue "${this.name}" to dead letter queue "${consumer.deadLetterQueue}" after ${maxAttempts} failed attempt${maxAttemptsS}...` + ); + toDeadLetterQueue.push(message); + } else { + await this.logWithLevel( + LogLevel.WARN, + `Dropped message "${message.id}" on queue "${this.name}" after ${maxAttempts} failed attempt${maxAttemptsS}!` + ); + } + } + } + const acked = batch.length - failedMessages; + await this.logWithLevel( + LogLevel.INFO, + formatQueueResponse(this.name, acked, batch.length, endTime - startTime) + ); + + // Add messages for retry back to the queue, and ensure we flush again if + // we still have messages + this.#messages.push(...toRetry); + this.#pendingFlush = undefined; + if (this.#messages.length > 0) this.#ensurePendingFlush(); + + if (toDeadLetterQueue.length > 0) { + // If we have messages to move to a dead letter queue, do so + const name = consumer.deadLetterQueue; + assert(name !== undefined); + const ns = this.env[SharedBindings.DURABLE_OBJECT_NAMESPACE_OBJECT]; + const id = ns.idFromName(name); + const stub = ns.get(id); + const cf: MiniflareDurableObjectCf = { miniflare: { name } }; + const batchRequest: QueuesOutgoingBatchRequest = { + messages: toDeadLetterQueue.map(serialise), + }; + const res = await stub.fetch("http://placeholder/batch", { + method: "POST", + body: JSON.stringify(batchRequest), + cf: cf as Record, + }); + assert(res.ok); + } + }; + + #ensurePendingFlush() { + const consumer = this.#maybeConsumer; + assert(consumer !== undefined); + + const batchSize = consumer.maxBatchSize ?? DEFAULT_BATCH_SIZE; + const batchTimeout = consumer.maxBatchTimeout ?? DEFAULT_BATCH_TIMEOUT; + const batchHasSpace = this.#messages.length < batchSize; + + if (this.#pendingFlush !== undefined) { + // If we have a pending immediate flush, or a delayed flush we haven't + // filled the batch for yet, just wait for it + if (this.#pendingFlush.immediate || batchHasSpace) return; + // Otherwise, the batch is full, so clear the existing timeout, and + // register an immediate flush + this.timers.clearTimeout(this.#pendingFlush.timeout); + this.#pendingFlush = undefined; + } + + // Register a new flush timeout with the appropriate delay + const delay = batchHasSpace ? batchTimeout * 1000 : 0; + const timeout = this.timers.setTimeout(this.#flush, delay); + this.#pendingFlush = { immediate: delay === 0, timeout }; + } + + #enqueue(messages: QueueIncomingMessage[]) { + for (const message of messages) { + const randomness = crypto.getRandomValues(new Uint8Array(16)); + const id = message.id ?? Buffer.from(randomness).toString("hex"); + const timestamp = message.timestamp ?? this.timers.now(); + const body = deserialise(message); + this.#messages.push(new QueueMessage(id, timestamp, body)); + } + this.#ensurePendingFlush(); + } + + @POST("/message") + message: RouteHandler = async (req) => { + validateMessageSize(req.headers); + const contentType = validateContentType(req.headers); + const body = Buffer.from(await req.arrayBuffer()); + + // If we don't have a consumer, drop the message + const consumer = this.#maybeConsumer; + if (consumer === undefined) return new Response(); + + this.#enqueue([{ contentType, body }]); + return new Response(); + }; + + @POST("/batch") + batch: RouteHandler = async (req) => { + // NOTE: this endpoint is also used when moving messages to the dead-letter + // queue. In this case, size headers won't be added and this validation is + // a no-op. This allows us to enqueue a maximum size batch with additional + // ID and timestamp information. + validateBatchSize(req.headers); + const body = QueuesBatchRequestSchema.parse(await req.json()); + + // If we don't have a consumer, drop the message + const consumer = this.#maybeConsumer; + if (consumer === undefined) return new Response(); + + this.#enqueue(body.messages); + return new Response(); + }; +} diff --git a/packages/miniflare/src/workers/queues/constants.ts b/packages/miniflare/src/workers/queues/constants.ts new file mode 100644 index 000000000..b5e584870 --- /dev/null +++ b/packages/miniflare/src/workers/queues/constants.ts @@ -0,0 +1,4 @@ +export const QueueBindings = { + SERVICE_WORKER_PREFIX: "MINIFLARE_WORKER_", + MAYBE_JSON_QUEUE_CONSUMERS: "MINIFLARE_QUEUE_CONSUMERS", +} as const; diff --git a/packages/miniflare/src/workers/queues/index.ts b/packages/miniflare/src/workers/queues/index.ts new file mode 100644 index 000000000..19a982eaf --- /dev/null +++ b/packages/miniflare/src/workers/queues/index.ts @@ -0,0 +1,2 @@ +export * from "./constants"; +export * from "./schemas"; diff --git a/packages/miniflare/src/workers/queues/schemas.ts b/packages/miniflare/src/workers/queues/schemas.ts new file mode 100644 index 000000000..a53b7b211 --- /dev/null +++ b/packages/miniflare/src/workers/queues/schemas.ts @@ -0,0 +1,44 @@ +import { Base64DataSchema, z } from "miniflare:zod"; + +export const QueueConsumerOptionsSchema = /* @__PURE__ */ z.object({ + // https://developers.cloudflare.com/queues/platform/configuration/#consumer + // https://developers.cloudflare.com/queues/platform/limits/ + maxBatchSize: z.number().min(0).max(100).optional(), + maxBatchTimeout: z.number().min(0).max(30).optional(), // seconds + maxRetires: z.number().min(0).max(100).optional(), + deadLetterQueue: z.ostring(), +}); +export const QueueConsumerSchema = /* @__PURE__ */ z.intersection( + QueueConsumerOptionsSchema, + z.object({ workerName: z.string() }) +); +export type QueueConsumer = z.infer; +// Maps queue names to the Worker that wishes to consume it. Note each queue +// can only be consumed by one Worker, but one Worker may consume multiple +// queues. Support for multiple consumers of a single queue is not planned +// anytime soon. +export const QueueConsumersSchema = + /* @__PURE__ */ z.record(QueueConsumerSchema); + +export const QueueContentTypeSchema = /* @__PURE__ */ z + .enum(["text", "json", "bytes", "v8"]) + .default("v8"); +export type QueueContentType = z.infer; + +export const QueueIncomingMessageSchema = /* @__PURE__ */ z.object({ + contentType: QueueContentTypeSchema, + body: Base64DataSchema, + // When enqueuing messages on dead-letter queues, we want to reuse the same ID + // and timestamp + id: z.ostring(), + timestamp: z.onumber(), +}); +export type QueueIncomingMessage = z.infer; +export type QueueOutgoingMessage = z.input; + +export const QueuesBatchRequestSchema = /* @__PURE__ */ z.object({ + messages: z.array(QueueIncomingMessageSchema), +}); +export type QueuesOutgoingBatchRequest = z.input< + typeof QueuesBatchRequestSchema +>; diff --git a/packages/miniflare/test/index.spec.ts b/packages/miniflare/test/index.spec.ts index a86288369..c26aac269 100644 --- a/packages/miniflare/test/index.spec.ts +++ b/packages/miniflare/test/index.spec.ts @@ -24,7 +24,6 @@ import { MiniflareOptions, ReplaceWorkersTypes, Response, - _QUEUES_COMPATIBLE_V8_VERSION, _transformsForContentEncoding, createFetchMock, fetch, @@ -37,8 +36,6 @@ import { } from "ws"; import { TestLog, useServer, useTmp, utf8Encode } from "./test-shared"; -const queuesTest = _QUEUES_COMPATIBLE_V8_VERSION ? test : test.skip; - test("Miniflare: validates options", async (t) => { // Check empty workers array rejected t.throws(() => new Miniflare({ workers: [] }), { @@ -557,7 +554,7 @@ test("Miniflare: manually triggered scheduled events", async (t) => { t.is(await res.text(), "true"); }); -queuesTest("Miniflare: getBindings() returns all bindings", async (t) => { +test("Miniflare: getBindings() returns all bindings", async (t) => { const tmp = await useTmp(t); const blobPath = path.join(tmp, "blob.txt"); await fs.writeFile(blobPath, "blob"); @@ -636,89 +633,84 @@ queuesTest("Miniflare: getBindings() returns all bindings", async (t) => { }; t.throws(() => bindings.KV.get("key"), expectations); }); -queuesTest( - "Miniflare: getBindings() and friends return bindings for different workers", - async (t) => { - const mf = new Miniflare({ - workers: [ - { - name: "a", - modules: true, - script: ` +test("Miniflare: getBindings() and friends return bindings for different workers", async (t) => { + const mf = new Miniflare({ + workers: [ + { + name: "a", + modules: true, + script: ` export class DurableObject {} export default { fetch() { return new Response(null, { status: 404 }); } } `, - d1Databases: ["DB"], - durableObjects: { DO: "DurableObject" }, - }, - { - // 2nd worker unnamed, to validate that not specifying a name when - // getting bindings gives the entrypoint, not the unnamed worker - script: - 'addEventListener("fetch", (event) => event.respondWith(new Response(null, { status: 404 })));', - kvNamespaces: ["KV"], - queueProducers: ["QUEUE"], - }, - { - name: "b", - script: - 'addEventListener("fetch", (event) => event.respondWith(new Response(null, { status: 404 })));', - r2Buckets: ["BUCKET"], - }, - ], - }); - t.teardown(() => mf.dispose()); - - // Check `getBindings()` - let bindings = await mf.getBindings(); - t.deepEqual(Object.keys(bindings), ["DB", "DO"]); - bindings = await mf.getBindings(""); - t.deepEqual(Object.keys(bindings), ["KV", "QUEUE"]); - bindings = await mf.getBindings("b"); - t.deepEqual(Object.keys(bindings), ["BUCKET"]); - await t.throwsAsync(() => mf.getBindings("c"), { - instanceOf: TypeError, - message: '"c" worker not found', - }); + d1Databases: ["DB"], + durableObjects: { DO: "DurableObject" }, + }, + { + // 2nd worker unnamed, to validate that not specifying a name when + // getting bindings gives the entrypoint, not the unnamed worker + script: + 'addEventListener("fetch", (event) => event.respondWith(new Response(null, { status: 404 })));', + kvNamespaces: ["KV"], + queueProducers: ["QUEUE"], + }, + { + name: "b", + script: + 'addEventListener("fetch", (event) => event.respondWith(new Response(null, { status: 404 })));', + r2Buckets: ["BUCKET"], + }, + ], + }); + t.teardown(() => mf.dispose()); - const unboundExpectations = ( - name: string - ): ThrowsExpectation => ({ - instanceOf: TypeError, - message: `"${name}" unbound in "c" worker`, - }); + // Check `getBindings()` + let bindings = await mf.getBindings(); + t.deepEqual(Object.keys(bindings), ["DB", "DO"]); + bindings = await mf.getBindings(""); + t.deepEqual(Object.keys(bindings), ["KV", "QUEUE"]); + bindings = await mf.getBindings("b"); + t.deepEqual(Object.keys(bindings), ["BUCKET"]); + await t.throwsAsync(() => mf.getBindings("c"), { + instanceOf: TypeError, + message: '"c" worker not found', + }); - // Check `getD1Database()` - let binding: unknown = await mf.getD1Database("DB"); - t.not(binding, undefined); - let expectations = unboundExpectations("DB"); - await t.throwsAsync(() => mf.getD1Database("DB", "c"), expectations); - - // Check `getDurableObjectNamespace()` - binding = await mf.getDurableObjectNamespace("DO"); - t.not(binding, undefined); - expectations = unboundExpectations("DO"); - await t.throwsAsync( - () => mf.getDurableObjectNamespace("DO", "c"), - expectations - ); - - // Check `getKVNamespace()` - binding = await mf.getKVNamespace("KV", ""); - t.not(binding, undefined); - expectations = unboundExpectations("KV"); - await t.throwsAsync(() => mf.getKVNamespace("KV", "c"), expectations); - - // Check `getQueueProducer()` - binding = await mf.getQueueProducer("QUEUE", ""); - t.not(binding, undefined); - expectations = unboundExpectations("QUEUE"); - await t.throwsAsync(() => mf.getQueueProducer("QUEUE", "c"), expectations); - - // Check `getR2Bucket()` - binding = await mf.getR2Bucket("BUCKET", "b"); - t.not(binding, undefined); - expectations = unboundExpectations("BUCKET"); - await t.throwsAsync(() => mf.getQueueProducer("BUCKET", "c"), expectations); - } -); + const unboundExpectations = (name: string): ThrowsExpectation => ({ + instanceOf: TypeError, + message: `"${name}" unbound in "c" worker`, + }); + + // Check `getD1Database()` + let binding: unknown = await mf.getD1Database("DB"); + t.not(binding, undefined); + let expectations = unboundExpectations("DB"); + await t.throwsAsync(() => mf.getD1Database("DB", "c"), expectations); + + // Check `getDurableObjectNamespace()` + binding = await mf.getDurableObjectNamespace("DO"); + t.not(binding, undefined); + expectations = unboundExpectations("DO"); + await t.throwsAsync( + () => mf.getDurableObjectNamespace("DO", "c"), + expectations + ); + + // Check `getKVNamespace()` + binding = await mf.getKVNamespace("KV", ""); + t.not(binding, undefined); + expectations = unboundExpectations("KV"); + await t.throwsAsync(() => mf.getKVNamespace("KV", "c"), expectations); + + // Check `getQueueProducer()` + binding = await mf.getQueueProducer("QUEUE", ""); + t.not(binding, undefined); + expectations = unboundExpectations("QUEUE"); + await t.throwsAsync(() => mf.getQueueProducer("QUEUE", "c"), expectations); + + // Check `getR2Bucket()` + binding = await mf.getR2Bucket("BUCKET", "b"); + t.not(binding, undefined); + expectations = unboundExpectations("BUCKET"); + await t.throwsAsync(() => mf.getQueueProducer("BUCKET", "c"), expectations); +}); diff --git a/packages/miniflare/test/plugins/queues/index.spec.ts b/packages/miniflare/test/plugins/queues/index.spec.ts index 970cdfe5b..27a44f2bd 100644 --- a/packages/miniflare/test/plugins/queues/index.spec.ts +++ b/packages/miniflare/test/plugins/queues/index.spec.ts @@ -1,29 +1,44 @@ -import anyTest from "ava"; +import test from "ava"; import { DeferredPromise, LogLevel, Miniflare, + QUEUES_PLUGIN_NAME, QueuesError, Response, - _QUEUES_COMPATIBLE_V8_VERSION, } from "miniflare"; import { z } from "zod"; -import { LogEntry, TestLog, TestTimers } from "../../test-shared"; - -// Only run Queues tests if we're using a supported V8 version -const test = _QUEUES_COMPATIBLE_V8_VERSION ? anyTest : anyTest.skip; +import { + LogEntry, + MiniflareDurableObjectControlStub, + TestLog, +} from "../../test-shared"; const StringArraySchema = z.string().array(); const MessageArraySchema = z .object({ queue: z.string(), id: z.string(), body: z.string() }) .array(); +async function getControlStub( + mf: Miniflare, + queueName: string +): Promise { + const objectNamespace = await mf._getInternalDurableObjectNamespace( + QUEUES_PLUGIN_NAME, + "queues:queue", + "QueueBrokerObject" + ); + const objectId = objectNamespace.idFromName(queueName); + const objectStub = objectNamespace.get(objectId); + const stub = new MiniflareDurableObjectControlStub(objectStub); + await stub.enableFakeTimers(1_000_000); + return stub; +} + test("flushes partial and full batches", async (t) => { let batches: string[][] = []; - const timers = new TestTimers(); const mf = new Miniflare({ - timers, verbose: true, workers: [ @@ -81,24 +96,26 @@ test("flushes partial and full batches", async (t) => { }); } + const object = await getControlStub(mf, "QUEUE"); + // Check with single msg await send("msg1"); - timers.timestamp += 500; - await timers.waitForTasks(); + await object.advanceFakeTime(500); + await object.waitForFakeTasks(); t.is(batches.length, 0); - timers.timestamp += 500; - await timers.waitForTasks(); + await object.advanceFakeTime(500); + await object.waitForFakeTasks(); t.is(batches[0]?.length, 1); t.regex(batches[0][0], /^[0-9a-f]{32}$/); batches = []; // Check with single batch await sendBatch("msg1", "msg2"); - timers.timestamp += 250; - await timers.waitForTasks(); + await object.advanceFakeTime(250); + await object.waitForFakeTasks(); t.is(batches.length, 0); - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches[0]?.length, 2); t.regex(batches[0][0], /^[0-9a-f]{32}$/); t.regex(batches[0][1], /^[0-9a-f]{32}$/); @@ -108,31 +125,31 @@ test("flushes partial and full batches", async (t) => { await send("msg1"); await sendBatch("msg2", "msg3"); await send("msg4"); - timers.timestamp += 100; - await timers.waitForTasks(); + await object.advanceFakeTime(100); + await object.waitForFakeTasks(); t.is(batches.length, 0); - timers.timestamp += 900; - await timers.waitForTasks(); + await object.advanceFakeTime(900); + await object.waitForFakeTasks(); t.is(batches[0]?.length, 4); batches = []; // Check with full batch await sendBatch("msg1", "msg2", "msg3", "msg4", "msg5"); - await timers.waitForTasks(); + await object.waitForFakeTasks(); t.is(batches.length, 1); t.is(batches[0]?.length, 5); batches = []; // Check with overflowing batch await sendBatch("msg1", "msg2", "msg3", "msg4", "msg5", "msg6", "msg7"); - await timers.waitForTasks(); + await object.waitForFakeTasks(); t.is(batches.length, 1); // (second batch isn't full, so need to wait for max batch timeout) - timers.timestamp += 500; - await timers.waitForTasks(); + await object.advanceFakeTime(500); + await object.waitForFakeTasks(); t.is(batches.length, 1); - timers.timestamp += 500; - await timers.waitForTasks(); + await object.advanceFakeTime(500); + await object.waitForFakeTasks(); t.is(batches.length, 2); t.is(batches[0]?.length, 5); t.is(batches[1]?.length, 2); @@ -140,15 +157,15 @@ test("flushes partial and full batches", async (t) => { // Check with overflowing batch twice await sendBatch("msg1", "msg2", "msg3", "msg4", "msg5", "msg6"); - await timers.waitForTasks(); + await object.waitForFakeTasks(); t.is(batches.length, 1); // (second batch isn't full yet, but sending more messages will fill it) await sendBatch("msg7", "msg8", "msg9", "msg10", "msg11"); - await timers.waitForTasks(); + await object.waitForFakeTasks(); t.is(batches.length, 2); // (third batch isn't full, so need to wait for max batch timeout) - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 3); t.is(batches[0]?.length, 5); t.is(batches[1]?.length, 5); @@ -157,12 +174,9 @@ test("flushes partial and full batches", async (t) => { }); test("sends all structured cloneable types", async (t) => { - const timers = new TestTimers(); - const errorPromise = new DeferredPromise(); const mf = new Miniflare({ - timers, verbose: true, queueProducers: ["QUEUE"], @@ -258,10 +272,11 @@ test("sends all structured cloneable types", async (t) => { ], }); t.teardown(() => mf.dispose()); + const object = await getControlStub(mf, "QUEUE"); await mf.dispatchFetch("http://localhost"); - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(await errorPromise, "undefined"); }); @@ -286,10 +301,8 @@ test("retries messages", async (t) => { let retryMessages: string[] = []; const log = new TestLog(t); - const timers = new TestTimers(); const mf = new Miniflare({ log, - timers, queueProducers: { QUEUE: "queue" }, queueConsumers: { @@ -338,12 +351,14 @@ test("retries messages", async (t) => { }); } + const object = await getControlStub(mf, "queue"); + // Check with explicit single retry retryMessages = ["msg2"]; await sendBatch("msg1", "msg2", "msg3"); log.logs = []; - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 1); t.deepEqual(stripTimings(log.logs), [ [ @@ -354,15 +369,15 @@ test("retries messages", async (t) => { ]); log.logs = []; retryMessages = []; - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 2); t.deepEqual(stripTimings(log.logs), [ [LogLevel.INFO, "QUEUE queue 1/1 (Xms)"], ]); log.logs = []; - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 2); t.deepEqual(bodies(), [["msg1", "msg2", "msg3"], ["msg2"]]); batches = []; @@ -371,8 +386,8 @@ test("retries messages", async (t) => { retryAll = true; await sendBatch("msg1", "msg2", "msg3"); log.logs = []; - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 1); t.deepEqual(stripTimings(log.logs), [ [ @@ -391,8 +406,8 @@ test("retries messages", async (t) => { ]); log.logs = []; retryAll = false; - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 2); t.deepEqual(stripTimings(log.logs), [ [LogLevel.INFO, "QUEUE queue 3/3 (Xms)"], @@ -407,8 +422,8 @@ test("retries messages", async (t) => { errorAll = true; await sendBatch("msg1", "msg2", "msg3"); log.logs = []; - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 1); t.deepEqual(stripTimings(log.logs), [ [ @@ -427,8 +442,8 @@ test("retries messages", async (t) => { ]); log.logs = []; errorAll = false; - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 2); t.deepEqual(stripTimings(log.logs), [ [LogLevel.INFO, "QUEUE queue 3/3 (Xms)"], @@ -443,8 +458,8 @@ test("retries messages", async (t) => { retryAll = true; await sendBatch("msg1", "msg2", "msg3"); log.logs = []; - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 1); t.deepEqual(stripTimings(log.logs), [ [ @@ -464,8 +479,8 @@ test("retries messages", async (t) => { log.logs = []; retryAll = false; retryMessages = ["msg3"]; - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 2); t.deepEqual(stripTimings(log.logs), [ [ @@ -475,8 +490,8 @@ test("retries messages", async (t) => { [LogLevel.INFO, "QUEUE queue 2/3 (Xms)"], ]); log.logs = []; - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 3); t.deepEqual(stripTimings(log.logs), [ [ @@ -486,8 +501,8 @@ test("retries messages", async (t) => { [LogLevel.INFO, "QUEUE queue 0/1 (Xms)"], ]); log.logs = []; - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); t.is(batches.length, 3); t.deepEqual(bodies(), [ ["msg1", "msg2", "msg3"], @@ -502,10 +517,8 @@ test("moves to dead letter queue", async (t) => { let retryMessages: string[] = []; const log = new TestLog(t); - const timers = new TestTimers(); const mf = new Miniflare({ log, - timers, verbose: true, queueProducers: { BAD_QUEUE: "bad" }, @@ -560,12 +573,15 @@ test("moves to dead letter queue", async (t) => { }); } + const badObject = await getControlStub(mf, "bad"); + const dlqObject = await getControlStub(mf, "dlq"); + // Check moves messages to dead letter queue after max retries retryMessages = ["msg2", "msg3"]; await sendBatch("msg1", "msg2", "msg3"); log.logs = []; - timers.timestamp += 1000; - await timers.waitForTasks(); + await badObject.advanceFakeTime(1000); + await badObject.waitForFakeTasks(); t.is(batches.length, 1); t.deepEqual(stripTimings(log.logs), [ [ @@ -581,8 +597,8 @@ test("moves to dead letter queue", async (t) => { log.logs = []; // Check allows cyclic dead letter queue path with multiple queues retryMessages = ["msg2"]; - timers.timestamp += 1000; - await timers.waitForTasks(); + await dlqObject.advanceFakeTime(1000); + await dlqObject.waitForFakeTasks(); t.is(batches.length, 2); t.deepEqual(stripTimings(log.logs), [ [ @@ -593,8 +609,8 @@ test("moves to dead letter queue", async (t) => { ]); log.logs = []; retryMessages = []; - timers.timestamp += 1000; - await timers.waitForTasks(); + await badObject.advanceFakeTime(1000); + await badObject.waitForFakeTasks(); t.is(batches.length, 3); t.deepEqual(stripTimings(log.logs), [[LogLevel.INFO, "QUEUE bad 1/1 (Xms)"]]); log.logs = []; @@ -614,7 +630,6 @@ test("moves to dead letter queue", async (t) => { // Check rejects queue as own dead letter queue const promise = mf.setOptions({ log, - timers, queueConsumers: { bad: { deadLetterQueue: "bad" } }, script: "", }); @@ -627,10 +642,8 @@ test("moves to dead letter queue", async (t) => { test("operations permit strange queue names", async (t) => { const promise = new DeferredPromise>(); - const timers = new TestTimers(); const id = "my/ Queue"; const mf = new Miniflare({ - timers, verbose: true, queueProducers: { QUEUE: id }, queueConsumers: [id], @@ -656,10 +669,11 @@ test("operations permit strange queue names", async (t) => { }`, }); t.teardown(() => mf.dispose()); + const object = await getControlStub(mf, id); await mf.dispatchFetch("http://localhost"); - timers.timestamp += 1000; - await timers.waitForTasks(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); const batch = await promise; t.deepEqual(batch, [ { queue: id, id: batch[0].id, body: "msg1" }, @@ -674,12 +688,10 @@ test("supports message contentTypes", async (t) => { const promise = new DeferredPromise< z.infer >(); - const timers = new TestTimers(); const id = "my/ Queue"; const log = new TestLog(t); const mf = new Miniflare({ log, - timers, verbose: true, queueProducers: { QUEUE: id }, queueConsumers: [id], @@ -693,46 +705,47 @@ test("supports message contentTypes", async (t) => { }, modules: true, script: `export default { - async fetch(request, env, ctx) { - await env.QUEUE.send("msg1", { contentType: "text" }); - await env.QUEUE.send([{ message: "msg2" }], { contentType: "json" }); - const arrayBuffer = new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7]); - await env.QUEUE.send(arrayBuffer, { contentType: "bytes" }); - await env.QUEUE.send(new Date(1600000000000), { contentType: "v8" }); - return new Response(); - }, - async queue(batch, env, ctx) { - delete Date.prototype.toJSON; // JSON.stringify calls .toJSON before the replacer - await env.REPORTER.fetch("http://localhost", { - method: "POST", - body: JSON.stringify( - batch.messages.map(({ id, body }) => ({ - queue: batch.queue, - id, - body, - })), - (_, value) => { - if (value instanceof ArrayBuffer) { - return { - $type: "ArrayBuffer", - value: Array.from(new Uint8Array(value)), - }; - } else if (value instanceof Date) { - return { $type: "Date", value: value.getTime() }; - } - return value; - }, - ), - }); - }, -};`, + async fetch(request, env, ctx) { + await env.QUEUE.send("msg1", { contentType: "text" }); + await env.QUEUE.send([{ message: "msg2" }], { contentType: "json" }); + const arrayBuffer = new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7]); + await env.QUEUE.send(arrayBuffer, { contentType: "bytes" }); + await env.QUEUE.send(new Date(1600000000000), { contentType: "v8" }); + return new Response(); + }, + async queue(batch, env, ctx) { + delete Date.prototype.toJSON; // JSON.stringify calls .toJSON before the replacer + await env.REPORTER.fetch("http://localhost", { + method: "POST", + body: JSON.stringify( + batch.messages.map(({ id, body }) => ({ + queue: batch.queue, + id, + body, + })), + (_, value) => { + if (value instanceof ArrayBuffer) { + return { + $type: "ArrayBuffer", + value: Array.from(new Uint8Array(value)), + }; + } else if (value instanceof Date) { + return { $type: "Date", value: value.getTime() }; + } + return value; + }, + ), + }); + }, + };`, }); t.teardown(() => mf.dispose()); + const object = await getControlStub(mf, id); const res = await mf.dispatchFetch("http://localhost"); - await res.arrayBuffer(); // (drain) - timers.timestamp += 1000; - await timers.waitForTasks(); + await res.arrayBuffer(); + await object.advanceFakeTime(1000); + await object.waitForFakeTasks(); const batch = await promise; t.deepEqual(batch, [ { queue: id, id: batch[0].id, body: "msg1" }, @@ -749,3 +762,36 @@ test("supports message contentTypes", async (t) => { }, ]); }); + +test("validates message size", async (t) => { + const mf = new Miniflare({ + verbose: true, + queueProducers: ["QUEUE"], + modules: true, + script: `export default { + async fetch(request, env, ctx) { + const { pathname } = new URL(request.url); + try { + await env.QUEUE.send(new Uint8Array(128 * 1000 + 1), { contentType: "bytes" }); + return new Response(null, { status: 204 }); + } catch (e) { + const error = { + name: e?.name, + message: e?.message ?? String(e), + stack: e?.stack, + }; + return Response.json(error, { + status: 500, + headers: { "MF-Experimental-Error-Stack": "true" }, + }); + } + }, + }`, + }); + t.teardown(() => mf.dispose()); + + await t.throwsAsync(mf.dispatchFetch("http://localhost"), { + message: + "Queue send failed: message length of 128001 bytes exceeds limit of 128000", + }); +});