Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend Cosmos custom selector to support + when using paths and tags #1150

Merged
92 changes: 74 additions & 18 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
+model_d+
2+model_e
model_f+3
+/path/to/model_g+
path:/path/to/model_h+
+tag:nightly
+config.materialized:view

https://docs.getdbt.com/reference/node-selection/graph-operators
"""
Expand Down Expand Up @@ -84,6 +88,8 @@
regex_match = re.search(GRAPH_SELECTOR_REGEX, text)
if regex_match:
precursors, node_name, descendants = regex_match.groups()
if "/" in node_name and not node_name.startswith(PATH_SELECTOR):
node_name = f"{PATH_SELECTOR}{node_name}"
return GraphSelector(node_name, precursors, descendants)
return None

Expand Down Expand Up @@ -148,22 +154,63 @@
:return: set of node ids that matches current graph selector
"""
selected_nodes: set[str] = set()
root_nodes: set[str] = set()

# Index nodes by name, we can improve performance by doing this once
# for multiple GraphSelectors
node_by_name = {}
for node_id, node in nodes.items():
node_by_name[node.name] = node_id
if PATH_SELECTOR in self.node_name:
path_selection = self.node_name[len(PATH_SELECTOR) :]
root_nodes.update({node_id for node_id, node in nodes.items() if path_selection in str(node.file_path)})

elif TAG_SELECTOR in self.node_name:
tag_selection = self.node_name[len(TAG_SELECTOR) :]
root_nodes.update({node_id for node_id, node in nodes.items() if tag_selection in node.tags})

elif CONFIG_SELECTOR in self.node_name:
tatiana marked this conversation as resolved.
Show resolved Hide resolved
config_selection_key, config_selection_value = self.node_name[len(CONFIG_SELECTOR) :].split(":")
if config_selection_key not in SUPPORTED_CONFIG:
logger.warning("Unsupported config key selector: %s", config_selection_key)

Check warning on line 172 in cosmos/dbt/selector.py

View check run for this annotation

Codecov / codecov/patch

cosmos/dbt/selector.py#L172

Added line #L172 was not covered by tests

# currently tags, materialized, and schema are the only supported config keys
# logic is separated into two conditions because the config 'tags' contains a
# list of tags, but the config 'materialized', and 'schema' contain strings
elif config_selection_key == "tags":
root_nodes.update(
{
node_id
for node_id, node in nodes.items()
if config_selection_value in node.config.get(config_selection_key, [])
}
)
elif config_selection_key in (
"materialized",
"schema",
):
root_nodes.update(
{
node_id
for node_id, node in nodes.items()
if config_selection_value == node.config.get(config_selection_key, "")
}
)

if self.node_name in node_by_name:
root_id = node_by_name[self.node_name]
else:
logger.warn(f"Selector {self.node_name} not found.")
return selected_nodes
node_by_name = {}
for node_id, node in nodes.items():
node_by_name[node.name] = node_id

if self.node_name in node_by_name:
root_id = node_by_name[self.node_name]
root_nodes.add(root_id)
else:
logger.warn(f"Selector {self.node_name} not found.")
return selected_nodes

selected_nodes.update(root_nodes)

selected_nodes.add(root_id)
self.select_node_precursors(nodes, root_id, selected_nodes)
self.select_node_descendants(nodes, root_id, selected_nodes)
for root_id in root_nodes:
self.select_node_precursors(nodes, root_id, selected_nodes)
self.select_node_descendants(nodes, root_id, selected_nodes)
return selected_nodes


Expand Down Expand Up @@ -210,14 +257,23 @@
items = statement.split(",")

for item in items:
if item.startswith(PATH_SELECTOR):
self._parse_path_selector(item)
elif item.startswith(TAG_SELECTOR):
self._parse_tag_selector(item)
elif item.startswith(CONFIG_SELECTOR):
self._parse_config_selector(item)
else:
self._parse_unknown_selector(item)
regex_match = re.search(GRAPH_SELECTOR_REGEX, item)
if regex_match:
precursors, node_name, descendants = regex_match.groups()
if node_name is None:
...
elif precursors or descendants:
self._parse_unknown_selector(item)
elif node_name.startswith(PATH_SELECTOR):
self._parse_path_selector(item)
elif "/" in node_name:
self._parse_path_selector(f"{PATH_SELECTOR}{node_name}")
tatiana marked this conversation as resolved.
Show resolved Hide resolved
elif node_name.startswith(TAG_SELECTOR):
self._parse_tag_selector(item)
elif node_name.startswith(CONFIG_SELECTOR):
self._parse_config_selector(item)
else:
self._parse_unknown_selector(item)

def _parse_unknown_selector(self, item: str) -> None:
if item:
Expand Down
76 changes: 76 additions & 0 deletions tests/dbt/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ def test_select_nodes_by_select_path():
assert selected == expected


def test_select_nodes_with_slash_but_no_path_selector():
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["gen2/models"])
expected = {
parent_node.unique_id: parent_node,
}
assert selected == expected


def test_select_nodes_by_select_union():
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["tag:has_child", "tag:nightly"])
expected = {
Expand Down Expand Up @@ -432,3 +440,71 @@ def test_should_include_node_without_depends_on(selector_config):
selector = NodeSelector({}, selector_config)
selector.visited_nodes = set()
selector._should_include_node(node.unique_id, node)


@pytest.mark.parametrize(
"select_statement, expected",
[
(
["+path:gen2/models"],
[
"model.dbt-proj.another_grandparent_node",
"model.dbt-proj.grandparent",
"model.dbt-proj.parent",
],
),
(
["path:gen2/models+"],
[
"model.dbt-proj.child",
"model.dbt-proj.parent",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
],
),
(
["gen2/models+"],
[
"model.dbt-proj.child",
"model.dbt-proj.parent",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
],
),
(
["+gen2/models"],
[
"model.dbt-proj.another_grandparent_node",
"model.dbt-proj.grandparent",
"model.dbt-proj.parent",
],
),
(
["1+tag:deprecated"],
[
"model.dbt-proj.parent",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
],
),
(
["1+config.tags:deprecated"],
[
"model.dbt-proj.parent",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
],
),
(
["config.materialized:table+"],
[
"model.dbt-proj.child",
"model.dbt-proj.sibling1",
"model.dbt-proj.sibling2",
],
),
],
)
def test_select_using_graph_operators(select_statement, expected):
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=select_statement)
assert sorted(selected.keys()) == expected
Loading