-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
selector.py
366 lines (315 loc) · 14.9 KB
/
selector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
from typing import List, Optional, Set, Tuple
from dbt import selected_resources
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import GraphMemberNode
from dbt.contracts.state import PreviousState
from dbt.events.types import NoNodesForSelectionCriteria, SelectorReportInvalidSelector
from dbt.exceptions import DbtInternalError, InvalidSelectorError
from dbt.node_types import NodeType
from dbt_common.events.functions import fire_event, warn_or_error
from .graph import Graph, UniqueId
from .queue import GraphQueue
from .selector_methods import MethodManager
from .selector_spec import IndirectSelection, SelectionCriteria, SelectionSpec
def get_package_names(nodes):
return set([node.split(".")[1] for node in nodes])
def can_select_indirectly(node):
"""If a node is not selected itself, but its parent(s) are, it may qualify
for indirect selection.
Today, only Test nodes can be indirectly selected. In the future,
other node types or invocation flags might qualify.
"""
if node.resource_type == NodeType.Test:
return True
elif node.resource_type == NodeType.Unit:
return True
else:
return False
class NodeSelector(MethodManager):
"""The node selector is aware of the graph and manifest"""
def __init__(
self,
graph: Graph,
manifest: Manifest,
previous_state: Optional[PreviousState] = None,
include_empty_nodes: bool = False,
) -> None:
super().__init__(manifest, previous_state)
self.full_graph: Graph = graph
self.include_empty_nodes: bool = include_empty_nodes
# build a subgraph containing only non-empty, enabled nodes and enabled
# sources.
graph_members = {
unique_id for unique_id in self.full_graph.nodes() if self._is_graph_member(unique_id)
}
self.graph = self.full_graph.subgraph(graph_members)
def select_included(
self,
included_nodes: Set[UniqueId],
spec: SelectionCriteria,
) -> Set[UniqueId]:
"""Select the explicitly included nodes, using the given spec. Return
the selected set of unique IDs.
"""
method = self.get_method(spec.method, spec.method_arguments)
return set(method.search(included_nodes, spec.value))
def get_nodes_from_criteria(
self, spec: SelectionCriteria
) -> Tuple[Set[UniqueId], Set[UniqueId]]:
"""Get all nodes specified by the single selection criteria.
- collect the directly included nodes
- find their specified relatives
- perform any selector-specific expansion
"""
nodes = self.graph.nodes()
try:
collected = self.select_included(nodes, spec)
except InvalidSelectorError:
valid_selectors = ", ".join(self.SELECTOR_METHODS)
fire_event(
SelectorReportInvalidSelector(
valid_selectors=valid_selectors, spec_method=spec.method, raw_spec=spec.raw
)
)
return set(), set()
neighbors = self.collect_specified_neighbors(spec, collected)
selected = collected | neighbors
# if --indirect-selection EMPTY, do not expand to adjacent tests
if spec.indirect_selection == IndirectSelection.Empty:
return selected, set()
else:
direct_nodes, indirect_nodes = self.expand_selection(
selected=selected, indirect_selection=spec.indirect_selection
)
return direct_nodes, indirect_nodes
def collect_specified_neighbors(
self, spec: SelectionCriteria, selected: Set[UniqueId]
) -> Set[UniqueId]:
"""Given the set of models selected by the explicit part of the
selector (like "tag:foo"), apply the modifiers on the spec ("+"/"@").
Return the set of additional nodes that should be collected (which may
overlap with the selected set).
"""
additional: Set[UniqueId] = set()
if spec.childrens_parents:
additional.update(self.graph.select_childrens_parents(selected))
if spec.parents:
depth = spec.parents_depth
additional.update(self.graph.select_parents(selected, depth))
if spec.children:
depth = spec.children_depth
additional.update(self.graph.select_children(selected, depth))
return additional
def select_nodes_recursively(self, spec: SelectionSpec) -> Tuple[Set[UniqueId], Set[UniqueId]]:
"""If the spec is a composite spec (a union, difference, or intersection),
recurse into its selections and combine them. If the spec is a concrete
selection criteria, resolve that using the given graph.
"""
if isinstance(spec, SelectionCriteria):
direct_nodes, indirect_nodes = self.get_nodes_from_criteria(spec)
else:
bundles = [self.select_nodes_recursively(component) for component in spec]
direct_sets = []
indirect_sets = []
for direct, indirect in bundles:
direct_sets.append(direct)
indirect_sets.append(direct | indirect)
initial_direct = spec.combined(direct_sets)
indirect_nodes = spec.combined(indirect_sets)
direct_nodes = self.incorporate_indirect_nodes(
initial_direct, indirect_nodes, spec.indirect_selection
)
if spec.expect_exists and len(direct_nodes) == 0:
warn_or_error(NoNodesForSelectionCriteria(spec_raw=str(spec.raw)))
return direct_nodes, indirect_nodes
def select_nodes(self, spec: SelectionSpec) -> Tuple[Set[UniqueId], Set[UniqueId]]:
"""Select the nodes in the graph according to the spec.
This is the main point of entry for turning a spec into a set of nodes:
- Recurse through spec, select by criteria, combine by set operation
- Return final (unfiltered) selection set
"""
direct_nodes, indirect_nodes = self.select_nodes_recursively(spec)
indirect_only = indirect_nodes.difference(direct_nodes)
return direct_nodes, indirect_only
def _is_graph_member(self, unique_id: UniqueId) -> bool:
if unique_id in self.manifest.sources:
source = self.manifest.sources[unique_id]
return source.config.enabled
elif unique_id in self.manifest.exposures:
return True
elif unique_id in self.manifest.metrics:
metric = self.manifest.metrics[unique_id]
return metric.config.enabled
elif unique_id in self.manifest.semantic_models:
semantic_model = self.manifest.semantic_models[unique_id]
return semantic_model.config.enabled
elif unique_id in self.manifest.unit_tests:
return True
elif unique_id in self.manifest.saved_queries:
saved_query = self.manifest.saved_queries[unique_id]
return saved_query.config.enabled
node = self.manifest.nodes[unique_id]
return node.config.enabled
def _is_empty_node(self, unique_id: UniqueId) -> bool:
if unique_id in self.manifest.nodes:
node = self.manifest.nodes[unique_id]
return node.empty
else:
return False
def node_is_match(self, node: GraphMemberNode) -> bool:
"""Determine if a node is a match for the selector. Non-match nodes
will be excluded from results during filtering.
"""
return True
def _is_match(self, unique_id: UniqueId) -> bool:
node: GraphMemberNode
if unique_id in self.manifest.nodes:
node = self.manifest.nodes[unique_id]
elif unique_id in self.manifest.sources:
node = self.manifest.sources[unique_id]
elif unique_id in self.manifest.exposures:
node = self.manifest.exposures[unique_id]
elif unique_id in self.manifest.metrics:
node = self.manifest.metrics[unique_id]
elif unique_id in self.manifest.semantic_models:
node = self.manifest.semantic_models[unique_id]
elif unique_id in self.manifest.unit_tests:
node = self.manifest.unit_tests[unique_id]
elif unique_id in self.manifest.saved_queries:
node = self.manifest.saved_queries[unique_id]
else:
raise DbtInternalError(f"Node {unique_id} not found in the manifest!")
return self.node_is_match(node)
def filter_selection(self, selected: Set[UniqueId]) -> Set[UniqueId]:
"""Return the subset of selected nodes that is a match for this
selector.
"""
return {
unique_id
for unique_id in selected
if self._is_match(unique_id)
and (self.include_empty_nodes or not self._is_empty_node(unique_id))
}
def expand_selection(
self,
selected: Set[UniqueId],
indirect_selection: IndirectSelection = IndirectSelection.Eager,
) -> Tuple[Set[UniqueId], Set[UniqueId]]:
# Test selection by default expands to include an implicitly/indirectly selected tests.
# `dbt test -m model_a` also includes tests that directly depend on `model_a`.
# Expansion has four modes, EAGER, CAUTIOUS and BUILDABLE, EMPTY.
#
# EAGER mode: If ANY parent is selected, select the test.
#
# CAUTIOUS mode:
# - If ALL parents are selected, select the test.
# - If ANY parent is missing, return it separately. We'll keep it around
# for later and see if its other parents show up.
#
# BUILDABLE mode:
# - If ALL parents are selected, or the parents of the test are themselves parents of the selected, select the test.
# - If ANY parent is missing, return it separately. We'll keep it around
# for later and see if its other parents show up.
#
# EMPTY mode: Only select the given node and ignore attached nodes (i.e. ignore tests attached to a model)
#
# Users can opt out of inclusive EAGER mode by passing --indirect-selection cautious
# CLI argument or by specifying `indirect_selection: true` in a yaml selector
direct_nodes = set(selected)
indirect_nodes = set()
selected_and_parents = set()
if indirect_selection == IndirectSelection.Buildable:
selected_and_parents = selected.union(self.graph.select_parents(selected)).union(
self.manifest.sources
)
for unique_id in self.graph.select_successors(selected):
if unique_id in self.manifest.nodes or unique_id in self.manifest.unit_tests:
if unique_id in self.manifest.nodes:
node = self.manifest.nodes[unique_id]
elif unique_id in self.manifest.unit_tests:
node = self.manifest.unit_tests[unique_id] # type: ignore
# Test nodes that are not selected themselves, but whose parents are selected.
# (Does not include unit tests because they can only have one parent.)
if can_select_indirectly(node):
# should we add it in directly?
if indirect_selection == IndirectSelection.Eager or set(
node.depends_on_nodes
) <= set(selected):
direct_nodes.add(unique_id)
elif indirect_selection == IndirectSelection.Buildable and set(
node.depends_on_nodes
) <= set(selected_and_parents):
direct_nodes.add(unique_id)
elif indirect_selection == IndirectSelection.Empty:
pass
else:
indirect_nodes.add(unique_id)
return direct_nodes, indirect_nodes
def incorporate_indirect_nodes(
self,
direct_nodes: Set[UniqueId],
indirect_nodes: Set[UniqueId] = set(),
indirect_selection: IndirectSelection = IndirectSelection.Eager,
) -> Set[UniqueId]:
# Check tests previously selected indirectly to see if ALL their
# parents are now present.
# performance: if identical, skip the processing below
if set(direct_nodes) == set(indirect_nodes):
return direct_nodes
selected = set(direct_nodes)
if indirect_selection == IndirectSelection.Cautious:
for unique_id in indirect_nodes:
if unique_id in self.manifest.nodes:
node = self.manifest.nodes[unique_id]
if set(node.depends_on_nodes) <= set(selected):
selected.add(unique_id)
elif indirect_selection == IndirectSelection.Buildable:
selected_and_parents = selected.union(self.graph.select_parents(selected))
for unique_id in indirect_nodes:
if unique_id in self.manifest.nodes:
node = self.manifest.nodes[unique_id]
if set(node.depends_on_nodes) <= set(selected_and_parents):
selected.add(unique_id)
return selected
def get_selected(self, spec: SelectionSpec) -> Set[UniqueId]:
"""get_selected runs through the node selection process:
- node selection. Based on the include/exclude sets, the set
of matched unique IDs is returned
- includes direct + indirect selection (for tests)
- filtering:
- selectors can filter the nodes after all of them have been
selected
"""
selected_nodes, indirect_only = self.select_nodes(spec)
filtered_nodes = self.filter_selection(selected_nodes)
return filtered_nodes
def get_graph_queue(self, spec: SelectionSpec, preserve_edges: bool = True) -> GraphQueue:
"""Returns a queue over nodes in the graph that tracks progress of
dependencies.
"""
# Filtering happens in get_selected
selected_nodes = self.get_selected(spec)
# Save to global variable
selected_resources.set_selected_resources(selected_nodes)
# Construct a new graph using the selected_nodes
new_graph = self.full_graph.get_subset_graph(selected_nodes)
# should we give a way here for consumers to mutate the graph?
return GraphQueue(new_graph.graph, self.manifest, selected_nodes, preserve_edges)
class ResourceTypeSelector(NodeSelector):
def __init__(
self,
graph: Graph,
manifest: Manifest,
previous_state: Optional[PreviousState],
resource_types: List[NodeType],
include_empty_nodes: bool = False,
) -> None:
super().__init__(
graph=graph,
manifest=manifest,
previous_state=previous_state,
include_empty_nodes=include_empty_nodes,
)
self.resource_types: Set[NodeType] = set(resource_types)
def node_is_match(self, node):
return node.resource_type in self.resource_types