From e634e9e409de5c764539e826b75f7e2c730da996 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Mon, 6 Jan 2025 21:48:56 +0530 Subject: [PATCH] fix(ingest/snowflake): use fast query fingerprint for lineage (#12275) --- .../source/snowflake/snowflake_lineage_v2.py | 4 ++++ .../datahub/sql_parsing/sql_parsing_aggregator.py | 13 ++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index 6b200590d7ab63..e93ecf30171f65 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -40,6 +40,7 @@ ColumnRef, DownstreamColumnRef, ) +from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.time import ts_millis_to_datetime @@ -239,6 +240,9 @@ def get_known_query_lineage( downstream_table_urn = self.identifiers.gen_dataset_urn(dataset_name) known_lineage = KnownQueryLineageInfo( + query_id=get_query_fingerprint( + query.query_text, self.identifiers.platform, fast=True + ), query_text=query.query_text, downstream=downstream_table_urn, upstreams=self.map_query_result_upstreams( diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index a4a49f77882168..25b63ffac45f96 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -165,6 +165,7 @@ class KnownQueryLineageInfo: timestamp: Optional[datetime] = None session_id: Optional[str] = None query_type: QueryType = QueryType.UNKNOWN + query_id: Optional[str] = None @dataclasses.dataclass @@ -618,11 +619,13 @@ def add_known_query_lineage( self.report.num_known_query_lineage += 1 # Generate a fingerprint for the query. - with self.report.sql_fingerprinting_timer: - query_fingerprint = get_query_fingerprint( - known_query_lineage.query_text, - platform=self.platform.platform_name, - ) + query_fingerprint = known_query_lineage.query_id + if not query_fingerprint: + with self.report.sql_fingerprinting_timer: + query_fingerprint = get_query_fingerprint( + known_query_lineage.query_text, + platform=self.platform.platform_name, + ) formatted_query = self._maybe_format_query(known_query_lineage.query_text) # Register the query.