Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/transformer): tags to terms transformer #10758

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The below table shows transformer which can transform aspects of entity [Dataset
| `ownership` | - [Simple Add Dataset ownership](#simple-add-dataset-ownership)<br/> - [Pattern Add Dataset ownership](#pattern-add-dataset-ownership)<br/> - [Simple Remove Dataset Ownership](#simple-remove-dataset-ownership)<br/> - [Extract Ownership from Tags](#extract-ownership-from-tags)<br/> - [Clean suffix prefix from Ownership](#clean-suffix-prefix-from-ownership) |
| `globalTags` | - [Simple Add Dataset globalTags ](#simple-add-dataset-globaltags)<br/> - [Pattern Add Dataset globalTags](#pattern-add-dataset-globaltags)<br/> - [Add Dataset globalTags](#add-dataset-globaltags) |
| `browsePaths` | - [Set Dataset browsePath](#set-dataset-browsepath) |
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) |
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms)<br/> - [Tags to Term Mapping](#tags-to-term-mapping) |
| `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)<br/> - [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) |
| `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)<br/> - [Add Dataset datasetProperties](#add-dataset-datasetproperties) |
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains)<br/> - [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) |
Expand Down Expand Up @@ -668,6 +668,56 @@ We can add glossary terms to datasets based on a regex filter.
".*example1.*": ["urn:li:glossaryTerm:Email", "urn:li:glossaryTerm:Address"]
".*example2.*": ["urn:li:glossaryTerm:PostalCode"]
```

## Tags to Term Mapping
### Config Details
Comment on lines +672 to +673
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add blank lines around headings.

Headings should be surrounded by blank lines for better readability.

672a673
+
673a674
+
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
## Tags to Term Mapping
### Config Details
## Tags to Term Mapping
### Config Details
Tools
Markdownlint

672-672: Expected: 1; Actual: 0; Below
Headings should be surrounded by blank lines

(MD022, blanks-around-headings)


673-673: Expected: 1; Actual: 0; Above
Headings should be surrounded by blank lines

(MD022, blanks-around-headings)


| Field | Required | Type | Default | Description |
|---------------|----------|--------------------|-------------|-------------------------------------------------------------------------------------------------------|
| `tags` | ✅ | List[str] | | List of tag names based on which terms will be created and associated with the dataset. |
| `semantics` | | enum | "OVERWRITE" | Determines whether to OVERWRITE or PATCH the terms associated with the dataset on DataHub GMS. |

<br/>

The `tags_to_term` transformer is designed to map specific tags to glossary terms within DataHub. It takes a configuration of tags should be translated into corresponding glossaryTerm. This transformer can apply these mappings to any tags found either at column level of dataset or dataset top level.
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved

When specifying tags in the configuration, use the tag's simple name rather than the full tag URN.
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved

For example, instead of using the tag URN `urn:li:tag:snowflakedb.snowflakeschema.tag_name:tag_value`, you should specify just the tag name `tag_name` in the mapping configuration
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved

```yaml
transformers:
- type: "tags_to_term"
config:
semantics: OVERWRITE # OVERWRITE is the default behavior
tags:
- "tag_name"
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved
```

`tags_to_term` can be configured in below different way
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved

- Add domains based on tags, however overwrite the domains available for the dataset on DataHub GMS
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved
```yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add blank lines around fenced code blocks.

Fenced code blocks should be surrounded by blank lines for better readability.

700a701
+
709a710
+

Committable suggestion was skipped due to low confidence.

Tools
Markdownlint

700-700: null
Fenced code blocks should be surrounded by blank lines

(MD031, blanks-around-fences)

transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: OVERWRITE # OVERWRITE is default behaviour
tags:
- "example1"
- "example2"
- "example3"
```
- Add domains based on tags, however keep the domains available for the dataset on DataHub GMS
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved
```yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add blank lines around fenced code blocks.

Fenced code blocks should be surrounded by blank lines for better readability.

711a712
+
720a721
+
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
```yaml
```yaml
Tools
Markdownlint

711-711: null
Fenced code blocks should be surrounded by blank lines

(MD031, blanks-around-fences)

transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: PATCH
domain_mapping:
'example1': "urn:li:domain:engineering"
'example2': "urn:li:domain:hr"
```

## Pattern Add Dataset Schema Field glossaryTerms
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl",
"pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser",
"domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper",
"tags_to_term = datahub.ingestion.transformer.tags_to_terms:TagsToTermMapper",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
Expand Down
16 changes: 16 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,22 @@ def create_tag(self, tag_name: str) -> str:
# return urn
return res["createTag"]

def remove_tag(self, tag_urn: str, resource_urn: str) -> bool:
graph_query = """
mutation removeTag {{
removeTag(
input: {{
tagUrn: "{tag_urn}",
resourceUrn: "{resource_urn}"
}})
}}
""".format(
tag_urn=tag_urn, resource_urn=resource_urn
)
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved

res = self.execute_graphql(query=graph_query)
return res["removeTag"]

def _assertion_result_shared(self) -> str:
fragment: str = """
fragment assertionResult on AssertionResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ def entity_types(self) -> List[str]:
return ["dataset"]


class TagTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta):
"""Transformer that does transform sequentially on each tag."""

def __init__(self):
super().__init__()

def entity_types(self) -> List[str]:
return ["dataset", "container"]


class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "ownership"
Expand Down Expand Up @@ -128,3 +138,8 @@ def aspect_name(self) -> str:
class DatasetUsageStatisticsTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "datasetUsageStatistics"


class TagsToTermTransformer(TagTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "glossaryTerms"
145 changes: 145 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/transformer/tags_to_terms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from typing import List, Optional, Set, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
TransformerSemantics,
TransformerSemanticsConfigModel,
)
from datahub.emitter.mce_builder import Aspect, make_term_urn
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import TagsToTermTransformer
from datahub.metadata.schema_classes import (
AuditStampClass,
GlobalTagsClass,
GlossaryTermAssociationClass,
GlossaryTermsClass,
SchemaMetadataClass,
)


class TagsToTermMapperConfig(TransformerSemanticsConfigModel):
tags: List[str]


class TagsToTermMapper(TagsToTermTransformer):
"""This transformer maps specified tags to corresponding glossary terms for a dataset."""

def __init__(self, config: TagsToTermMapperConfig, ctx: PipelineContext):
super().__init__()
self.ctx: PipelineContext = ctx
self.config: TagsToTermMapperConfig = config

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "TagsToTermMapper":
config = TagsToTermMapperConfig.parse_obj(config_dict)
return cls(config, ctx)

@staticmethod
def _merge_with_server_glossary_terms(
graph: DataHubGraph,
urn: str,
glossary_terms_aspect: Optional[GlossaryTermsClass],
) -> Optional[GlossaryTermsClass]:
if not glossary_terms_aspect or not glossary_terms_aspect.terms:
# nothing to add, no need to consult server
return None

# Merge the transformed terms with existing server terms.
# The transformed terms takes precedence, which may change the term context.
server_glossary_terms_aspect = graph.get_glossary_terms(entity_urn=urn)
if server_glossary_terms_aspect is not None:
glossary_terms_aspect.terms = list(
{
**{term.urn: term for term in server_glossary_terms_aspect.terms},
**{term.urn: term for term in glossary_terms_aspect.terms},
}.values()
)

return glossary_terms_aspect

@staticmethod
def get_tags_from_global_tags(global_tags: Optional[GlobalTagsClass]) -> Set[str]:
"""Extracts tags urn from GlobalTagsClass."""
if not global_tags or not global_tags.tags:
return set()

return {tag_assoc.tag for tag_assoc in global_tags.tags}

@staticmethod
def get_tags_from_schema_metadata(
schema_metadata: Optional[SchemaMetadataClass],
) -> Set[str]:
"""Extracts globalTags from all fields in SchemaMetadataClass."""
if not schema_metadata or not schema_metadata.fields:
return set()
tags = set()
for field in schema_metadata.fields:
if field.globalTags:
tags.update(
TagsToTermMapper.get_tags_from_global_tags(field.globalTags)
)
return tags

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:

in_glossary_terms: Optional[GlossaryTermsClass] = cast(
Optional[GlossaryTermsClass], aspect
)

assert self.ctx.graph
in_global_tags_aspect: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(
entity_urn
)
in_schema_metadata_aspect: Optional[
SchemaMetadataClass
] = self.ctx.graph.get_schema_metadata(entity_urn)

if in_global_tags_aspect is None and in_schema_metadata_aspect is None:
return cast(Aspect, in_glossary_terms)

global_tags = TagsToTermMapper.get_tags_from_global_tags(in_global_tags_aspect)
schema_metadata_tags = TagsToTermMapper.get_tags_from_schema_metadata(
in_schema_metadata_aspect
)

# Combine tags from both global and schema level
combined_tags = global_tags.union(schema_metadata_tags)

tag_set = set(self.config.tags)
terms_to_add = set()
tags_to_delete = set()

# Check each global tag against the configured tag list and prepare terms
for full_tag in combined_tags:
tag_name = full_tag.split("urn:li:tag:")[-1].split(".")[-1].split(":")[0]
if tag_name in tag_set:
term_urn = make_term_urn(tag_name)
terms_to_add.add(term_urn)
tags_to_delete.add(full_tag) # Full URN for deletion

if not terms_to_add:
return cast(Aspect, in_glossary_terms) # No new terms to add

for tag_urn in tags_to_delete:
self.ctx.graph.remove_tag(tag_urn=tag_urn, resource_urn=entity_urn)

# Initialize the Glossary Terms properly
out_glossary_terms = GlossaryTermsClass(
terms=[GlossaryTermAssociationClass(urn=term) for term in terms_to_add],
auditStamp=AuditStampClass(
time=builder.get_sys_time(), actor="urn:li:corpUser:restEmitter"
),
)

if self.config.semantics == TransformerSemantics.PATCH:
patch_glossary_terms: Optional[
GlossaryTermsClass
] = TagsToTermMapper._merge_with_server_glossary_terms(
self.ctx.graph, entity_urn, out_glossary_terms
)
return cast(Optional[Aspect], patch_glossary_terms)
else:
return cast(Aspect, out_glossary_terms)
Loading
Loading