Skip to content

Commit

Permalink
Add tags for sources
Browse files Browse the repository at this point in the history
Allow column-level tags, which are inherited by their tests
  • Loading branch information
Jacob Beck committed Jan 24, 2020
1 parent 9d5488e commit 88e2f43
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 65 deletions.
6 changes: 2 additions & 4 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class ColumnInfo(JsonSchemaMixin, Replaceable):
description: str = ''
meta: Dict[str, Any] = field(default_factory=dict)
data_type: Optional[str] = None
tags: List[str] = field(default_factory=list)


# Docrefs are not quite like regular references, as they indicate what they
Expand Down Expand Up @@ -513,6 +514,7 @@ class ParsedSourceDefinition(
columns: Dict[str, ColumnInfo] = field(default_factory=dict)
meta: Dict[str, Any] = field(default_factory=dict)
source_meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)

@property
def is_ephemeral_model(self):
Expand All @@ -530,10 +532,6 @@ def refs(self):
def sources(self):
return []

@property
def tags(self):
return []

@property
def has_freshness(self):
return bool(self.freshness) and self.loaded_at_field is not None
Expand Down
9 changes: 8 additions & 1 deletion core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,14 @@ def __post_init__(self):
self.tests = []


@dataclass
class UnparsedColumn(NamedTested):
tags: List[str] = field(default_factory=list)


@dataclass
class ColumnDescription(JsonSchemaMixin, Replaceable):
columns: List[NamedTested] = field(default_factory=list)
columns: List[UnparsedColumn] = field(default_factory=list)


@dataclass
Expand Down Expand Up @@ -206,6 +211,7 @@ class UnparsedSourceTableDefinition(ColumnDescription, NodeDescription):
external: Optional[ExternalTable] = field(
default_factory=ExternalTable
)
tags: List[str] = field(default_factory=list)

def __post_init__(self):
NodeDescription.__post_init__(self)
Expand All @@ -225,6 +231,7 @@ class UnparsedSourceDefinition(JsonSchemaMixin, Replaceable):
)
loaded_at_field: Optional[str] = None
tables: List[UnparsedSourceTableDefinition] = field(default_factory=list)
tags: List[str] = field(default_factory=list)

@property
def yaml_key(self) -> 'str':
Expand Down
5 changes: 4 additions & 1 deletion core/dbt/graph/selector.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from itertools import chain

import networkx as nx # type: ignore

Expand Down Expand Up @@ -180,7 +181,9 @@ class TagSelector(ManifestSelector):

def search(self, included_nodes, selector):
""" yields nodes from graph that have the specified tag """
for node, real_node in self.parsed_nodes(included_nodes):
search = chain(self.parsed_nodes(included_nodes),
self.source_nodes(included_nodes))
for node, real_node in search:
if selector in real_node.tags:
yield node

Expand Down
18 changes: 11 additions & 7 deletions core/dbt/parser/schema_test_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dbt.clients.jinja import get_rendered
from dbt.contracts.graph.unparsed import (
UnparsedNodeUpdate, UnparsedSourceDefinition,
UnparsedSourceTableDefinition, NamedTested
UnparsedSourceTableDefinition, UnparsedColumn
)
from dbt.exceptions import raise_compiler_error
from dbt.parser.search import FileBlock
Expand Down Expand Up @@ -79,7 +79,7 @@ def name(self) -> str:
return '{0.name}_{1.name}'.format(self.source, self.table)

