Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parse group resource #6921

Merged
merged 8 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230209-093409.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Parse 'group' resource
time: 2023-02-09T09:34:09.547006-05:00
custom:
Author: michelleark
Issue: "6921"
3 changes: 3 additions & 0 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def print_compile_stats(stats):
NodeType.Source: "source",
NodeType.Exposure: "exposure",
NodeType.Metric: "metric",
NodeType.Group: "group",
}

results = {k: 0 for k in names.keys()}
Expand Down Expand Up @@ -87,6 +88,8 @@ def _generate_stats(manifest: Manifest):
stats[metric.resource_type] += 1
for macro in manifest.macros.values():
stats[macro.resource_type] += 1
for group in manifest.groups.values():
stats[group.resource_type] += 1
return stats


Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class SchemaSourceFile(BaseSourceFile):
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
metrics: List[str] = field(default_factory=list)
groups: List[str] = field(default_factory=list)
# node patches contain models, seeds, snapshots, analyses
ndp: List[str] = field(default_factory=list)
# any macro patches in this file by macro unique_id.
Expand Down
15 changes: 15 additions & 0 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
GenericTestNode,
Exposure,
Metric,
Group,
UnpatchedSourceDefinition,
ManifestNode,
GraphMemberNode,
Expand Down Expand Up @@ -599,6 +600,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin):
docs: MutableMapping[str, Documentation] = field(default_factory=dict)
exposures: MutableMapping[str, Exposure] = field(default_factory=dict)
metrics: MutableMapping[str, Metric] = field(default_factory=dict)
groups: MutableMapping[str, Group] = field(default_factory=dict)
selectors: MutableMapping[str, Any] = field(default_factory=dict)
files: MutableMapping[str, AnySourceFile] = field(default_factory=dict)
metadata: ManifestMetadata = field(default_factory=ManifestMetadata)
Expand Down Expand Up @@ -775,6 +777,7 @@ def deepcopy(self):
docs={k: _deepcopy(v) for k, v in self.docs.items()},
exposures={k: _deepcopy(v) for k, v in self.exposures.items()},
metrics={k: _deepcopy(v) for k, v in self.metrics.items()},
groups={k: _deepcopy(v) for k, v in self.groups.items()},
selectors={k: _deepcopy(v) for k, v in self.selectors.items()},
metadata=self.metadata,
disabled={k: _deepcopy(v) for k, v in self.disabled.items()},
Expand Down Expand Up @@ -816,6 +819,7 @@ def writable_manifest(self):
docs=self.docs,
exposures=self.exposures,
metrics=self.metrics,
groups=self.groups,
selectors=self.selectors,
metadata=self.metadata,
disabled=self.disabled,
Expand Down Expand Up @@ -1070,6 +1074,8 @@ def add_node(self, source_file: AnySourceFile, node: ManifestNode, test_from=Non
source_file.metrics.append(node.unique_id)
if isinstance(node, Exposure):
source_file.exposures.append(node.unique_id)
if isinstance(node, Group):
source_file.groups.append(node.unique_id)
else:
source_file.nodes.append(node.unique_id)

Expand All @@ -1083,6 +1089,11 @@ def add_metric(self, source_file: SchemaSourceFile, metric: Metric):
self.metrics[metric.unique_id] = metric
source_file.metrics.append(metric.unique_id)

def add_group(self, source_file: SchemaSourceFile, group: Group):
_check_duplicates(group, self.groups)
self.groups[group.unique_id] = group
source_file.groups.append(group.unique_id)

def add_disabled_nofile(self, node: GraphMemberNode):
# There can be multiple disabled nodes for the same unique_id
if node.unique_id in self.disabled:
Expand Down Expand Up @@ -1125,6 +1136,7 @@ def __reduce_ex__(self, protocol):
self.docs,
self.exposures,
self.metrics,
self.groups,
self.selectors,
self.files,
self.metadata,
Expand Down Expand Up @@ -1178,6 +1190,9 @@ class WritableManifest(ArtifactMixin):
metrics: Mapping[UniqueID, Metric] = field(
metadata=dict(description=("The metrics defined in the dbt project and its dependencies"))
)
groups: Mapping[UniqueID, Group] = field(
metadata=dict(description=("The groups defined in the dbt project"))
)
selectors: Mapping[UniqueID, Any] = field(
metadata=dict(description=("The selectors defined in selectors.yml"))
)
Expand Down
13 changes: 13 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,18 @@ def same_contents(self, old: Optional["Metric"]) -> bool:
)


