From c2070c65595d4cf385e20423451a1751727dd7bf Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Thu, 29 Aug 2024 17:41:26 +0530 Subject: [PATCH 1/4] fix(ingest): limit number of upstreams generated by sql parsing aggregator --- .../sql_parsing/sql_parsing_aggregator.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 764c2b42537bb4..11af557275078d 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -72,6 +72,8 @@ class QueryLogSetting(enum.Enum): _DEFAULT_QUERY_LOG_SETTING = QueryLogSetting[ os.getenv("DATAHUB_SQL_AGG_QUERY_LOG") or QueryLogSetting.DISABLED.name ] +MAX_UPSTREAM_TABLES_COUNT = 1000 +MAX_FINEGRAINEDLINEAGE_COUNT = 5000 @dataclasses.dataclass @@ -1154,6 +1156,24 @@ def _gen_lineage_for_downstream( confidenceScore=queries_map[query_id].confidence_score, ) ) + + if len(upstream_aspect.upstreams) > MAX_UPSTREAM_TABLES_COUNT: + logger.warning( + f"Too many upstream tables for {downstream_urn}: {len(upstream_aspect.upstreams)}" + f"Keeping only {MAX_UPSTREAM_TABLES_COUNT} table level upstreams/" + ) + upstream_aspect.upstreams = upstream_aspect.upstreams[ + :MAX_UPSTREAM_TABLES_COUNT + ] + if len(upstream_aspect.fineGrainedLineages) > MAX_FINEGRAINEDLINEAGE_COUNT: + logger.warning( + f"Too many upstream columns for {downstream_urn}: {len(upstream_aspect.fineGrainedLineages)}" + f"Keeping only {MAX_FINEGRAINEDLINEAGE_COUNT} column level upstreams/" + ) + upstream_aspect.fineGrainedLineages = upstream_aspect.fineGrainedLineages[ + :MAX_FINEGRAINEDLINEAGE_COUNT + ] + upstream_aspect.fineGrainedLineages = ( upstream_aspect.fineGrainedLineages or None ) From c90bb44c0e0331a200f0bf4d1bdd5d9310f01454 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Fri, 30 Aug 2024 10:27:49 +0530 Subject: [PATCH 2/4] Update metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py Co-authored-by: Harshal Sheth --- .../src/datahub/sql_parsing/sql_parsing_aggregator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 11af557275078d..f4aa8afcb8f039 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -72,7 +72,7 @@ class QueryLogSetting(enum.Enum): _DEFAULT_QUERY_LOG_SETTING = QueryLogSetting[ os.getenv("DATAHUB_SQL_AGG_QUERY_LOG") or QueryLogSetting.DISABLED.name ] -MAX_UPSTREAM_TABLES_COUNT = 1000 +MAX_UPSTREAM_TABLES_COUNT = 300 MAX_FINEGRAINEDLINEAGE_COUNT = 5000 From ec55b1474072ac29497a250a66b66c1b8a2407f8 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Fri, 30 Aug 2024 10:27:56 +0530 Subject: [PATCH 3/4] Update metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py Co-authored-by: Harshal Sheth --- .../src/datahub/sql_parsing/sql_parsing_aggregator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index f4aa8afcb8f039..5ae23a9bf98b9f 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -73,7 +73,7 @@ class QueryLogSetting(enum.Enum): os.getenv("DATAHUB_SQL_AGG_QUERY_LOG") or QueryLogSetting.DISABLED.name ] MAX_UPSTREAM_TABLES_COUNT = 300 -MAX_FINEGRAINEDLINEAGE_COUNT = 5000 +MAX_FINEGRAINEDLINEAGE_COUNT = 2000 @dataclasses.dataclass From dff2f7e11c4597baadbcc29e92047704f25f7945 Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Fri, 30 Aug 2024 10:37:15 +0530 Subject: [PATCH 4/4] add counters to report trimmed lineage --- .../src/datahub/sql_parsing/sql_parsing_aggregator.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 5ae23a9bf98b9f..ed4ea2cabee423 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -231,6 +231,8 @@ class SqlAggregatorReport(Report): num_unique_query_fingerprints: Optional[int] = None num_urns_with_lineage: Optional[int] = None num_lineage_skipped_due_to_filters: int = 0 + num_table_lineage_trimmed_due_to_large_size: int = 0 + num_column_lineage_trimmed_due_to_large_size: int = 0 # Queries. num_queries_entities_generated: int = 0 @@ -1165,6 +1167,7 @@ def _gen_lineage_for_downstream( upstream_aspect.upstreams = upstream_aspect.upstreams[ :MAX_UPSTREAM_TABLES_COUNT ] + self.report.num_table_lineage_trimmed_due_to_large_size += 1 if len(upstream_aspect.fineGrainedLineages) > MAX_FINEGRAINEDLINEAGE_COUNT: logger.warning( f"Too many upstream columns for {downstream_urn}: {len(upstream_aspect.fineGrainedLineages)}" @@ -1173,6 +1176,7 @@ def _gen_lineage_for_downstream( upstream_aspect.fineGrainedLineages = upstream_aspect.fineGrainedLineages[ :MAX_FINEGRAINEDLINEAGE_COUNT ] + self.report.num_column_lineage_trimmed_due_to_large_size += 1 upstream_aspect.fineGrainedLineages = ( upstream_aspect.fineGrainedLineages or None