Skip to content

Commit

Permalink
feat: add get_dashbaord support for neptune
Browse files Browse the repository at this point in the history
Signed-off-by: owenlch <owen.leung2@gmail.com>
  • Loading branch information
Owen-CH-Leung committed Jul 10, 2022
1 parent 11f1f64 commit 771cc0f
Showing 1 changed file with 206 additions and 2 deletions.
208 changes: 206 additions & 2 deletions metadata/metadata_service/proxy/gremlin_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from abc import abstractmethod
from datetime import date, datetime, timedelta
from operator import attrgetter
from flask import current_app, has_app_context
from typing import (Any, Callable, Dict, Iterable, List, Mapping, Optional,
Sequence, Set, Tuple, Type, TypeVar, Union, no_type_check,
overload)
Expand All @@ -21,7 +22,8 @@
from amundsen_common.models.popular_table import PopularTable
from amundsen_common.models.table import (Application, Column,
ProgrammaticDescription, Reader,
Source, Stat, Table, Tag, Watermark)
Source, Stat, Table, Tag, Watermark,
Badge)
from amundsen_common.models.user import User
from amundsen_gremlin.gremlin_model import (EdgeType, EdgeTypes, VertexType,
VertexTypes, WellKnownProperties)
Expand Down Expand Up @@ -57,8 +59,11 @@
from tornado import httpclient
from typing_extensions import Protocol # TODO: it's in typing 3.8

from metadata_service import config
from metadata_service.entity.dashboard_detail import \
DashboardDetail as DashboardDetailEntity
from metadata_service.entity.dashboard_query import \
DashboardQuery as DashboardQueryEntity
from metadata_service.entity.description import Description
from metadata_service.entity.tag_detail import TagDetail
from metadata_service.exception import NotFoundException
Expand Down Expand Up @@ -1593,12 +1598,211 @@ def delete_resource_relation_by_user(self, *,
vertex1_label=VertexTypes.User, vertex1_key=user_id,
vertex2_label=vertex_type, vertex2_key=id)

@staticmethod
def _build_user_from_record(record: dict, manager_name: Optional[str] = None) -> User:
other_key_values = {}
if has_app_context() and current_app.config[config.USER_OTHER_KEYS]:
for k in current_app.config[config.USER_OTHER_KEYS]:
if k in record:
other_key_values[k] = record[k]

return User(email=record['email'],
user_id=record.get('user_id', record['email']),
first_name=record.get('first_name'),
last_name=record.get('last_name'),
full_name=record.get('full_name'),
is_active=record.get('is_active', True),
profile_url=record.get('profile_url'),
github_username=record.get('github_username'),
team_name=record.get('team_name'),
slack_id=record.get('slack_id'),
employee_type=record.get('employee_type'),
role_name=record.get('role_name'),
manager_fullname=record.get('manager_fullname', manager_name),
other_key_values=other_key_values)

def _make_badges(self, badges: Iterable) -> List[Badge]:
"""
Generates a list of Badges objects
:param badges: A list of badges of a table, column, or type_metadata
:return: a list of Badge objects
"""
_badges = []
for badge in badges:
_badges.append(Badge(badge_name=badge["key"], category=badge["category"]))
return _badges

def _get_dashboard_vertex(self,
dashboard_uri: str,
) -> Tuple[str, str, str, int]:
"""
Helper function to get the dashbaord vertex
:param dashboard_uri: dashboard URI that is sent from frontend
:return: Tuple of 3 String and 1 int, for dashbaord uri, url, name, and created timestamp
"""
dashboard = self.g.V().has("key", dashboard_uri).valueMap().by(__.unfold()).toList()
if len(dashboard) > 1:
raise Exception(f"More than one Dashboard Found with ID {dashboard_uri}.")
if len(dashboard) == 0:
raise NotFoundException(f"No Dashboard exist with URI : {dashboard_uri}.")

dashboard_dict = dashboard[0]
dashboard_uri = dashboard_dict.get("key", "")
dashboard_url = dashboard_dict.get("dashboard_url", "")
dashboard_name = dashboard_dict.get("name", "")
dashboard_created_timestamp = int(dashboard_dict.get("created_timestamp"))
return dashboard_uri, dashboard_url, dashboard_name, dashboard_created_timestamp

def _get_dashboard_group_and_cluster(self,
dashboard_uri: str,
) -> Tuple[str, str, str, str]:
"""
Helper function to get the dashbaord group and cluster
:param dashboard_uri: dashboard URI that is sent from frontend
:return: Tuple of 4 String
"""
dashboard_group = self.g.V().has("key", dashboard_uri).out("DASHBOARD_OF").valueMap().by(__.unfold()).toList()
if dashboard_group:
dashboard_group_name = dashboard_group[0].get("name", "")
dashboard_group_url = dashboard_group[0].get("dashboard_group_url", "")
else:
dashboard_group_name = ""
dashboard_group_url = ""

dashboard_cluster = self.g.V().has("key", dashboard_uri).out("DASHBOARD_OF")
dashboard_cluster = dashboard_cluster.out("DASHBOARD_GROUP_OF").valueMap().by(__.unfold()).toList()
if dashboard_cluster:
cluster_name = dashboard_cluster[0].get("name", "")
product_name = dashboard_cluster[0].get("key", "").split("_")[0]
else:
cluster_name = ""
product_name = ""
return dashboard_group_name, dashboard_group_url, cluster_name, product_name

