Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Miniflare 3] Add support for routing to multiple Workers #520

Merged
merged 9 commits into from
Mar 11, 2023
50 changes: 45 additions & 5 deletions packages/tre/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
Plugins,
SERVICE_ENTRY,
SOCKET_ENTRY,
getGlobalServices,
maybeGetSitesManifestModule,
normaliseDurableObject,
} from "./plugins";
Expand Down Expand Up @@ -100,6 +101,9 @@ function validateOptions(
const sharedOpts = opts;
const multipleWorkers = "workers" in opts;
const workerOpts = multipleWorkers ? opts.workers : [opts];
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
if (workerOpts.length === 0) {
throw new MiniflareCoreError("ERR_NO_WORKERS", "No workers defined");
}

// Initialise return values
const pluginSharedOpts = {} as PluginSharedOptions;
Expand All @@ -119,6 +123,19 @@ function validateOptions(
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The // @ts-expect-errors appear to be because PluginSharedOptions has readonly keys, which can be fixed (and the errors removed) by removing the as const in packages/tre/src/plugins/index.ts. Would it be possible to make that change, or would it break something else? If it's not possible, could the // @ts-expect-error comments be updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I think removing the as const should be ok. It looks like that still keeps the specific Zod types so options inference/completions still work. Will make that change. 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

839b02c (unfortunately the change to enforce code means CoreOptionsSchema has required options that aren't satisfied by the other options types, so we still need some // @ts-expect-errors)

}

// Validate names unique
const names = new Set<string>();
for (const opts of pluginWorkerOpts) {
const name = opts.core.name ?? "";
if (names.has(name)) {
throw new MiniflareCoreError(
"ERR_DUPLICATE_NAME",
`Multiple workers defined with the same name: "${name}"`
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably have specific handling for multiple unnamed workers, since this error message won't be very helpful with an empty string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
names.add(name);
}

return [pluginSharedOpts, pluginWorkerOpts];
}

Expand Down Expand Up @@ -150,6 +167,19 @@ function getDurableObjectClassNames(
return serviceClassNames;
}

// Collects all routes from all worker services
function getWorkerRoutes(
allWorkerOpts: PluginWorkerOptions[]
): Map<string, string[]> {
const allRoutes = new Map<string, string[]>();
for (const workerOpts of allWorkerOpts) {
if (workerOpts.core.routes !== undefined) {
allRoutes.set(workerOpts.core.name ?? "", workerOpts.core.routes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are multiple unnamed workers, will this overwrite some routes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if worker names should be required

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would, but we've validate names are unique beforehand. Perhaps an assertion that allRoutes doesn't contain the key would be a good idea?

I wonder if worker names should be required

In the single worker case, I don't think they should be. I do agree with you later on though that some of the error messages should be better in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
return allRoutes;
}

// ===== `Miniflare` Internal Storage & Routing =====
type OptionalGatewayFactoryType<
Gateway extends GatewayConstructor<any> | undefined
Expand Down Expand Up @@ -622,7 +652,16 @@ export class Miniflare {

sharedOpts.core.cf = await setupCf(this.#log, sharedOpts.core.cf);

const services: Service[] = [];
const durableObjectClassNames = getDurableObjectClassNames(allWorkerOpts);
const allWorkerRoutes = getWorkerRoutes(allWorkerOpts);

const services: Service[] = getGlobalServices({
optionsVersion,
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
sharedOptions: sharedOpts.core,
allWorkerRoutes,
fallbackWorkerName: this.#workerOpts[0].core.name,
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
loopbackPort,
});
const sockets: Socket[] = [
{
name: SOCKET_ENTRY,
Expand All @@ -633,10 +672,13 @@ export class Miniflare {
},
];

const durableObjectClassNames = getDurableObjectClassNames(allWorkerOpts);

// Dedupe services by name
const serviceNames = new Set<string>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serviceNames seems to just be used for de-duplication of services. What about making services a Map<string, Service>, getting rid of serviceNames, and then returning services.values()? It feels like serviceNames and services could easily get out of sync as this code evolves (if another services source other than just global/plugiin specific is added, it seems like it would be easy to forgot to add the necessary additional de-duping code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a much better idea 😃 , will make that switch 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (const service of services) {
// Global services should all have unique names
assert(service.name !== undefined && !serviceNames.has(service.name));
serviceNames.add(service.name);
}

for (let i = 0; i < allWorkerOpts.length; i++) {
const workerOpts = allWorkerOpts[i];
Expand All @@ -662,13 +704,11 @@ export class Miniflare {
const pluginServices = await plugin.getServices({
log: this.#log,
options: workerOpts[key],
optionsVersion,
sharedOptions: sharedOpts[key],
workerBindings,
workerIndex: i,
durableObjectClassNames,
additionalModules,
loopbackPort,
tmpPath: this.#tmpPath,
});
if (pluginServices !== undefined) {
Expand Down
9 changes: 2 additions & 7 deletions packages/tre/src/plugins/cache/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { z } from "zod";
import { Worker_Binding } from "../../runtime";
import { SERVICE_LOOPBACK } from "../core";
import {
BINDING_SERVICE_LOOPBACK,
BINDING_TEXT_PERSIST,
Expand All @@ -9,6 +7,7 @@ import {
HEADER_PERSIST,
PersistenceSchema,
Plugin,
WORKER_BINDING_SERVICE_LOOPBACK,
encodePersist,
} from "../shared";
import { HEADER_CACHE_WARN_USAGE } from "./constants";
Expand Down Expand Up @@ -69,10 +68,6 @@ export const CACHE_PLUGIN: Plugin<
},
getServices({ sharedOptions, options, workerIndex }) {
const persistBinding = encodePersist(sharedOptions.cachePersist);
const loopbackBinding: Worker_Binding = {
name: BINDING_SERVICE_LOOPBACK,
service: { name: SERVICE_LOOPBACK },
};
return [
{
name: getCacheServiceName(workerIndex),
Expand All @@ -87,7 +82,7 @@ export const CACHE_PLUGIN: Plugin<
name: BINDING_JSON_CACHE_WARN_USAGE,
json: JSON.stringify(options.cacheWarnUsage ?? false),
},
loopbackBinding,
WORKER_BINDING_SERVICE_LOOPBACK,
],
compatibilityDate: "2022-09-01",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the spirit of extracting shared options, the compatibility date could probably also be extracted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

},
Expand Down
160 changes: 98 additions & 62 deletions packages/tre/src/plugins/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ import { getCacheServiceName } from "../cache";
import { DURABLE_OBJECTS_STORAGE_SERVICE_NAME } from "../do";
import {
BINDING_SERVICE_LOOPBACK,
CORE_PLUGIN_NAME,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the move of CORE_PLUGIN_NAME out of core?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was so we could use it in SERVICE_LOOPBACK without a cyclic dependency on core:

export const SERVICE_LOOPBACK = `${CORE_PLUGIN_NAME}:loopback`;

Though I think moving global services to shared makes a lot more sense. Then we could just rename this loopback or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloudflareFetchSchema,
HEADER_CF_BLOB,
Plugin,
SERVICE_LOOPBACK,
WORKER_BINDING_SERVICE_LOOPBACK,
matchRoutes,
parseRoutes,
} from "../shared";
import { HEADER_ERROR_STACK } from "./errors";
import {
Expand Down Expand Up @@ -50,6 +55,8 @@ export const CoreOptionsSchema = z.object({
compatibilityDate: z.string().optional(),
compatibilityFlags: z.string().array().optional(),

routes: z.string().array().optional(),

bindings: z.record(JsonSchema).optional(),
wasmBindings: z.record(z.string()).optional(),
textBlobBindings: z.record(z.string()).optional(),
Expand All @@ -74,10 +81,6 @@ export const CoreSharedOptionsSchema = z.object({
liveReload: z.boolean().optional(),
});

export const CORE_PLUGIN_NAME = "core";

// Service looping back to Miniflare's Node.js process (for storage, etc)
export const SERVICE_LOOPBACK = `${CORE_PLUGIN_NAME}:loopback`;
// Service for HTTP socket entrypoint (for checking runtime ready, routing, etc)
export const SERVICE_ENTRY = `${CORE_PLUGIN_NAME}:entry`;
// Service prefix for all regular user workers
Expand All @@ -102,9 +105,11 @@ export const HEADER_CUSTOM_SERVICE = "MF-Custom-Service";
export const HEADER_ORIGINAL_URL = "MF-Original-URL";

const BINDING_JSON_VERSION = "MINIFLARE_VERSION";
const BINDING_SERVICE_USER = "MINIFLARE_USER";
const BINDING_SERVICE_USER_ROUTE_PREFIX = "MINIFLARE_USER_ROUTE_";
const BINDING_SERVICE_USER_FALLBACK = "MINIFLARE_USER_FALLBACK";
const BINDING_TEXT_CUSTOM_SERVICE = "MINIFLARE_CUSTOM_SERVICE";
const BINDING_JSON_CF_BLOB = "CF_BLOB";
const BINDING_JSON_ROUTES = "MINIFLARE_ROUTES";
const BINDING_DATA_LIVE_RELOAD_SCRIPT = "MINIFLARE_LIVE_RELOAD_SCRIPT";

const LIVE_RELOAD_SCRIPT_TEMPLATE = (
Expand All @@ -129,7 +134,10 @@ const LIVE_RELOAD_SCRIPT_TEMPLATE = (

// Using `>=` for version check to handle multiple `setOptions` calls before
// reload complete.
export const SCRIPT_ENTRY = `async function handleEvent(event) {
export const SCRIPT_ENTRY = `
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
const matchRoutes = ${matchRoutes.toString()};
mrbbot marked this conversation as resolved.
Show resolved Hide resolved

async function handleEvent(event) {
const probe = event.request.headers.get("${HEADER_PROBE}");
if (probe !== null) {
const probeMin = parseInt(probe);
Expand All @@ -147,11 +155,16 @@ export const SCRIPT_ENTRY = `async function handleEvent(event) {
});
request.headers.delete("${HEADER_ORIGINAL_URL}");

if (globalThis.${BINDING_SERVICE_USER} === undefined) {
let service = globalThis.${BINDING_SERVICE_USER_FALLBACK};
const url = new URL(request.url);
const route = matchRoutes(${BINDING_JSON_ROUTES}, url);
if (route !== null) service = globalThis["${BINDING_SERVICE_USER_ROUTE_PREFIX}" + route];
if (service === undefined) {
return new Response("No entrypoint worker found", { status: 404 });
}

try {
let response = await ${BINDING_SERVICE_USER}.fetch(request);
let response = await service.fetch(request);

if (
response.status === 500 &&
Expand Down Expand Up @@ -319,60 +332,12 @@ export const CORE_PLUGIN: Plugin<
async getServices({
log,
options,
optionsVersion,
workerBindings,
workerIndex,
sharedOptions,
durableObjectClassNames,
additionalModules,
loopbackPort,
}) {
// Define core/shared services.
const loopbackBinding: Worker_Binding = {
name: BINDING_SERVICE_LOOPBACK,
service: { name: SERVICE_LOOPBACK },
};

// Services get de-duped by name, so only the first worker's
// SERVICE_LOOPBACK and SERVICE_ENTRY will be used
const serviceEntryBindings: Worker_Binding[] = [
loopbackBinding, // For converting stack-traces to pretty-error pages
{ name: BINDING_JSON_VERSION, json: optionsVersion.toString() },
{ name: BINDING_JSON_CF_BLOB, json: JSON.stringify(sharedOptions.cf) },
];
if (sharedOptions.liveReload) {
const liveReloadScript = LIVE_RELOAD_SCRIPT_TEMPLATE(loopbackPort);
serviceEntryBindings.push({
name: BINDING_DATA_LIVE_RELOAD_SCRIPT,
data: encoder.encode(liveReloadScript),
});
}
const services: Service[] = [
{
name: SERVICE_LOOPBACK,
external: { http: { cfBlobHeader: HEADER_CF_BLOB } },
},
{
name: SERVICE_ENTRY,
worker: {
serviceWorkerScript: SCRIPT_ENTRY,
compatibilityDate: "2022-09-01",
bindings: serviceEntryBindings,
},
},
// Allow access to private/public addresses:
// https://github.com/cloudflare/miniflare/issues/412
{
name: "internet",
network: {
// Can't use `["public", "private"]` here because of
// https://github.com/cloudflare/workerd/issues/62
allow: ["0.0.0.0/0"],
deny: [],
tlsOptions: { trustBrowserCas: true },
},
},
];
const services: Service[] = [];

// Define regular user worker if script is set
const workerScript = getWorkerScript(options, workerIndex);
Expand Down Expand Up @@ -412,10 +377,13 @@ export const CORE_PLUGIN: Plugin<
cacheApiOutbound: { name: getCacheServiceName(workerIndex) },
},
});
serviceEntryBindings.push({
name: BINDING_SERVICE_USER,
service: { name },
});
} else if (workerIndex === 0 || options.routes?.length) {
throw new MiniflareCoreError(
"ERR_ROUTABLE_NO_SCRIPT",
`Worker [${workerIndex}] ${
options.name === undefined ? "" : `("${options.name}") `
}must have code defined as it's routable or the fallback`
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can any workers have no code? What does it mean for a worker to not have code defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I suppose that doesn't really make sense, since we don't expose workered's other service types (disk, network, etc) outside of serviceBindings. Will make code required for all Workers. 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

// Define custom `fetch` services if set
Expand All @@ -433,7 +401,7 @@ export const CORE_PLUGIN: Plugin<
name: BINDING_TEXT_CUSTOM_SERVICE,
text: `${workerIndex}/${name}`,
},
loopbackBinding,
WORKER_BINDING_SERVICE_LOOPBACK,
],
},
});
Expand All @@ -451,6 +419,74 @@ export const CORE_PLUGIN: Plugin<
},
};

export interface GlobalServicesOptions {
optionsVersion: number;
sharedOptions: z.infer<typeof CoreSharedOptionsSchema>;
allWorkerRoutes: Map<string, string[]>;
fallbackWorkerName: string | undefined;
loopbackPort: number;
}
export function getGlobalServices({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it probably shouldn't be in the core plugin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, maybe. That might help with the moving CORE_PLUGIN_NAME issue you raised later on too. Will move it out into plugins/shared/services.ts or something. 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optionsVersion,
sharedOptions,
allWorkerRoutes,
fallbackWorkerName,
loopbackPort,
}: GlobalServicesOptions): Service[] {
// Collect list of workers we could route to, then parse and sort all routes
const routableWorkers = [...allWorkerRoutes.keys()];
const routes = parseRoutes(allWorkerRoutes);

// Define core/shared services.
const serviceEntryBindings: Worker_Binding[] = [
WORKER_BINDING_SERVICE_LOOPBACK, // For converting stack-traces to pretty-error pages
mrbbot marked this conversation as resolved.
Show resolved Hide resolved
{ name: BINDING_JSON_VERSION, json: optionsVersion.toString() },
{ name: BINDING_JSON_ROUTES, json: JSON.stringify(routes) },
{ name: BINDING_JSON_CF_BLOB, json: JSON.stringify(sharedOptions.cf) },
{
name: BINDING_SERVICE_USER_FALLBACK,
service: { name: getUserServiceName(fallbackWorkerName) },
},
...routableWorkers.map((name) => ({
name: BINDING_SERVICE_USER_ROUTE_PREFIX + name,
service: { name: getUserServiceName(name) },
})),
];
if (sharedOptions.liveReload) {
const liveReloadScript = LIVE_RELOAD_SCRIPT_TEMPLATE(loopbackPort);
serviceEntryBindings.push({
name: BINDING_DATA_LIVE_RELOAD_SCRIPT,
data: encoder.encode(liveReloadScript),
});
}
return [
{
name: SERVICE_LOOPBACK,
external: { http: { cfBlobHeader: HEADER_CF_BLOB } },
},
{
name: SERVICE_ENTRY,
worker: {
serviceWorkerScript: SCRIPT_ENTRY,
compatibilityDate: "2022-09-01",
bindings: serviceEntryBindings,
},
},
// Allow access to private/public addresses:
// https://github.com/cloudflare/miniflare/issues/412
{
name: "internet",
network: {
// Can't use `["public", "private"]` here because of
// https://github.com/cloudflare/workerd/issues/62
allow: ["0.0.0.0/0"],
deny: [],
tlsOptions: { trustBrowserCas: true },
},
},
];
}

function getWorkerScript(
options: z.infer<typeof CoreOptionsSchema>,
workerIndex: number
Expand Down
Loading