Skip to content

Commit

Permalink
🪟 🐛 Update refresh schema logic to handle confirmation modal and othe…
Browse files Browse the repository at this point in the history
…r edge cases (#19131)

* Update refresh schema logic to handle confirmation modal

* Update the refresh schema separately
* Only mark as refreshed if the syncCatalog has changed
* Add option to clear refreshed schema
* Update tests

* Update connection edit service to track the schema changes separately

* ConnectionEditService now marks schema as refreshed only if the catalog has changed, clears refresh after schema update
* Add mockCatalog diff and add refresh schema tests to connection edit service
* Add interfaces to useConnectionEdit and useConnectionForm to better track references
* Remove clearRefreshedSchema from ConnectionReplicationTab form submit
* Update useConfirmCatalogDiff to only show if the diff has changed, instead of the whole connection

* Cleanup discardRefreshedSchema name in ConnectionReplicationTab
  • Loading branch information
edmundito authored Nov 29, 2022
1 parent 6770b5e commit 2c9d099
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { act, renderHook } from "@testing-library/react-hooks";
import React from "react";
import { mockCatalogDiff } from "test-utils/mock-data/mockCatalogDiff";
import { mockConnection } from "test-utils/mock-data/mockConnection";
import { mockDestination } from "test-utils/mock-data/mockDestination";
import { mockWorkspace } from "test-utils/mock-data/mockWorkspace";
import { TestWrapper } from "test-utils/testutils";

import { WebBackendConnectionUpdate } from "core/request/AirbyteClient";
import { WebBackendConnectionRead, WebBackendConnectionUpdate } from "core/request/AirbyteClient";

import { useConnectionFormService } from "../ConnectionForm/ConnectionFormService";
import { ConnectionEditServiceProvider, useConnectionEditService } from "./ConnectionEditService";
Expand All @@ -18,14 +19,24 @@ jest.mock("services/workspaces/WorkspacesService", () => ({
useCurrentWorkspace: () => mockWorkspace,
}));

const utils = {
getMockConnectionWithRefreshedCatalog: (): WebBackendConnectionRead => ({
...mockConnection,
catalogDiff: mockCatalogDiff,
catalogId: `${mockConnection.catalogId}1`,
}),
};

jest.mock("../useConnectionHook", () => ({
useGetConnection: () => mockConnection,
useWebConnectionService: () => ({
getConnection: () => mockConnection,
getConnection: (_connectionId: string, withRefreshedCatalog?: boolean) =>
withRefreshedCatalog ? utils.getMockConnectionWithRefreshedCatalog() : mockConnection,
}),
useUpdateConnection: () => ({
mutateAsync: jest.fn(async (connection: WebBackendConnectionUpdate) => {
return { ...mockConnection, ...connection };
const { sourceCatalogId, ...connectionUpdate } = connection;
return { ...mockConnection, ...connectionUpdate, catalogId: sourceCatalogId ?? mockConnection.catalogId };
}),
isLoading: false,
}),
Expand Down Expand Up @@ -77,7 +88,7 @@ describe("ConnectionEditService", () => {
expect(result.current.connection).toEqual({ ...mockConnection, ...mockUpdateConnection });
});

it("should refresh connection", async () => {
it("should refresh schema", async () => {
// Need to combine the hooks so both can be used.
const useMyTestHook = () => {
return [useConnectionEditService(), useConnectionFormService()] as const;
Expand Down Expand Up @@ -108,6 +119,77 @@ describe("ConnectionEditService", () => {
});

expect(result.current[0].schemaHasBeenRefreshed).toBe(true);
expect(result.current[0].connection).toEqual(mockConnection);
expect(result.current[0].schemaRefreshing).toBe(false);
expect(result.current[0].connection).toEqual(utils.getMockConnectionWithRefreshedCatalog());
});

it("should refresh schema only if the sync catalog has diffs", async () => {
// Need to combine the hooks so both can be used.
const useMyTestHook = () =>
({ editService: useConnectionEditService(), formService: useConnectionFormService() } as const);

const { result } = renderHook(useMyTestHook, {
wrapper: Wrapper,
initialProps: {
connectionId: mockConnection.connectionId,
},
});

const connectionUpdate = {
connectionId: mockConnection.connectionId,
name: "new connection name",
prefix: "new connection prefix",
};

const updatedConnection: WebBackendConnectionRead = {
...mockConnection,
...connectionUpdate,
};

jest.spyOn(utils, "getMockConnectionWithRefreshedCatalog").mockImplementationOnce(
(): WebBackendConnectionRead => ({
...updatedConnection,
catalogDiff: { transforms: [] },
})
);

await act(async () => {
await result.current.editService.updateConnection(connectionUpdate);
await result.current.formService.refreshSchema();
});

expect(result.current.editService.schemaHasBeenRefreshed).toBe(false);
expect(result.current.editService.schemaRefreshing).toBe(false);
expect(result.current.editService.connection).toEqual(updatedConnection);
});

it("should discard the refreshed schema", async () => {
const useMyTestHook = () =>
({ editService: useConnectionEditService(), formService: useConnectionFormService() } as const);

const { result } = renderHook(useMyTestHook, {
wrapper: Wrapper,
initialProps: {
connectionId: mockConnection.connectionId,
},
});

const connectionUpdate: WebBackendConnectionUpdate = {
connectionId: mockConnection.connectionId,
name: "new connection name",
prefix: "new connection prefix",
};

const updatedConnection = { ...mockConnection, ...connectionUpdate };

await act(async () => {
await result.current.formService.refreshSchema();
await result.current.editService.updateConnection(connectionUpdate);
result.current.editService.discardRefreshedSchema();
});

expect(result.current.editService.schemaHasBeenRefreshed).toBe(false);
expect(result.current.editService.schemaRefreshing).toBe(false);
expect(result.current.editService.connection).toEqual(updatedConnection);
});
});
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { pick } from "lodash";
import { useContext, useState, createContext, useCallback } from "react";
import { useAsyncFn } from "react-use";

import { ConnectionStatus, WebBackendConnectionUpdate } from "core/request/AirbyteClient";
import {
AirbyteCatalog,
ConnectionStatus,
WebBackendConnectionRead,
WebBackendConnectionUpdate,
} from "core/request/AirbyteClient";

import { ConnectionFormServiceProvider } from "../ConnectionForm/ConnectionFormService";
import { useGetConnection, useUpdateConnection, useWebConnectionService } from "../useConnectionHook";
Expand All @@ -11,22 +17,69 @@ interface ConnectionEditProps {
connectionId: string;
}

const useConnectionEdit = ({ connectionId }: ConnectionEditProps) => {
const [connection, setConnection] = useState(useGetConnection(connectionId));
export interface ConnectionCatalog {
syncCatalog: AirbyteCatalog;
catalogId?: string;
}

interface ConnectionEditHook {
connection: WebBackendConnectionRead;
connectionUpdating: boolean;
schemaError?: Error;
schemaRefreshing: boolean;
schemaHasBeenRefreshed: boolean;
updateConnection: (connectionUpdates: WebBackendConnectionUpdate) => Promise<void>;
refreshSchema: () => Promise<void>;
discardRefreshedSchema: () => void;
}

const getConnectionCatalog = (connection: WebBackendConnectionRead): ConnectionCatalog =>
pick(connection, ["syncCatalog", "catalogId"]);

const useConnectionEdit = ({ connectionId }: ConnectionEditProps): ConnectionEditHook => {
const connectionService = useWebConnectionService();
const [connection, setConnection] = useState(useGetConnection(connectionId));
const [catalog, setCatalog] = useState<ConnectionCatalog>(() => getConnectionCatalog(connection));
const [schemaHasBeenRefreshed, setSchemaHasBeenRefreshed] = useState(false);

const [{ loading: schemaRefreshing, error: schemaError }, refreshSchema] = useAsyncFn(async () => {
const refreshedConnection = await connectionService.getConnection(connectionId, true);
setConnection(refreshedConnection);
setSchemaHasBeenRefreshed(true);
if (refreshedConnection.catalogDiff && refreshedConnection.catalogDiff.transforms?.length > 0) {
setConnection(refreshedConnection);
setSchemaHasBeenRefreshed(true);
}
}, [connectionId]);

const discardRefreshedSchema = useCallback(() => {
setConnection((connection) => ({
...connection,
...catalog,
catalogDiff: undefined,
}));
setSchemaHasBeenRefreshed(false);
}, [catalog]);

const { mutateAsync: updateConnectionAction, isLoading: connectionUpdating } = useUpdateConnection();

const updateConnection = useCallback(
async (connection: WebBackendConnectionUpdate) => {
setConnection(await updateConnectionAction(connection));
async (connectionUpdates: WebBackendConnectionUpdate) => {
const updatedConnection = await updateConnectionAction(connectionUpdates);
const updatedKeys = Object.keys(connectionUpdates).map((key) => (key === "sourceCatalogId" ? "catalogId" : key));
const connectionPatch = pick(updatedConnection, updatedKeys);
const wasSyncCatalogUpdated = !!connectionPatch.syncCatalog;

// Mutate the current connection state only with the values that were updated
setConnection((connection) => ({
...connection,
...connectionPatch,
catalogDiff: wasSyncCatalogUpdated ? undefined : connection.catalogDiff,
}));

if (wasSyncCatalogUpdated) {
// The catalog ws also saved, so update the current catalog
setCatalog(getConnectionCatalog(updatedConnection));
setSchemaHasBeenRefreshed(false);
}
},
[updateConnectionAction]
);
Expand All @@ -38,15 +91,12 @@ const useConnectionEdit = ({ connectionId }: ConnectionEditProps) => {
schemaRefreshing,
schemaHasBeenRefreshed,
updateConnection,
setSchemaHasBeenRefreshed,
refreshSchema,
discardRefreshedSchema,
};
};

const ConnectionEditContext = createContext<Omit<
ReturnType<typeof useConnectionEdit>,
"refreshSchema" | "schemaError"
> | null>(null);
const ConnectionEditContext = createContext<Omit<ConnectionEditHook, "refreshSchema" | "schemaError"> | null>(null);

export const ConnectionEditServiceProvider: React.FC<React.PropsWithChildren<ConnectionEditProps>> = ({
children,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import React, { createContext, useCallback, useContext, useState } from "react";
import { useIntl } from "react-intl";

import { ConnectionScheduleType, OperationRead, WebBackendConnectionRead } from "core/request/AirbyteClient";
import {
ConnectionScheduleType,
DestinationDefinitionSpecificationRead,
OperationRead,
WebBackendConnectionRead,
} from "core/request/AirbyteClient";
import { useGetDestinationDefinitionSpecification } from "services/connector/DestinationDefinitionSpecificationService";
import { FormError, generateMessageFromError } from "utils/errorStatusMessage";
import {
Expand Down Expand Up @@ -54,7 +59,24 @@ export const tidyConnectionFormValues = (
return formValues;
};

const useConnectionForm = ({ connection, mode, schemaError, refreshSchema }: ConnectionServiceProps) => {
interface ConnectionFormHook {
connection: ConnectionOrPartialConnection;
mode: ConnectionFormMode;
destDefinition: DestinationDefinitionSpecificationRead;
initialValues: FormikConnectionFormValues;
schemaError?: SchemaError;
formId: string;
setSubmitError: (submitError: FormError | null) => void;
getErrorMessage: (formValid: boolean, connectionDirty: boolean) => string | JSX.Element | null;
refreshSchema: () => Promise<void>;
}

const useConnectionForm = ({
connection,
mode,
schemaError,
refreshSchema,
}: ConnectionServiceProps): ConnectionFormHook => {
const destDefinition = useGetDestinationDefinitionSpecification(connection.destination.destinationDefinitionId);
const initialValues = useInitialValues(connection, destDefinition, mode !== "create");
const { formatMessage } = useIntl();
Expand Down Expand Up @@ -84,7 +106,7 @@ const useConnectionForm = ({ connection, mode, schemaError, refreshSchema }: Con
};
};

const ConnectionFormContext = createContext<ReturnType<typeof useConnectionForm> | null>(null);
const ConnectionFormContext = createContext<ConnectionFormHook | null>(null);

export const ConnectionFormServiceProvider: React.FC<React.PropsWithChildren<ConnectionServiceProps>> = ({
children,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Form, Formik, FormikHelpers } from "formik";
import React, { useCallback, useState } from "react";
import { FormattedMessage, useIntl } from "react-intl";
import { useUnmount } from "react-use";

import { SchemaError } from "components/CreateConnection/SchemaError";
import LoadingSchema from "components/LoadingSchema";
Expand Down Expand Up @@ -40,7 +41,7 @@ export const ConnectionReplicationTab: React.FC = () => {

const [saved, setSaved] = useState(false);

const { connection, schemaRefreshing, schemaHasBeenRefreshed, updateConnection, setSchemaHasBeenRefreshed } =
const { connection, schemaRefreshing, schemaHasBeenRefreshed, updateConnection, discardRefreshedSchema } =
useConnectionEditService();
const { initialValues, mode, schemaError, getErrorMessage, setSubmitError } = useConnectionFormService();
const allowSubOneHourCronExpressions = useFeature(FeatureItem.AllowSyncSubOneHourCronExpressions);
Expand Down Expand Up @@ -134,7 +135,6 @@ export const ConnectionReplicationTab: React.FC = () => {
}

setSaved(true);
setSchemaHasBeenRefreshed(false);
} catch (e) {
setSubmitError(e);
}
Expand All @@ -150,7 +150,6 @@ export const ConnectionReplicationTab: React.FC = () => {
mode,
openModal,
saveConnection,
setSchemaHasBeenRefreshed,
setSubmitError,
workspaceId,
allowSubOneHourCronExpressions,
Expand All @@ -159,6 +158,10 @@ export const ConnectionReplicationTab: React.FC = () => {

useConfirmCatalogDiff();

useUnmount(() => {
discardRefreshedSchema();
});

return (
<div className={styles.content}>
{schemaError && !schemaRefreshing ? (
Expand All @@ -172,14 +175,18 @@ export const ConnectionReplicationTab: React.FC = () => {
>
{({ values, isSubmitting, isValid, dirty, resetForm }) => (
<Form>
<ConnectionFormFields values={values} isSubmitting={isSubmitting} dirty={dirty} />
<ConnectionFormFields
values={values}
isSubmitting={isSubmitting}
dirty={dirty || schemaHasBeenRefreshed}
/>
<EditControls
isSubmitting={isSubmitting}
submitDisabled={!isValid}
dirty={dirty}
resetForm={async () => {
resetForm();
setSchemaHasBeenRefreshed(false);
discardRefreshedSchema();
}}
successMessage={saved && !dirty && <FormattedMessage id="form.changesSaved" />}
errorMessage={getErrorMessage(isValid, dirty)}
Expand Down
46 changes: 46 additions & 0 deletions airbyte-webapp/src/test-utils/mock-data/mockCatalogDiff.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { CatalogDiff } from "core/request/AirbyteClient";

export const mockCatalogDiff: CatalogDiff = {
transforms: [
{
transformType: "update_stream",
streamDescriptor: { namespace: "apple", name: "harissa_paste" },
updateStream: [
{ transformType: "add_field", fieldName: ["users", "phone"], breaking: false },
{ transformType: "add_field", fieldName: ["users", "email"], breaking: false },
{ transformType: "remove_field", fieldName: ["users", "lastName"], breaking: false },

{
transformType: "update_field_schema",
breaking: false,
fieldName: ["users", "address"],
updateFieldSchema: { oldSchema: { type: "number" }, newSchema: { type: "string" } },
},
],
},
{
transformType: "add_stream",
streamDescriptor: { namespace: "apple", name: "banana" },
},
{
transformType: "add_stream",
streamDescriptor: { namespace: "apple", name: "carrot" },
},
{
transformType: "remove_stream",
streamDescriptor: { namespace: "apple", name: "dragonfruit" },
},
{
transformType: "remove_stream",
streamDescriptor: { namespace: "apple", name: "eclair" },
},
{
transformType: "remove_stream",
streamDescriptor: { namespace: "apple", name: "fishcake" },
},
{
transformType: "remove_stream",
streamDescriptor: { namespace: "apple", name: "gelatin_mold" },
},
],
};
Loading

0 comments on commit 2c9d099

Please sign in to comment.