Skip to content

Commit

Permalink
fix(ingest/looker): support platform instance for dashboards & charts (
Browse files Browse the repository at this point in the history
  • Loading branch information
sid-acryl authored Jul 26, 2024
1 parent 71d1cdb commit b173f60
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,17 @@ class LookerDashboardSourceConfig(
)
extract_independent_looks: bool = Field(
False,
description="Extract looks which are not part of any Dashboard. To enable this flag the stateful_ingestion should also be enabled.",
description="Extract looks which are not part of any Dashboard. To enable this flag the stateful_ingestion "
"should also be enabled.",
)
emit_used_explores_only: bool = Field(
True,
description="When enabled, only explores that are used by a Dashboard/Look will be ingested.",
)
include_platform_instance_in_urns: bool = Field(
False,
description="When enabled, platform instance will be added in dashboard and chart urn.",
)

@validator("external_base_url", pre=True, always=True)
def external_url_defaults_to_api_config_base_url(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
DataPlatformInstance,
Status,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
Expand All @@ -95,11 +96,13 @@
ChartTypeClass,
ContainerClass,
DashboardInfoClass,
DataPlatformInfoClass,
InputFieldClass,
InputFieldsClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
PlatformTypeClass,
SubTypesClass,
)
from datahub.utilities.backpressure_aware_executor import BackpressureAwareExecutor
Expand Down Expand Up @@ -624,15 +627,47 @@ def _get_folder_browse_path_v2_entries(
if include_current_folder:
yield BrowsePathEntryClass(id=urn, urn=urn)

def _create_platform_instance_aspect(
self,
) -> DataPlatformInstance:

assert (
self.source_config.platform_name
), "Platform name is not set in the configuration."
assert (
self.source_config.platform_instance
), "Platform instance is not set in the configuration."

return DataPlatformInstance(
platform=builder.make_data_platform_urn(self.source_config.platform_name),
instance=builder.make_dataplatform_instance_urn(
platform=self.source_config.platform_name,
instance=self.source_config.platform_instance,
),
)

def _make_chart_urn(self, element_id: str) -> str:

platform_instance: Optional[str] = None

if self.source_config.include_platform_instance_in_urns:
platform_instance = self.source_config.platform_instance

return builder.make_chart_urn(
name=element_id,
platform=self.source_config.platform_name,
platform_instance=platform_instance,
)

def _make_chart_metadata_events(
self,
dashboard_element: LookerDashboardElement,
dashboard: Optional[
LookerDashboard
], # dashboard will be None if this is a standalone look
) -> List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
chart_urn = builder.make_chart_urn(
self.source_config.platform_name, dashboard_element.get_urn_element_id()
chart_urn = self._make_chart_urn(
element_id=dashboard_element.get_urn_element_id()
)
chart_snapshot = ChartSnapshot(
urn=chart_urn,
Expand Down Expand Up @@ -713,6 +748,14 @@ def _make_chart_metadata_events(
),
]

if self.source_config.include_platform_instance_in_urns:
proposals.append(
MetadataChangeProposalWrapper(
entityUrn=chart_urn,
aspect=self._create_platform_instance_aspect(),
),
)

# If extracting embeds is enabled, produce an MCP for embed URL.
if (
self.source_config.extract_embed_urls
Expand Down Expand Up @@ -818,11 +861,26 @@ def _make_dashboard_metadata_events(
)
)

if self.source_config.include_platform_instance_in_urns:
proposals.append(
MetadataChangeProposalWrapper(
entityUrn=dashboard_urn,
aspect=self._create_platform_instance_aspect(),
)
)

return proposals

def make_dashboard_urn(self, looker_dashboard: LookerDashboard) -> str:
platform_instance: Optional[str] = None

if self.source_config.include_platform_instance_in_urns:
platform_instance = self.source_config.platform_instance

return builder.make_dashboard_urn(
self.source_config.platform_name, looker_dashboard.get_urn_dashboard_id()
name=looker_dashboard.get_urn_dashboard_id(),
platform=self.source_config.platform_name,
platform_instance=platform_instance,
)

def _make_explore_metadata_events(
Expand Down Expand Up @@ -1154,8 +1212,8 @@ def _input_fields_from_dashboard_element(

# enrich the input_fields with the fully hydrated ViewField from the now fetched explores
for input_field in input_fields:
entity_urn = builder.make_chart_urn(
self.source_config.platform_name, dashboard_element.get_urn_element_id()
entity_urn = self._make_chart_urn(
element_id=dashboard_element.get_urn_element_id()
)
view_field_for_reference = input_field.view_field

Expand Down Expand Up @@ -1220,8 +1278,8 @@ def _make_metrics_dimensions_dashboard_mcp(
def _make_metrics_dimensions_chart_mcp(
self, dashboard_element: LookerDashboardElement
) -> MetadataChangeProposalWrapper:
chart_urn = builder.make_chart_urn(
self.source_config.platform_name, dashboard_element.get_urn_element_id()
chart_urn = self._make_chart_urn(
element_id=dashboard_element.get_urn_element_id()
)
input_fields_aspect = InputFieldsClass(
fields=self._input_fields_from_dashboard_element(dashboard_element)
Expand Down Expand Up @@ -1513,6 +1571,25 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

looker_dashboards_for_usage: List[looker_usage.LookerDashboardForUsage] = []

# Emit platform instance entity
if self.source_config.platform_instance:
platform_instance_urn = builder.make_dataplatform_instance_urn(
platform=self.source_config.platform_name,
instance=self.source_config.platform_instance,
)

yield MetadataWorkUnit(
id=f"{platform_instance_urn}-aspect-dataplatformInfo",
mcp=MetadataChangeProposalWrapper(
entityUrn=platform_instance_urn,
aspect=DataPlatformInfoClass(
name=self.source_config.platform_instance,
type=PlatformTypeClass.OTHERS,
datasetNameDelimiter=".",
),
),
)

with self.reporter.report_stage("dashboard_chart_metadata"):
for job in BackpressureAwareExecutor.map(
self.process_dashboard,
Expand Down
Loading

0 comments on commit b173f60

Please sign in to comment.