Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/snowflake): integrate snowflake-queries into main source #10905

Merged
merged 18 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import datetime
import logging
from abc import ABCMeta, abstractmethod
Expand All @@ -10,6 +11,7 @@
Dict,
Generic,
Iterable,
Iterator,
List,
Optional,
Sequence,
Expand Down Expand Up @@ -97,6 +99,7 @@ def report_log(
context: Optional[str] = None,
exc: Optional[BaseException] = None,
log: bool = False,
stacklevel: int = 1,
) -> None:
"""
Report a user-facing warning for the ingestion run.
Expand All @@ -109,7 +112,8 @@ def report_log(
exc: The exception associated with the event. We'll show the stack trace when in debug mode.
"""

stacklevel = 2
# One for this method, and one for the containing report_* call.
stacklevel = stacklevel + 2

log_key = f"{title}-{message}"
entries = self._entries[level]
Expand All @@ -118,6 +122,8 @@ def report_log(
context = f"{context[:_MAX_CONTEXT_STRING_LENGTH]} ..."

log_content = f"{message} => {context}" if context else message
if title:
log_content = f"{title}: {log_content}"
if exc:
log_content += f"{log_content}: {exc}"

Expand Down Expand Up @@ -255,9 +261,10 @@ def report_failure(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.ERROR, message, title, context, exc, log=False
StructuredLogLevel.ERROR, message, title, context, exc, log=log
)

def failure(
Expand All @@ -266,9 +273,10 @@ def failure(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.ERROR, message, title, context, exc, log=True
StructuredLogLevel.ERROR, message, title, context, exc, log=log
)

def info(
Expand All @@ -277,11 +285,30 @@ def info(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.INFO, message, title, context, exc, log=True
StructuredLogLevel.INFO, message, title, context, exc, log=log
)

@contextlib.contextmanager
def report_exc(
self,
message: LiteralString,
title: Optional[LiteralString] = None,
context: Optional[str] = None,
level: StructuredLogLevel = StructuredLogLevel.ERROR,
) -> Iterator[None]:
# Convenience method that helps avoid boilerplate try/except blocks.
# TODO: I'm not super happy with the naming here - it's not obvious that this
# suppresses the exception in addition to reporting it.
try:
yield
except Exception as exc:
self._structured_logs.report_log(
level, message=message, title=title, context=context, exc=exc
)

def __post_init__(self) -> None:
self.start_time = datetime.datetime.now()
self.running_time: datetime.timedelta = datetime.timedelta(seconds=0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeIdentifierConfig,
SnowflakeV2Config,
)
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeIdentifierMixin
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeIdentifierBuilder,
)
from datahub.metadata.com.linkedin.pegasus2avro.assertion import (
AssertionResult,
AssertionResultType,
Expand All @@ -40,23 +39,20 @@ class DataQualityMonitoringResult(BaseModel):
VALUE: int


class SnowflakeAssertionsHandler(SnowflakeIdentifierMixin):
class SnowflakeAssertionsHandler:
def __init__(
self,
config: SnowflakeV2Config,
report: SnowflakeV2Report,
connection: SnowflakeConnection,
identifiers: SnowflakeIdentifierBuilder,
) -> None:
self.config = config
self.report = report
self.logger = logger
self.connection = connection
self.identifiers = identifiers
self._urns_processed: List[str] = []

@property
def identifier_config(self) -> SnowflakeIdentifierConfig:
return self.config

def get_assertion_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
Expand All @@ -80,10 +76,10 @@ def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit:
return MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=DataPlatformInstance(
platform=make_data_platform_urn(self.platform),
platform=make_data_platform_urn(self.identifiers.platform),
instance=(
make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
self.identifiers.platform, self.config.platform_instance
)
if self.config.platform_instance
else None
Expand All @@ -98,7 +94,7 @@ def _process_result_row(
result = DataQualityMonitoringResult.parse_obj(result_row)
assertion_guid = result.METRIC_NAME.split("__")[-1].lower()
status = bool(result.VALUE) # 1 if PASS, 0 if FAIL
assertee = self.get_dataset_identifier(
assertee = self.identifiers.get_dataset_identifier(
result.TABLE_NAME, result.TABLE_SCHEMA, result.TABLE_DATABASE
)
if assertee in discovered_datasets:
Expand All @@ -107,7 +103,7 @@ def _process_result_row(
aspect=AssertionRunEvent(
timestampMillis=datetime_to_ts_millis(result.MEASUREMENT_TIME),
runId=result.MEASUREMENT_TIME.strftime("%Y-%m-%dT%H:%M:%SZ"),
asserteeUrn=self.gen_dataset_urn(assertee),
asserteeUrn=self.identifiers.gen_dataset_urn(assertee),
status=AssertionRunStatus.COMPLETE,
assertionUrn=make_assertion_urn(assertion_guid),
result=AssertionResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class SnowflakeIdentifierConfig(
# Changing default value here.
convert_urns_to_lowercase: bool = Field(
default=True,
description="Whether to convert dataset urns to lowercase.",
)


Expand Down Expand Up @@ -210,8 +211,13 @@ class SnowflakeV2Config(
description="Populates view->view and table->view column lineage using DataHub's sql parser.",
)

lazy_schema_resolver: bool = Field(
use_queries_v2: bool = Field(
default=False,
description="If enabled, uses the new queries extractor to extract queries from snowflake.",
)

lazy_schema_resolver: bool = Field(
default=True,
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
description="If enabled, uses lazy schema resolver to resolve schemas for tables and views. "
"This is useful if you have a large number of schemas and want to avoid bulk fetching the schema for each table/view.",
)
Expand Down
Loading
Loading