Skip to content

Commit

Permalink
MQ-605: Extend miniflare's queues simulator to support delayed delive…
Browse files Browse the repository at this point in the history
…ry (#5570)

* MQ-605: Extend miniflare's queues simulator to support delayed delivery

This patch updates the miniflare's queue broker to support delayed
delivery of messages, both when sending the message from a producer and
when retrying the message from a consumer.
The implementation is based on miniflare's timers, and currently there
is no support for specifying the delivery delay in queue settings for a
producer.

* MQ-605: Add support for inheriting the delivery delay from the queues binding

* fix e2e tests

* chore: tighten turbo inputs to wrangler build to improve cache hits

---------

Co-authored-by: Peter Bacon Darwin <pbacondarwin@cloudflare.com>
  • Loading branch information
sesteves and petebacondarwin authored May 8, 2024
1 parent 0ce042f commit 66bdad0
Show file tree
Hide file tree
Showing 17 changed files with 411 additions and 37 deletions.
8 changes: 8 additions & 0 deletions .changeset/serious-baboons-look.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"miniflare": minor
"wrangler": minor
---

feature: support delayed delivery in the miniflare's queue simulator.

This change updates the miniflare's queue broker to support delayed delivery of messages, both when sending the message from a producer and when retrying the message from a consumer.
4 changes: 4 additions & 0 deletions packages/miniflare/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ details on the multipart implementation.
- **`POST /message`:**
- Optional `X-Msg-Fmt` request header is one of "text", "json", "bytes", or
"v8" (defaults to "v8"), and instructs how to interpret the body
- Optional `X-Msg-Delay-Sec` request header sets the number of seconds to
delay the delivery of this message (value between `0` and `42300` inclusive)
- Request body is encoded message body
- 200 response: empty body if enqueued
- 413 response: message too large
Expand All @@ -311,6 +313,8 @@ details on the multipart implementation.
- `CF-Queue-Largest-Msg` request header is size in bytes of largest message in
batch
- `CF-Queue-Batch-Bytes` request header is size in bytes of entire batch
- Optional `X-Msg-Delay-Sec` request header sets the number of seconds to
delay the delivery of this batch (value between `0` and `42300` inclusive)
- Request body is JSON-encoded `{ messages: QueueIncomingMessage[] }`
- 200 response: empty body if all messages enqueued
- 413 response: batch or individual message too large
Expand Down
22 changes: 19 additions & 3 deletions packages/miniflare/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,17 @@ level will be logged.
Creates a new logger that logs nothing to the `console`, and always `throw`s
`message`s logged at the `ERROR` level.

### `interface QueueProducerOptions`

- `queueName: string`

The name of the queue where messages will be sent by the producer.

- `deliveryDelay?: number`

Default number of seconds to delay the delivery of messages to consumers.
Value between `0` (no delay) and `42300` (12 hours).

### `interface QueueConsumerOptions`

- `maxBatchSize?: number`
Expand All @@ -192,6 +203,11 @@ level will be logged.
Name of another Queue to send a message on if it fails processing after
`maxRetries`. If this isn't specified, failed messages will be discarded.

- `retryDelay?: number`

Number of seconds to delay the (re-)delivery of messages by default. Value
between `0` (no delay) and `42300` (12 hours).

### `interface WorkerOptions`

Options for an individual Worker/"nanoservice". All bindings are accessible on
Expand Down Expand Up @@ -534,12 +550,12 @@ parameter in module format Workers.

#### Queues

- `queueProducers?: Record<string, string> | string[]`
- `queueProducers?: Record<string, QueueProducerOptions> | string[]`

Record mapping binding name to queue names to inject as `WorkerQueue` bindings
Record mapping binding name to queue options to inject as `WorkerQueue` bindings
into this Worker. Different Workers may bind to the same queue name with
different binding names. If a `string[]` of binding names is specified, the
binding name and queue name are assumed to be the same.
binding name and queue name (part of the queue options) are assumed to be the same.

- `queueConsumers?: Record<string, QueueConsumerOptions> | string[]`

Expand Down
25 changes: 25 additions & 0 deletions packages/miniflare/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import {
ProxyClient,
QUEUES_PLUGIN_NAME,
QueueConsumers,
QueueProducers,
QueuesError,
R2_PLUGIN_NAME,
ReplaceWorkersTypes,
Expand Down Expand Up @@ -388,6 +389,28 @@ function getWrappedBindingNames(
return wrappedBindingWorkerNames;
}

function getQueueProducers(allWorkerOpts: PluginWorkerOptions[]): QueueProducers {
const queueProducers: QueueProducers = new Map();
for (const workerOpts of allWorkerOpts) {
const workerName = workerOpts.core.name ?? "";
let workerProducers = workerOpts.queues.queueProducers;

if (workerProducers !== undefined) {
// De-sugar array consumer options to record mapping to empty options
if (Array.isArray(workerProducers)) {
workerProducers = Object.fromEntries(
workerProducers.map((bindingName) => [bindingName, { queueName: bindingName }])
);
}

for (const [bindingName, opts] of Object.entries(workerProducers)) {
queueProducers.set(bindingName, { workerName, ...opts });
}
}
}
return queueProducers;
}

function getQueueConsumers(
allWorkerOpts: PluginWorkerOptions[]
): QueueConsumers {
Expand Down Expand Up @@ -1015,6 +1038,7 @@ export class Miniflare {
allWorkerOpts,
durableObjectClassNames
);
const queueProducers = getQueueProducers(allWorkerOpts);
const queueConsumers = getQueueConsumers(allWorkerOpts);
const allWorkerRoutes = getWorkerRoutes(allWorkerOpts, wrappedBindingNames);
const workerNames = [...allWorkerRoutes.keys()];
Expand Down Expand Up @@ -1137,6 +1161,7 @@ export class Miniflare {
wrappedBindingNames,
durableObjectClassNames,
unsafeEphemeralDurableObjects,
queueProducers,
queueConsumers,
};
for (const [key, plugin] of PLUGIN_ENTRIES) {
Expand Down
42 changes: 35 additions & 7 deletions packages/miniflare/src/plugins/queues/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
import {
QueueBindings,
QueueConsumerOptionsSchema,
QueueProducerOptionsSchema,
SharedBindings,
} from "../../workers";
import { getUserServiceName } from "../core";
Expand All @@ -17,14 +18,12 @@ import {
SERVICE_LOOPBACK,
getMiniflareObjectBindings,
kProxyNodeBinding,
namespaceEntries,
namespaceKeys,
objectEntryWorker,
objectEntryWorker
} from "../shared";

export const QueuesOptionsSchema = z.object({
queueProducers: z
.union([z.record(z.string()), z.string().array()])
.union([z.record(QueueProducerOptionsSchema), z.string().array(), z.record(z.string())])
.optional(),
queueConsumers: z
.union([z.record(QueueConsumerOptionsSchema), z.string().array()])
Expand All @@ -42,23 +41,24 @@ const QUEUE_BROKER_OBJECT: Worker_Binding_DurableObjectNamespaceDesignator = {
export const QUEUES_PLUGIN: Plugin<typeof QueuesOptionsSchema> = {
options: QueuesOptionsSchema,
getBindings(options) {
const queues = namespaceEntries(options.queueProducers);
const queues = bindingEntries(options.queueProducers);
return queues.map<Worker_Binding>(([name, id]) => ({
name,
queue: { name: `${SERVICE_QUEUE_PREFIX}:${id}` },
}));
},
getNodeBindings(options) {
const queues = namespaceKeys(options.queueProducers);
const queues = bindingKeys(options.queueProducers);
return Object.fromEntries(queues.map((name) => [name, kProxyNodeBinding]));
},
async getServices({
options,
workerNames,
queueProducers: allQueueProducers,
queueConsumers: allQueueConsumers,
unsafeStickyBlobs,
}) {
const queues = namespaceEntries(options.queueProducers);
const queues = bindingEntries(options.queueProducers);
if (queues.length === 0) return [];

const services = queues.map<Service>(([_, id]) => ({
Expand Down Expand Up @@ -96,6 +96,10 @@ export const QUEUES_PLUGIN: Plugin<typeof QueuesOptionsSchema> = {
className: QUEUE_BROKER_OBJECT_CLASS_NAME,
},
},
{
name: QueueBindings.MAYBE_JSON_QUEUE_PRODUCERS,
json: JSON.stringify(Object.fromEntries(allQueueProducers)),
},
{
name: QueueBindings.MAYBE_JSON_QUEUE_CONSUMERS,
json: JSON.stringify(Object.fromEntries(allQueueConsumers)),
Expand All @@ -113,4 +117,28 @@ export const QUEUES_PLUGIN: Plugin<typeof QueuesOptionsSchema> = {
},
};

function bindingEntries(
namespaces?: Record<string, { queueName: string, deliveryDelay?: number }> | string[] | Record<string, string>
): [bindingName: string, id: string][] {
if (Array.isArray(namespaces)) {
return namespaces.map((bindingName) => [bindingName, bindingName]);
} else if (namespaces !== undefined) {
return Object.entries(namespaces).map(([name, opts]) => [name, typeof opts === 'string' ? opts : opts.queueName]);
} else {
return [];
}
}

function bindingKeys(
namespaces?: Record<string, { queueName: string, deliveryDelay?: number }> | string[] | Record<string, string>
): string[] {
if (Array.isArray(namespaces)) {
return namespaces;
} else if (namespaces !== undefined) {
return Object.keys(namespaces);
} else {
return [];
}
}

export * from "./errors";
6 changes: 5 additions & 1 deletion packages/miniflare/src/plugins/shared/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
OptionalZodTypeOf,
PathSchema,
} from "../../shared";
import { Awaitable, QueueConsumerSchema, sanitisePath } from "../../workers";
import { Awaitable, QueueConsumerSchema, QueueProducerSchema, sanitisePath } from "../../workers";
import { UnsafeUniqueKey } from "./constants";

export const DEFAULT_PERSIST_ROOT = ".mf";
Expand Down Expand Up @@ -45,6 +45,9 @@ export type DurableObjectClassNames = Map<
>
>;

// Maps queue names to producer worker options.
export type QueueProducers = Map<string, z.infer<typeof QueueProducerSchema>>;

// Maps queue names to the Worker that wishes to consume it. Note each queue
// can only be consumed by one Worker, but one Worker may consume multiple
// queues. Support for multiple consumers of a single queue is not planned
Expand All @@ -70,6 +73,7 @@ export interface PluginServicesOptions<
wrappedBindingNames: WrappedBindingNames;
durableObjectClassNames: DurableObjectClassNames;
unsafeEphemeralDurableObjects: boolean;
queueProducers: QueueProducers;
queueConsumers: QueueConsumers;
}

Expand Down
Loading

0 comments on commit 66bdad0

Please sign in to comment.