Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Selector Upgrade #7101

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
51 changes: 51 additions & 0 deletions .changes/unreleased/Features-20230301-162551.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
kind: Features
body: |-
There are some really cool features in DBT such as the ability to choose models and their descendents, graph ordered builds, and much more. However, I found that some of the syntax seemed to be lacking (particularly around the selectors yml syntax).

The definition part of the selector yaml supports some model selection operators, but requires that the end user use a different approach to describe what can be done in short-hand on the command line. For example, on the command line, I can select the children of a model using

```bash
dbt ls --select some_model+
```

In the yaml,
```yaml
- name: some_selector
description:
- some_model+
```
does not work and instead

```yaml
- name: some_selector
description:
- method: fqn
value: some_model
children: true
```
must be used.

The yaml is very useful in defining routine selections. This inconsistency with the command line forces the end user to learn two approaches.

Furthermore, there is support from the command line to union and intersect using the --select operator. However, if one wishes to exclude, the separate --exclude operator is needed. The syntax is very straight forward for the command line approach, but again the yaml approach is different. I see value in the yaml approach. I just believe that the two should be compatible. In this PR, I propose that the --select can include NOT as a way of embedding excluded models in a single string.

```bash
dbt ls --select 'selector:some_selector+1 NOT selector:some_selector'
```

In addition, dbt doesn't allow selectors to be expanded using the children/parent/ancestor operators. This PR addresses that and makes selectors just like any other method.

Syntax in the yaml is also supported
```yaml
- definition:
union:
- selector:some_other_selector+1 NOT some_model+2
name: some_selector
```

Most of the code removes the selectors dictionary from being passed around and instead opts to pass the project to the graph/cli get_selected and get_graph_queue methods.

time: 2023-03-01T16:25:51.627771-05:00
custom:
Author: acurtis-evi
Issue: "5009"
2 changes: 2 additions & 0 deletions core/dbt/config/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,8 @@ def hashed_name(self):

def get_selector(self, name: str) -> Union[SelectionSpec, bool]:
if name not in self.selectors:
if name == 'arg_selector' and self.arg_selector:
return self.arg_selector
raise DbtRuntimeError(
f"Could not find selector named {name}, expected one of {list(self.selectors)}"
)
Expand Down
26 changes: 10 additions & 16 deletions core/dbt/config/selectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,33 +141,28 @@ def validate_selector_default(selector_file: SelectorFile) -> None:
# good to combine the two flows into one at some point.
class SelectorDict:
@classmethod
def parse_dict_definition(cls, definition, selector_dict={}):
def parse_dict_definition(cls, definition):
key = list(definition)[0]
value = definition[key]
if isinstance(value, list):
new_values = []
for sel_def in value:
new_value = cls.parse_from_definition(sel_def, selector_dict=selector_dict)
new_value = cls.parse_from_definition(sel_def)
new_values.append(new_value)
value = new_values
if key == "exclude":
definition = {key: value}
elif len(definition) == 1:
definition = {"method": key, "value": value}
elif key == "method" and value == "selector":
sel_def = definition.get("value")
if sel_def not in selector_dict:
raise DbtSelectorsError(f"Existing selector definition for {sel_def} not found.")
return selector_dict[definition["value"]]["definition"]
return definition
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only checked in graph/selector.py (or config/project.py)


@classmethod
def parse_a_definition(cls, def_type, definition, selector_dict={}):
def parse_a_definition(cls, def_type, definition):
# this definition must be a list
new_dict = {def_type: []}
for sel_def in definition[def_type]:
if isinstance(sel_def, dict):
sel_def = cls.parse_from_definition(sel_def, selector_dict=selector_dict)
sel_def = cls.parse_from_definition(sel_def)
new_dict[def_type].append(sel_def)
elif isinstance(sel_def, str):
sel_def = SelectionCriteria.dict_from_single_spec(sel_def)
Expand All @@ -177,17 +172,17 @@ def parse_a_definition(cls, def_type, definition, selector_dict={}):
return new_dict