@property
def columns(self) -> List[NamedTested]:
def columns(self) -> List[UnparsedColumn]:
if self.table.columns is None:
return []
else:
Expand Down Expand Up @@ -136,17 +136,23 @@ def from_yaml_block(
class SchemaTestBlock(TargetBlock):
test: Dict[str, Any]
column_name: Optional[str]
tags: List[str]

@classmethod
def from_target_block(
cls, src: TargetBlock, test: Dict[str, Any], column_name: Optional[str]
cls,
src: TargetBlock,
test: Dict[str, Any],
column_name: Optional[str],
tags: List[str],
) -> 'SchemaTestBlock':
return cls(
file=src.file,
data=src.data,
target=src.target,
test=test,
column_name=column_name
column_name=column_name,
tags=tags,
)


Expand All @@ -156,8 +162,6 @@ class TestBuilder(Generic[Target]):
Test names have the following pattern:
- the test name itself may be namespaced (package.test)
- or it may not be namespaced (test)
- the test may have arguments embedded in the name (, severity=WARN)
- or it may not have arguments.
"""
TEST_NAME_PATTERN = re.compile(
Expand Down Expand Up @@ -286,7 +290,7 @@ def build_raw_sql(self) -> str:
model=self.build_model_str(),
macro=self.macro_name(),
kwargs=self.test_kwargs_str(),
severity=self.severity()
severity=self.severity(),
)

def build_model_str(self):
Expand Down
45 changes: 33 additions & 12 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import os

from abc import abstractmethod
Expand All @@ -19,7 +20,7 @@
ParsedTestNode,
)
from dbt.contracts.graph.unparsed import (
UnparsedSourceDefinition, UnparsedNodeUpdate, NamedTested,
UnparsedSourceDefinition, UnparsedNodeUpdate, UnparsedColumn,
UnparsedSourceTableDefinition, FreshnessThreshold
)
from dbt.context.parser import docs
Expand Down Expand Up @@ -68,11 +69,14 @@ def __init__(self):
self.column_info: Dict[str, ColumnInfo] = {}
self.docrefs: List[Docref] = []

def add(self, column_name, description, data_type, meta):
self.column_info[column_name] = ColumnInfo(name=column_name,
description=description,
data_type=data_type,
meta=meta)
def add(self, column: UnparsedColumn, description, data_type, meta):
self.column_info[column.name] = ColumnInfo(
name=column.name,
description=description,
data_type=data_type,
meta=meta,
tags=column.tags,
)


def collect_docrefs(
Expand Down Expand Up @@ -160,21 +164,21 @@ def _yaml_from_file(
return None

def parse_column(
self, block: TargetBlock, column: NamedTested, refs: ParserRef
self, block: TargetBlock, column: UnparsedColumn, refs: ParserRef
) -> None:
column_name = column.name
description = column.description
data_type = column.data_type
meta = column.meta
collect_docrefs(block.target, refs, column_name, description)

refs.add(column_name, description, data_type, meta)
refs.add(column, description, data_type, meta)

if not column.tests:
return

for test in column.tests:
self.parse_test(block, test, column_name)
self.parse_test(block, test, column)

def parse_node(self, block: SchemaTestBlock) -> ParsedTestNode:
"""In schema parsing, we rewrite most of the part of parse_node that
Expand Down Expand Up @@ -209,11 +213,16 @@ def parse_node(self, block: SchemaTestBlock) -> ParsedTestNode:
'kwargs': builder.args,
}

# copy - we don't want to mutate the tags!
tags = block.tags[:]
if 'schema' not in tags:
tags.append('schema')

node = self._create_parsetime_node(
block=block,
path=compiled_path,
config=config,
tags=['schema'],
tags=tags,
name=builder.fqn_name,
raw_sql=builder.build_raw_sql(),
column_name=block.column_name,
Expand All @@ -227,16 +236,24 @@ def parse_test(
self,
target_block: TargetBlock,
test: TestDef,
column_name: Optional[str]
column: Optional[UnparsedColumn],
) -> None:

if isinstance(test, str):
test = {test: {}}

if column is None:
column_name: Optional[str] = None
column_tags: List[str] = []
else:
column_name = column.name
column_tags = column.tags

block = SchemaTestBlock.from_target_block(
src=target_block,
test=test,
column_name=column_name
column_name=column_name,
tags=column_tags,
)
try:
self.parse_node(block)
Expand Down Expand Up @@ -395,6 +412,9 @@ def parse_with_refs(
path = block.path.original_file_path
source_meta = source.meta or {}

# make sure we don't do duplicate tags from source + table
tags = sorted(set(itertools.chain(source.tags, table.tags)))

result = ParsedSourceDefinition(
package_name=self.project.project_name,
database=(source.database or self.default_database),
Expand All @@ -419,6 +439,7 @@ def parse_with_refs(
quoting=quoting,
resource_type=NodeType.Source,
fqn=[self.project.project_name, source.name, table.name],
tags=tags,
)
self.results.add_source(self.yaml.file, result)

Expand Down
16 changes: 16 additions & 0 deletions test/integration/008_schema_tests_test/models-v2/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ models:
tests:
- not_null
- unique
tags:
- table_id
- name: first_name
description: "The user's first name"
tests:
- not_null
tags:
- table_first_name
- name: ip_address
description: "The user's IP address"
tests:
Expand All @@ -29,6 +33,8 @@ models:
description: "The user's favorite color"
tests:
- accepted_values: { values: ['blue', 'green'], quote: true }
tags:
- table_favorite_color
- name: fav_number
description: "The user's favorite number"
tests:
Expand All @@ -47,6 +53,8 @@ models:
- unique
- accepted_values: { values: ['blue', 'green'] }
- relationships: { field: favorite_color, to: ref('table_copy') }
tags:
- table_favorite_color
- name: count
description: "The number of responses for this favorite color"
tests:
Expand All @@ -61,10 +69,14 @@ models:
tests:
- not_null
- unique
tags:
- xfail
- name: favorite_color
description: "The user's favorite color"
tests:
- accepted_values: { values: ['blue', 'green'] }
tags:
- xfail

# all of these constraints will fail
- name: table_failure_summary
Expand All @@ -75,6 +87,8 @@ models:
tests:
- accepted_values: { values: ['red'] }
- relationships: { field: favorite_color, to: ref('table_copy') }
tags:
- xfail

# this table is disabled so these tests should be ignored
- name: table_disabled
Expand All @@ -94,3 +108,5 @@ models:
description: "The user ID"
tests:
- relationships: { field: id, to: ref('table_failure_copy') }
tags:
- xfail
Loading

0 comments on commit 88e2f43

Please sign in to comment.