Skip to content

Commit

Permalink
Add pipeline binding to wrangler.toml (#6674)
Browse files Browse the repository at this point in the history
* Add [[pipelines]] binding in wrangler.

* chore: fix lint and formatting errors

* chore: fix test

* chore: remove only

* chore: update wording

* chore: fix tests

---------

Co-authored-by: Oli Yu <oli@cloudflare.com>
Co-authored-by: Andy Jessop <ajessop@cloudflare.com>
  • Loading branch information
3 people authored Sep 12, 2024
1 parent 7579bd8 commit 831f892
Show file tree
Hide file tree
Showing 18 changed files with 271 additions and 2 deletions.
12 changes: 12 additions & 0 deletions .changeset/angry-keys-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"wrangler": minor
---

Added new [[pipelines]] bindings. This creates a new binding that allows sending events to
the specified pipeline.

Example:

[[pipelines]]
binding = "MY_PIPELINE"
pipeline = "my-pipeline"
2 changes: 2 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ packages/create-cloudflare/templates/**/*.*
# but still exclude the worker-configuration.d.ts file, since it's generated
!packages/create-cloudflare/templates/hello-world/**/*.*
packages/create-cloudflare/templates/hello-world/**/worker-configuration.d.ts
# dist-functions are generated in the fixtures/vitest-pool-workers-examples/pages-functions-unit-integration-self folder
dist-functions

vscode.d.ts
vscode.*.d.ts
Expand Down
109 changes: 109 additions & 0 deletions packages/wrangler/src/__tests__/configuration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ describe("normalizeAndValidateConfig()", () => {
upload_source_maps: undefined,
placement: undefined,
tail_consumers: undefined,
pipelines: [],
});
expect(diagnostics.hasErrors()).toBe(false);
expect(diagnostics.hasWarnings()).toBe(false);
Expand Down Expand Up @@ -3181,6 +3182,114 @@ describe("normalizeAndValidateConfig()", () => {
});
});

describe("[pipelines]", () => {
it("should error if pipelines is an object", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: {} },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got {}."
`);
});

it("should error if pipelines is a string", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: "BAD" },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got \\"BAD\\"."
`);
});

it("should error if pipelines is a number", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: 999 },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got 999."
`);
});

it("should error if pipelines is null", () => {
const { diagnostics } = normalizeAndValidateConfig(
// @ts-expect-error purposely using an invalid value
{ pipelines: null },
undefined,
{ env: undefined }
);

expect(diagnostics.hasWarnings()).toBe(false);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- The field \\"pipelines\\" should be an array but got null."
`);
});

it("should accept valid bindings", () => {
const { diagnostics } = normalizeAndValidateConfig(
{
pipelines: [
{
binding: "VALID",
pipeline: "343cd4f1d58c42fbb5bd082592fd7143",
},
],
} as unknown as RawConfig,
undefined,
{ env: undefined }
);

expect(diagnostics.hasErrors()).toBe(false);
});

it("should error if pipelines.bindings are not valid", () => {
const { diagnostics } = normalizeAndValidateConfig(
{
pipelines: [
{},
{
binding: "VALID",
pipeline: "343cd4f1d58c42fbb5bd082592fd7143",
},
{ binding: 2000, project: 2111 },
],
} as unknown as RawConfig,
undefined,
{ env: undefined }
);
expect(diagnostics.renderWarnings()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- Unexpected fields found in pipelines[2] field: \\"project\\""
`);
expect(diagnostics.hasWarnings()).toBe(true);
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- \\"pipelines[0]\\" bindings must have a string \\"binding\\" field but got {}.
- \\"pipelines[0]\\" bindings must have a string \\"pipeline\\" field but got {}.
- \\"pipelines[2]\\" bindings must have a string \\"binding\\" field but got {\\"binding\\":2000,\\"project\\":2111}.
- \\"pipelines[2]\\" bindings must have a string \\"pipeline\\" field but got {\\"binding\\":2000,\\"project\\":2111}."
`);
});
});