# ====================================
# Group node
# ====================================


@dataclass
class Group(BaseNode):
name: str
owner: Owner
resource_type: NodeType = field(metadata={"restrict": [NodeType.Group]})


# ====================================
# Patches
# ====================================
Expand Down Expand Up @@ -1172,6 +1184,7 @@ class ParsedMacroPatch(ParsedPatch):
GraphMemberNode,
Documentation,
Macro,
Group,
]

TestNode = Union[
Expand Down
12 changes: 12 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,3 +538,15 @@ def validate(cls, data):

if data.get("model") is not None and data.get("calculation_method") == "derived":
raise ValidationError("Derived metrics cannot have a 'model' property")


@dataclass
class UnparsedGroup(dbtClassMixin, Replaceable):
name: str
owner: Owner

@classmethod
def validate(cls, data):
super(UnparsedGroup, cls).validate(data)
if data["owner"].get("name") is None and data["owner"].get("email") is None:
raise ValidationError("Group owner must have at least one of 'name' or 'email'.")
3 changes: 3 additions & 0 deletions core/dbt/contracts/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ def upgrade_manifest_json(manifest: dict) -> dict:
upgrade_node_content(node_content)
if node_content["resource_type"] == "seed":
upgrade_seed_content(node_content)
# add group key
if "groups" not in manifest:
manifest["groups"] = {}
for metric_content in manifest.get("metrics", {}).values():
# handle attr renames + value translation ("expression" -> "derived")
metric_content = rename_metric_attr(metric_content)
Expand Down
Binary file modified core/dbt/docs/build/doctrees/environment.pickle
Binary file not shown.
Binary file modified core/dbt/docs/build/doctrees/index.doctree
Binary file not shown.
1 change: 1 addition & 0 deletions core/dbt/node_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class NodeType(StrEnum):
Macro = "macro"
Exposure = "exposure"
Metric = "metric"
Group = "group"

@classmethod
def executable(cls) -> List["NodeType"]:
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/parser/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ A different sub-parser is called for each main dictionary key in the yaml.
* 'exposures' - SchemaParser.parse\_exposures
* no 'patches'
* Manifest: exposures
* 'groups' - SchemaParser.parse\_groups
* no 'patches'
* Manifest: groups

# dbt Manifest

Expand Down
36 changes: 36 additions & 0 deletions core/dbt/parser/partial.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,29 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
self.delete_schema_metric(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)

# groups
dict_key = "groups"
group_diff = self.get_diff_for("groups", saved_yaml_dict, new_yaml_dict)
if group_diff["changed"]:
for group in group_diff["changed"]:
self.delete_schema_group(schema_file, group)
self.merge_patch(schema_file, dict_key, group)
if group_diff["deleted"]:
for group in group_diff["deleted"]:
self.delete_schema_group(schema_file, group)
if group_diff["added"]:
for group in group_diff["added"]:
self.merge_patch(schema_file, dict_key, group)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in group_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_group(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)

# Take a "section" of the schema file yaml dictionary from saved and new schema files
# and determine which parts have changed
def get_diff_for(self, key, saved_yaml_dict, new_yaml_dict):
Expand Down Expand Up @@ -903,6 +926,19 @@ def delete_schema_exposure(self, schema_file, exposure_dict):
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)

# groups are created only from schema files, so just delete the group
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there are references to groups in other nodes, then we'll have to go through and schedule every node with this groups for reparsing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. That would also go for modifications to existing groups in addition to deletions, right?

