Skip to content

Commit

Permalink
🪟 🎉 Select dbt jobs with dropdown (#19502)
Browse files Browse the repository at this point in the history
* Add wrapper for cloud dbt endpoint

Also includes a cheeky little test usage which probably should have had
a name starting with an underscore (I did not commit the test code which
added the new variable's contents to `(window as any).availableJobs`, on
grounds that it was very ugly, but I did want to have the import and
call of the wrapper function hit the git history here).

* Add dbt Cloud jobs via dropdown (WIP: breaks if no integration)

Selecting job from dropdown adds it to saved jobs, but the new endpoint
is unconditionally called even though it will always throw an error for
users with no dbt Cloud integration configured.

* Refactor to stop errors in non-dbt-integrated workspaces

Well, I suppose throwing a runtime error for every connection which
could support dbt Cloud jobs but is part of a workspace with no
integration set up, but that doesn't exactly seem ideal.

Instead, this pulls all logic out of the top-level card except for
pulling the dbt Cloud integration information; then it delegates the
rest to either of two fairly self-contained components,
`NoDbtIntegration` or the new `DbtJobsForm`. The immediate benefit is
that I have a nice component boundary in which I can unconditionally
run dbt-Cloud-only logic to fetch available jobs.

* Filter already-selected jobs out of dropdown

* Use dbt's jobNames and read-only {account,job}Id in job list

* Remove obsolete yup validations for dbt Cloud jobs

Since the values are now supplied by dbt Cloud via API, the user no
longer has to manually input anything; and this sudden lack of user
input rather obviates the need to validate user input.

* Add button loading state when saving dbt Cloud jobs
  • Loading branch information
ambirdsall authored Nov 17, 2022
1 parent ec7963d commit 1dbad96
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 135 deletions.
55 changes: 55 additions & 0 deletions airbyte-webapp/src/packages/cloud/lib/domain/dbtCloud/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { apiOverride } from "core/request/apiOverride";

/**
* Get the available dbt Cloud jobs associated with the given workspace config.
*/
export interface WorkspaceGetDbtJobsRequest {
workspaceId: WorkspaceId;
/** The config id associated with the dbt Cloud config, references the webhookConfigId in the core API. */
dbtConfigId: string;
}

/**
* The available dbt Cloud jobs for the requested workspace config
*/
export interface WorkspaceGetDbtJobsResponse {
availableDbtJobs: DbtCloudJobInfo[];
}

/**
* A dbt Cloud job
*/
export interface DbtCloudJobInfo {
/** The account id associated with the job */
accountId: number;
/** The the specific job id returned by the dbt Cloud API */
jobId: number;
/** The human-readable name of the job returned by the dbt Cloud API */
jobName: string;
}

/**
* @summary Calls the dbt Cloud `List Accounts` and `List jobs` APIs to get the list of available jobs for the dbt auth token associated with the requested workspace config.
*/
export const webBackendGetAvailableDbtJobsForWorkspace = (
workspaceGetDbtJobsRequest: WorkspaceGetDbtJobsRequest,
options?: SecondParameter<typeof apiOverride>
) => {
return apiOverride<WorkspaceGetDbtJobsResponse>(
{
url: `/v1/web_backend/cloud_workspaces/get_available_dbt_jobs`,
method: "post",
headers: { "Content-Type": "application/json" },
data: workspaceGetDbtJobsRequest,
},
options
);
};

/**
* Workspace Id from OSS Airbyte instance
*/
export type WorkspaceId = string;

// eslint-disable-next-line
type SecondParameter<T extends (...args: any) => any> = T extends (config: any, args: infer P) => any ? P : never;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./api";
126 changes: 87 additions & 39 deletions airbyte-webapp/src/packages/cloud/services/dbtCloud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,45 +9,64 @@
// - custom domains aren't yet supported

import isEmpty from "lodash/isEmpty";
import { useMutation } from "react-query";
import { useMutation, useQuery } from "react-query";

import { OperatorType, WebBackendConnectionRead, OperationRead, WebhookConfigRead } from "core/request/AirbyteClient";
import { useWebConnectionService } from "hooks/services/useConnectionHook";
import { useCurrentWorkspace } from "hooks/services/useWorkspace";
import {
DbtCloudJobInfo,
webBackendGetAvailableDbtJobsForWorkspace,
WorkspaceGetDbtJobsResponse,
} from "packages/cloud/lib/domain/dbtCloud/api";
import { useDefaultRequestMiddlewares } from "services/useDefaultRequestMiddlewares";
import { useUpdateWorkspace } from "services/workspaces/WorkspacesService";

import { useConfig } from "./config";

export interface DbtCloudJob {
account: string;
job: string;
operationId?: string;
jobName?: string;
}
export type { DbtCloudJobInfo } from "packages/cloud/lib/domain/dbtCloud/api";
const dbtCloudDomain = "https://cloud.getdbt.com";
const webhookConfigName = "dbt cloud";
const executionBody = `{"cause": "airbyte"}`;
const jobName = (t: DbtCloudJob) => `${t.account}/${t.job}`;

const isDbtWebhookConfig = (webhookConfig: WebhookConfigRead) => !!webhookConfig.name?.includes("dbt");

const toDbtCloudJob = (operation: OperationRead): DbtCloudJob => {
const { operationId } = operation;
const { executionUrl } = operation.operatorConfiguration.webhook || {};
export const toDbtCloudJob = (operationOrCloudJob: OperationRead | DbtCloudJobInfo): DbtCloudJob => {
if ("operationId" in operationOrCloudJob) {
const { operationId } = operationOrCloudJob;
const { executionUrl } = operationOrCloudJob.operatorConfiguration.webhook || {};

const matches = (executionUrl || "").match(/\/accounts\/([^/]+)\/jobs\/([^]+)\/run/);
if (!matches) {
throw new Error(`Cannot extract dbt cloud job params from executionUrl ${executionUrl}`);
} else {
const [, account, job] = matches;
const matches = (executionUrl || "").match(/\/accounts\/([^/]+)\/jobs\/([^]+)\/run/);
if (!matches) {
throw new Error(`Cannot extract dbt cloud job params from executionUrl ${executionUrl}`);
} else {
const [, account, job] = matches;

return {
account,
job,
operationId,
};
return {
account,
job,
operationId,
};
}
} else {
const { accountId, jobId, jobName } = operationOrCloudJob;
return { account: `${accountId}`, job: `${jobId}`, jobName };
}
};

const isDbtCloudJob = (operation: OperationRead): boolean =>
operation.operatorConfiguration.operatorType === OperatorType.webhook;

export const isSameJob = (remoteJob: DbtCloudJobInfo, savedJob: DbtCloudJob): boolean =>
savedJob.account === `${remoteJob.accountId}` && savedJob.job === `${remoteJob.jobId}`;

export const useSubmitDbtCloudIntegrationConfig = () => {
const { workspaceId } = useCurrentWorkspace();
const { mutateAsync: updateWorkspace } = useUpdateWorkspace();
Expand Down Expand Up @@ -78,35 +97,64 @@ export const useDbtIntegration = (connection: WebBackendConnectionRead) => {
);
const otherOperations = [...(connection.operations?.filter((operation) => !isDbtCloudJob(operation)) || [])];

const saveJobs = (jobs: DbtCloudJob[]) => {
// TODO dynamically use the workspace's configured dbt cloud domain when it gets returned by backend
const urlForJob = (job: DbtCloudJob) => `${dbtCloudDomain}/api/v2/accounts/${job.account}/jobs/${job.job}/run/`;

return connectionService.update({
connectionId: connection.connectionId,
operations: [
...otherOperations,
...jobs.map((job) => ({
workspaceId,
...(job.operationId ? { operationId: job.operationId } : {}),
name: jobName(job),
operatorConfiguration: {
operatorType: OperatorType.webhook,
webhook: {
executionUrl: urlForJob(job),
// if `hasDbtIntegration` is true, webhookConfigId is guaranteed to exist
...(webhookConfigId ? { webhookConfigId } : {}),
executionBody,
const { mutateAsync, isLoading } = useMutation({
mutationFn: (jobs: DbtCloudJob[]) => {
// TODO dynamically use the workspace's configured dbt cloud domain when it gets returned by backend
const urlForJob = (job: DbtCloudJob) => `${dbtCloudDomain}/api/v2/accounts/${job.account}/jobs/${job.job}/run/`;

return connectionService.update({
connectionId: connection.connectionId,
operations: [
...otherOperations,
...jobs.map((job) => ({
workspaceId,
...(job.operationId ? { operationId: job.operationId } : {}),
name: jobName(job),
operatorConfiguration: {
operatorType: OperatorType.webhook,
webhook: {
executionUrl: urlForJob(job),
// if `hasDbtIntegration` is true, webhookConfigId is guaranteed to exist
...(webhookConfigId ? { webhookConfigId } : {}),
executionBody,
},
},
},
})),
],
});
};
})),
],
});
},
});

return {
hasDbtIntegration,
dbtCloudJobs,
saveJobs,
saveJobs: mutateAsync,
isSaving: isLoading,
};
};

export const useAvailableDbtJobs = () => {
const { cloudApiUrl } = useConfig();
const config = { apiUrl: cloudApiUrl };
const middlewares = useDefaultRequestMiddlewares();
const requestOptions = { config, middlewares };
const workspace = useCurrentWorkspace();
const { workspaceId } = workspace;
const dbtConfigId = workspace.webhookConfigs?.find((config) => config.name?.includes("dbt"))?.id;

if (!dbtConfigId) {
throw new Error("cannot request available dbt jobs for a workspace with no dbt cloud integration configured");
}

const results = useQuery(
["dbtCloud", dbtConfigId, "list"],
() => webBackendGetAvailableDbtJobsForWorkspace({ workspaceId, dbtConfigId }, requestOptions),
{
suspense: true,
}
);

// casting type to remove `| undefined`, since `suspense: true` will ensure the value
// is, in fact, available
return (results.data as WorkspaceGetDbtJobsResponse).availableDbtJobs;
};
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,20 @@
align-items: center;
}

.jobListItemInputGroup {
.jobListItemIdFieldGroup {
display: flex;
justify-content: space-between;
align-items: center;
flex-grow: 2;
}

.jobListItemInput {
.jobListItemIdField {
height: fit-content;
margin-left: 1em;
}

.jobListItemInputLabel {
font-size: 11px;
font-weight: 500;
background-color: colors.$grey-50;
flex-grow: 2;
padding: variables.$spacing-sm;
border-radius: variables.$border-radius-sm;
}

.jobListItemDelete {
Expand Down
Loading

0 comments on commit 1dbad96

Please sign in to comment.