@classmethod
def parse_from_definition(cls, definition, selector_dict={}):
def parse_from_definition(cls, definition):
if isinstance(definition, str):
definition = SelectionCriteria.dict_from_single_spec(definition)
elif "union" in definition:
definition = cls.parse_a_definition("union", definition, selector_dict=selector_dict)
definition = cls.parse_a_definition("union", definition)
elif "intersection" in definition:
definition = cls.parse_a_definition(
"intersection", definition, selector_dict=selector_dict
"intersection", definition
)
elif isinstance(definition, dict):
definition = cls.parse_dict_definition(definition, selector_dict=selector_dict)
definition = cls.parse_dict_definition(definition)
return definition

# This is the normal entrypoint of this code. Give it the
Expand All @@ -198,8 +193,7 @@ def parse_from_selectors_list(cls, selectors):
for selector in selectors:
sel_name = selector["name"]
selector_dict[sel_name] = selector
definition = cls.parse_from_definition(
selector["definition"], selector_dict=deepcopy(selector_dict)
selector_dict[sel_name]["definition"] = cls.parse_from_definition(
selector["definition"]
)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The selector_dict doesn't initially resolve the selectors. This is left until the end to allow for it to be more dynamic

selector_dict[sel_name]["definition"] = definition
return selector_dict
92 changes: 61 additions & 31 deletions core/dbt/graph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,64 @@ def parse_union(
# turn ['a b', 'c'] -> ['a', 'b', 'c']
raw_specs = itertools.chain.from_iterable(r.split(" ") for r in components)
union_components: List[SelectionSpec] = []
exclude_components: List[SelectionSpec] = []
in_exclude = False

flags = get_flags()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow for --select 'some_expression --exclude some_other_expression', having it be one string allows for the yaml syntax to support union/intersection/exclude in a single string

# ['a', 'b', 'c,d'] -> union('a', 'b', intersection('c', 'd'))
for raw_spec in raw_specs:
if in_exclude == False and raw_spec == 'EXCEPT':
in_exclude = True
if len(union_components) == 0:
for include in DEFAULT_INCLUDES:
union_components.append(
SelectionCriteria.from_single_spec(include, indirect_selection=False)
)
continue

intersection_components: List[SelectionSpec] = [
SelectionCriteria.from_single_spec(part, indirect_selection=indirect_selection)
for part in raw_spec.split(INTERSECTION_DELIMITER)
]
union_components.append(
SelectionIntersection(
components=intersection_components,
expect_exists=expect_exists,
raw=raw_spec,
indirect_selection=IndirectSelection(flags.INDIRECT_SELECTION),
if in_exclude:
exclude_components.append(
SelectionIntersection(
components=intersection_components,
expect_exists=expect_exists,
raw=raw_spec,
indirect_selection=IndirectSelection(flags.INDIRECT_SELECTION),
)
)
)
return SelectionUnion(
else:
union_components.append(
SelectionIntersection(
components=intersection_components,
expect_exists=expect_exists,
raw=raw_spec,
indirect_selection=IndirectSelection(flags.INDIRECT_SELECTION),
)
)
res = SelectionUnion(
components=union_components,
expect_exists=False,
raw=components,
indirect_selection=IndirectSelection(flags.INDIRECT_SELECTION),
)

if len(exclude_components) == 0:
return res
else:
return SelectionDifference(
components=[
res,
SelectionUnion(
components=exclude_components,
expect_exists=False,
raw=components,
indirect_selection=False
)
]
)

def parse_union_from_default(
raw: Optional[List[str]],
Expand Down Expand Up @@ -123,9 +159,9 @@ def _get_list_dicts(dct: Dict[str, Any], key: str) -> List[RawDefinition]:
return result


def _parse_exclusions(definition, result={}) -> Optional[SelectionSpec]:
def _parse_exclusions(definition) -> Optional[SelectionSpec]:
exclusions = _get_list_dicts(definition, "exclude")
parsed_exclusions = [parse_from_definition(excl, result=result) for excl in exclusions]
parsed_exclusions = [parse_from_definition(excl) for excl in exclusions]
if len(parsed_exclusions) == 1:
return parsed_exclusions[0]
elif len(parsed_exclusions) > 1:
Expand All @@ -135,7 +171,7 @@ def _parse_exclusions(definition, result={}) -> Optional[SelectionSpec]:


def _parse_include_exclude_subdefs(
definitions: List[RawDefinition], result={}
definitions: List[RawDefinition]
) -> Tuple[List[SelectionSpec], Optional[SelectionSpec]]:
include_parts: List[SelectionSpec] = []
diff_arg: Optional[SelectionSpec] = None
Expand All @@ -149,16 +185,16 @@ def _parse_include_exclude_subdefs(
f"You cannot provide multiple exclude arguments to the "
f"same selector set operator:\n{yaml_sel_cfg}"
)
diff_arg = _parse_exclusions(definition, result=result)
diff_arg = _parse_exclusions(definition)
else:
include_parts.append(parse_from_definition(definition, result=result))
include_parts.append(parse_from_definition(definition))

return (include_parts, diff_arg)


def parse_union_definition(definition: Dict[str, Any], result={}) -> SelectionSpec:
def parse_union_definition(definition: Dict[str, Any]) -> SelectionSpec:
union_def_parts = _get_list_dicts(definition, "union")
include, exclude = _parse_include_exclude_subdefs(union_def_parts, result=result)
include, exclude = _parse_include_exclude_subdefs(union_def_parts)

union = SelectionUnion(components=include)

Expand All @@ -169,9 +205,9 @@ def parse_union_definition(definition: Dict[str, Any], result={}) -> SelectionSp
return SelectionDifference(components=[union, exclude], raw=definition)


def parse_intersection_definition(definition: Dict[str, Any], result={}) -> SelectionSpec:
def parse_intersection_definition(definition: Dict[str, Any]) -> SelectionSpec:
intersection_def_parts = _get_list_dicts(definition, "intersection")
include, exclude = _parse_include_exclude_subdefs(intersection_def_parts, result=result)
include, exclude = _parse_include_exclude_subdefs(intersection_def_parts)
intersection = SelectionIntersection(components=include)

if exclude is None:
Expand All @@ -181,7 +217,7 @@ def parse_intersection_definition(definition: Dict[str, Any], result={}) -> Sele
return SelectionDifference(components=[intersection, exclude], raw=definition)


def parse_dict_definition(definition: Dict[str, Any], result={}) -> SelectionSpec:
def parse_dict_definition(definition: Dict[str, Any]) -> SelectionSpec:
diff_arg: Optional[SelectionSpec] = None
if len(definition) == 1:
key = list(definition)[0]
Expand All @@ -194,15 +230,10 @@ def parse_dict_definition(definition: Dict[str, Any], result={}) -> SelectionSpe
"method": key,
"value": value,
}
elif definition.get("method") == "selector":
sel_def = definition.get("value")
if sel_def not in result:
raise DbtValidationError(f"Existing selector definition for {sel_def} not found.")
return result[definition["value"]]["definition"]
elif "method" in definition and "value" in definition:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to graph/selector.py / config/project.py

dct = definition
if "exclude" in definition:
diff_arg = _parse_exclusions(definition, result=result)
diff_arg = _parse_exclusions(definition)
dct = {k: v for k, v in dct.items() if k != "exclude"}
else:
raise DbtValidationError(
Expand All @@ -220,8 +251,7 @@ def parse_dict_definition(definition: Dict[str, Any], result={}) -> SelectionSpe

def parse_from_definition(
definition: RawDefinition,
rootlevel=False,
result: Dict[str, Dict[str, Union[SelectionSpec, bool]]] = {},
rootlevel=False
) -> SelectionSpec:

if (
Expand All @@ -236,13 +266,13 @@ def parse_from_definition(
f"in a root level selector definition; found {keys}."
)
if isinstance(definition, str):
return SelectionCriteria.from_single_spec(definition)
return parse_union_from_default(raw=[definition], default=[])
elif "union" in definition:
return parse_union_definition(definition, result=result)
return parse_union_definition(definition)
elif "intersection" in definition:
return parse_intersection_definition(definition, result=result)
return parse_intersection_definition(definition)
elif isinstance(definition, dict):
return parse_dict_definition(definition, result=result)
return parse_dict_definition(definition)
else:
raise DbtValidationError(
f"Expected to find union, intersection, str or dict, instead "
Expand All @@ -259,7 +289,7 @@ def parse_from_selectors_definition(
result[selector.name] = {
"default": selector.default,
"definition": parse_from_definition(
selector.definition, rootlevel=True, result=deepcopy(result)
selector.definition, rootlevel=True
),
}
return result
17 changes: 11 additions & 6 deletions core/dbt/graph/selector.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Set, List, Optional, Tuple
from typing import Set, List, Optional, Tuple, Any

from .graph import Graph, UniqueId
from .queue import GraphQueue
Expand Down Expand Up @@ -62,8 +62,11 @@ def select_included(
"""Select the explicitly included nodes, using the given spec. Return
the selected set of unique IDs.
"""
method = self.get_method(spec.method, spec.method_arguments)
return set(method.search(included_nodes, spec.value))
if spec.method == 'selector':
return self.get_selected(self.project.get_selector(spec.value))
else:
method = self.get_method(spec.method, spec.method_arguments)
return set(method.search(included_nodes, spec.value))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The self.project.get_selector is the only place that checks if selectors actually exist

def get_nodes_from_criteria(
self, spec: SelectionCriteria
Expand Down Expand Up @@ -273,7 +276,7 @@ def incorporate_indirect_nodes(

return selected

def get_selected(self, spec: SelectionSpec) -> Set[UniqueId]:
def get_selected(self, spec: SelectionSpec, project: Any=None) -> Set[UniqueId]:
"""get_selected runs through the node selection process:

- node selection. Based on the include/exclude sets, the set
Expand All @@ -283,16 +286,18 @@ def get_selected(self, spec: SelectionSpec) -> Set[UniqueId]:
- selectors can filter the nodes after all of them have been
selected
"""
if project is not None:
self.project = project
selected_nodes, indirect_only = self.select_nodes(spec)
filtered_nodes = self.filter_selection(selected_nodes)

return filtered_nodes

def get_graph_queue(self, spec: SelectionSpec) -> GraphQueue:
def get_graph_queue(self, spec: SelectionSpec, project: Any = None) -> GraphQueue:
"""Returns a queue over nodes in the graph that tracks progress of
dependecies.
"""
selected_nodes = self.get_selected(spec)
selected_nodes = self.get_selected(spec, project)
selected_resources.set_selected_resources(selected_nodes)
new_graph = self.full_graph.get_subset_graph(selected_nodes)
# should we give a way here for consumers to mutate the graph?
Expand Down
1 change: 1 addition & 0 deletions core/dbt/graph/selector_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class MethodName(StrEnum):
Metric = "metric"
Result = "result"
SourceStatus = "source_status"
Selector = "selector"
Wildcard = "wildcard"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "selector" is added without a corresponding class


Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(self, args, config, manifest):
def _iterate_selected_nodes(self):
selector = self.get_node_selector()
spec = self.get_selection_spec()
nodes = sorted(selector.get_selected(spec))
nodes = sorted(selector.get_selected(spec, self.config))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_selected needs the project to call get_selector

if not nodes:
warn_or_error(NoNodesSelected())
return
Expand Down
Loading