I'm thinking it makes more sense to add that to the follow-up PR that parses group configs on group-able nodes: #6965.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd only need to reparse for modifications of existing groups if information from the group is pulled into the node that needs to be rebuilt. If we only have the name in the node then probably not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On thinking about this... Would people be pulling group information into model jinja code? Like model.group.owner? Then we'd have to reparse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not storing any additional group information directly on the node, right? Users would need to look up .groups() in the manifest / flat_graph (a.k.a. {{ graph }} in Jinja code) to get information about the owner

Copy link
Contributor Author

@MichelleArk MichelleArk Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not storing any additional group information directly on the node, right? Users would need to look up .groups() in the manifest / flat_graph (a.k.a. {{ graph }} in Jinja code) to get information about the owner

Right - we're just storing the name. Changes to which will be captured as a deleted + new group. So {{ model.config.group }} would just be able to access the name, not owner.

We're not currently adding groups to the flat graph - should we be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh, I'd just seen this change and thought we were.

I think we probably should. This feels like valid metadata that someone might want access to in the Jinja context (including the folks who maintain the dbt_artifacts package), and I can't see potential for anti-pattern shenanigans. (There are indeed some things we exclude from the flat_graph, such as macros. I would say that exclusion is somewhat intentional. I just remembered this issue that I tagged myself to refine forever ago: #4919)

It would just require an update to this method, and associated tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, makes sense! I had actually added it on first pass of this work but wasn't convinced it should be part of the flat graph so removed it. Will add now

def delete_schema_group(self, schema_file, group_dict):
group_name = group_dict["name"]
groups = schema_file.groups.copy()
for unique_id in groups:
if unique_id in self.saved_manifest.groups:
group = self.saved_manifest.groups[unique_id]
if group.name == group_name:
self.deleted_manifest.groups[unique_id] = self.saved_manifest.groups.pop(
unique_id
)
schema_file.groups.remove(unique_id)

# metrics are created only from schema files, but also can be referred to by other nodes
def delete_schema_metric(self, schema_file, metric_dict):
metric_name = metric_dict["name"]
Expand Down
41 changes: 41 additions & 0 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
UnpatchedSourceDefinition,
Exposure,
Metric,
Group,
)
from dbt.contracts.graph.unparsed import (
HasColumnDocs,
Expand All @@ -48,6 +49,7 @@
UnparsedExposure,
UnparsedMetric,
UnparsedSourceDefinition,
UnparsedGroup,
)
from dbt.exceptions import (
CompilationError,
Expand Down Expand Up @@ -542,6 +544,11 @@ def parse_file(self, block: FileBlock, dct: Dict = None) -> None:
metric_parser = MetricParser(self, yaml_block)
metric_parser.parse()

# parse groups
if "groups" in dct:
group_parser = GroupParser(self, yaml_block)
group_parser.parse()


def check_format_version(file_path, yaml_dct) -> None:
if "version" not in yaml_dct:
Expand Down Expand Up @@ -1258,3 +1265,37 @@ def parse(self):
except (ValidationError, JSONValidationError) as exc:
raise YamlParseDictError(self.yaml.path, self.key, data, exc)
self.parse_metric(unparsed)


class GroupParser(YamlReader):
def __init__(self, schema_parser: SchemaParser, yaml: YamlBlock):
super().__init__(schema_parser, yaml, NodeType.Group.pluralize())
self.schema_parser = schema_parser
self.yaml = yaml

def parse_group(self, unparsed: UnparsedGroup):
package_name = self.project.project_name
unique_id = f"{NodeType.Group}.{package_name}.{unparsed.name}"
path = self.yaml.path.relative_path

parsed = Group(
resource_type=NodeType.Group,
package_name=package_name,
path=path,
original_file_path=self.yaml.path.original_file_path,
unique_id=unique_id,
name=unparsed.name,
owner=unparsed.owner,
)

self.manifest.add_group(self.yaml.file, parsed)

def parse(self):
for data in self.get_key_dicts():
try:
UnparsedGroup.validate(data)
unparsed = UnparsedGroup.from_dict(data)
except (ValidationError, JSONValidationError) as exc:
raise YamlParseDictError(self.yaml.path, self.key, data, exc)

self.parse_group(unparsed)
Loading