describe("[unsafe.bindings]", () => {
it("should error if unsafe is an array", () => {
const { diagnostics } = normalizeAndValidateConfig(
Expand Down
37 changes: 37 additions & 0 deletions packages/wrangler/src/__tests__/deploy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10615,6 +10615,43 @@ export default{
});
});

describe("pipelines", () => {
it("should upload pipelines bindings", async () => {
writeWranglerToml({
pipelines: [
{
binding: "MY_PIPELINE",
pipeline: "0123456789ABCDEF0123456789ABCDEF",
},
],
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest({
expectedBindings: [
{
type: "pipelines",
name: "MY_PIPELINE",
id: "0123456789ABCDEF0123456789ABCDEF",
},
],
});

await runWrangler("deploy index.js");
expect(std.out).toMatchInlineSnapshot(`
"Total Upload: xx KiB / gzip: xx KiB
Worker Startup Time: 100 ms
Your worker has access to the following bindings:
- Pipelines:
- MY_PIPELINE: 0123456789ABCDEF0123456789ABCDEF
Uploaded test-name (TIMINGS)
Deployed test-name triggers (TIMINGS)
https://test-name.test-sub-domain.workers.dev
Current Version ID: Galaxy-Class"
`);
});
});

describe("--keep-vars", () => {
it("should send keepVars when keep-vars is passed in", async () => {
vi.stubEnv("CLOUDFLARE_API_TOKEN", "hunter2");
Expand Down
1 change: 1 addition & 0 deletions packages/wrangler/src/__tests__/type-generation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ const bindingsConfigMock: Omit<
},
{ type: "CompiledWasm", globs: ["**/*.wasm"], fallthrough: true },
],
pipelines: [],
};

describe("generateTypes()", () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ function createWorkerBundleFormData(
text_blobs: undefined,
data_blobs: undefined,
dispatch_namespaces: undefined,
pipelines: undefined,
logfwdr: undefined,
unsafe: undefined,
experimental_assets: undefined,
Expand Down
2 changes: 2 additions & 0 deletions packages/wrangler/src/api/startDevWorker/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {
CfLogfwdrBinding,
CfModule,
CfMTlsCertificate,
CfPipeline,
CfQueue,
CfR2Bucket,
CfScriptFormat,
Expand Down Expand Up @@ -261,6 +262,7 @@ export type Binding =
| ({ type: "analytics_engine" } & Omit<CfAnalyticsEngineDataset, "binding">)
| ({ type: "dispatch_namespace" } & Omit<CfDispatchNamespace, "binding">)
| ({ type: "mtls_certificate" } & Omit<CfMTlsCertificate, "binding">)
| ({ type: "pipeline" } & Omit<CfPipeline, "binding">)
| ({ type: "logfwdr" } & Omit<CfLogfwdrBinding, "name">)
| { type: `unsafe_${string}` }
| { type: "assets" };
Expand Down
10 changes: 10 additions & 0 deletions packages/wrangler/src/api/startDevWorker/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ export function convertCfWorkerInitBindingstoBindings(
output[info["binding"]] = { type: "assets" };
break;
}
case "pipelines": {
for (const { binding, ...x } of info) {
output[binding] = { type: "pipeline", ...x };
}
break;
}
default: {
assertNever(type);
}
Expand Down Expand Up @@ -282,6 +288,7 @@ export async function convertBindingsToCfWorkerInitBindings(
logfwdr: undefined,
unsafe: undefined,
experimental_assets: undefined,
pipelines: undefined,
};

const fetchers: Record<string, ServiceFetch> = {};
Expand Down Expand Up @@ -354,6 +361,9 @@ export async function convertBindingsToCfWorkerInitBindings(
} else if (binding.type === "mtls_certificate") {
bindings.mtls_certificates ??= [];
bindings.mtls_certificates.push({ ...binding, binding: name });
} else if (binding.type === "pipeline") {
bindings.pipelines ??= [];
bindings.pipelines.push({ ...binding, binding: name });
} else if (binding.type === "logfwdr") {
bindings.logfwdr ??= { bindings: [] };
bindings.logfwdr.bindings.push({ ...binding, name: name });
Expand Down
1 change: 1 addition & 0 deletions packages/wrangler/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,5 @@ export const defaultWranglerConfig: Config = {
},
mtls_certificates: [],
tail_consumers: undefined,
pipelines: [],
};
17 changes: 17 additions & 0 deletions packages/wrangler/src/config/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,23 @@ export interface EnvironmentNonInheritable {
/** Details about the outbound Worker which will handle outbound requests from your namespace */
outbound?: DispatchNamespaceOutbound;
}[];

/**
* Specifies list of Pipelines bound to this Worker environment
*
* NOTE: This field is not automatically inherited from the top level environment,
* and so must be specified in every named environment.
*
* @default `[]`
* @nonInheritable
*/
pipelines: {
/** The binding name used to refer to the bound service. */
binding: string;

/** Name of the Pipeline to bind */
pipeline: string;
}[];
}

/**
Expand Down
15 changes: 13 additions & 2 deletions packages/wrangler/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import type { NormalizeAndValidateConfigArgs } from "./validation";

export type {
Config,
RawConfig,
ConfigFields,
DevConfig,
RawConfig,
RawDevConfig,
} from "./config";
export type {
ConfigModuleRuleType,
Environment,
RawEnvironment,
ConfigModuleRuleType,
} from "./environment";

type ReadConfigCommandArgs = NormalizeAndValidateConfigArgs & {
Expand Down Expand Up @@ -232,6 +232,7 @@ export function printBindings(bindings: CfWorkerInit["bindings"]) {
wasm_modules,
dispatch_namespaces,
mtls_certificates,
pipelines,
} = bindings;

if (data_blobs !== undefined && Object.keys(data_blobs).length > 0) {
Expand Down Expand Up @@ -443,6 +444,16 @@ export function printBindings(bindings: CfWorkerInit["bindings"]) {
});
}

if (pipelines?.length) {
output.push({
type: "Pipelines",
entries: pipelines.map(({ binding, pipeline }) => ({
key: binding,
value: pipeline,
})),
});
}

if (version_metadata !== undefined) {
output.push({
type: "Worker Version Metadata",
Expand Down
45 changes: 45 additions & 0 deletions packages/wrangler/src/config/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1456,6 +1456,16 @@ function normalizeAndValidateEnvironment(
validateAIBinding(envName),
undefined
),
pipelines: notInheritable(
diagnostics,
topLevelEnv,
rawConfig,
rawEnv,
envName,
"pipelines",
validateBindingArray(envName, validatePipelineBinding),
[]
),
version_metadata: notInheritable(
diagnostics,
topLevelEnv,
Expand Down Expand Up @@ -2213,6 +2223,7 @@ const validateUnsafeBinding: ValidatorFn = (diagnostics, field, value) => {
"service",
"logfwdr",
"mtls_certificate",
"pipeline",
];

if (safeBindings.includes(value.type)) {
Expand Down Expand Up @@ -3115,6 +3126,40 @@ const validateConsumer: ValidatorFn = (diagnostics, field, value, _config) => {
return isValid;
};

const validatePipelineBinding: ValidatorFn = (diagnostics, field, value) => {
if (typeof value !== "object" || value === null) {
diagnostics.errors.push(
`"pipeline" bindings should be objects, but got ${JSON.stringify(value)}`
);
return false;
}
let isValid = true;
// Pipeline bindings must have a binding and a pipeline.
if (!isRequiredProperty(value, "binding", "string")) {
diagnostics.errors.push(
`"${field}" bindings must have a string "binding" field but got ${JSON.stringify(
value
)}.`
);
isValid = false;
}
if (!isRequiredProperty(value, "pipeline", "string")) {
diagnostics.errors.push(
`"${field}" bindings must have a string "pipeline" field but got ${JSON.stringify(
value
)}.`
);
isValid = false;
}

validateAdditionalProperties(diagnostics, field, Object.keys(value), [
"binding",
"pipeline",
]);

return isValid;
};

function normalizeAndValidateLimits(
diagnostics: Diagnostics,
topLevelEnv: Environment | undefined,
Expand Down
Loading

0 comments on commit 831f892

Please sign in to comment.