Skip to content

Commit

Permalink
Feat: Further generalize applications (#1577)
Browse files Browse the repository at this point in the history
* Further generalize applications

Signed-off-by: Dmitriy Kunitskiy <dkunitskiy@lyft.com>

* lint

Signed-off-by: Dmitriy Kunitskiy <dkunitskiy@lyft.com>

* lint again

Signed-off-by: Dmitriy Kunitskiy <dkunitskiy@lyft.com>

* mypy

Signed-off-by: Dmitriy Kunitskiy <dkunitskiy@lyft.com>
  • Loading branch information
Dmitriy Kunitskiy authored Nov 11, 2021
1 parent 895d136 commit 96563fd
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 178 deletions.
214 changes: 113 additions & 101 deletions databuilder/databuilder/models/application.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from typing import Iterator, Union
from typing import (
Iterator, Optional, Union,
)

from amundsen_common.utils.atlas import (
AtlasCommonParams, AtlasCommonTypes, AtlasTableTypes,
Expand All @@ -21,88 +23,51 @@
from databuilder.utils.atlas import AtlasRelationshipTypes, AtlasSerializedEntityOperation


class Application(GraphSerializable, TableSerializable, AtlasSerializable):
class GenericApplication(GraphSerializable, TableSerializable, AtlasSerializable):
"""
Application-table matching model
Application represent the applications that generate tables
An Application that generates or consumes a resource.
"""

APPLICATION_LABEL = 'Application'
LABEL = 'Application'
DEFAULT_KEY_FORMAT = 'application://{application_type}/{application_id}'

APPLICATION_KEY_FORMAT = 'application://{application_type}/{database}/{table}'
APPLICATION_ID_FORMAT = '{application_type}.{database}.{table}'
APPLICATION_DESCRIPTION_FORMAT = '{application_type} application for {database}.{table}'
APP_URL = 'application_url'
APP_NAME = 'name'
APP_ID = 'id'
APP_DESCRIPTION = 'description'

# Hardcode Airflow configuration values for backwards compatibility
AIRFLOW_APPLICATION_KEY_FORMAT = 'application://{cluster}.airflow/{dag}/{task}'
AIRFLOW_APPLICATION_ID_FORMAT = '{dag}/{task}'
AIRFLOW_APPLICATION_DESCRIPTION_FORMAT = 'Airflow with id {id}'
GENERATES_REL_TYPE = 'GENERATES'
DERIVED_FROM_REL_TYPE = 'DERIVED_FROM'
CONSUMES_REL_TYPE = 'CONSUMES'
CONSUMED_BY_REL_TYPE = 'CONSUMED_BY'

APPLICATION_URL_NAME = 'application_url'
APPLICATION_NAME = 'name'
APPLICATION_ID = 'id'
APPLICATION_DESCRIPTION = 'description'
APPLICATION_TABLE_RELATION_TYPE = 'GENERATES'
TABLE_APPLICATION_RELATION_TYPE = 'DERIVED_FROM'
LABELS_PERMITTED_TO_HAVE_USAGE = ['Table']

def __init__(self,
task_id: str,
dag_id: str,
application_url_template: str,
db_name: str = 'hive',
cluster: str = 'gold',
schema: str = '',
table_name: str = '',
application_type: str = 'Airflow',
exec_date: str = '',
start_label: str,
start_key: str,
application_type: str,
application_id: str,
application_url: str,
application_description: Optional[str] = None,
app_key_override: Optional[str] = None, # for bw-compatibility only
generates_resource: bool = True,
) -> None:
# todo: need to modify this hack
self.application_url = application_url_template.format(dag_id=dag_id)
self.database = db_name
self.cluster = cluster
self.schema = schema
self.table = table_name
self.dag = dag_id
self.application_type = application_type
self.task = task_id

application_id_format = Application.APPLICATION_ID_FORMAT
application_key_format = Application.APPLICATION_KEY_FORMAT
application_description_format = Application.APPLICATION_DESCRIPTION_FORMAT

# The Application model was originally designed to only be compatible with Airflow
# If the type is Airflow we must use the hardcoded Airflow constants for backwards compatibility
if self.application_type.lower() == 'airflow':
application_id_format = Application.AIRFLOW_APPLICATION_ID_FORMAT
application_key_format = Application.AIRFLOW_APPLICATION_KEY_FORMAT
application_description_format = Application.AIRFLOW_APPLICATION_DESCRIPTION_FORMAT

self.application_id = application_id_format.format(
dag=self.dag,
task=self.task,
table=self.table,
database=self.database,
application_type=self.application_type,
)
self.application_key = application_key_format.format(
dag=self.dag,
task=self.task,
table=self.table,
database=self.database,
cluster=self.cluster,
application_type=self.application_type,
)
if start_label not in GenericApplication.LABELS_PERMITTED_TO_HAVE_USAGE:
raise Exception(f'applications associated with {start_label} are not supported')

self.application_description = application_description_format.format(
dag=self.dag,
task=self.task,
table=self.table,
database=self.database,
cluster=self.cluster,
id=self.application_id,
self.start_label = start_label
self.start_key = start_key
self.application_type = application_type
self.application_id = application_id
self.application_url = application_url
self.application_description = application_description
self.application_key = app_key_override or GenericApplication.DEFAULT_KEY_FORMAT.format(
application_type=self.application_type,
application_id=self.application_id,
)
self.generates_resource = generates_resource

self._node_iter = self._create_node_iterator()
self._relation_iter = self._create_relation_iterator()
Expand All @@ -129,61 +94,57 @@ def create_next_record(self) -> Union[RDSModel, None]:
except StopIteration:
return None

def get_table_model_key(self) -> str:
# returns formatted string for table name
return TableMetadata.TABLE_KEY_FORMAT.format(db=self.database,
schema=self.schema,
tbl=self.table,
cluster=self.cluster)

def _create_node_iterator(self) -> Iterator[GraphNode]:
"""
Create an application node
:return:
"""
application_node = GraphNode(
attrs = {
GenericApplication.APP_NAME: self.application_type,
GenericApplication.APP_ID: self.application_id,
GenericApplication.APP_URL: self.application_url,
}
if self.application_description:
attrs[GenericApplication.APP_DESCRIPTION] = self.application_description

yield GraphNode(
key=self.application_key,
label=Application.APPLICATION_LABEL,
attributes={
Application.APPLICATION_URL_NAME: self.application_url,
Application.APPLICATION_NAME: self.application_type,
Application.APPLICATION_DESCRIPTION: self.application_description,
Application.APPLICATION_ID: self.application_id
}
label=GenericApplication.LABEL,
attributes=attrs,
)
yield application_node

def _create_relation_iterator(self) -> Iterator[GraphRelationship]:
"""
Create relations between application and table nodes
:return:
"""
graph_relationship = GraphRelationship(
start_key=self.get_table_model_key(),
start_label=TableMetadata.TABLE_NODE_LABEL,
start_key=self.start_key,
start_label=self.start_label,
end_key=self.application_key,
end_label=Application.APPLICATION_LABEL,
type=Application.TABLE_APPLICATION_RELATION_TYPE,
reverse_type=Application.APPLICATION_TABLE_RELATION_TYPE,
end_label=GenericApplication.LABEL,
type=(GenericApplication.DERIVED_FROM_REL_TYPE if self.generates_resource
else GenericApplication.CONSUMED_BY_REL_TYPE),
reverse_type=(GenericApplication.GENERATES_REL_TYPE if self.generates_resource
else GenericApplication.CONSUMES_REL_TYPE),
attributes={}
)
yield graph_relationship

# TODO: support consuming/producing relationships and multiple apps per resource
def _create_record_iterator(self) -> Iterator[RDSModel]:
application_record = RDSApplication(
yield RDSApplication(
rk=self.application_key,
application_url=self.application_url,
name=self.application_type,
id=self.application_id,
description=self.application_description
description=self.application_description or '',
)
yield application_record

application_table_record = RDSApplicationTable(
rk=self.get_table_model_key(),
yield RDSApplicationTable(
rk=self.start_key,
application_rk=self.application_key,
)
yield application_table_record

def create_next_atlas_entity(self) -> Union[AtlasEntity, None]:
try:
Expand All @@ -196,7 +157,7 @@ def _create_next_atlas_entity(self) -> Iterator[AtlasEntity]:
(AtlasCommonParams.qualified_name, self.application_key),
('name', self.application_type),
('id', self.application_id),
('description', self.application_description),
('description', self.application_description or ''),
('application_url', self.application_url)
]

Expand All @@ -217,14 +178,65 @@ def create_next_atlas_relation(self) -> Union[AtlasRelationship, None]:
except StopIteration:
return None

# TODO: support consuming/producing relationships and multiple apps per resource
def _create_atlas_relation_iterator(self) -> Iterator[AtlasRelationship]:
relationship = AtlasRelationship(
yield AtlasRelationship(
relationshipType=AtlasRelationshipTypes.table_application,
entityType1=AtlasTableTypes.table,
entityQualifiedName1=self.get_table_model_key(),
entityQualifiedName1=self.start_key,
entityType2=AtlasCommonTypes.application,
entityQualifiedName2=self.application_key,
attributes={}
)

yield relationship

class AirflowApplication(GenericApplication):

ID_FORMAT = '{dag}/{task}'
KEY_FORMAT = 'application://{cluster}.airflow/{id}'
DESCRIPTION_FORMAT = 'Airflow with id {id}'

def __init__(self,
task_id: str,
dag_id: str,
application_url_template: str,
db_name: str = 'hive',
cluster: str = 'gold',
schema: str = '',
table_name: str = '',
application_type: str = 'Airflow',
exec_date: str = '',
generates_table: bool = True,
) -> None:

self.database = db_name
self.cluster = cluster
self.schema = schema
self.table = table_name
self.dag = dag_id
self.task = task_id

airflow_app_id = AirflowApplication.ID_FORMAT.format(dag=dag_id, task=task_id)
GenericApplication.__init__(
self,
start_label=TableMetadata.TABLE_NODE_LABEL,
start_key=self.get_table_model_key(),
application_type=application_type,
application_id=airflow_app_id,
application_url=application_url_template.format(dag_id=dag_id),
application_description=AirflowApplication.DESCRIPTION_FORMAT.format(id=airflow_app_id),
app_key_override=AirflowApplication.KEY_FORMAT.format(cluster=cluster, id=airflow_app_id),
generates_resource=generates_table,
)

def get_table_model_key(self) -> str:
return TableMetadata.TABLE_KEY_FORMAT.format(
db=self.database,
cluster=self.cluster,
schema=self.schema,
tbl=self.table,
)


# Alias for backwards compatibility
Application = AirflowApplication
2 changes: 1 addition & 1 deletion databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from setuptools import find_packages, setup

__version__ = '6.3.1'
__version__ = '6.4.0'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file:
Expand Down
Loading

0 comments on commit 96563fd

Please sign in to comment.