From fcd31c3288594e10289826eddef87d8cf5273aaa Mon Sep 17 00:00:00 2001 From: John Joyce Date: Mon, 8 Jul 2024 18:39:56 -0700 Subject: [PATCH 1/6] Adding redshift error handling --- .../recipes/redshift_to_datahub.dhub.yaml | 57 +++++---------- .../ingestion/source/ge_data_profiler.py | 18 ++++- .../ingestion/source/redshift/exception.py | 57 +++++++++++++++ .../ingestion/source/redshift/lineage_v2.py | 13 +++- .../ingestion/source/redshift/profile.py | 29 +++++--- .../ingestion/source/redshift/query.py | 1 + .../ingestion/source/redshift/redshift.py | 71 +++++++++++++++++-- .../source/redshift/redshift_schema.py | 45 ++++++++---- .../source/sql/sql_generic_profiler.py | 29 +++++--- 9 files changed, 238 insertions(+), 82 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/redshift/exception.py diff --git a/metadata-ingestion/examples/recipes/redshift_to_datahub.dhub.yaml b/metadata-ingestion/examples/recipes/redshift_to_datahub.dhub.yaml index d9342cff9007e..00111cb38237d 100644 --- a/metadata-ingestion/examples/recipes/redshift_to_datahub.dhub.yaml +++ b/metadata-ingestion/examples/recipes/redshift_to_datahub.dhub.yaml @@ -1,47 +1,24 @@ --- # see https://datahubproject.io/docs/generated/ingestion/sources/redshift for complete documentation source: - type: "redshift" + type: redshift config: - # Coordinates - host_port: host:port - database: database_name - options: - connect_args: - sslmode: prefer - # Credentials - username: datahub - password: datahub - #include_tables: true - #include_views: true - #include_table_lineage: true - #default_schema: public - #table_lineage_mode: stl_scan_based - #include_copy_lineage: true - #start_time: 2020-12-15T20:08:23.091Z - #end_time: 2023-12-15T20:08:23.091Z - #profiling: - # enabled: true - # turn_off_expensive_profiling_metrics: false - # limit: 10 - # query_combiner_enabled: true - # max_number_of_fields_to_profile: 8 - # profile_table_level_only: false - # include_field_null_count: true - # include_field_min_value: true - # include_field_max_value: true - # include_field_mean_value: true - # include_field_median_value: true - # include_field_stddev_value: false - # include_field_quantiles: false - # include_field_distinct_value_frequencies: false - # include_field_histogram: false - # include_field_sample_values: false - #profile_pattern: - # allow: - # - "schema.table.column" - # deny: - # - "*.*.*" + host_port: 'redshift-cluster-for-testing-connectors.cdj7lmroi8gs.us-west-2.redshift.amazonaws.com:5439' + username: connector_test_exceptions + include_table_lineage: true + include_tables: true + include_views: true + profiling: + enabled: true + profile_table_level_only: false + profile_external_tables: true + profile_pattern: + allow: + - "dev.public.*" + stateful_ingestion: + enabled: false + database: dev + password: TestPassword1 # see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation sink: diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 3173dfa302399..c844f0ed4e04c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -1216,8 +1216,22 @@ def _generate_single_profile( except Exception as e: if not self.config.catch_exceptions: raise e - logger.exception(f"Encountered exception while profiling {pretty_name}") - self.report.report_warning(pretty_name, f"Profiling exception {e}") + + error_message = str(e).lower() + if "permission denied" in error_message: + self.report.warning( + title="Unauthorized to extract statistics", + message="We were denied access while attempting to generate profiling statistics for some assets. Please ensure the provided user has permission to query these tables and views.", + context=f"Asset: {pretty_name}", + exc=e, + ) + else: + self.report.warning( + title="Failed to extract statistics for some assets", + message="Caught unexpected exception while attempting to extract profiling statistics for some assets.", + context=f"Asset: {pretty_name}", + exc=e, + ) return None finally: if batch is not None and self.base_engine.engine.name == TRINO: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/exception.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/exception.py new file mode 100644 index 0000000000000..85b3499abb2d4 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/exception.py @@ -0,0 +1,57 @@ +from typing import Any, Callable, Iterable, TypeVar, Union + +import redshift_connector + +from datahub.ingestion.source.redshift.report import RedshiftReport + +T = TypeVar("T") + + +def handle_redshift_exceptions( + report: RedshiftReport, func: Callable[..., T], *args: Any, **kwargs: Any +) -> Union[T, None]: + try: + return func(*args, **kwargs) + except redshift_connector.Error as e: + report_redshift_failure(report, e) + return None + + +def handle_redshift_exceptions_yield( + report: RedshiftReport, func: Callable[..., Iterable[T]], *args: Any, **kwargs: Any +) -> Iterable[T]: + try: + yield from func(*args, **kwargs) + except redshift_connector.Error as e: + report_redshift_failure(report, e) + + +def report_redshift_failure( + report: RedshiftReport, e: redshift_connector.Error +) -> None: + error_message = str(e).lower() + if "permission denied" in error_message: + if "svv_table_info" in error_message: + report.report_failure( + title="Permission denied", + message="Failed to extract metadata due to insufficient permission to access 'svv_table_info' table. Please ensure the provided database user has access.", + exc=e, + ) + elif "svl_user_info" in error_message: + report.report_failure( + title="Permission denied", + message="Failed to extract metadata due to insufficient permission to access 'svl_user_info' table. Please ensure the provided database user has access.", + exc=e, + ) + else: + report.report_failure( + title="Permission denied", + message="Failed to extract metadata due to insufficient permissions.", + exc=e, + ) + else: + report.report_failure( + title="Failed to extract some metadata", + message="Failed to extract some metadata from Redshift.", + exc=e, + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index 062a99de6b735..1b2c18fe49ffd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -1,6 +1,5 @@ import collections import logging -import traceback from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, Union import redshift_connector @@ -241,8 +240,10 @@ def _populate_lineage_agg( processor(lineage_row) except Exception as e: self.report.warning( - f"lineage-v2-extract-{lineage_type.name}", - f"Error was {e}, {traceback.format_exc()}", + title="Failed to extract some lineage", + message=f"Failed to extract lineage of type {lineage_type.name}", + context=f"Query: '{query}'", + exc=e, ) self._lineage_v1.report_status(f"extract-{lineage_type.name}", False) @@ -409,3 +410,9 @@ def _process_external_tables( def generate(self) -> Iterable[MetadataWorkUnit]: for mcp in self.aggregator.gen_metadata(): yield mcp.as_workunit() + if len(self.aggregator.report.observed_query_parse_failures) > 0: + self.report.report_failure( + title="Failed to extract some SQL lineage", + message="Unexpected error(s) while attempting to extract lineage from SQL queries. See the full logs for more details.", + context=f"Query Parsing Failures: {self.aggregator.report.observed_query_parse_failures}", + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py index eed82ec4d83e7..e3f02c8b958d9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py @@ -48,15 +48,26 @@ def get_workunits( if not self.config.schema_pattern.allowed(schema): continue for table in tables[db].get(schema, {}): - if ( - not self.config.profiling.profile_external_tables - and table.type == "EXTERNAL_TABLE" - ): - self.report.profiling_skipped_other[schema] += 1 - logger.info( - f"Skipping profiling of external table {db}.{schema}.{table.name}" - ) - continue + if table.type == "EXTERNAL_TABLE": + if not self.config.profiling.profile_external_tables: + # Case 1: If user did not tell us to profile external tables, simply log this. + self.report.profiling_skipped_other[schema] += 1 + logger.info( + f"Skipping profiling of external table {db}.{schema}.{table.name}" + ) + # Continue, since we should not profile this table. + continue + elif self.config.profiling.profile_table_level_only: + # Case 2: User DID tell us to profile external tables, but only at the table level. + # Currently, we do not support this combination. The user needs to also set + # profile_table_level_only to False in order to profile. + self.report.report_warning( + title="Unable to Profile External Tables", + message="External tables are not supported for profiling when 'profile_table_level_only' config is set to True. Please set 'profile_table_level_only' to False to profile external Redshift tables.", + context=f"External Table: {db}.{schema}.{table.name}", + ) + # Continue, since we were unable to retrieve cheap profiling stats from svv_table_info. + continue # Emit the profile work unit profile_request = self.get_profile_request(table, schema, db) if profile_request is not None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index 3bd69d72be605..affbcd00b5107 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -122,6 +122,7 @@ def list_tables( else: return f"{tables_query} UNION {external_tables_query}" + # Why is this unused. Is this a bug? list_columns: str = """ SELECT n.nspname as "schema", diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 6be7eedf976bd..33eb511735e81 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -45,6 +45,7 @@ DatasetSubTypes, ) from datahub.ingestion.source.redshift.config import RedshiftConfig +from datahub.ingestion.source.redshift.exception import handle_redshift_exceptions_yield from datahub.ingestion.source.redshift.lineage import RedshiftLineageExtractor from datahub.ingestion.source.redshift.lineage_v2 import RedshiftSqlLineageV2 from datahub.ingestion.source.redshift.profile import RedshiftProfiler @@ -411,7 +412,12 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: ] def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: - connection = RedshiftSource.get_redshift_connection(self.config) + connection = self._try_get_redshift_connection(self.config) + + if connection is None: + # If we failed to establish a connection, short circuit the connector. + return + database = self.config.database logger.info(f"Processing db {database}") self.report.report_ingestion_stage_start(METADATA_EXTRACTION) @@ -419,9 +425,18 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit self.db_views[database] = defaultdict() self.db_schemas.setdefault(database, {}) + yield from handle_redshift_exceptions_yield( + self.report, self._extract_metadata, connection, database + ) + + def _extract_metadata( + self, connection: redshift_connector.Connection, database: str + ) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: + yield from self.gen_database_container( database=database, ) + self.cache_tables_and_views(connection, database) self.report.tables_in_mem_size[database] = humanfriendly.format_size( @@ -556,6 +571,7 @@ def process_schema( ): for table in self.db_tables[schema.database][schema.name]: table.columns = schema_columns[schema.name].get(table.name, []) + table.column_count = len(table.columns) table_wu_generator = self._process_table( table, database=database ) @@ -575,8 +591,10 @@ def process_schema( f"Table processed: {schema.database}.{schema.name}.{table.name}" ) else: - logger.info( - f"No tables in cache for {schema.database}.{schema.name}, skipping" + self.report.info( + title="No tables found in some schemas", + message="No tables found in some schemas. This may be due to insufficient privileges for the provided user.", + context=f"Schema: {schema.database}.{schema.name}", ) else: logger.info("Table processing disabled, skipping") @@ -589,6 +607,7 @@ def process_schema( ): for view in self.db_views[schema.database][schema.name]: view.columns = schema_columns[schema.name].get(view.name, []) + view.column_count = len(view.columns) yield from self._process_view( table=view, database=database, schema=schema ) @@ -603,8 +622,10 @@ def process_schema( f"Table processed: {schema.database}.{schema.name}.{view.name}" ) else: - logger.info( - f"No views in cache for {schema.database}.{schema.name}, skipping" + self.report.info( + title="No views found in some schemas", + message="No views found in some schemas. This may be due to insufficient privileges for the provided user.", + context=f"Schema: {schema.database}.{schema.name}", ) else: logger.info("View processing disabled, skipping") @@ -1092,3 +1113,43 @@ def add_config_to_report(self): self.config.start_time, self.config.end_time, ) + + def _try_get_redshift_connection( + self, + config: RedshiftConfig, + ) -> Union[None, redshift_connector.Connection]: + try: + return RedshiftSource.get_redshift_connection(config) + except redshift_connector.Error as e: + error_message = str(e).lower() + if "password authentication failed" in error_message: + self.report.report_failure( + title="Invalid credentials", + message="Failed to connect to Redshift. Please verify your username, password, and database.", + exc=e, + ) + elif "timeout" in error_message: + self.report.report_failure( + title="Unable to connect", + message="Failed to connect to Redshift. Please verify your host name and port number.", + exc=e, + ) + elif "communication error" in error_message: + self.report.report_failure( + title="Unable to connect", + message="Failed to connect to Redshift. Please verify that the host name is valid and reachable.", + exc=e, + ) + elif "database" in error_message and "does not exist" in error_message: + self.report.report_failure( + title="Database does not exist", + message="Failed to connect to Redshift. Please verify that the provided database exists and the provided user has access to it.", + exc=e, + ) + else: + self.report.report_failure( + title="Unable to connect", + message="Failed to connect to Redshift. Please verify your connection details.", + exc=e, + ) + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py index 101146563e8e7..6e88a50f898a5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py @@ -169,6 +169,8 @@ def enrich_tables( self, conn: redshift_connector.Connection, ) -> Dict[str, Dict[str, RedshiftExtraTableMeta]]: + # Warning: This table enrichment will not return anything for + # external tables (spectrum) and for tables that have never been queried / written to. cur = RedshiftDataDictionary.get_query_result( conn, self.queries.additional_table_metadata_query() ) @@ -207,7 +209,7 @@ def get_tables_and_views( # This query needs to run separately as we can't join with the main query because it works with # driver only functions. - enriched_table = self.enrich_tables(conn) + enriched_tables = self.enrich_tables(conn) cur = RedshiftDataDictionary.get_query_result( conn, @@ -216,6 +218,7 @@ def get_tables_and_views( field_names = [i[0] for i in cur.description] db_tables = cur.fetchall() logger.info(f"Fetched {len(db_tables)} tables/views from Redshift") + for table in db_tables: schema = table[field_names.index("schema")] table_name = table[field_names.index("relname")] @@ -233,7 +236,7 @@ def get_tables_and_views( rows_count, size_in_bytes, ) = RedshiftDataDictionary.get_table_stats( - enriched_table, field_names, schema, table + enriched_tables, field_names, schema, table ) tables[schema].append( @@ -263,15 +266,15 @@ def get_tables_and_views( rows_count, size_in_bytes, ) = RedshiftDataDictionary.get_table_stats( - enriched_table=enriched_table, + enriched_tables=enriched_tables, field_names=field_names, schema=schema, table=table, ) materialized = False - if schema in enriched_table and table_name in enriched_table[schema]: - if enriched_table[schema][table_name].is_materialized: + if schema in enriched_tables and table_name in enriched_tables[schema]: + if enriched_tables[schema][table_name].is_materialized: materialized = True views[schema].append( @@ -298,7 +301,7 @@ def get_tables_and_views( return tables, views @staticmethod - def get_table_stats(enriched_table, field_names, schema, table): + def get_table_stats(enriched_tables, field_names, schema, table): table_name = table[field_names.index("relname")] creation_time: Optional[datetime] = None @@ -309,25 +312,41 @@ def get_table_stats(enriched_table, field_names, schema, table): last_altered: Optional[datetime] = None size_in_bytes: Optional[int] = None rows_count: Optional[int] = None - if schema in enriched_table and table_name in enriched_table[schema]: - if enriched_table[schema][table_name].last_accessed: + if schema in enriched_tables and table_name in enriched_tables[schema]: + if enriched_tables[schema][table_name].last_accessed is not None: # Mypy seems to be not clever enough to understand the above check - last_accessed = enriched_table[schema][table_name].last_accessed + last_accessed = enriched_tables[schema][table_name].last_accessed assert last_accessed last_altered = last_accessed.replace(tzinfo=timezone.utc) elif creation_time: last_altered = creation_time - if enriched_table[schema][table_name].size: + if enriched_tables[schema][table_name].size is not None: # Mypy seems to be not clever enough to understand the above check - size = enriched_table[schema][table_name].size + size = enriched_tables[schema][table_name].size if size: size_in_bytes = size * 1024 * 1024 - if enriched_table[schema][table_name].estimated_visible_rows: - rows = enriched_table[schema][table_name].estimated_visible_rows + if enriched_tables[schema][table_name].estimated_visible_rows is not None: + rows = enriched_tables[schema][table_name].estimated_visible_rows assert rows rows_count = int(rows) + else: + # The object was not found in the enriched data. + # + # If we don't have enriched data, it may be either because: + # 1 The table is empty (as per https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_TABLE_INFO.html) empty tables are omitted from svv_table_info. + # 2. The table is external + # 3. The table is a view (non-materialized) + # + # In case 1, we want to report an accurate profile suggesting that the table is empty. + # In case 2, do nothing since we cannot cheaply profile + # In case 3, do nothing since we cannot cheaply profile + if table[field_names.index("tabletype")] == "TABLE": + rows_count = 0 + size_in_bytes = 0 + logger.info("Found some tables with no profiles need to return 0") + return creation_time, last_altered, rows_count, size_in_bytes @staticmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index 968989e2548d1..9c8e475e7b307 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -92,16 +92,25 @@ def generate_profile_workunits( request for request in requests if request.profile_table_level_only ] for request in table_level_profile_requests: - table_level_profile = DatasetProfile( - timestampMillis=int(datetime.now().timestamp() * 1000), - columnCount=request.table.column_count, - rowCount=request.table.rows_count, - sizeInBytes=request.table.size_in_bytes, - ) - dataset_urn = self.dataset_urn_builder(request.pretty_name) - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=table_level_profile - ).as_workunit() + if ( + request.table.column_count is None + and request.table.rows_count is None + and request.table.size_in_bytes is None + ): + logger.warning( + f"Table {request.pretty_name} has no column count, rows count, or size in bytes. Skipping emitting table level profile." + ) + else: + table_level_profile = DatasetProfile( + timestampMillis=int(datetime.now().timestamp() * 1000), + columnCount=request.table.column_count, + rowCount=request.table.rows_count, + sizeInBytes=request.table.size_in_bytes, + ) + dataset_urn = self.dataset_urn_builder(request.pretty_name) + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=table_level_profile + ).as_workunit() if not ge_profile_requests: return From 31f578831868aff0712e42f545dd831a8a81923e Mon Sep 17 00:00:00 2001 From: John Joyce Date: Mon, 8 Jul 2024 18:43:02 -0700 Subject: [PATCH 2/6] Revert bad changes --- .../recipes/redshift_to_datahub.dhub.yaml | 57 +++++++++++++------ 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/examples/recipes/redshift_to_datahub.dhub.yaml b/metadata-ingestion/examples/recipes/redshift_to_datahub.dhub.yaml index 00111cb38237d..d9342cff9007e 100644 --- a/metadata-ingestion/examples/recipes/redshift_to_datahub.dhub.yaml +++ b/metadata-ingestion/examples/recipes/redshift_to_datahub.dhub.yaml @@ -1,24 +1,47 @@ --- # see https://datahubproject.io/docs/generated/ingestion/sources/redshift for complete documentation source: - type: redshift + type: "redshift" config: - host_port: 'redshift-cluster-for-testing-connectors.cdj7lmroi8gs.us-west-2.redshift.amazonaws.com:5439' - username: connector_test_exceptions - include_table_lineage: true - include_tables: true - include_views: true - profiling: - enabled: true - profile_table_level_only: false - profile_external_tables: true - profile_pattern: - allow: - - "dev.public.*" - stateful_ingestion: - enabled: false - database: dev - password: TestPassword1 + # Coordinates + host_port: host:port + database: database_name + options: + connect_args: + sslmode: prefer + # Credentials + username: datahub + password: datahub + #include_tables: true + #include_views: true + #include_table_lineage: true + #default_schema: public + #table_lineage_mode: stl_scan_based + #include_copy_lineage: true + #start_time: 2020-12-15T20:08:23.091Z + #end_time: 2023-12-15T20:08:23.091Z + #profiling: + # enabled: true + # turn_off_expensive_profiling_metrics: false + # limit: 10 + # query_combiner_enabled: true + # max_number_of_fields_to_profile: 8 + # profile_table_level_only: false + # include_field_null_count: true + # include_field_min_value: true + # include_field_max_value: true + # include_field_mean_value: true + # include_field_median_value: true + # include_field_stddev_value: false + # include_field_quantiles: false + # include_field_distinct_value_frequencies: false + # include_field_histogram: false + # include_field_sample_values: false + #profile_pattern: + # allow: + # - "schema.table.column" + # deny: + # - "*.*.*" # see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation sink: From 116d315db7bed62e24a2c1d69986c30a3174dada Mon Sep 17 00:00:00 2001 From: John Joyce Date: Tue, 9 Jul 2024 09:21:11 -0700 Subject: [PATCH 3/6] Update metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py Co-authored-by: Harshal Sheth --- .../src/datahub/ingestion/source/ge_data_profiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index c844f0ed4e04c..8843a0ad8eae6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -1220,7 +1220,7 @@ def _generate_single_profile( error_message = str(e).lower() if "permission denied" in error_message: self.report.warning( - title="Unauthorized to extract statistics", + title="Unauthorized to extract data profile statistics", message="We were denied access while attempting to generate profiling statistics for some assets. Please ensure the provided user has permission to query these tables and views.", context=f"Asset: {pretty_name}", exc=e, From 3aae2625e8cf16dc60f74e95afdda6841f656b61 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Tue, 9 Jul 2024 10:39:13 -0700 Subject: [PATCH 4/6] Update metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py Co-authored-by: Harshal Sheth --- .../src/datahub/ingestion/source/redshift/redshift.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 33eb511735e81..849cebb8b1915 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -1117,7 +1117,7 @@ def add_config_to_report(self): def _try_get_redshift_connection( self, config: RedshiftConfig, - ) -> Union[None, redshift_connector.Connection]: + ) -> Optional[redshift_connector.Connection]: try: return RedshiftSource.get_redshift_connection(config) except redshift_connector.Error as e: From 2655152336755a46a5e7f27c0fff0f8e062c7951 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Tue, 9 Jul 2024 11:31:49 -0700 Subject: [PATCH 5/6] Addressing comments --- .../datahub/ingestion/source/redshift/exception.py | 14 +++++++++++--- .../datahub/ingestion/source/redshift/profile.py | 4 ++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/exception.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/exception.py index 85b3499abb2d4..43ad5bfcefdf1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/exception.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/exception.py @@ -1,14 +1,19 @@ -from typing import Any, Callable, Iterable, TypeVar, Union +from typing import Callable, Iterable, TypeVar, Union import redshift_connector +from typing_extensions import ParamSpec from datahub.ingestion.source.redshift.report import RedshiftReport T = TypeVar("T") +P = ParamSpec("P") def handle_redshift_exceptions( - report: RedshiftReport, func: Callable[..., T], *args: Any, **kwargs: Any + report: RedshiftReport, + func: Callable[P, T], + *args: P.args, + **kwargs: P.kwargs, ) -> Union[T, None]: try: return func(*args, **kwargs) @@ -18,7 +23,10 @@ def handle_redshift_exceptions( def handle_redshift_exceptions_yield( - report: RedshiftReport, func: Callable[..., Iterable[T]], *args: Any, **kwargs: Any + report: RedshiftReport, + func: Callable[P, Iterable[T]], + *args: P.args, + **kwargs: P.kwargs, ) -> Iterable[T]: try: yield from func(*args, **kwargs) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py index e3f02c8b958d9..6f611fa674187 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py @@ -62,8 +62,8 @@ def get_workunits( # Currently, we do not support this combination. The user needs to also set # profile_table_level_only to False in order to profile. self.report.report_warning( - title="Unable to Profile External Tables", - message="External tables are not supported for profiling when 'profile_table_level_only' config is set to True. Please set 'profile_table_level_only' to False to profile external Redshift tables.", + title="Skipped profiling for external tables", + message="External tables are not supported for profiling when 'profile_table_level_only' config is set to 'True'. Please set 'profile_table_level_only' to 'False' in order to profile external Redshift tables.", context=f"External Table: {db}.{schema}.{table.name}", ) # Continue, since we were unable to retrieve cheap profiling stats from svv_table_info. From c8819dd9262c8f80c95b1f7eb3cb527d92e2f14a Mon Sep 17 00:00:00 2001 From: John Joyce Date: Tue, 9 Jul 2024 11:32:51 -0700 Subject: [PATCH 6/6] Adding comment --- .../src/datahub/ingestion/source/redshift/redshift.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 849cebb8b1915..51d27aa19f774 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -425,6 +425,8 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit self.db_views[database] = defaultdict() self.db_schemas.setdefault(database, {}) + # TODO: Ideally, we'd push down exception handling to the place where the connection is used, as opposed to keeping + # this fallback. For now, this gets us broad coverage quickly. yield from handle_redshift_exceptions_yield( self.report, self._extract_metadata, connection, database )