From 7d5ee530e87a2f1601bd955d8f79bdfd9675f2e2 Mon Sep 17 00:00:00 2001 From: Masterchen09 <13187726+Masterchen09@users.noreply.github.com> Date: Sat, 20 Jul 2024 15:09:53 +0200 Subject: [PATCH 1/7] feat(ingest): add ingestion source for SAP Analytics Cloud --- .../source/builder/RecipeForm/constants.ts | 39 +- .../ingest/source/builder/RecipeForm/sac.ts | 161 ++++ .../source/builder/SelectTemplateStep.tsx | 12 + .../app/ingest/source/builder/constants.ts | 4 + .../app/ingest/source/builder/sources.json | 8 + .../src/app/ingest/source/conf/sac/sac.ts | 26 + .../src/app/ingest/source/conf/sources.tsx | 2 + .../src/app/shared/getLogoFromPlatform.tsx | 23 - datahub-web-react/src/images/saclogo.svg | 19 + .../docs/sources/sac/sac_pre.md | 45 + .../docs/sources/sac/sac_recipe.yml | 40 + metadata-ingestion/setup.py | 9 + .../ingestion/source/common/subtypes.py | 7 + .../datahub/ingestion/source/sac/__init__.py | 0 .../src/datahub/ingestion/source/sac/sac.py | 808 ++++++++++++++++++ .../ingestion/source/sac/sac_common.py | 45 + .../src/datahub/utilities/logging_manager.py | 3 + .../tests/integration/sac/__init__.py | 0 .../tests/integration/sac/metadata.xml | 404 +++++++++ .../integration/sac/sac_mces_golden.json | 663 ++++++++++++++ .../tests/integration/sac/test_sac.py | 308 +++++++ .../main/resources/boot/data_platforms.json | 10 + 22 files changed, 2611 insertions(+), 25 deletions(-) create mode 100644 datahub-web-react/src/app/ingest/source/builder/RecipeForm/sac.ts create mode 100644 datahub-web-react/src/app/ingest/source/conf/sac/sac.ts delete mode 100644 datahub-web-react/src/app/shared/getLogoFromPlatform.tsx create mode 100644 datahub-web-react/src/images/saclogo.svg create mode 100644 metadata-ingestion/docs/sources/sac/sac_pre.md create mode 100644 metadata-ingestion/docs/sources/sac/sac_recipe.yml create mode 100644 metadata-ingestion/src/datahub/ingestion/source/sac/__init__.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source/sac/sac.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source/sac/sac_common.py create mode 100644 metadata-ingestion/tests/integration/sac/__init__.py create mode 100644 metadata-ingestion/tests/integration/sac/metadata.xml create mode 100644 metadata-ingestion/tests/integration/sac/sac_mces_golden.json create mode 100644 metadata-ingestion/tests/integration/sac/test_sac.py diff --git a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/constants.ts b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/constants.ts index 6a5e6c9de2b96b..b6d076cf6da026 100644 --- a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/constants.ts +++ b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/constants.ts @@ -83,7 +83,7 @@ import { PROJECT_NAME, } from './lookml'; import { PRESTO, PRESTO_HOST_PORT, PRESTO_DATABASE, PRESTO_USERNAME, PRESTO_PASSWORD } from './presto'; -import { AZURE, BIGQUERY_BETA, CSV, DBT_CLOUD, MYSQL, OKTA, POWER_BI, UNITY_CATALOG, VERTICA } from '../constants'; +import { AZURE, BIGQUERY_BETA, CSV, DBT_CLOUD, MYSQL, OKTA, POWER_BI, SAC, UNITY_CATALOG, VERTICA } from '../constants'; import { BIGQUERY_BETA_PROJECT_ID, DATASET_ALLOW, DATASET_DENY, PROJECT_ALLOW, PROJECT_DENY } from './bigqueryBeta'; import { MYSQL_HOST_PORT, MYSQL_PASSWORD, MYSQL_USERNAME } from './mysql'; import { MSSQL, MSSQL_DATABASE, MSSQL_HOST_PORT, MSSQL_PASSWORD, MSSQL_USERNAME } from './mssql'; @@ -171,6 +171,20 @@ import { USER_ALLOW, USER_DENY, } from './azure'; +import { + SAC_TENANT_URL, + SAC_TOKEN_URL, + SAC_CLIENT_ID, + SAC_CLIENT_SECRET, + INGEST_STORIES, + INGEST_APPLICATIONS, + RESOURCE_ID_ALLOW, + RESOURCE_ID_DENY, + RESOURCE_NAME_ALLOW, + RESOURCE_NAME_DENY, + FOLDER_ALLOW, + FOLDER_DENY, +} from './sac'; export enum RecipeSections { Connection = 0, @@ -519,8 +533,29 @@ export const RECIPE_FIELDS: RecipeFields = { filterFields: [GROUP_ALLOW, GROUP_DENY, USER_ALLOW, USER_DENY], advancedFields: [AZURE_INGEST_USERS, AZURE_INGEST_GROUPS, STATEFUL_INGESTION_ENABLED, SKIP_USERS_WITHOUT_GROUP], }, + [SAC]: { + fields: [SAC_TENANT_URL, SAC_TOKEN_URL, SAC_CLIENT_ID, SAC_CLIENT_SECRET], + filterFields: [ + INGEST_STORIES, + INGEST_APPLICATIONS, + RESOURCE_ID_ALLOW, + RESOURCE_ID_DENY, + RESOURCE_NAME_ALLOW, + RESOURCE_NAME_DENY, + FOLDER_ALLOW, + FOLDER_DENY, + ], + advancedFields: [STATEFUL_INGESTION_ENABLED], + }, }; export const CONNECTORS_WITH_FORM = new Set(Object.keys(RECIPE_FIELDS)); -export const CONNECTORS_WITH_TEST_CONNECTION = new Set([SNOWFLAKE, LOOKER, BIGQUERY_BETA, BIGQUERY, UNITY_CATALOG]); +export const CONNECTORS_WITH_TEST_CONNECTION = new Set([ + SNOWFLAKE, + LOOKER, + BIGQUERY_BETA, + BIGQUERY, + UNITY_CATALOG, + SAC, +]); diff --git a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/sac.ts b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/sac.ts new file mode 100644 index 00000000000000..3f5c6d0b09a33e --- /dev/null +++ b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/sac.ts @@ -0,0 +1,161 @@ +import { RecipeField, FieldType, setListValuesOnRecipe } from './common'; + +export const SAC_TENANT_URL: RecipeField = { + name: 'tenant_url', + label: 'Tenant URL', + tooltip: 'The URL of the SAP Analytics Cloud tenant.', + type: FieldType.TEXT, + fieldPath: 'source.config.tenant_url', + placeholder: 'https://company.eu10.sapanalytics.cloud', + required: true, + rules: null, +}; + +export const SAC_TOKEN_URL: RecipeField = { + name: 'token_url', + label: 'Token URL', + tooltip: 'The OAuth 2.0 Token Service URL.', + type: FieldType.TEXT, + fieldPath: 'source.config.token_url', + placeholder: 'https://company.eu10.hana.ondemand.com/oauth/token', + required: true, + rules: null, +}; + +export const SAC_CLIENT_ID: RecipeField = { + name: 'client_id', + label: 'Client ID', + tooltip: 'Client ID.', + type: FieldType.SECRET, + fieldPath: 'source.config.client_id', + placeholder: 'client_id', + required: true, + rules: null, +}; + +export const SAC_CLIENT_SECRET: RecipeField = { + name: 'client_secret', + label: 'Client Secret', + tooltip: 'Client Secret.', + type: FieldType.SECRET, + fieldPath: 'source.config.client_secret', + placeholder: 'client_secret', + required: true, + rules: null, +}; + +export const INGEST_STORIES: RecipeField = { + name: 'ingest_stories', + label: 'Ingest Stories', + tooltip: 'Whether stories should be ingested into DataHub.', + type: FieldType.BOOLEAN, + fieldPath: 'source.config.ingest_stories', + rules: null, + section: 'Stories and Applications', +}; + +export const INGEST_APPLICATIONS: RecipeField = { + name: 'ingest_applications', + label: 'Ingest Applications', + tooltip: 'Whether applications should be ingested into DataHub.', + type: FieldType.BOOLEAN, + fieldPath: 'source.config.ingest_applications', + rules: null, + section: 'Stories and Applications', +}; + +const resourceIdAllowFieldPath = 'source.config.resource_id_pattern.allow'; +export const RESOURCE_ID_ALLOW: RecipeField = { + name: 'resource_id_pattern.allow', + label: 'Resource Id Allow Patterns', + tooltip: + 'Only include specific Stories and Applications by providing the id of the ressource, or a Regular Expression (REGEX). If not provided, all Stories and Applications will be included.', + type: FieldType.LIST, + buttonLabel: 'Add pattern', + fieldPath: resourceIdAllowFieldPath, + rules: null, + section: 'Stories and Applications', + placeholder: 'LXTH4JCE36EOYLU41PIINLYPU9XRYM26', + setValueOnRecipeOverride: (recipe: any, values: string[]) => + setListValuesOnRecipe(recipe, values, resourceIdAllowFieldPath), +}; + +const resourceIdDenyFieldPath = 'source.config.resource_id_pattern.deny'; +export const RESOURCE_ID_DENY: RecipeField = { + name: 'resource_id_pattern.deny', + label: 'Resource Id Deny Patterns', + tooltip: + 'Exclude specific Stories and Applications by providing the id of the resource, or a Regular Expression (REGEX). If not provided, all Stories and Applications will be included. Deny patterns always take precendence over Allow patterns.', + type: FieldType.LIST, + buttonLabel: 'Add pattern', + fieldPath: resourceIdDenyFieldPath, + rules: null, + section: 'Stories and Applications', + placeholder: 'LXTH4JCE36EOYLU41PIINLYPU9XRYM26', + setValueOnRecipeOverride: (recipe: any, values: string[]) => + setListValuesOnRecipe(recipe, values, resourceIdDenyFieldPath), +}; + +const resourceNameAllowFieldPath = 'source.config.resource_id_pattern.allow'; +export const RESOURCE_NAME_ALLOW: RecipeField = { + name: 'resource_name_pattern.allow', + label: 'Resource Name Allow Patterns', + tooltip: + 'Only include specific Stories and Applications by providing the name of the ressource, or a Regular Expression (REGEX). If not provided, all Stories and Applications will be included.', + type: FieldType.LIST, + buttonLabel: 'Add pattern', + fieldPath: resourceNameAllowFieldPath, + rules: null, + section: 'Stories and Applications', + placeholder: 'Name of the story', + setValueOnRecipeOverride: (recipe: any, values: string[]) => + setListValuesOnRecipe(recipe, values, resourceNameAllowFieldPath), +}; + +const resourceNameDenyFieldPath = 'source.config.resource_name_pattern.deny'; +export const RESOURCE_NAME_DENY: RecipeField = { + name: 'resource_name_pattern.deny', + label: 'Resource Name Deny Patterns', + tooltip: + 'Exclude specific Stories and Applications by providing the name of the resource, or a Regular Expression (REGEX). If not provided, all Stories and Applications will be included. Deny patterns always take precendence over Allow patterns.', + type: FieldType.LIST, + buttonLabel: 'Add pattern', + fieldPath: resourceNameDenyFieldPath, + rules: null, + section: 'Stories and Applications', + placeholder: 'Name of the story', + setValueOnRecipeOverride: (recipe: any, values: string[]) => + setListValuesOnRecipe(recipe, values, resourceNameDenyFieldPath), +}; + +const folderAllowFieldPath = 'source.config.resource_id_pattern.allow'; +export const FOLDER_ALLOW: RecipeField = { + name: 'folder_pattern.allow', + label: 'Folder Allow Patterns', + tooltip: + 'Only include specific Stories and Applications by providing the folder containing the resources, or a Regular Expression (REGEX). If not provided, all Stories and Applications will be included.', + type: FieldType.LIST, + buttonLabel: 'Add pattern', + fieldPath: folderAllowFieldPath, + rules: null, + section: 'Stories and Applications', + placeholder: 'Folder of the story', + setValueOnRecipeOverride: (recipe: any, values: string[]) => + setListValuesOnRecipe(recipe, values, folderAllowFieldPath), +}; + +const folderDenyFieldPath = 'source.config.folder_pattern.deny'; +export const FOLDER_DENY: RecipeField = { + name: 'folder_pattern.deny', + label: 'Folder Deny Patterns', + tooltip: + 'Exclude specific Stories and Applications by providing the folder containing the resources, or a Regular Expression (REGEX). If not provided, all Stories and Applications will be included. Deny patterns always take precendence over Allow patterns.', + type: FieldType.LIST, + buttonLabel: 'Add pattern', + fieldPath: folderDenyFieldPath, + rules: null, + section: 'Stories and Applications', + placeholder: 'Folder of the story', + setValueOnRecipeOverride: (recipe: any, values: string[]) => + setListValuesOnRecipe(recipe, values, folderDenyFieldPath), +}; diff --git a/datahub-web-react/src/app/ingest/source/builder/SelectTemplateStep.tsx b/datahub-web-react/src/app/ingest/source/builder/SelectTemplateStep.tsx index 3998915e07a2ce..e014cdcc8e2240 100644 --- a/datahub-web-react/src/app/ingest/source/builder/SelectTemplateStep.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/SelectTemplateStep.tsx @@ -104,6 +104,18 @@ export const SelectTemplateStep = ({ state, updateState, goTo, cancel, ingestion source.name.toLocaleLowerCase().includes(searchFilter.toLocaleLowerCase()), ); + filteredSources.sort((a, b) => { + if (a.name === 'custom') { + return 1; + } + + if (b.name === 'custom') { + return -1; + } + + return a.displayName.localeCompare(b.displayName); + }); + return (
diff --git a/datahub-web-react/src/app/ingest/source/builder/constants.ts b/datahub-web-react/src/app/ingest/source/builder/constants.ts index d90faa91b85a26..b67ca388c10546 100644 --- a/datahub-web-react/src/app/ingest/source/builder/constants.ts +++ b/datahub-web-react/src/app/ingest/source/builder/constants.ts @@ -34,6 +34,7 @@ import fivetranLogo from '../../../../images/fivetranlogo.png'; import csvLogo from '../../../../images/csv-logo.png'; import qlikLogo from '../../../../images/qliklogo.png'; import sigmaLogo from '../../../../images/sigmalogo.png'; +import sacLogo from '../../../../images/saclogo.svg'; export const ATHENA = 'athena'; export const ATHENA_URN = `urn:li:dataPlatform:${ATHENA}`; @@ -122,6 +123,8 @@ export const QLIK_SENSE = 'qlik-sense'; export const QLIK_SENSE_URN = `urn:li:dataPlatform:${QLIK_SENSE}`; export const SIGMA = 'sigma'; export const SIGMA_URN = `urn:li:dataPlatform:${SIGMA}`; +export const SAC = 'sac'; +export const SAC_URN = `urn:li:dataPlatform:${SAC}`; export const PLATFORM_URN_TO_LOGO = { [ATHENA_URN]: athenaLogo, @@ -161,6 +164,7 @@ export const PLATFORM_URN_TO_LOGO = { [CSV_URN]: csvLogo, [QLIK_SENSE_URN]: qlikLogo, [SIGMA_URN]: sigmaLogo, + [SAC_URN]: sacLogo, }; export const SOURCE_TO_PLATFORM_URN = { diff --git a/datahub-web-react/src/app/ingest/source/builder/sources.json b/datahub-web-react/src/app/ingest/source/builder/sources.json index c35a7a033a8ab3..bb1c1a10ea6e5f 100644 --- a/datahub-web-react/src/app/ingest/source/builder/sources.json +++ b/datahub-web-react/src/app/ingest/source/builder/sources.json @@ -287,6 +287,14 @@ "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/csv'", "recipe": "source: \n type: csv-enricher \n config: \n # URL of your csv file to ingest \n filename: \n array_delimiter: '|' \n delimiter: ',' \n write_semantics: PATCH" }, + { + "urn": "urn:li:dataPlatform:sac", + "name": "sac", + "displayName": "SAP Analytics Cloud", + "description": "Import Stories, Applications and Models from SAP Analytics Cloud.", + "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/sac/", + "recipe": "source:\n type: sac\n config:\n tenant_url: # Your SAP Analytics Cloud tenant URL, e.g. https://company.eu10.sapanalytics.cloud or https://company.eu10.hcs.cloud.sap\n token_url: # The Token URL of your SAP Analytics Cloud tenant, e.g. https://company.eu10.hana.ondemand.com/oauth/token.\n\n # Add secret in Secrets Tab with relevant names for each variable\n client_id: \"${SAC_CLIENT_ID}\" # Your SAP Analytics Cloud client id\n client_secret: \"${SAC_CLIENT_SECRET}\" # Your SAP Analytics Cloud client secret" + }, { "urn": "urn:li:dataPlatform:custom", "name": "custom", diff --git a/datahub-web-react/src/app/ingest/source/conf/sac/sac.ts b/datahub-web-react/src/app/ingest/source/conf/sac/sac.ts new file mode 100644 index 00000000000000..e8a3b1f67866af --- /dev/null +++ b/datahub-web-react/src/app/ingest/source/conf/sac/sac.ts @@ -0,0 +1,26 @@ +import { SourceConfig } from '../types'; +import sacLogo from '../../../../../images/saclogo.svg'; + +const placeholderRecipe = `\ +source: + type: sac + config: + tenant_url: # Your SAP Analytics Cloud tenant URL, e.g. https://company.eu10.sapanalytics.cloud or https://company.eu10.hcs.cloud.sap + token_url: # The Token URL of your SAP Analytics Cloud tenant, e.g. https://company.eu10.hana.ondemand.com/oauth/token. + + # Add secret in Secrets Tab with relevant names for each variable + client_id: "\${SAC_CLIENT_ID}" # Your SAP Analytics Cloud client id + client_secret: "\${SAC_CLIENT_SECRET}" # Your SAP Analytics Cloud client secret +`; + +export const SAC = 'sac'; + +const sacConfig: SourceConfig = { + type: SAC, + placeholderRecipe, + displayName: 'SAP Analytics Cloud', + docsUrl: 'https://datahubproject.io/docs/generated/ingestion/sources/sac/', + logoUrl: sacLogo, +}; + +export default sacConfig; diff --git a/datahub-web-react/src/app/ingest/source/conf/sources.tsx b/datahub-web-react/src/app/ingest/source/conf/sources.tsx index 4dbeeb5c975e9d..66644cd14ddd5e 100644 --- a/datahub-web-react/src/app/ingest/source/conf/sources.tsx +++ b/datahub-web-react/src/app/ingest/source/conf/sources.tsx @@ -17,6 +17,7 @@ import hiveConfig from './hive/hive'; import oracleConfig from './oracle/oracle'; import tableauConfig from './tableau/tableau'; import csvConfig from './csv/csv'; +import sacConfig from './sac/sac'; const baseUrl = window.location.origin; @@ -48,6 +49,7 @@ export const SOURCE_TEMPLATE_CONFIGS: Array = [ oracleConfig, hiveConfig, csvConfig, + sacConfig, { type: 'custom', placeholderRecipe: DEFAULT_PLACEHOLDER_RECIPE, diff --git a/datahub-web-react/src/app/shared/getLogoFromPlatform.tsx b/datahub-web-react/src/app/shared/getLogoFromPlatform.tsx deleted file mode 100644 index f32ab8b7dbca91..00000000000000 --- a/datahub-web-react/src/app/shared/getLogoFromPlatform.tsx +++ /dev/null @@ -1,23 +0,0 @@ -import lookerLogo from '../../images/lookerlogo.png'; -import supersetLogo from '../../images/supersetlogo.png'; -import airflowLogo from '../../images/airflowlogo.png'; -import redashLogo from '../../images/redashlogo.png'; - -/** - * TODO: This is a temporary solution, until the backend can push logos for all data platform types. - */ -export function getLogoFromPlatform(platform: string) { - if (platform.toLowerCase() === 'looker') { - return lookerLogo; - } - if (platform.toLowerCase() === 'superset') { - return supersetLogo; - } - if (platform.toLowerCase() === 'airflow') { - return airflowLogo; - } - if (platform.toLowerCase() === 'redash') { - return redashLogo; - } - return undefined; -} diff --git a/datahub-web-react/src/images/saclogo.svg b/datahub-web-react/src/images/saclogo.svg new file mode 100644 index 00000000000000..91bacb51f92a49 --- /dev/null +++ b/datahub-web-react/src/images/saclogo.svg @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + + + + + diff --git a/metadata-ingestion/docs/sources/sac/sac_pre.md b/metadata-ingestion/docs/sources/sac/sac_pre.md new file mode 100644 index 00000000000000..2dc66f4c1f2f65 --- /dev/null +++ b/metadata-ingestion/docs/sources/sac/sac_pre.md @@ -0,0 +1,45 @@ +## Configuration Notes + +1. Refer to [Manage OAuth Clients](https://help.sap.com/docs/SAP_ANALYTICS_CLOUD/00f68c2e08b941f081002fd3691d86a7/4f43b54398fc4acaa5efa32badfe3df6.html) to create an OAuth client in SAP Analytics Cloud. The OAuth client is required to have the following properties: + + - Purpose: API Access + - Access: + - Data Import Service + - Authorization Grant: Client Credentials + +2. Maintain connection mappings (optional): + +To map individual connections in SAP Analytics Cloud to platforms, platform instances or environments the `connection_mapping` configuration can be used within the recipe: + +```yaml +connection_mapping: + MY_BW_CONNECTION: + platform: bw + platform_instance: PROD_BW + env: PROD + MY_HANA_CONNECTION: + platform: hana + platform_instance: PROD_HANA + env: PROD +``` + +The key in the connection mapping is the technical name of the connection resp. its id. + +## Concept mapping + +| SAP Analytics Cloud | DataHub | +|-----------------------|---------------------| +| `Story` | `Dashboard` | +| `Application` | `Dashboard` | +| `Live Data Model` | `Dataset` | +| `Import Data Model` | `Dataset` | +| `Model` | `Dataset` | + +## Limitations + +- Only models which are used in a Story or an Application will be ingested because there is no dedicated API to retrieve models (only for Stories and Applications). +- Browse Paths for models cannot be created because the folder where the models are saved is not returned by the API. +- Schema metadata is only ingested for Import Data Models because there is no possibility to get the schema metadata of the other model types. +- Lineages for Import Data Models cannot be ingested because the API is not providing any information about it. +- Currently, only SAP BW and SAP HANA are supported for ingesting the upstream lineages of Live Data Models - a warning is logged for all other connection types, please feel free to open an [issue on GitHub](https://github.com/datahub-project/datahub/issues/new/choose) with the warning message to have this fixed. +- For some models (e.g., builtin models) it cannot be detected whether the models are Live Data or Import Data Models. Therefore, these models will be ingested only with the `Story` subtype. diff --git a/metadata-ingestion/docs/sources/sac/sac_recipe.yml b/metadata-ingestion/docs/sources/sac/sac_recipe.yml new file mode 100644 index 00000000000000..e2067d815e41dd --- /dev/null +++ b/metadata-ingestion/docs/sources/sac/sac_recipe.yml @@ -0,0 +1,40 @@ +source: + type: sac + config: + stateful_ingestion: + enabled: true + + tenant_url: # Your SAP Analytics Cloud tenant URL, e.g. https://company.eu10.sapanalytics.cloud or https://company.eu10.hcs.cloud.sap + token_url: # The Token URL of your SAP Analytics Cloud tenant, e.g. https://company.eu10.hana.ondemand.com/oauth/token. + + # Add secret in Secrets Tab with relevant names for each variable + client_id: "${SAC_CLIENT_ID}" # Your SAP Analytics Cloud client id + client_secret: "${SAC_CLIENT_SECRET}" # Your SAP Analytics Cloud client secret + + # ingest stories + ingest_stories: true + + # ingest applications + ingest_applications: true + + resource_id_pattern: + allow: + - .* + + resource_name_pattern: + allow: + - .* + + folder_pattern: + allow: + - .* + + connection_mapping: + MY_BW_CONNECTION: + platform: bw + platform_instance: PROD_BW + env: PROD + MY_HANA_CONNECTION: + platform: hana + platform_instance: PROD_HANA + env: PROD diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index e973ff629ee841..edeea508a7749c 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -308,6 +308,12 @@ mysql = sql_common | {"pymysql>=1.0.2"} +sac = { + "requests", + "pyodata>=1.11.1", + "Authlib", +} + # Note: for all of these, framework_common will be added. plugins: Dict[str, Set[str]] = { # Sink plugins. @@ -471,6 +477,7 @@ "fivetran": snowflake_common | bigquery_common | sqlglot_lib, "qlik-sense": sqlglot_lib | {"requests", "websocket-client"}, "sigma": sqlglot_lib | {"requests"}, + "sac": sac, } # This is mainly used to exclude plugins from the Docker image. @@ -609,6 +616,7 @@ "kafka-connect", "qlik-sense", "sigma", + "sac", ] if plugin for dependency in plugins[plugin] @@ -723,6 +731,7 @@ "fivetran = datahub.ingestion.source.fivetran.fivetran:FivetranSource", "qlik-sense = datahub.ingestion.source.qlik_sense.qlik_sense:QlikSenseSource", "sigma = datahub.ingestion.source.sigma.sigma:SigmaSource", + "sac = datahub.ingestion.source.sac.sac:SACSource", ], "datahub.ingestion.transformer.plugins": [ "pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership", diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py index 0d9fc8225532c9..4d335779fe49b0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py @@ -18,6 +18,9 @@ class DatasetSubTypes(str, Enum): QLIK_DATASET = "Qlik Dataset" BIGQUERY_TABLE_SNAPSHOT = "Bigquery Table Snapshot" SIGMA_DATASET = "Sigma Dataset" + SAC_MODEL = "Model" + SAC_IMPORT_DATA_MODEL = "Import Data Model" + SAC_LIVE_DATA_MODEL = "Live Data Model" # TODO: Create separate entity... NOTEBOOK = "Notebook" @@ -71,3 +74,7 @@ class BIAssetSubTypes(str, Enum): MODE_REPORT = "Report" MODE_QUERY = "Query" MODE_CHART = "Chart" + + # SAP Analytics Cloud + SAC_STORY = "Story" + SAC_APPLICATION = "Application" diff --git a/metadata-ingestion/src/datahub/ingestion/source/sac/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/sac/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py new file mode 100644 index 00000000000000..20bfcac5ea8439 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py @@ -0,0 +1,808 @@ +import json +import logging +from dataclasses import dataclass +from functools import partial +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple + +import pyodata +import pyodata.v2.model +import pyodata.v2.service +from authlib.integrations.requests_client import OAuth2Session +from pydantic import Field, SecretStr, validator + +from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.source_common import ( + DEFAULT_ENV, + DatasetSourceConfigMixin, + EnvConfigMixin, +) +from datahub.emitter.mce_builder import ( + dataset_urn_to_key, + make_dashboard_urn, + make_data_platform_urn, + make_dataplatform_instance_urn, + make_dataset_urn_with_platform_instance, + make_user_urn, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SupportStatus, + capability, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.incremental_lineage_helper import ( + IncrementalLineageConfigMixin, + auto_incremental_lineage, +) +from datahub.ingestion.api.source import ( + CapabilityReport, + MetadataWorkUnitProcessor, + SourceCapability, + TestableSource, + TestConnectionReport, +) +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.common.subtypes import BIAssetSubTypes, DatasetSubTypes +from datahub.ingestion.source.sac.sac_common import ( + ImportDataModelColumn, + Resource, + ResourceModel, +) +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalHandler, + StaleEntityRemovalSourceReport, + StatefulStaleMetadataRemovalConfig, +) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfigBase, + StatefulIngestionSourceBase, +) +from datahub.metadata.schema_classes import ( + AuditStampClass, + BrowsePathEntryClass, + BrowsePathsClass, + BrowsePathsV2Class, + ChangeAuditStampsClass, + ChangeTypeClass, + DashboardInfoClass, + DataPlatformInstanceClass, + DatasetLineageTypeClass, + DatasetPropertiesClass, + DateTypeClass, + NullTypeClass, + NumberTypeClass, + SchemaFieldClass, + SchemaFieldDataTypeClass, + SchemalessClass, + SchemaMetadataClass, + StatusClass, + StringTypeClass, + SubTypesClass, + UpstreamClass, + UpstreamLineageClass, +) +from datahub.utilities import config_clean + +logger = logging.getLogger(__name__) + + +class ConnectionMappingConfig(EnvConfigMixin): + platform: Optional[str] = Field( + default=None, description="The platform that this connection mapping belongs to" + ) + + platform_instance: Optional[str] = Field( + default=None, + description="The instance of the platform that this connection mapping belongs to", + ) + + env: str = Field( + default=DEFAULT_ENV, + description="The environment that this connection mapping belongs to", + ) + + +class SACSourceConfig( + StatefulIngestionConfigBase, DatasetSourceConfigMixin, IncrementalLineageConfigMixin +): + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field( + default=None, + description="Stateful ingestion related configs", + ) + + tenant_url: str = Field(description="URL of the SAP Analytics Cloud tenant") + token_url: str = Field( + description="URL of the OAuth token endpoint of the SAP Analytics Cloud tenant" + ) + client_id: str = Field(description="Client ID for the OAuth authentication") + client_secret: SecretStr = Field( + description="Client secret for the OAuth authentication" + ) + + ingest_stories: bool = Field( + default=True, + description="Controls whether Stories should be ingested", + ) + + ingest_applications: bool = Field( + default=True, + description="Controls whether Analytic Applications should be ingested", + ) + + resource_id_pattern: AllowDenyPattern = Field( + AllowDenyPattern.allow_all(), + description="Patterns for selecting resource ids that are to be included", + ) + + resource_name_pattern: AllowDenyPattern = Field( + AllowDenyPattern.allow_all(), + description="Patterns for selecting resource names that are to be included", + ) + + folder_pattern: AllowDenyPattern = Field( + AllowDenyPattern.allow_all(), + description="Patterns for selecting folders that are to be included", + ) + + connection_mapping: Dict[str, ConnectionMappingConfig] = Field( + default={}, description="Custom mappings for connections" + ) + + query_name_template: Optional[str] = Field( + default="QUERY/{name}", + description="Template for generating dataset urns of consumed queries, the placeholder {query} can be used within the template for inserting the name of the query", + ) + + @validator("tenant_url", "token_url") + def remove_trailing_slash(cls, v): + return config_clean.remove_trailing_slashes(v) + + +@dataclass +class SACSourceReport(StaleEntityRemovalSourceReport): + pass + + +@platform_name("SAP Analytics Cloud", id="sac") +@config_class(SACSourceConfig) +@support_status(SupportStatus.TESTING) +@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") +@capability(SourceCapability.DESCRIPTIONS, "Enabled by default") +@capability( + SourceCapability.LINEAGE_COARSE, + "Enabled by default (only for Live Data Models)", +) +@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") +@capability( + SourceCapability.SCHEMA_METADATA, + "Enabled by default (only for Import Data Models)", +) +class SACSource(StatefulIngestionSourceBase, TestableSource): + config: SACSourceConfig + report: SACSourceReport + platform = "sac" + + session: OAuth2Session + client: pyodata.Client + + ingested_dataset_entities: Set[str] = set() + ingested_upstream_dataset_keys: Set[str] = set() + + def __init__(self, config: SACSourceConfig, ctx: PipelineContext): + super().__init__(config, ctx) + self.config = config + self.report = SACSourceReport() + + self.session = OAuth2Session( + client_id=self.config.client_id, + client_secret=self.config.client_secret.get_secret_value(), + token_endpoint=config.token_url, + token_endpoint_auth_method="client_secret_post", + grant_type="client_credentials", + ) + + self.session.register_compliance_hook( + "protected_request", _add_sap_sac_custom_auth_header + ) + self.session.fetch_token() + + self.client = pyodata.Client( + url=f"{self.config.tenant_url}/api/v1", + connection=self.session, + config=pyodata.v2.model.Config(retain_null=True), + ) + + def close(self) -> None: + self.session.close() + super().close() + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "SACSource": + config = SACSourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + @staticmethod + def test_connection(config_dict: dict) -> TestConnectionReport: + test_report = TestConnectionReport() + + try: + config = SACSourceConfig.parse_obj(config_dict) + + session = OAuth2Session( + client_id=config.client_id, + client_secret=config.client_secret.get_secret_value(), + token_endpoint=config.token_url, + token_endpoint_auth_method="client_secret_post", + grant_type="client_credentials", + ) + + session.register_compliance_hook( + "protected_request", _add_sap_sac_custom_auth_header + ) + session.fetch_token() + + response = session.get(url=f"{config.tenant_url}/api/v1/$metadata") + response.raise_for_status() + + response = session.get(url=f"{config.tenant_url}/api/v1/dataimport/models") + response.raise_for_status() + + test_report.basic_connectivity = CapabilityReport(capable=True) + + except Exception as e: + logger.error(f"Failed to test connection due to {e}", exc_info=e) + if test_report.basic_connectivity is None: + test_report.basic_connectivity = CapabilityReport( + capable=False, failure_reason=f"{e}" + ) + else: + test_report.internal_failure = True + test_report.internal_failure_reason = f"{e}" + + return test_report + + def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: + return [ + *super().get_workunit_processors(), + partial( + auto_incremental_lineage, + self.config.incremental_lineage, + ), + StaleEntityRemovalHandler.create( + self, self.config, self.ctx + ).workunit_processor, + ] + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + if self.config.ingest_stories or self.config.ingest_applications: + resources = self.get_resources() + + for resource in resources: + dashboard_urn = make_dashboard_urn( + platform=self.platform, + name=resource.resource_id, + platform_instance=self.config.platform_instance, + ) + + if resource.ancestor_path: + mcp = MetadataChangeProposalWrapper( + entityType="dashboard", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dashboard_urn, + aspectName="browsePaths", + aspect=BrowsePathsClass( + paths=[ + f"/{self.platform}/{resource.ancestor_path}", + ], + ), + ) + + yield MetadataWorkUnit( + id=f"dashboard-browse-paths-{dashboard_urn}", mcp=mcp + ) + + mcp = MetadataChangeProposalWrapper( + entityType="dashboard", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dashboard_urn, + aspectName="browsePathsV2", + aspect=BrowsePathsV2Class( + path=[ + BrowsePathEntryClass(id=folder_name) + for folder_name in resource.ancestor_path.split("/") + ], + ), + ) + + yield MetadataWorkUnit( + id=f"dashboard-browse-paths-v2-{dashboard_urn}", mcp=mcp + ) + + if self.config.platform_instance is not None: + mcp = MetadataChangeProposalWrapper( + entityType="dashboard", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dashboard_urn, + aspectName="dataPlatformInstance", + aspect=DataPlatformInstanceClass( + platform=make_data_platform_urn(self.platform), + instance=make_dataplatform_instance_urn( + self.platform, self.config.platform_instance + ), + ), + ) + + yield MetadataWorkUnit( + id=f"dashboard-data-platform-instance-{dashboard_urn}", mcp=mcp + ) + + datasets = [] + + for resource_model in resource.resource_models: + dataset_urn = make_dataset_urn_with_platform_instance( + platform=self.platform, + name=f"{resource_model.namespace}:{resource_model.model_id}", + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + + if dataset_urn not in datasets: + datasets.append(dataset_urn) + + if dataset_urn in self.ingested_dataset_entities: + continue + + self.ingested_dataset_entities.add(dataset_urn) + + yield from self.get_model_workunits(dataset_urn, resource_model) + + mcp = MetadataChangeProposalWrapper( + entityType="dashboard", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dashboard_urn, + aspectName="dashboardInfo", + aspect=DashboardInfoClass( + title=resource.name, + description=resource.description, + lastModified=ChangeAuditStampsClass( + created=AuditStampClass( + time=round(resource.created_time.timestamp() * 1000), + actor=make_user_urn(resource.created_by) + if resource.created_by + else "urn:li:corpuser:unknown", + ), + lastModified=AuditStampClass( + time=round(resource.modified_time.timestamp() * 1000), + actor=make_user_urn(resource.modified_by) + if resource.modified_by + else "urn:li:corpuser:unknown", + ), + ), + customProperties={ + "resourceType": resource.resource_type, + "resourceSubtype": resource.resource_subtype, + "storyId": resource.story_id, + "isMobile": str(resource.is_mobile), + }, + datasets=sorted(datasets) if datasets else None, + externalUrl=f"{self.config.tenant_url}{resource.open_url}", + ), + ) + + yield MetadataWorkUnit(id=f"dashboard-info-{dashboard_urn}", mcp=mcp) + + type_name: Optional[str] = None + if resource.resource_subtype == "": + type_name = BIAssetSubTypes.SAC_STORY + elif resource.resource_subtype == "APPLICATION": + type_name = BIAssetSubTypes.SAC_APPLICATION + + if type_name: + mcp = MetadataChangeProposalWrapper( + entityType="dashboard", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dashboard_urn, + aspectName="subTypes", + aspect=SubTypesClass( + typeNames=[type_name], + ), + ) + + yield MetadataWorkUnit( + id=f"dashboard-subtype-{dashboard_urn}", mcp=mcp + ) + + def get_report(self) -> SACSourceReport: + return self.report + + def get_model_workunits( + self, dataset_urn: str, model: ResourceModel + ) -> Iterable[MetadataWorkUnit]: + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="datasetProperties", + aspect=DatasetPropertiesClass( + name=model.name, + description=model.description, + customProperties={ + "namespace": model.namespace, + "modelId": model.model_id, + "isImport": "true" if model.is_import else "false", + }, + externalUrl=f"{self.config.tenant_url}/sap/fpa/ui/tenants/3c44c#view_id=model;model_id={model.namespace}:{model.model_id}", + ), + ) + + yield MetadataWorkUnit(id=f"dataset-properties-{dataset_urn}", mcp=mcp) + + if model.is_import: + primary_fields: List[str] = [] + schema_fields: List[SchemaFieldClass] = [] + + columns = self.get_import_data_model_columns(model_id=model.model_id) + for column in columns: + native_data_type = column.data_type + if column.data_type == "decimal": + native_data_type = ( + f"{column.data_type}({column.precision}, {column.scale})" + ) + elif column.data_type == "int32": + native_data_type = f"{column.data_type}({column.precision})" + elif column.max_length is not None: + native_data_type = f"{column.data_type}({column.max_length})" + + schema_field = SchemaFieldClass( + fieldPath=column.name, + type=self.get_schema_field_data_type( + column.property_type, column.data_type + ), + nativeDataType=native_data_type, + description=column.description, + isPartOfKey=column.is_key, + ) + + schema_fields.append(schema_field) + + if column.is_key: + primary_fields.append(column.name) + + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="schemaMetadata", + aspect=SchemaMetadataClass( + schemaName=model.model_id, + platform=make_data_platform_urn(self.platform), + version=0, + hash="", + platformSchema=SchemalessClass(), + fields=schema_fields, + primaryKeys=primary_fields, + ), + ) + + yield MetadataWorkUnit( + id=f"dataset-upstream-lineage-{dataset_urn}", mcp=mcp + ) + + if model.system_type in ("BW", "HANA") and model.external_id is not None: + upstream_dataset_name: Optional[str] = None + + if model.system_type == "BW" and model.external_id.startswith( + "query:" + ): # query:[][][query] + query = model.external_id[11:-1] + upstream_dataset_name = self.get_query_name(query) + elif model.system_type == "HANA" and model.external_id.startswith( + "view:" + ): # view:[schema][schema.namespace][view] + schema, namespace_with_schema, view = model.external_id.split("][", 2) + schema = schema[6:] + namespace: Optional[str] = None + if len(schema) < len(namespace_with_schema): + namespace = namespace_with_schema[len(f"{schema}.") :] + view = view[:-1] + upstream_dataset_name = self.get_view_name(schema, namespace, view) + + if upstream_dataset_name is not None: + if model.connection_id in self.config.connection_mapping: + connection = self.config.connection_mapping[model.connection_id] + platform = ( + connection.platform + if connection.platform + else model.system_type.lower() + ) + platform_instance = connection.platform_instance + env = connection.env + else: + platform = model.system_type.lower() + platform_instance = model.connection_id + env = DEFAULT_ENV + + upstream_dataset_urn = make_dataset_urn_with_platform_instance( + platform=platform, + name=upstream_dataset_name, + platform_instance=platform_instance, + env=env, + ) + + if upstream_dataset_urn not in self.ingested_upstream_dataset_keys: + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=upstream_dataset_urn, + aspectName="datasetKey", + aspect=dataset_urn_to_key(upstream_dataset_urn), + ) + + yield MetadataWorkUnit( + id=f"dataset-key-{upstream_dataset_urn}", + mcp=mcp, + is_primary_source=False, + ) + + self.ingested_upstream_dataset_keys.add(upstream_dataset_urn) + + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="upstreamLineage", + aspect=UpstreamLineageClass( + upstreams=[ + UpstreamClass( + dataset=upstream_dataset_urn, + type=DatasetLineageTypeClass.COPY, + ), + ], + ), + ) + + yield MetadataWorkUnit( + id=f"dataset-upstream-lineage-{dataset_urn}", mcp=mcp + ) + else: + logger.warning( + f"Unknown upstream dataset for model with id {model.namespace}:{model.model_id} and external id {model.external_id}" + ) + self.report.report_warning( + "unknown-upstream-dataset", + f"Unknown upstream dataset for model with id {model.namespace}:{model.model_id} and external id {model.external_id}", + ) + elif model.system_type is not None: + logger.warning( + f"Unknown system type {model.system_type} for model with id {model.namespace}:{model.model_id} and external id {model.external_id}" + ) + self.report.report_warning( + "unknown-system-type", + f"Unknown system type {model.system_type} for model with id {model.namespace}:{model.model_id} and external id {model.external_id}", + ) + + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="status", + aspect=StatusClass( + removed=False, + ), + ) + + yield MetadataWorkUnit(id=f"dataset-status-{dataset_urn}", mcp=mcp) + + if model.external_id and model.connection_id and model.system_type: + type_name = DatasetSubTypes.SAC_LIVE_DATA_MODEL + elif model.is_import: + type_name = DatasetSubTypes.SAC_IMPORT_DATA_MODEL + else: + type_name = DatasetSubTypes.SAC_MODEL + + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="subTypes", + aspect=SubTypesClass( + typeNames=[type_name], + ), + ) + + yield MetadataWorkUnit(id=f"dataset-subtype-{dataset_urn}", mcp=mcp) + + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="dataPlatformInstance", + aspect=DataPlatformInstanceClass( + platform=make_data_platform_urn(self.platform), + instance=self.config.platform_instance, + ), + ) + + yield MetadataWorkUnit( + id=f"dataset-data-platform-instance-{dataset_urn}", mcp=mcp + ) + + def get_resources(self) -> Iterable[Resource]: + import_data_model_ids = self.get_import_data_model_ids() + + filter = "isTemplate eq 0 and isSample eq 0 and isPublic eq 1" + if self.config.ingest_stories and self.config.ingest_applications: + filter += " and ((resourceType eq 'STORY' and resourceSubtype eq '') or (resourceType eq 'STORY' and resourceSubtype eq 'APPLICATION'))" + elif self.config.ingest_stories and not self.config.ingest_applications: + filter += " and resourceType eq 'STORY' and resourceSubtype eq ''" + elif not self.config.ingest_stories and self.config.ingest_applications: + filter += ( + " and resourceType eq 'STORY' and resourceSubtype eq 'APPLICATION'" + ) + + select = "resourceId,resourceType,resourceSubtype,storyId,name,description,createdTime,createdBy,modifiedBy,modifiedTime,openURL,ancestorPath,isMobile" + + entities: pyodata.v2.service.ListWithTotalCount = ( + self.client.entity_sets.Resources.get_entities() + .custom("$format", "json") + .filter(filter) + .select(select) + .execute() + ) + entity: pyodata.v2.service.EntityProxy + for entity in entities: + resource_id: str = entity.resourceId + name: str = entity.name.strip() + + if not self.config.resource_id_pattern.allowed( + resource_id + ) or not self.config.resource_name_pattern.allowed(name): + continue + + ancestor_path: Optional[str] = None + + try: + ancestors = json.loads(entity.ancestorPath) + ancestor_path = "/".join( + ancestor.replace("/", "%2F") for ancestor in ancestors + ) + except json.JSONDecodeError: + pass + + if ancestor_path and not self.config.folder_pattern.allowed(ancestor_path): + continue + + resource_models: Set[ResourceModel] = set() + + select = "modelId,name,description,externalId,connectionId,systemType" + + nav_entities: pyodata.v2.service.EntitySetProxy = ( + entity.nav("resourceModels") + .get_entities() + .custom("$format", "json") + .select(select) + .execute() + ) + nav_entity: pyodata.v2.service.EntityProxy + for nav_entity in nav_entities: + # the model id can have a different structure, commonly all model ids have a namespace (the part before the colon) and the model id itself + # t.4.sap.fpa.services.userFriendlyPerfLog:ACTIVITY_LOG is a builtin model without a possiblity to get more metadata about the model + # t.4.YV67EM4QBRU035A7TVKERZ786N:YV67EM4QBRU035A7TVKERZ786N is a model id where the model id itself also appears as part of the namespace + # t.4:C76tt2j402o1e69wnvrwfcl79c is a model id without the model id itself as part of the namespace + model_id: str = nav_entity.modelId + namespace, _, model_id = model_id.partition(":") + + resource_models.add( + ResourceModel( + namespace=namespace, + model_id=model_id, + name=nav_entity.name.strip(), + description=nav_entity.description.strip(), + system_type=nav_entity.systemType, # BW or HANA + connection_id=nav_entity.connectionId, + external_id=nav_entity.externalId, # query:[][][query] or view:[schema][schema.namespace][view] + is_import=model_id in import_data_model_ids, + ) + ) + + created_by: Optional[str] = entity.createdBy + if created_by in ("SYSTEM", "$DELETED_USER$"): + created_by = None + + modified_by: Optional[str] = entity.modifiedBy + if modified_by in ("SYSTEM", "$DELETED_USER$"): + modified_by = None + + yield Resource( + resource_id=resource_id, + resource_type=entity.resourceType, + resource_subtype=entity.resourceSubtype, + story_id=entity.storyId, + name=name, + description=entity.description.strip(), + created_time=entity.createdTime, + created_by=created_by, + modified_time=entity.modifiedTime, + modified_by=modified_by, + open_url=entity.openURL, + ancestor_path=ancestor_path, + is_mobile=entity.isMobile, + resource_models=resource_models, + ) + + def get_import_data_model_ids(self) -> Set[str]: + response = self.session.get( + url=f"{self.config.tenant_url}/api/v1/dataimport/models" + ) + response.raise_for_status() + + import_data_model_ids = set( + model["modelID"] for model in response.json()["models"] + ) + return import_data_model_ids + + def get_import_data_model_columns( + self, model_id: str + ) -> List[ImportDataModelColumn]: + response = self.session.get( + url=f"{self.config.tenant_url}/api/v1/dataimport/models/{model_id}/metadata" + ) + response.raise_for_status() + + model_metadata = response.json() + + columns: List[ImportDataModelColumn] = [] + for column in model_metadata["factData"]["columns"]: + columns.append( + ImportDataModelColumn( + name=column["columnName"].strip(), + description=column["descriptionName"].strip(), + property_type=column["propertyType"], + data_type=column["columnDataType"], + max_length=column.get("maxLength"), + precision=column.get("precision"), + scale=column.get("scale"), + is_key=column["isKey"], + ) + ) + + return columns + + def get_query_name(self, query: str) -> str: + if not self.config.query_name_template: + return query + + query_name = self.config.query_name_template + query_name = query_name.replace("{name}", query) + + return query_name + + def get_view_name(self, schema: str, namespace: Optional[str], view: str) -> str: + if namespace: + return f"{schema}.{namespace}::{view}" + + return f"{schema}.{view}" + + def get_schema_field_data_type( + self, property_type: str, data_type: str + ) -> SchemaFieldDataTypeClass: + if property_type == "DATE": + return SchemaFieldDataTypeClass(type=DateTypeClass()) + else: + if data_type == "string": + return SchemaFieldDataTypeClass(type=StringTypeClass()) + elif data_type in ("decimal", "int32"): + return SchemaFieldDataTypeClass(type=NumberTypeClass()) + else: + logger.warning(f"Unknown data type {data_type} found") + return SchemaFieldDataTypeClass(type=NullTypeClass()) + + +def _add_sap_sac_custom_auth_header( + url: str, headers: Dict[str, str], body: Any +) -> Tuple[str, Dict[str, str], Any]: + headers["x-sap-sac-custom-auth"] = "true" + return url, headers, body diff --git a/metadata-ingestion/src/datahub/ingestion/source/sac/sac_common.py b/metadata-ingestion/src/datahub/ingestion/source/sac/sac_common.py new file mode 100644 index 00000000000000..fa284a9b06715f --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/sac/sac_common.py @@ -0,0 +1,45 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional, Set + + +@dataclass(frozen=True) +class ResourceModel: + namespace: str + model_id: str + name: str + description: str + system_type: Optional[str] + connection_id: Optional[str] + external_id: Optional[str] + is_import: bool + + +@dataclass(frozen=True) +class Resource: + resource_id: str + resource_type: str + resource_subtype: str + story_id: str + name: str + description: str + created_time: datetime + created_by: Optional[str] + modified_time: datetime + modified_by: Optional[str] + open_url: str + ancestor_path: Optional[str] + is_mobile: bool + resource_models: Set[ResourceModel] + + +@dataclass(frozen=True) +class ImportDataModelColumn: + name: str + description: str + property_type: str + data_type: str + max_length: Optional[int] + precision: Optional[int] + scale: Optional[int] + is_key: bool diff --git a/metadata-ingestion/src/datahub/utilities/logging_manager.py b/metadata-ingestion/src/datahub/utilities/logging_manager.py index 64383745eb2d1b..4e860d12a52dc3 100644 --- a/metadata-ingestion/src/datahub/utilities/logging_manager.py +++ b/metadata-ingestion/src/datahub/utilities/logging_manager.py @@ -278,3 +278,6 @@ def configure_logging(debug: bool, log_file: Optional[str] = None) -> Iterator[N logging.getLogger("snowflake").setLevel(level=logging.WARNING) # logging.getLogger("botocore").setLevel(logging.INFO) # logging.getLogger("google").setLevel(logging.INFO) +logging.getLogger("pyodata.client").setLevel(logging.WARNING) +logging.getLogger("pyodata.model").setLevel(logging.WARNING) +logging.getLogger("pyodata.service").setLevel(logging.WARNING) diff --git a/metadata-ingestion/tests/integration/sac/__init__.py b/metadata-ingestion/tests/integration/sac/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/tests/integration/sac/metadata.xml b/metadata-ingestion/tests/integration/sac/metadata.xml new file mode 100644 index 00000000000000..203f85dd286c9b --- /dev/null +++ b/metadata-ingestion/tests/integration/sac/metadata.xml @@ -0,0 +1,404 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/sac/sac_mces_golden.json b/metadata-ingestion/tests/integration/sac/sac_mces_golden.json new file mode 100644 index 00000000000000..c88ded368711fb --- /dev/null +++ b/metadata-ingestion/tests/integration/sac/sac_mces_golden.json @@ -0,0 +1,663 @@ +[ +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(sac,LXTH4JCE36EOYLU41PIINLYPU9XRYM26)", + "changeType": "UPSERT", + "aspectName": "browsePaths", + "aspect": { + "json": { + "paths": [ + "/sac/Public/Folder 1/Folder 2" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(sac,LXTH4JCE36EOYLU41PIINLYPU9XRYM26)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Public" + }, + { + "id": "Folder 1" + }, + { + "id": "Folder 2" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.ANL8Q577BA2F73KU3VELDXGWZK:ANL8Q577BA2F73KU3VELDXGWZK,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "namespace": "t.4.ANL8Q577BA2F73KU3VELDXGWZK", + "modelId": "ANL8Q577BA2F73KU3VELDXGWZK", + "isImport": "false" + }, + "externalUrl": "http://tenant/sap/fpa/ui/tenants/3c44c#view_id=model;model_id=t.4.ANL8Q577BA2F73KU3VELDXGWZK:ANL8Q577BA2F73KU3VELDXGWZK", + "name": "Name of the first model (BW)", + "description": "Description of the first model which has a connection to a BW query", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hana,HANA.SCHEMA.CE.SCHEMA::VIEW,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:hana", + "name": "HANA.SCHEMA.CE.SCHEMA::VIEW", + "origin": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.ANL8Q577BA2F73KU3VELDXGWZK:ANL8Q577BA2F73KU3VELDXGWZK,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:bw,BW.QUERY/QUERY_TECHNICAL_NAME,PROD)", + "type": "COPY" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.ANL8Q577BA2F73KU3VELDXGWZK:ANL8Q577BA2F73KU3VELDXGWZK,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.ANL8Q577BA2F73KU3VELDXGWZK:ANL8Q577BA2F73KU3VELDXGWZK,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Live Data Model" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.ANL8Q577BA2F73KU3VELDXGWZK:ANL8Q577BA2F73KU3VELDXGWZK,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:sac" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.K73U3VELDXGWZKANL8Q577BA2F:K73U3VELDXGWZKANL8Q577BA2F,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "namespace": "t.4.K73U3VELDXGWZKANL8Q577BA2F", + "modelId": "K73U3VELDXGWZKANL8Q577BA2F", + "isImport": "false" + }, + "externalUrl": "http://tenant/sap/fpa/ui/tenants/3c44c#view_id=model;model_id=t.4.K73U3VELDXGWZKANL8Q577BA2F:K73U3VELDXGWZKANL8Q577BA2F", + "name": "Name of the second model (HANA)", + "description": "Description of the second model which has a connection to a HANA view", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.DXGWZKANLK73U3VEL8Q577BA2F:DXGWZKANLK73U3VEL8Q577BA2F,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "DXGWZKANLK73U3VEL8Q577BA2F", + "platform": "urn:li:dataPlatform:sac", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.Schemaless": {} + }, + "fields": [ + { + "fieldPath": "Account", + "nullable": false, + "description": "Account", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string(256)", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "FIELD1", + "nullable": false, + "description": "FIELD1", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string(256)", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "FIELD2", + "nullable": false, + "description": "FIELD2", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string(256)", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "FIELD3", + "nullable": false, + "description": "FIELD3", + "type": { + "type": { + "com.linkedin.schema.DateType": {} + } + }, + "nativeDataType": "string(256)", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "Version", + "nullable": false, + "description": "Version", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string(300)", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "SignedData", + "nullable": false, + "description": "SignedData", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "decimal(31, 7)", + "recursive": false, + "isPartOfKey": false + } + ], + "primaryKeys": [ + "Account", + "FIELD1", + "FIELD2", + "FIELD3", + "Version" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.K73U3VELDXGWZKANL8Q577BA2F:K73U3VELDXGWZKANL8Q577BA2F,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:hana,HANA.SCHEMA.CE.SCHEMA::VIEW,PROD)", + "type": "COPY" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.K73U3VELDXGWZKANL8Q577BA2F:K73U3VELDXGWZKANL8Q577BA2F,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.K73U3VELDXGWZKANL8Q577BA2F:K73U3VELDXGWZKANL8Q577BA2F,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Live Data Model" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.K73U3VELDXGWZKANL8Q577BA2F:K73U3VELDXGWZKANL8Q577BA2F,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:sac" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.DXGWZKANLK73U3VEL8Q577BA2F:DXGWZKANLK73U3VEL8Q577BA2F,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "namespace": "t.4.DXGWZKANLK73U3VEL8Q577BA2F", + "modelId": "DXGWZKANLK73U3VEL8Q577BA2F", + "isImport": "true" + }, + "externalUrl": "http://tenant/sap/fpa/ui/tenants/3c44c#view_id=model;model_id=t.4.DXGWZKANLK73U3VEL8Q577BA2F:DXGWZKANLK73U3VEL8Q577BA2F", + "name": "Name of the third model (Import)", + "description": "Description of the third model which was imported", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.DXGWZKANLK73U3VEL8Q577BA2F:DXGWZKANLK73U3VEL8Q577BA2F,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bw,BW.QUERY/QUERY_TECHNICAL_NAME,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bw", + "name": "BW.QUERY/QUERY_TECHNICAL_NAME", + "origin": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.DXGWZKANLK73U3VEL8Q577BA2F:DXGWZKANLK73U3VEL8Q577BA2F,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Import Data Model" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.DXGWZKANLK73U3VEL8Q577BA2F:DXGWZKANLK73U3VEL8Q577BA2F,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:sac" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(sac,LXTH4JCE36EOYLU41PIINLYPU9XRYM26)", + "changeType": "UPSERT", + "aspectName": "dashboardInfo", + "aspect": { + "json": { + "customProperties": { + "resourceType": "STORY", + "resourceSubtype": "", + "storyId": "STORY:t.4:LXTH4JCE36EOYLU41PIINLYPU9XRYM26", + "isMobile": "0" + }, + "externalUrl": "http://tenant/sap/fpa/ui/tenants/3c44c/bo/story/LXTH4JCE36EOYLU41PIINLYPU9XRYM26", + "title": "Name of the story", + "description": "Description of the story", + "charts": [], + "datasets": [ + "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.ANL8Q577BA2F73KU3VELDXGWZK:ANL8Q577BA2F73KU3VELDXGWZK,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.K73U3VELDXGWZKANL8Q577BA2F:K73U3VELDXGWZKANL8Q577BA2F,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.DXGWZKANLK73U3VEL8Q577BA2F:DXGWZKANLK73U3VEL8Q577BA2F,PROD)" + ], + "lastModified": { + "created": { + "time": 1667544309783, + "actor": "urn:li:corpuser:JOHN_DOE" + }, + "lastModified": { + "time": 1673067981272, + "actor": "urn:li:corpuser:JOHN_DOE" + } + } + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(sac,LXTH4JCE36EOYLU41PIINLYPU9XRYM26)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(sac,LXTH4JCE36EOYLU41PIINLYPU9XRYM26)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Story" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(sac,EOYLU41PIILXTH4JCE36NLYPU9XRYM26)", + "changeType": "UPSERT", + "aspectName": "browsePaths", + "aspect": { + "json": { + "paths": [ + "/sac/Public/Folder 1/Folder 2" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(sac,EOYLU41PIILXTH4JCE36NLYPU9XRYM26)", + "changeType": "UPSERT", + "aspectName": "dashboardInfo", + "aspect": { + "json": { + "customProperties": { + "resourceType": "STORY", + "resourceSubtype": "APPLICATION", + "storyId": "STORY:t.4:EOYLU41PIILXTH4JCE36NLYPU9XRYM26", + "isMobile": "0" + }, + "externalUrl": "http://tenant/sap/fpa/ui/tenants/3c44c/bo/story/EOYLU41PIILXTH4JCE36NLYPU9XRYM26", + "title": "Name of the application", + "description": "Description of the application", + "charts": [], + "datasets": [ + "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.ANL8Q577BA2F73KU3VELDXGWZK:ANL8Q577BA2F73KU3VELDXGWZK,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.K73U3VELDXGWZKANL8Q577BA2F:K73U3VELDXGWZKANL8Q577BA2F,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:sac,t.4.DXGWZKANLK73U3VEL8Q577BA2F:DXGWZKANLK73U3VEL8Q577BA2F,PROD)" + ], + "lastModified": { + "created": { + "time": 1673279404272, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1673279414272, + "actor": "urn:li:corpuser:unknown" + } + } + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(sac,EOYLU41PIILXTH4JCE36NLYPU9XRYM26)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(sac,EOYLU41PIILXTH4JCE36NLYPU9XRYM26)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Application" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(sac,EOYLU41PIILXTH4JCE36NLYPU9XRYM26)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Public" + }, + { + "id": "Folder 1" + }, + { + "id": "Folder 2" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "sac-integration-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/sac/test_sac.py b/metadata-ingestion/tests/integration/sac/test_sac.py new file mode 100644 index 00000000000000..067bd5d7fc4215 --- /dev/null +++ b/metadata-ingestion/tests/integration/sac/test_sac.py @@ -0,0 +1,308 @@ +from functools import partial +from typing import Dict +from urllib.parse import parse_qs + +import pytest + +from datahub.ingestion.run.pipeline import Pipeline +from tests.test_helpers import mce_helpers + +MOCK_TENANT_URL = "http://tenant" +MOCK_TOKEN_URL = "http://tenant.authentication/oauth/token" +MOCK_CLIENT_ID = "foo" +MOCK_CLIENT_SECRET = "bar" +MOCK_ACCESS_TOKEN = "foobaraccesstoken" + + +@pytest.mark.integration +def test_sac( + pytestconfig, + tmp_path, + requests_mock, + mock_time, +): + def match_token_url(request, context): + form = parse_qs(request.text, strict_parsing=True) + + assert "grant_type" in form + assert len(form["grant_type"]) == 1 + assert form["grant_type"][0] == "client_credentials" + + assert "client_id" in form + assert len(form["client_id"]) == 1 + assert form["client_id"][0] == MOCK_CLIENT_ID + + assert "client_secret" in form + assert len(form["client_secret"]) == 1 + assert form["client_secret"][0] == MOCK_CLIENT_SECRET + + json = { + "access_token": MOCK_ACCESS_TOKEN, + "expires_in": 3599, + } + + return json + + requests_mock.post( + MOCK_TOKEN_URL, + json=match_token_url, + ) + + test_resources_dir = pytestconfig.rootpath / "tests/integration/sac" + + with open(f"{test_resources_dir}/metadata.xml", mode="rb") as f: + content = f.read() + + def match_metadata(request, context): + _check_authorization(request.headers) + + context.headers["content-type"] = "application/xml" + + return content + + requests_mock.get(f"{MOCK_TENANT_URL}/api/v1/$metadata", content=match_metadata) + + def match_resources(request, context): + _check_authorization(request.headers) + + json = { + "d": { + "results": [ + { + "__metadata": { + "type": "sap.fpa.services.search.internal.ResourcesType", + "uri": "/api/v1/Resources('LXTH4JCE36EOYLU41PIINLYPU9XRYM26')", + }, + "name": "Name of the story", + "description": "Description of the story", + "resourceId": "LXTH4JCE36EOYLU41PIINLYPU9XRYM26", + "resourceType": "STORY", + "resourceSubtype": "", + "storyId": "STORY:t.4:LXTH4JCE36EOYLU41PIINLYPU9XRYM26", + "createdTime": "/Date(1667544309783)/", + "createdBy": "JOHN_DOE", + "modifiedBy": "JOHN_DOE", + "modifiedTime": "/Date(1673067981272)/", + "isMobile": 0, + "openURL": "/sap/fpa/ui/tenants/3c44c/bo/story/LXTH4JCE36EOYLU41PIINLYPU9XRYM26", + "ancestorPath": '["Public","Folder 1","Folder 2"]', + }, + { + "__metadata": { + "type": "sap.fpa.services.search.internal.ResourcesType", + "uri": "/api/v1/Resources('EOYLU41PIILXTH4JCE36NLYPU9XRYM26')", + }, + "name": "Name of the application", + "description": "Description of the application", + "resourceId": "EOYLU41PIILXTH4JCE36NLYPU9XRYM26", + "resourceType": "STORY", + "resourceSubtype": "APPLICATION", + "storyId": "STORY:t.4:EOYLU41PIILXTH4JCE36NLYPU9XRYM26", + "createdTime": "/Date(1673279404272)/", + "createdBy": "SYSTEM", + "modifiedBy": "$DELETED_USER$", + "modifiedTime": "/Date(1673279414272)/", + "isMobile": 0, + "openURL": "/sap/fpa/ui/tenants/3c44c/bo/story/EOYLU41PIILXTH4JCE36NLYPU9XRYM26", + "ancestorPath": '["Public","Folder 1","Folder 2"]', + }, + ], + }, + } + + return json + + requests_mock.get( + f"{MOCK_TENANT_URL}/api/v1/Resources?$format=json&$filter=isTemplate eq 0 and isSample eq 0 and isPublic eq 1 and ((resourceType eq 'STORY' and resourceSubtype eq '') or (resourceType eq 'STORY' and resourceSubtype eq 'APPLICATION'))&$select=resourceId,resourceType,resourceSubtype,storyId,name,description,createdTime,createdBy,modifiedBy,modifiedTime,openURL,ancestorPath,isMobile", + json=match_resources, + ) + + def match_resource(request, context, resource_id): + _check_authorization(request.headers) + + json = { + "d": { + "results": [ + { + "__metadata": { + "type": "sap.fpa.services.search.internal.ModelsType", + "uri": f"/api/v1/Models(resourceId='{resource_id}',modelId='t.4.ANL8Q577BA2F73KU3VELDXGWZK%3AANL8Q577BA2F73KU3VELDXGWZK')", + }, + "modelId": "t.4.ANL8Q577BA2F73KU3VELDXGWZK:ANL8Q577BA2F73KU3VELDXGWZK", + "name": "Name of the first model (BW)", + "description": "Description of the first model which has a connection to a BW query", + "externalId": "query:[][][QUERY_TECHNICAL_NAME]", + "connectionId": "BW", + "systemType": "BW", + }, + { + "__metadata": { + "type": "sap.fpa.services.search.internal.ModelsType", + "uri": f"/api/v1/Models(resourceId='{resource_id}',modelId='t.4.K73U3VELDXGWZKANL8Q577BA2F%3AK73U3VELDXGWZKANL8Q577BA2F')", + }, + "modelId": "t.4.K73U3VELDXGWZKANL8Q577BA2F:K73U3VELDXGWZKANL8Q577BA2F", + "name": "Name of the second model (HANA)", + "description": "Description of the second model which has a connection to a HANA view", + "externalId": "view:[SCHEMA][NAMESPACE.SCHEMA][VIEW]", + "connectionId": "HANA", + "systemType": "HANA", + }, + { + "__metadata": { + "type": "sap.fpa.services.search.internal.ModelsType", + "uri": f"/api/v1/Models(resourceId='{resource_id}',modelId='t.4.DXGWZKANLK73U3VEL8Q577BA2F%3ADXGWZKANLK73U3VEL8Q577BA2F')", + }, + "modelId": "t.4.DXGWZKANLK73U3VEL8Q577BA2F:DXGWZKANLK73U3VEL8Q577BA2F", + "name": "Name of the third model (Import)", + "description": "Description of the third model which was imported", + "externalId": "", + "connectionId": "", + "systemType": None, + }, + ], + }, + } + + return json + + requests_mock.get( + f"{MOCK_TENANT_URL}/api/v1/Resources%28%27LXTH4JCE36EOYLU41PIINLYPU9XRYM26%27%29/resourceModels?$format=json&$select=modelId,name,description,externalId,connectionId,systemType", + json=partial(match_resource, resource_id="LXTH4JCE36EOYLU41PIINLYPU9XRYM26"), + ) + + requests_mock.get( + f"{MOCK_TENANT_URL}/api/v1/Resources%28%27EOYLU41PIILXTH4JCE36NLYPU9XRYM26%27%29/resourceModels?$format=json&$select=modelId,name,description,externalId,connectionId,systemType", + json=partial(match_resource, resource_id="EOYLU41PIILXTH4JCE36NLYPU9XRYM26"), + ) + + def match_models(request, context): + _check_authorization(request.headers) + + json = { + "models": [ + { + "modelID": "DXGWZKANLK73U3VEL8Q577BA2F", + "modelName": "Name of the third model (Import)", + "modelDescription": "Description of the third model which was imported", + "modelURL": f"{MOCK_TENANT_URL}/api/v1/dataimport/models/DXGWZKANLK73U3VEL8Q577BA2F", + }, + ], + } + + return json + + requests_mock.get( + f"{MOCK_TENANT_URL}/api/v1/dataimport/models", + json=match_models, + ) + + def match_model_metadata(request, context): + _check_authorization(request.headers) + + json = { + "factData": { + "keys": [ + "Account", + "FIELD1", + "FIELD2", + "FIELD3", + "Version", + ], + "columns": [ + { + "columnName": "Account", + "columnDataType": "string", + "maxLength": 256, + "isKey": True, + "propertyType": "PROPERTY", + "descriptionName": "Account", + }, + { + "columnName": "FIELD1", + "columnDataType": "string", + "maxLength": 256, + "isKey": True, + "propertyType": "PROPERTY", + "descriptionName": "FIELD1", + }, + { + "columnName": "FIELD2", + "columnDataType": "string", + "maxLength": 256, + "isKey": True, + "propertyType": "PROPERTY", + "descriptionName": "FIELD2", + }, + { + "columnName": "FIELD3", + "columnDataType": "string", + "maxLength": 256, + "isKey": True, + "propertyType": "DATE", + "descriptionName": "FIELD3", + }, + { + "columnName": "Version", + "columnDataType": "string", + "maxLength": 300, + "isKey": True, + "propertyType": "PROPERTY", + "descriptionName": "Version", + }, + { + "columnName": "SignedData", + "columnDataType": "decimal", + "maxLength": 32, + "precision": 31, + "scale": 7, + "isKey": False, + "propertyType": "PROPERTY", + "descriptionName": "SignedData", + }, + ], + }, + } + + return json + + requests_mock.get( + f"{MOCK_TENANT_URL}/api/v1/dataimport/models/DXGWZKANLK73U3VEL8Q577BA2F/metadata", + json=match_model_metadata, + ) + + pipeline = Pipeline.create( + { + "run_id": "sac-integration-test", + "source": { + "type": "sac", + "config": { + "tenant_url": MOCK_TENANT_URL, + "token_url": MOCK_TOKEN_URL, + "client_id": MOCK_CLIENT_ID, + "client_secret": MOCK_CLIENT_SECRET, + }, + }, + "sink": { + "type": "file", + "config": {"filename": f"{tmp_path}/sac_mces.json"}, + }, + }, + ) + + pipeline.run() + pipeline.raise_from_status() + + mce_helpers.check_golden_file( + pytestconfig, + output_path=f"{tmp_path}/sac_mces.json", + golden_path=test_resources_dir / "sac_mces_golden.json", + ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS, + ) + + +def _check_authorization(headers: Dict) -> None: + assert "Authorization" in headers + assert headers["Authorization"] == f"Bearer {MOCK_ACCESS_TOKEN}" + + assert "x-sap-sac-custom-auth" in headers + assert headers["x-sap-sac-custom-auth"] == "true" diff --git a/metadata-service/war/src/main/resources/boot/data_platforms.json b/metadata-service/war/src/main/resources/boot/data_platforms.json index ab040ad2854ddb..4830311996fd94 100644 --- a/metadata-service/war/src/main/resources/boot/data_platforms.json +++ b/metadata-service/war/src/main/resources/boot/data_platforms.json @@ -664,5 +664,15 @@ "type": "OTHERS", "logoUrl": "/assets/platforms/sigmalogo.png" } + }, + { + "urn": "urn:li:dataPlatform:sac", + "aspect": { + "datasetNameDelimiter": ".", + "name": "sac", + "displayName": "SAP Analytics Cloud", + "type": "OTHERS", + "logoUrl": "/assets/platforms/saclogo.svg" + } } ] From a07233a23427fc34939b4f73d8e18db298efe39f Mon Sep 17 00:00:00 2001 From: Masterchen09 <13187726+Masterchen09@users.noreply.github.com> Date: Tue, 30 Jul 2024 22:38:39 +0200 Subject: [PATCH 2/7] adjustments after code review --- .../docs/sources/sac/sac_pre.md | 4 +- .../src/datahub/ingestion/source/sac/sac.py | 372 +++++++-------- .../src/datahub/utilities/logging_manager.py | 4 +- .../tests/integration/sac/test_sac.py | 438 +++++++++--------- 4 files changed, 384 insertions(+), 434 deletions(-) diff --git a/metadata-ingestion/docs/sources/sac/sac_pre.md b/metadata-ingestion/docs/sources/sac/sac_pre.md index 2dc66f4c1f2f65..c62cd81fa27534 100644 --- a/metadata-ingestion/docs/sources/sac/sac_pre.md +++ b/metadata-ingestion/docs/sources/sac/sac_pre.md @@ -9,7 +9,7 @@ 2. Maintain connection mappings (optional): -To map individual connections in SAP Analytics Cloud to platforms, platform instances or environments the `connection_mapping` configuration can be used within the recipe: +To map individual connections in SAP Analytics Cloud to platforms, platform instances and environments, the `connection_mapping` configuration can be used within the recipe: ```yaml connection_mapping: @@ -23,7 +23,7 @@ connection_mapping: env: PROD ``` -The key in the connection mapping is the technical name of the connection resp. its id. +The key in the connection mapping dictionary represents the name of the connection created in SAP Analytics Cloud. ## Concept mapping diff --git a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py index 20bfcac5ea8439..e2fbc75ef0851b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py @@ -66,7 +66,6 @@ BrowsePathsClass, BrowsePathsV2Class, ChangeAuditStampsClass, - ChangeTypeClass, DashboardInfoClass, DataPlatformInstanceClass, DatasetLineageTypeClass, @@ -196,24 +195,7 @@ def __init__(self, config: SACSourceConfig, ctx: PipelineContext): self.config = config self.report = SACSourceReport() - self.session = OAuth2Session( - client_id=self.config.client_id, - client_secret=self.config.client_secret.get_secret_value(), - token_endpoint=config.token_url, - token_endpoint_auth_method="client_secret_post", - grant_type="client_credentials", - ) - - self.session.register_compliance_hook( - "protected_request", _add_sap_sac_custom_auth_header - ) - self.session.fetch_token() - - self.client = pyodata.Client( - url=f"{self.config.tenant_url}/api/v1", - connection=self.session, - config=pyodata.v2.model.Config(retain_null=True), - ) + self.session, self.client = SACSource.get_sac_connection(self.config) def close(self) -> None: self.session.close() @@ -231,36 +213,20 @@ def test_connection(config_dict: dict) -> TestConnectionReport: try: config = SACSourceConfig.parse_obj(config_dict) - session = OAuth2Session( - client_id=config.client_id, - client_secret=config.client_secret.get_secret_value(), - token_endpoint=config.token_url, - token_endpoint_auth_method="client_secret_post", - grant_type="client_credentials", - ) - - session.register_compliance_hook( - "protected_request", _add_sap_sac_custom_auth_header - ) - session.fetch_token() - - response = session.get(url=f"{config.tenant_url}/api/v1/$metadata") - response.raise_for_status() + # when creating the pyodata.Client, the metadata is automatically parsed and validated + session, _ = SACSource.get_sac_connection(config) + # test the Data Import Service Service separately here, because it requires specific properties when configuring the OAuth client response = session.get(url=f"{config.tenant_url}/api/v1/dataimport/models") response.raise_for_status() - test_report.basic_connectivity = CapabilityReport(capable=True) + session.close() + test_report.basic_connectivity = CapabilityReport(capable=True) except Exception as e: - logger.error(f"Failed to test connection due to {e}", exc_info=e) - if test_report.basic_connectivity is None: - test_report.basic_connectivity = CapabilityReport( - capable=False, failure_reason=f"{e}" - ) - else: - test_report.internal_failure = True - test_report.internal_failure_reason = f"{e}" + test_report.basic_connectivity = CapabilityReport( + capable=False, failure_reason=f"{e}" + ) return test_report @@ -281,64 +247,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: resources = self.get_resources() for resource in resources: - dashboard_urn = make_dashboard_urn( - platform=self.platform, - name=resource.resource_id, - platform_instance=self.config.platform_instance, - ) - - if resource.ancestor_path: - mcp = MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dashboard_urn, - aspectName="browsePaths", - aspect=BrowsePathsClass( - paths=[ - f"/{self.platform}/{resource.ancestor_path}", - ], - ), - ) - - yield MetadataWorkUnit( - id=f"dashboard-browse-paths-{dashboard_urn}", mcp=mcp - ) - - mcp = MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dashboard_urn, - aspectName="browsePathsV2", - aspect=BrowsePathsV2Class( - path=[ - BrowsePathEntryClass(id=folder_name) - for folder_name in resource.ancestor_path.split("/") - ], - ), - ) - - yield MetadataWorkUnit( - id=f"dashboard-browse-paths-v2-{dashboard_urn}", mcp=mcp - ) - - if self.config.platform_instance is not None: - mcp = MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dashboard_urn, - aspectName="dataPlatformInstance", - aspect=DataPlatformInstanceClass( - platform=make_data_platform_urn(self.platform), - instance=make_dataplatform_instance_urn( - self.platform, self.config.platform_instance - ), - ), - ) - - yield MetadataWorkUnit( - id=f"dashboard-data-platform-instance-{dashboard_urn}", mcp=mcp - ) - datasets = [] for resource_model in resource.resource_models: @@ -359,73 +267,110 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: yield from self.get_model_workunits(dataset_urn, resource_model) - mcp = MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dashboard_urn, - aspectName="dashboardInfo", - aspect=DashboardInfoClass( - title=resource.name, - description=resource.description, - lastModified=ChangeAuditStampsClass( - created=AuditStampClass( - time=round(resource.created_time.timestamp() * 1000), - actor=make_user_urn(resource.created_by) - if resource.created_by - else "urn:li:corpuser:unknown", - ), - lastModified=AuditStampClass( - time=round(resource.modified_time.timestamp() * 1000), - actor=make_user_urn(resource.modified_by) - if resource.modified_by - else "urn:li:corpuser:unknown", - ), - ), - customProperties={ - "resourceType": resource.resource_type, - "resourceSubtype": resource.resource_subtype, - "storyId": resource.story_id, - "isMobile": str(resource.is_mobile), - }, - datasets=sorted(datasets) if datasets else None, - externalUrl=f"{self.config.tenant_url}{resource.open_url}", + yield from self.get_resource_workunits(resource, datasets) + + def get_report(self) -> SACSourceReport: + return self.report + + def get_resource_workunits( + self, resource: Resource, datasets: List[str] + ) -> Iterable[MetadataWorkUnit]: + dashboard_urn = make_dashboard_urn( + platform=self.platform, + name=resource.resource_id, + platform_instance=self.config.platform_instance, + ) + + if resource.ancestor_path: + mcp = MetadataChangeProposalWrapper( + entityUrn=dashboard_urn, + aspect=BrowsePathsClass( + paths=[ + f"/{self.platform}/{resource.ancestor_path}", + ], + ), + ) + + yield mcp.as_workunit() + + mcp = MetadataChangeProposalWrapper( + entityUrn=dashboard_urn, + aspect=BrowsePathsV2Class( + path=[ + BrowsePathEntryClass(id=folder_name) + for folder_name in resource.ancestor_path.split("/") + ], + ), + ) + + yield mcp.as_workunit() + + if self.config.platform_instance is not None: + mcp = MetadataChangeProposalWrapper( + entityUrn=dashboard_urn, + aspect=DataPlatformInstanceClass( + platform=make_data_platform_urn(self.platform), + instance=make_dataplatform_instance_urn( + self.platform, self.config.platform_instance ), - ) + ), + ) - yield MetadataWorkUnit(id=f"dashboard-info-{dashboard_urn}", mcp=mcp) + yield mcp.as_workunit() - type_name: Optional[str] = None - if resource.resource_subtype == "": - type_name = BIAssetSubTypes.SAC_STORY - elif resource.resource_subtype == "APPLICATION": - type_name = BIAssetSubTypes.SAC_APPLICATION + mcp = MetadataChangeProposalWrapper( + entityUrn=dashboard_urn, + aspect=DashboardInfoClass( + title=resource.name, + description=resource.description, + lastModified=ChangeAuditStampsClass( + created=AuditStampClass( + time=round(resource.created_time.timestamp() * 1000), + actor=make_user_urn(resource.created_by) + if resource.created_by + else "urn:li:corpuser:unknown", + ), + lastModified=AuditStampClass( + time=round(resource.modified_time.timestamp() * 1000), + actor=make_user_urn(resource.modified_by) + if resource.modified_by + else "urn:li:corpuser:unknown", + ), + ), + customProperties={ + "resourceType": resource.resource_type, + "resourceSubtype": resource.resource_subtype, + "storyId": resource.story_id, + "isMobile": str(resource.is_mobile), + }, + datasets=sorted(datasets) if datasets else None, + externalUrl=f"{self.config.tenant_url}{resource.open_url}", + ), + ) - if type_name: - mcp = MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dashboard_urn, - aspectName="subTypes", - aspect=SubTypesClass( - typeNames=[type_name], - ), - ) + yield mcp.as_workunit() - yield MetadataWorkUnit( - id=f"dashboard-subtype-{dashboard_urn}", mcp=mcp - ) + type_name: Optional[str] = None + if resource.resource_subtype == "": + type_name = BIAssetSubTypes.SAC_STORY + elif resource.resource_subtype == "APPLICATION": + type_name = BIAssetSubTypes.SAC_APPLICATION - def get_report(self) -> SACSourceReport: - return self.report + if type_name: + mcp = MetadataChangeProposalWrapper( + entityUrn=dashboard_urn, + aspect=SubTypesClass( + typeNames=[type_name], + ), + ) + + yield mcp.as_workunit() def get_model_workunits( self, dataset_urn: str, model: ResourceModel ) -> Iterable[MetadataWorkUnit]: mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, - aspectName="datasetProperties", aspect=DatasetPropertiesClass( name=model.name, description=model.description, @@ -438,7 +383,7 @@ def get_model_workunits( ), ) - yield MetadataWorkUnit(id=f"dataset-properties-{dataset_urn}", mcp=mcp) + yield mcp.as_workunit() if model.is_import: primary_fields: List[str] = [] @@ -446,22 +391,11 @@ def get_model_workunits( columns = self.get_import_data_model_columns(model_id=model.model_id) for column in columns: - native_data_type = column.data_type - if column.data_type == "decimal": - native_data_type = ( - f"{column.data_type}({column.precision}, {column.scale})" - ) - elif column.data_type == "int32": - native_data_type = f"{column.data_type}({column.precision})" - elif column.max_length is not None: - native_data_type = f"{column.data_type}({column.max_length})" schema_field = SchemaFieldClass( fieldPath=column.name, - type=self.get_schema_field_data_type( - column.property_type, column.data_type - ), - nativeDataType=native_data_type, + type=self.get_schema_field_data_type(column), + nativeDataType=self.get_schema_field_native_data_type(column), description=column.description, isPartOfKey=column.is_key, ) @@ -472,10 +406,7 @@ def get_model_workunits( primary_fields.append(column.name) mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, - aspectName="schemaMetadata", aspect=SchemaMetadataClass( schemaName=model.model_id, platform=make_data_platform_urn(self.platform), @@ -487,9 +418,7 @@ def get_model_workunits( ), ) - yield MetadataWorkUnit( - id=f"dataset-upstream-lineage-{dataset_urn}", mcp=mcp - ) + yield mcp.as_workunit() if model.system_type in ("BW", "HANA") and model.external_id is not None: upstream_dataset_name: Optional[str] = None @@ -525,6 +454,10 @@ def get_model_workunits( platform_instance = model.connection_id env = DEFAULT_ENV + logger.info( + f"No connection mapping found for connection with id {model.connection_id}, connection id will be used as platform instance" + ) + upstream_dataset_urn = make_dataset_urn_with_platform_instance( platform=platform, name=upstream_dataset_name, @@ -534,26 +467,16 @@ def get_model_workunits( if upstream_dataset_urn not in self.ingested_upstream_dataset_keys: mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=upstream_dataset_urn, - aspectName="datasetKey", aspect=dataset_urn_to_key(upstream_dataset_urn), ) - yield MetadataWorkUnit( - id=f"dataset-key-{upstream_dataset_urn}", - mcp=mcp, - is_primary_source=False, - ) + yield mcp.as_workunit(is_primary_source=False) self.ingested_upstream_dataset_keys.add(upstream_dataset_urn) mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, - aspectName="upstreamLineage", aspect=UpstreamLineageClass( upstreams=[ UpstreamClass( @@ -564,37 +487,26 @@ def get_model_workunits( ), ) - yield MetadataWorkUnit( - id=f"dataset-upstream-lineage-{dataset_urn}", mcp=mcp - ) + yield mcp.as_workunit() else: - logger.warning( - f"Unknown upstream dataset for model with id {model.namespace}:{model.model_id} and external id {model.external_id}" - ) self.report.report_warning( "unknown-upstream-dataset", f"Unknown upstream dataset for model with id {model.namespace}:{model.model_id} and external id {model.external_id}", ) elif model.system_type is not None: - logger.warning( - f"Unknown system type {model.system_type} for model with id {model.namespace}:{model.model_id} and external id {model.external_id}" - ) self.report.report_warning( "unknown-system-type", f"Unknown system type {model.system_type} for model with id {model.namespace}:{model.model_id} and external id {model.external_id}", ) mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, - aspectName="status", aspect=StatusClass( removed=False, ), ) - yield MetadataWorkUnit(id=f"dataset-status-{dataset_urn}", mcp=mcp) + yield mcp.as_workunit() if model.external_id and model.connection_id and model.system_type: type_name = DatasetSubTypes.SAC_LIVE_DATA_MODEL @@ -604,32 +516,49 @@ def get_model_workunits( type_name = DatasetSubTypes.SAC_MODEL mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, - aspectName="subTypes", aspect=SubTypesClass( typeNames=[type_name], ), ) - yield MetadataWorkUnit(id=f"dataset-subtype-{dataset_urn}", mcp=mcp) + yield mcp.as_workunit() mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, - aspectName="dataPlatformInstance", aspect=DataPlatformInstanceClass( platform=make_data_platform_urn(self.platform), instance=self.config.platform_instance, ), ) - yield MetadataWorkUnit( - id=f"dataset-data-platform-instance-{dataset_urn}", mcp=mcp + yield mcp.as_workunit() + + @staticmethod + def get_sac_connection( + config: SACSourceConfig, + ) -> Tuple[OAuth2Session, pyodata.Client]: + session = OAuth2Session( + client_id=config.client_id, + client_secret=config.client_secret.get_secret_value(), + token_endpoint=config.token_url, + token_endpoint_auth_method="client_secret_post", + grant_type="client_credentials", + ) + + session.register_compliance_hook( + "protected_request", _add_sap_sac_custom_auth_header + ) + session.fetch_token() + + client = pyodata.Client( + url=f"{config.tenant_url}/api/v1", + connection=session, + config=pyodata.v2.model.Config(retain_null=True), ) + return session, client + def get_resources(self) -> Iterable[Resource]: import_data_model_ids = self.get_import_data_model_ids() @@ -787,19 +716,34 @@ def get_view_name(self, schema: str, namespace: Optional[str], view: str) -> str return f"{schema}.{view}" def get_schema_field_data_type( - self, property_type: str, data_type: str + self, column: ImportDataModelColumn ) -> SchemaFieldDataTypeClass: - if property_type == "DATE": + if column.property_type == "DATE": return SchemaFieldDataTypeClass(type=DateTypeClass()) else: - if data_type == "string": + if column.data_type == "string": return SchemaFieldDataTypeClass(type=StringTypeClass()) - elif data_type in ("decimal", "int32"): + elif column.data_type in ("decimal", "int32"): return SchemaFieldDataTypeClass(type=NumberTypeClass()) else: - logger.warning(f"Unknown data type {data_type} found") + self.report.report_warning( + "unknown-data-type", + f"Unknown data type {column.data_type} found", + ) + return SchemaFieldDataTypeClass(type=NullTypeClass()) + def get_schema_field_native_data_type(self, column: ImportDataModelColumn) -> str: + native_data_type = column.data_type + if column.data_type == "decimal": + native_data_type = f"{column.data_type}({column.precision}, {column.scale})" + elif column.data_type == "int32": + native_data_type = f"{column.data_type}({column.precision})" + elif column.max_length is not None: + native_data_type = f"{column.data_type}({column.max_length})" + + return native_data_type + def _add_sap_sac_custom_auth_header( url: str, headers: Dict[str, str], body: Any diff --git a/metadata-ingestion/src/datahub/utilities/logging_manager.py b/metadata-ingestion/src/datahub/utilities/logging_manager.py index 4e860d12a52dc3..75e0575c0f18b5 100644 --- a/metadata-ingestion/src/datahub/utilities/logging_manager.py +++ b/metadata-ingestion/src/datahub/utilities/logging_manager.py @@ -278,6 +278,4 @@ def configure_logging(debug: bool, log_file: Optional[str] = None) -> Iterator[N logging.getLogger("snowflake").setLevel(level=logging.WARNING) # logging.getLogger("botocore").setLevel(logging.INFO) # logging.getLogger("google").setLevel(logging.INFO) -logging.getLogger("pyodata.client").setLevel(logging.WARNING) -logging.getLogger("pyodata.model").setLevel(logging.WARNING) -logging.getLogger("pyodata.service").setLevel(logging.WARNING) +logging.getLogger("pyodata").setLevel(logging.WARNING) diff --git a/metadata-ingestion/tests/integration/sac/test_sac.py b/metadata-ingestion/tests/integration/sac/test_sac.py index 067bd5d7fc4215..2b6ca81700712e 100644 --- a/metadata-ingestion/tests/integration/sac/test_sac.py +++ b/metadata-ingestion/tests/integration/sac/test_sac.py @@ -21,28 +21,6 @@ def test_sac( requests_mock, mock_time, ): - def match_token_url(request, context): - form = parse_qs(request.text, strict_parsing=True) - - assert "grant_type" in form - assert len(form["grant_type"]) == 1 - assert form["grant_type"][0] == "client_credentials" - - assert "client_id" in form - assert len(form["client_id"]) == 1 - assert form["client_id"][0] == MOCK_CLIENT_ID - - assert "client_secret" in form - assert len(form["client_secret"]) == 1 - assert form["client_secret"][0] == MOCK_CLIENT_SECRET - - json = { - "access_token": MOCK_ACCESS_TOKEN, - "expires_in": 3599, - } - - return json - requests_mock.post( MOCK_TOKEN_URL, json=match_token_url, @@ -52,119 +30,16 @@ def match_token_url(request, context): with open(f"{test_resources_dir}/metadata.xml", mode="rb") as f: content = f.read() - - def match_metadata(request, context): - _check_authorization(request.headers) - - context.headers["content-type"] = "application/xml" - - return content - - requests_mock.get(f"{MOCK_TENANT_URL}/api/v1/$metadata", content=match_metadata) - - def match_resources(request, context): - _check_authorization(request.headers) - - json = { - "d": { - "results": [ - { - "__metadata": { - "type": "sap.fpa.services.search.internal.ResourcesType", - "uri": "/api/v1/Resources('LXTH4JCE36EOYLU41PIINLYPU9XRYM26')", - }, - "name": "Name of the story", - "description": "Description of the story", - "resourceId": "LXTH4JCE36EOYLU41PIINLYPU9XRYM26", - "resourceType": "STORY", - "resourceSubtype": "", - "storyId": "STORY:t.4:LXTH4JCE36EOYLU41PIINLYPU9XRYM26", - "createdTime": "/Date(1667544309783)/", - "createdBy": "JOHN_DOE", - "modifiedBy": "JOHN_DOE", - "modifiedTime": "/Date(1673067981272)/", - "isMobile": 0, - "openURL": "/sap/fpa/ui/tenants/3c44c/bo/story/LXTH4JCE36EOYLU41PIINLYPU9XRYM26", - "ancestorPath": '["Public","Folder 1","Folder 2"]', - }, - { - "__metadata": { - "type": "sap.fpa.services.search.internal.ResourcesType", - "uri": "/api/v1/Resources('EOYLU41PIILXTH4JCE36NLYPU9XRYM26')", - }, - "name": "Name of the application", - "description": "Description of the application", - "resourceId": "EOYLU41PIILXTH4JCE36NLYPU9XRYM26", - "resourceType": "STORY", - "resourceSubtype": "APPLICATION", - "storyId": "STORY:t.4:EOYLU41PIILXTH4JCE36NLYPU9XRYM26", - "createdTime": "/Date(1673279404272)/", - "createdBy": "SYSTEM", - "modifiedBy": "$DELETED_USER$", - "modifiedTime": "/Date(1673279414272)/", - "isMobile": 0, - "openURL": "/sap/fpa/ui/tenants/3c44c/bo/story/EOYLU41PIILXTH4JCE36NLYPU9XRYM26", - "ancestorPath": '["Public","Folder 1","Folder 2"]', - }, - ], - }, - } - - return json + requests_mock.get( + f"{MOCK_TENANT_URL}/api/v1/$metadata", + content=partial(match_metadata, content=content), + ) requests_mock.get( f"{MOCK_TENANT_URL}/api/v1/Resources?$format=json&$filter=isTemplate eq 0 and isSample eq 0 and isPublic eq 1 and ((resourceType eq 'STORY' and resourceSubtype eq '') or (resourceType eq 'STORY' and resourceSubtype eq 'APPLICATION'))&$select=resourceId,resourceType,resourceSubtype,storyId,name,description,createdTime,createdBy,modifiedBy,modifiedTime,openURL,ancestorPath,isMobile", json=match_resources, ) - def match_resource(request, context, resource_id): - _check_authorization(request.headers) - - json = { - "d": { - "results": [ - { - "__metadata": { - "type": "sap.fpa.services.search.internal.ModelsType", - "uri": f"/api/v1/Models(resourceId='{resource_id}',modelId='t.4.ANL8Q577BA2F73KU3VELDXGWZK%3AANL8Q577BA2F73KU3VELDXGWZK')", - }, - "modelId": "t.4.ANL8Q577BA2F73KU3VELDXGWZK:ANL8Q577BA2F73KU3VELDXGWZK", - "name": "Name of the first model (BW)", - "description": "Description of the first model which has a connection to a BW query", - "externalId": "query:[][][QUERY_TECHNICAL_NAME]", - "connectionId": "BW", - "systemType": "BW", - }, - { - "__metadata": { - "type": "sap.fpa.services.search.internal.ModelsType", - "uri": f"/api/v1/Models(resourceId='{resource_id}',modelId='t.4.K73U3VELDXGWZKANL8Q577BA2F%3AK73U3VELDXGWZKANL8Q577BA2F')", - }, - "modelId": "t.4.K73U3VELDXGWZKANL8Q577BA2F:K73U3VELDXGWZKANL8Q577BA2F", - "name": "Name of the second model (HANA)", - "description": "Description of the second model which has a connection to a HANA view", - "externalId": "view:[SCHEMA][NAMESPACE.SCHEMA][VIEW]", - "connectionId": "HANA", - "systemType": "HANA", - }, - { - "__metadata": { - "type": "sap.fpa.services.search.internal.ModelsType", - "uri": f"/api/v1/Models(resourceId='{resource_id}',modelId='t.4.DXGWZKANLK73U3VEL8Q577BA2F%3ADXGWZKANLK73U3VEL8Q577BA2F')", - }, - "modelId": "t.4.DXGWZKANLK73U3VEL8Q577BA2F:DXGWZKANLK73U3VEL8Q577BA2F", - "name": "Name of the third model (Import)", - "description": "Description of the third model which was imported", - "externalId": "", - "connectionId": "", - "systemType": None, - }, - ], - }, - } - - return json - requests_mock.get( f"{MOCK_TENANT_URL}/api/v1/Resources%28%27LXTH4JCE36EOYLU41PIINLYPU9XRYM26%27%29/resourceModels?$format=json&$select=modelId,name,description,externalId,connectionId,systemType", json=partial(match_resource, resource_id="LXTH4JCE36EOYLU41PIINLYPU9XRYM26"), @@ -175,96 +50,11 @@ def match_resource(request, context, resource_id): json=partial(match_resource, resource_id="EOYLU41PIILXTH4JCE36NLYPU9XRYM26"), ) - def match_models(request, context): - _check_authorization(request.headers) - - json = { - "models": [ - { - "modelID": "DXGWZKANLK73U3VEL8Q577BA2F", - "modelName": "Name of the third model (Import)", - "modelDescription": "Description of the third model which was imported", - "modelURL": f"{MOCK_TENANT_URL}/api/v1/dataimport/models/DXGWZKANLK73U3VEL8Q577BA2F", - }, - ], - } - - return json - requests_mock.get( f"{MOCK_TENANT_URL}/api/v1/dataimport/models", json=match_models, ) - def match_model_metadata(request, context): - _check_authorization(request.headers) - - json = { - "factData": { - "keys": [ - "Account", - "FIELD1", - "FIELD2", - "FIELD3", - "Version", - ], - "columns": [ - { - "columnName": "Account", - "columnDataType": "string", - "maxLength": 256, - "isKey": True, - "propertyType": "PROPERTY", - "descriptionName": "Account", - }, - { - "columnName": "FIELD1", - "columnDataType": "string", - "maxLength": 256, - "isKey": True, - "propertyType": "PROPERTY", - "descriptionName": "FIELD1", - }, - { - "columnName": "FIELD2", - "columnDataType": "string", - "maxLength": 256, - "isKey": True, - "propertyType": "PROPERTY", - "descriptionName": "FIELD2", - }, - { - "columnName": "FIELD3", - "columnDataType": "string", - "maxLength": 256, - "isKey": True, - "propertyType": "DATE", - "descriptionName": "FIELD3", - }, - { - "columnName": "Version", - "columnDataType": "string", - "maxLength": 300, - "isKey": True, - "propertyType": "PROPERTY", - "descriptionName": "Version", - }, - { - "columnName": "SignedData", - "columnDataType": "decimal", - "maxLength": 32, - "precision": 31, - "scale": 7, - "isKey": False, - "propertyType": "PROPERTY", - "descriptionName": "SignedData", - }, - ], - }, - } - - return json - requests_mock.get( f"{MOCK_TENANT_URL}/api/v1/dataimport/models/DXGWZKANLK73U3VEL8Q577BA2F/metadata", json=match_model_metadata, @@ -300,9 +90,227 @@ def match_model_metadata(request, context): ) -def _check_authorization(headers: Dict) -> None: +def match_token_url(request, context): + form = parse_qs(request.text, strict_parsing=True) + + assert "grant_type" in form + assert len(form["grant_type"]) == 1 + assert form["grant_type"][0] == "client_credentials" + + assert "client_id" in form + assert len(form["client_id"]) == 1 + assert form["client_id"][0] == MOCK_CLIENT_ID + + assert "client_secret" in form + assert len(form["client_secret"]) == 1 + assert form["client_secret"][0] == MOCK_CLIENT_SECRET + + json = { + "access_token": MOCK_ACCESS_TOKEN, + "expires_in": 3599, + } + + return json + + +def check_authorization(headers: Dict[str, str]) -> None: assert "Authorization" in headers assert headers["Authorization"] == f"Bearer {MOCK_ACCESS_TOKEN}" assert "x-sap-sac-custom-auth" in headers assert headers["x-sap-sac-custom-auth"] == "true" + + +def match_metadata(request, context, content): + check_authorization(request.headers) + + context.headers["content-type"] = "application/xml" + + return content + + +def match_resources(request, context): + check_authorization(request.headers) + + json = { + "d": { + "results": [ + { + "__metadata": { + "type": "sap.fpa.services.search.internal.ResourcesType", + "uri": "/api/v1/Resources('LXTH4JCE36EOYLU41PIINLYPU9XRYM26')", + }, + "name": "Name of the story", + "description": "Description of the story", + "resourceId": "LXTH4JCE36EOYLU41PIINLYPU9XRYM26", + "resourceType": "STORY", + "resourceSubtype": "", + "storyId": "STORY:t.4:LXTH4JCE36EOYLU41PIINLYPU9XRYM26", + "createdTime": "/Date(1667544309783)/", + "createdBy": "JOHN_DOE", + "modifiedBy": "JOHN_DOE", + "modifiedTime": "/Date(1673067981272)/", + "isMobile": 0, + "openURL": "/sap/fpa/ui/tenants/3c44c/bo/story/LXTH4JCE36EOYLU41PIINLYPU9XRYM26", + "ancestorPath": '["Public","Folder 1","Folder 2"]', + }, + { + "__metadata": { + "type": "sap.fpa.services.search.internal.ResourcesType", + "uri": "/api/v1/Resources('EOYLU41PIILXTH4JCE36NLYPU9XRYM26')", + }, + "name": "Name of the application", + "description": "Description of the application", + "resourceId": "EOYLU41PIILXTH4JCE36NLYPU9XRYM26", + "resourceType": "STORY", + "resourceSubtype": "APPLICATION", + "storyId": "STORY:t.4:EOYLU41PIILXTH4JCE36NLYPU9XRYM26", + "createdTime": "/Date(1673279404272)/", + "createdBy": "SYSTEM", + "modifiedBy": "$DELETED_USER$", + "modifiedTime": "/Date(1673279414272)/", + "isMobile": 0, + "openURL": "/sap/fpa/ui/tenants/3c44c/bo/story/EOYLU41PIILXTH4JCE36NLYPU9XRYM26", + "ancestorPath": '["Public","Folder 1","Folder 2"]', + }, + ], + }, + } + + return json + + +def match_resource(request, context, resource_id): + check_authorization(request.headers) + + json = { + "d": { + "results": [ + { + "__metadata": { + "type": "sap.fpa.services.search.internal.ModelsType", + "uri": f"/api/v1/Models(resourceId='{resource_id}',modelId='t.4.ANL8Q577BA2F73KU3VELDXGWZK%3AANL8Q577BA2F73KU3VELDXGWZK')", + }, + "modelId": "t.4.ANL8Q577BA2F73KU3VELDXGWZK:ANL8Q577BA2F73KU3VELDXGWZK", + "name": "Name of the first model (BW)", + "description": "Description of the first model which has a connection to a BW query", + "externalId": "query:[][][QUERY_TECHNICAL_NAME]", + "connectionId": "BW", + "systemType": "BW", + }, + { + "__metadata": { + "type": "sap.fpa.services.search.internal.ModelsType", + "uri": f"/api/v1/Models(resourceId='{resource_id}',modelId='t.4.K73U3VELDXGWZKANL8Q577BA2F%3AK73U3VELDXGWZKANL8Q577BA2F')", + }, + "modelId": "t.4.K73U3VELDXGWZKANL8Q577BA2F:K73U3VELDXGWZKANL8Q577BA2F", + "name": "Name of the second model (HANA)", + "description": "Description of the second model which has a connection to a HANA view", + "externalId": "view:[SCHEMA][NAMESPACE.SCHEMA][VIEW]", + "connectionId": "HANA", + "systemType": "HANA", + }, + { + "__metadata": { + "type": "sap.fpa.services.search.internal.ModelsType", + "uri": f"/api/v1/Models(resourceId='{resource_id}',modelId='t.4.DXGWZKANLK73U3VEL8Q577BA2F%3ADXGWZKANLK73U3VEL8Q577BA2F')", + }, + "modelId": "t.4.DXGWZKANLK73U3VEL8Q577BA2F:DXGWZKANLK73U3VEL8Q577BA2F", + "name": "Name of the third model (Import)", + "description": "Description of the third model which was imported", + "externalId": "", + "connectionId": "", + "systemType": None, + }, + ], + }, + } + + return json + + +def match_models(request, context): + check_authorization(request.headers) + + json = { + "models": [ + { + "modelID": "DXGWZKANLK73U3VEL8Q577BA2F", + "modelName": "Name of the third model (Import)", + "modelDescription": "Description of the third model which was imported", + "modelURL": f"{MOCK_TENANT_URL}/api/v1/dataimport/models/DXGWZKANLK73U3VEL8Q577BA2F", + }, + ], + } + + return json + + +def match_model_metadata(request, context): + check_authorization(request.headers) + + json = { + "factData": { + "keys": [ + "Account", + "FIELD1", + "FIELD2", + "FIELD3", + "Version", + ], + "columns": [ + { + "columnName": "Account", + "columnDataType": "string", + "maxLength": 256, + "isKey": True, + "propertyType": "PROPERTY", + "descriptionName": "Account", + }, + { + "columnName": "FIELD1", + "columnDataType": "string", + "maxLength": 256, + "isKey": True, + "propertyType": "PROPERTY", + "descriptionName": "FIELD1", + }, + { + "columnName": "FIELD2", + "columnDataType": "string", + "maxLength": 256, + "isKey": True, + "propertyType": "PROPERTY", + "descriptionName": "FIELD2", + }, + { + "columnName": "FIELD3", + "columnDataType": "string", + "maxLength": 256, + "isKey": True, + "propertyType": "DATE", + "descriptionName": "FIELD3", + }, + { + "columnName": "Version", + "columnDataType": "string", + "maxLength": 300, + "isKey": True, + "propertyType": "PROPERTY", + "descriptionName": "Version", + }, + { + "columnName": "SignedData", + "columnDataType": "decimal", + "maxLength": 32, + "precision": 31, + "scale": 7, + "isKey": False, + "propertyType": "PROPERTY", + "descriptionName": "SignedData", + }, + ], + }, + } + + return json From 74e0c7f74d5152468f12f97390787daf18bd4e97 Mon Sep 17 00:00:00 2001 From: Masterchen09 <13187726+Masterchen09@users.noreply.github.com> Date: Wed, 31 Jul 2024 23:27:57 +0200 Subject: [PATCH 3/7] added config to control ingestion of schema metadata of Import Data Models and Retry to session HTTPAdapter --- .../src/datahub/ingestion/source/sac/sac.py | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py index e2fbc75ef0851b..287c4abaaaad7a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py @@ -9,6 +9,8 @@ import pyodata.v2.service from authlib.integrations.requests_client import OAuth2Session from pydantic import Field, SecretStr, validator +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry from datahub.configuration.common import AllowDenyPattern from datahub.configuration.source_common import ( @@ -131,6 +133,11 @@ class SACSourceConfig( description="Controls whether Analytic Applications should be ingested", ) + ingest_import_data_model_schema_metadata: bool = Field( + default=True, + description="Controls whether schema metadata of Import Data Models should be ingested (ingesting schema metadata of Import Data Models significantly increases overall ingestion time)", + ) + resource_id_pattern: AllowDenyPattern = Field( AllowDenyPattern.allow_all(), description="Patterns for selecting resource ids that are to be included", @@ -385,7 +392,7 @@ def get_model_workunits( yield mcp.as_workunit() - if model.is_import: + if model.is_import and self.config.ingest_import_data_model_schema_metadata: primary_fields: List[str] = [] schema_fields: List[SchemaFieldClass] = [] @@ -546,6 +553,22 @@ def get_sac_connection( grant_type="client_credentials", ) + retries = 3 + backoff_factor = 10 + status_forcelist = (500,) + + retry = Retry( + total=retries, + read=retries, + connect=retries, + backoff_factor=backoff_factor, + status_forcelist=status_forcelist, + ) + + adapter = HTTPAdapter(max_retries=retry) + session.mount("http://", adapter) + session.mount("https://", adapter) + session.register_compliance_hook( "protected_request", _add_sap_sac_custom_auth_header ) From 3dd7c066dbc67aa7d4f0773517b667327d8f83c4 Mon Sep 17 00:00:00 2001 From: Masterchen09 <13187726+Masterchen09@users.noreply.github.com.> Date: Thu, 8 Aug 2024 16:13:28 +0200 Subject: [PATCH 4/7] fixed a typo --- metadata-ingestion/src/datahub/ingestion/source/sac/sac.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py index 287c4abaaaad7a..ad93aee96bd18b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py @@ -223,7 +223,7 @@ def test_connection(config_dict: dict) -> TestConnectionReport: # when creating the pyodata.Client, the metadata is automatically parsed and validated session, _ = SACSource.get_sac_connection(config) - # test the Data Import Service Service separately here, because it requires specific properties when configuring the OAuth client + # test the Data Import Service separately here, because it requires specific properties when configuring the OAuth client response = session.get(url=f"{config.tenant_url}/api/v1/dataimport/models") response.raise_for_status() From 1347cd62d951653bf214f8d5491bafd3e122239a Mon Sep 17 00:00:00 2001 From: Masterchen09 <13187726+Masterchen09@users.noreply.github.com> Date: Mon, 12 Aug 2024 19:50:46 +0200 Subject: [PATCH 5/7] fix mutations/ingestion_source.js test --- .../tests/cypress/cypress/e2e/mutations/ingestion_source.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smoke-test/tests/cypress/cypress/e2e/mutations/ingestion_source.js b/smoke-test/tests/cypress/cypress/e2e/mutations/ingestion_source.js index 8707f090acad36..95c63835902e8a 100644 --- a/smoke-test/tests/cypress/cypress/e2e/mutations/ingestion_source.js +++ b/smoke-test/tests/cypress/cypress/e2e/mutations/ingestion_source.js @@ -13,7 +13,7 @@ describe("ingestion source creation flow", () => { cy.goToIngestionPage(); cy.clickOptionWithId('[data-node-key="Sources"]'); cy.clickOptionWithTestId("create-ingestion-source-button"); - cy.clickOptionWithText("Snowflake"); + cy.clickOptionWithTextToScrollintoView("Snowflake"); cy.waitTextVisible("Snowflake Details"); cy.get("#account_id").type(accound_id); cy.get("#warehouse").type(warehouse_id); From 5d7b4fbbccb8456da27d4b181969c60fd0c053cf Mon Sep 17 00:00:00 2001 From: Masterchen09 <13187726+Masterchen09@users.noreply.github.com> Date: Tue, 13 Aug 2024 18:43:19 +0200 Subject: [PATCH 6/7] fix hashability of frozen Resource dataclass (use FrozenSet instead of Set) --- metadata-ingestion/src/datahub/ingestion/source/sac/sac.py | 2 +- .../src/datahub/ingestion/source/sac/sac_common.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py index ad93aee96bd18b..88cb1f821ff0d6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py @@ -682,7 +682,7 @@ def get_resources(self) -> Iterable[Resource]: open_url=entity.openURL, ancestor_path=ancestor_path, is_mobile=entity.isMobile, - resource_models=resource_models, + resource_models=frozenset(resource_models), ) def get_import_data_model_ids(self) -> Set[str]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sac/sac_common.py b/metadata-ingestion/src/datahub/ingestion/source/sac/sac_common.py index fa284a9b06715f..457fda1e061814 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sac/sac_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sac/sac_common.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from datetime import datetime -from typing import Optional, Set +from typing import FrozenSet, Optional @dataclass(frozen=True) @@ -30,7 +30,7 @@ class Resource: open_url: str ancestor_path: Optional[str] is_mobile: bool - resource_models: Set[ResourceModel] + resource_models: FrozenSet[ResourceModel] @dataclass(frozen=True) From ee22f17f7ca4bf7578551ea1b4bccc47a79523b0 Mon Sep 17 00:00:00 2001 From: Masterchen09 <13187726+Masterchen09@users.noreply.github.com> Date: Tue, 13 Aug 2024 18:48:37 +0200 Subject: [PATCH 7/7] fix mutations/managing_secrets.js test --- .../tests/cypress/cypress/e2e/mutations/managing_secrets.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js b/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js index dd331fbcbd5ae2..5a2b101ddd8bb7 100644 --- a/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js +++ b/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js @@ -30,7 +30,7 @@ describe("managing secrets for ingestion creation", () => { cy.goToIngestionPage(); cy.clickOptionWithId('[data-node-key="Sources"]'); cy.get("#ingestion-create-source").click(); - cy.clickOptionWithText("Snowflake"); + cy.clickOptionWithTextToScrollintoView("Snowflake"); cy.waitTextVisible("Snowflake Details"); cy.get("#account_id").type(accound_id); cy.get("#warehouse").type(warehouse_id); @@ -69,7 +69,7 @@ describe("managing secrets for ingestion creation", () => { // Verify secret is not present during ingestion source creation for password dropdown cy.clickOptionWithText("Create new source"); - cy.clickOptionWithText("Snowflake"); + cy.clickOptionWithTextToScrollintoView("Snowflake"); cy.waitTextVisible("Snowflake Details"); cy.get("#account_id").type(accound_id); cy.get("#warehouse").type(warehouse_id);