def _get_dashboard_tables(self,
dashboard_uri: str,
) -> List[PopularTable]:
"""
Helper function to get the dashbaord tables
:param dashboard_uri: dashboard URI that is sent from frontend
:return: List of PopularTable
"""

dashboard_tables = self.g.V().has("key", dashboard_uri).out("DASHBOARD_WITH_TABLE")
dashboard_tables = dashboard_tables.valueMap(True).by(__.unfold()).toList()
tables = []
for table in dashboard_tables:
tabe_base = table.get('key').split("://")[1]
table_db = table.get('key').split("://")[0]
table_cluster = tabe_base.split(".")[0]
table_name = table.get('name')
table_schema = tabe_base.split("/")[0]
table_desc_vertex = self.g.V(table[T.id]).out("DESCRIPTION")
table_desc_vertex = table_desc_vertex.filter(__.hasLabel("Description")).valueMap().by(__.unfold()).toList()
if table_desc_vertex:
for desc in table_desc_vertex:
table_desc = desc.get('description')
else:
table_desc = ""
table_dict = {"schema": table_schema, "cluster": table_cluster,
"database": table_db, "name": table_name, "description": table_desc}
tables.append(PopularTable(**table_dict))
return tables

@timer_with_counter
@overrides
def get_dashboard(self,
dashboard_uri: str,
) -> DashboardDetailEntity:
pass
'''
Retrieves the Dashboard information based on the specified dashboard uri.
:param dashboard_uri: dashboard URI that is sent from frontend
:return: The DashboardDetailEntity object
'''
dashboard_uri, dashboard_url, dashboard_name, dashboard_created_timestamp = \
self._get_dashboard_vertex(dashboard_uri)

dashboard_group_name, dashboard_group_url, cluster_name, product_name = \
self._get_dashboard_group_and_cluster(dashboard_uri)

dashboard_desc = self.g.V().has("key", dashboard_uri).out("DESCRIPTION")
dashboard_desc = dashboard_desc.valueMap().by(__.unfold()).toList()
if dashboard_desc:
dashboard_description = dashboard_desc[0].get("description", "")
else:
dashboard_description = ""

owners = []
dashboard_owners = self.g.V().has("key", dashboard_uri).out("READ_BY", "OWNER")
dashboard_owners = dashboard_owners.dedup().valueMap().by(__.unfold()).toList()
for owner in dashboard_owners:
owner_data = self._get_user_details(user_id=owner["email"], user_data=owner)
owners.append(self._build_user_from_record(record=owner_data))

dashboard_tags = self.g.V().has("key", dashboard_uri).out("TAGGED_BY").valueMap().by(__.unfold()).toList()
tags = [Tag(tag_type=tag['tag_type'], tag_name=tag['key']) for tag in dashboard_tags]

dashboard_badges = self.g.V().has("key", dashboard_uri).out("HAS_BADGE").valueMap().by(__.unfold()).toList()
badges = self._make_badges(dashboard_badges)

dashboard_charts = self.g.V().has("key", dashboard_uri).out("HAS_CHART").valueMap().by(__.unfold()).toList()
chart = [chart['name'] for chart in dashboard_charts if 'name' in chart and chart['name']]

dashboard_query = self.g.V().has("key", dashboard_uri).out("HAS_QUERY").valueMap().by(__.unfold()).toList()
query_names = [query['name'] for query in dashboard_query if 'name' in query and query['name']]
queries = []
for query in dashboard_query:
if query.get('name') or query.get('url') or query.get('query_text'):
query_dict = {'name': query.get('name'), 'url': query.get('url'), 'query_text': query.get('query_text')}
queries.append(DashboardQueryEntity(**query_dict))

tables = self._get_dashboard_tables(dashboard_uri)

last_successful_run_timestamp = None
last_run_timestamp = None
last_run_state = None
dashboard_execution = self.g.V().has("key", dashboard_uri).out("EXECUTED").valueMap().by(__.unfold()).toList()

for execution in dashboard_execution:
if "last_successful_execution" in execution.get("key"):
last_successful_run_timestamp = int(execution.get("timestamp"))
if "last_execution" in execution.get("key"):
last_run_timestamp = int(execution.get("timestamp"))
last_run_state = execution.get("state")

updated_timestamp = None
dashboard_last_update = self.g.V().has("key", dashboard_uri).out("LAST_UPDATED_AT")
dashboard_last_update = dashboard_last_update.valueMap().by(__.unfold()).toList()
for last_update in dashboard_last_update:
updated_timestamp = int(last_update.get("timestamp"))

view_count = 0
dashboard_view_count = self.g.V().has("key", dashboard_uri).outE("READ_BY").valueMap().by(__.unfold()).toList()
for view in dashboard_view_count:
view_count += view.get("read_count")

return DashboardDetailEntity(uri=dashboard_uri,
cluster=cluster_name,
url=dashboard_url,
name=dashboard_name,
product=product_name,
created_timestamp=dashboard_created_timestamp,
description=dashboard_description,
group_name=dashboard_group_name,
group_url=dashboard_group_url,
last_successful_run_timestamp=last_successful_run_timestamp,
last_run_timestamp=last_run_timestamp,
last_run_state=last_run_state,
updated_timestamp=updated_timestamp,
owners=owners,
tags=tags,
badges=badges,
recent_view_count=view_count,
chart_names=chart,
query_names=query_names,
queries=queries,
tables=tables
)

@timer_with_counter
@overrides
Expand Down

0 comments on commit 771cc0f

Please sign in to comment.