From a3b9e6196748dd3653d24b2fd17d1bce16882db0 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Mon, 23 Nov 2020 23:52:57 -0500 Subject: [PATCH 1/9] first pass, lots of TODO's [skip ci] --- Makefile | 8 +-- core/dbt/contracts/graph/unparsed.py | 9 +-- core/dbt/contracts/results.py | 98 +++++++++++++++++----------- core/dbt/task/base.py | 68 ++++++++++--------- core/dbt/task/freshness.py | 20 +++--- core/dbt/task/generate.py | 4 +- core/dbt/task/printer.py | 45 +++++++------ core/dbt/task/run.py | 26 ++++++-- core/dbt/task/runnable.py | 17 +++-- core/dbt/task/seed.py | 3 +- core/dbt/task/test.py | 35 ++++++++-- 11 files changed, 207 insertions(+), 126 deletions(-) diff --git a/Makefile b/Makefile index c102d7221a3..ced0dbfc058 100644 --- a/Makefile +++ b/Makefile @@ -7,19 +7,19 @@ install: test: .env @echo "Full test run starting..." - @time docker-compose run test tox + @time docker-compose run --rm test tox test-unit: .env @echo "Unit test run starting..." - @time docker-compose run test tox -e unit-py36,flake8 + @time docker-compose run --rm test tox -e unit-py36,flake8 test-integration: .env @echo "Integration test run starting..." - @time docker-compose run test tox -e integration-postgres-py36,integration-redshift-py36,integration-snowflake-py36,integration-bigquery-py36 + @time docker-compose run --rm test tox -e integration-postgres-py36,integration-redshift-py36,integration-snowflake-py36,integration-bigquery-py36 test-quick: .env @echo "Integration test run starting..." - @time docker-compose run test tox -e integration-postgres-py36 -- -x + @time docker-compose run --rm test tox -e integration-postgres-py36 -- -x # This rule creates a file named .env that is used by docker-compose for passing # the USER_ID and GROUP_ID arguments to the Docker image. diff --git a/core/dbt/contracts/graph/unparsed.py b/core/dbt/contracts/graph/unparsed.py index 6e272bafaf7..70eb95710b8 100644 --- a/core/dbt/contracts/graph/unparsed.py +++ b/core/dbt/contracts/graph/unparsed.py @@ -158,19 +158,14 @@ def exceeded(self, actual_age: float) -> bool: return actual_age > difference -class FreshnessStatus(StrEnum): - Pass = 'pass' - Warn = 'warn' - Error = 'error' - - @dataclass class FreshnessThreshold(JsonSchemaMixin, Mergeable): warn_after: Optional[Time] = None error_after: Optional[Time] = None filter: Optional[str] = None - def status(self, age: float) -> FreshnessStatus: + def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus": + from dbt.contracts.results import FreshnessStatus if self.error_after and self.error_after.exceeded(age): return FreshnessStatus.Error elif self.warn_after and self.warn_after.exceeded(age): diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index dd4311637b4..280869fc133 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -1,6 +1,6 @@ from dbt.contracts.graph.manifest import CompileResultNode from dbt.contracts.graph.unparsed import ( - FreshnessStatus, FreshnessThreshold + FreshnessThreshold ) from dbt.contracts.graph.parsed import ParsedSourceDefinition from dbt.contracts.util import ( @@ -55,22 +55,52 @@ def __exit__(self, exc_type, exc_value, traceback): logger.debug('finished collecting timing info') +class NodeStatus(StrEnum): + Success = "success" + Error = "error" + Fail = "fail" + Warn = "warn" + Skipped = "skipped" + Pass = "pass" + RuntimeError = "runtime error" + + +class RunStatus(StrEnum): + Success = NodeStatus.Success + Error = NodeStatus.Error + Skipped = NodeStatus.Skipped + + +class TestStatus(StrEnum): + Success = NodeStatus.Success + Error = NodeStatus.Error + Fail = NodeStatus.Fail + Warn = NodeStatus.Warn + + +class FreshnessStatus(StrEnum): # maybe this should be the same as test status? + Pass = NodeStatus.Pass + Warn = NodeStatus.Warn + Error = NodeStatus.Error + RuntimeError = NodeStatus.RuntimeError + + @dataclass class BaseResult(JsonSchemaMixin): - node: CompileResultNode - error: Optional[str] = None - status: Union[None, str, int, bool] = None - execution_time: Union[str, int] = 0 - thread_id: Optional[str] = None - timing: List[TimingInfo] = field(default_factory=list) - fail: Optional[bool] = None - warn: Optional[bool] = None + status: Union[RunStatus, TestStatus, FreshnessStatus] + timing: List[TimingInfo] + thread_id: str + execution_time: float + message: Optional[str] @dataclass -class PartialResult(BaseResult, Writable): - pass +class NodeResult(BaseResult): + node: CompileResultNode + +@dataclass +class PartialNodeResult(NodeResult, Writable): # if the result got to the point where it could be skipped/failed, we would # be returning a real result, not a partial. @property @@ -79,12 +109,10 @@ def skipped(self): @dataclass -class WritableRunModelResult(BaseResult, Writable): - skip: bool = False - +class WritableRunModelResult(NodeResult, Writable): @property def skipped(self): - return self.skip + return self.status == RunStatus.Skipped @dataclass @@ -99,7 +127,7 @@ def to_dict(self, *args, **kwargs): @dataclass class ExecutionResult(JsonSchemaMixin): - results: Sequence[BaseResult] + results: Sequence[NodeResult] elapsed_time: float def __len__(self): @@ -112,7 +140,7 @@ def __getitem__(self, idx): return self.results[idx] -RunResult = Union[PartialResult, WritableRunModelResult] +RunResult = Union[PartialNodeResult, WritableRunModelResult] @dataclass @@ -174,7 +202,7 @@ def from_success( elapsed_time: float, generated_at: datetime, ): - meta = RunResultsMetadata( + meta = RunOperationResultMetadata( dbt_schema_version=str(cls.dbt_schema_version), generated_at=generated_at, ) @@ -186,26 +214,15 @@ def from_success( ) -@dataclass -class SourceFreshnessResultMixin(JsonSchemaMixin): - max_loaded_at: datetime - snapshotted_at: datetime - age: float - - # due to issues with typing.Union collapsing subclasses, this can't subclass # PartialResult @dataclass -class SourceFreshnessResult(BaseResult, Writable, SourceFreshnessResultMixin): +class SourceFreshnessResult(NodeResult, Writable): node: ParsedSourceDefinition - status: FreshnessStatus = FreshnessStatus.Pass - - def __post_init__(self): - self.fail = self.status == 'error' - - @property - def warned(self): - return self.status == 'warn' + status: FreshnessStatus + max_loaded_at: datetime + snapshotted_at: datetime + age: float @property def skipped(self): @@ -237,18 +254,25 @@ class SourceFreshnessOutput(JsonSchemaMixin): criteria: FreshnessThreshold -FreshnessNodeResult = Union[PartialResult, SourceFreshnessResult] +@dataclass +class PartialSourceFreshnessResult(PartialNodeResult): + status: FreshnessStatus + + +FreshnessNodeResult = Union[PartialSourceFreshnessResult, + SourceFreshnessResult] FreshnessNodeOutput = Union[SourceFreshnessRuntimeError, SourceFreshnessOutput] def process_freshness_result( result: FreshnessNodeResult ) -> FreshnessNodeOutput: + # TODO(kw) source freshness refactor unique_id = result.node.unique_id - if result.error is not None: + if result.status is FreshnessStatus.RuntimeError: return SourceFreshnessRuntimeError( unique_id=unique_id, - error=result.error, + error=result.message or "", state=FreshnessErrorEnum.runtime_error, ) diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py index d421cdc6592..eabc1962886 100644 --- a/core/dbt/task/base.py +++ b/core/dbt/task/base.py @@ -9,7 +9,7 @@ from dbt import ui from dbt.contracts.graph.manifest import Manifest from dbt.contracts.results import ( - RunModelResult, collect_timing_info + NodeStatus, RunModelResult, collect_timing_info, RunStatus ) from dbt.exceptions import ( NotImplementedException, CompilationException, RuntimeException, @@ -165,6 +165,7 @@ class ExecutionContext: """During execution and error handling, dbt makes use of mutable state: timing information and the newest (compiled vs executed) form of the node. """ + def __init__(self, node): self.timing = [] self.node = node @@ -186,16 +187,18 @@ def compile(self, manifest: Manifest) -> Any: pass def get_result_status(self, result) -> Dict[str, str]: - if result.error: - return {'node_status': 'error', 'node_error': str(result.error)} - elif result.skip: + if result.status == NodeStatus.Error: + return {'node_status': 'error', 'node_error': str(result.message)} + elif result.status == NodeStatus.Skipped: return {'node_status': 'skipped'} - elif result.fail: + elif result.status == NodeStatus.Fail: return {'node_status': 'failed'} - elif result.warn: + elif result.status == NodeStatus.Warn: return {'node_status': 'warn'} - else: + elif result.status == NodeStatus.Success: return {'node_status': 'passed'} + else: + raise RuntimeError(f"unknown status {result.status}") def run_with_hooks(self, manifest): if self.skip: @@ -212,54 +215,59 @@ def run_with_hooks(self, manifest): return result - def _build_run_result(self, node, start_time, error, status, timing_info, - skip=False, fail=None, warn=None, agate_table=None): + def _build_run_result(self, node, start_time, status, timing_info, message, + agate_table=None): execution_time = time.time() - start_time thread_id = threading.current_thread().name return RunModelResult( - node=node, - error=error, - skip=skip, status=status, - fail=fail, - warn=warn, - execution_time=execution_time, thread_id=thread_id, + execution_time=execution_time, timing=timing_info, - agate_table=agate_table, + message=message, + node=node, + agate_table=agate_table ) - def error_result(self, node, error, start_time, timing_info): + def error_result(self, node, message, start_time, timing_info): return self._build_run_result( node=node, start_time=start_time, - error=error, - status='ERROR', - timing_info=timing_info + status=RunStatus.Error, + timing_info=timing_info, + message=message, ) def ephemeral_result(self, node, start_time, timing_info): return self._build_run_result( node=node, start_time=start_time, - error=None, status=None, - timing_info=timing_info + timing_info=timing_info, + message=None ) def from_run_result(self, result, start_time, timing_info): return self._build_run_result( node=result.node, start_time=start_time, - error=result.error, - skip=result.skip, status=result.status, - fail=result.fail, - warn=result.warn, timing_info=timing_info, + message=result.message, agate_table=result.agate_table, ) + def skip_result(self, node, message): + thread_id = threading.current_thread().name + return RunModelResult( + status=RunStatus.Skipped, + thread_id=thread_id, + execution_time=0, + timing=[], + message=message, + node=node, + ) + def compile_and_execute(self, manifest, ctx): result = None with self.adapter.connection_for(self.node): @@ -340,7 +348,7 @@ def safe_run(self, manifest): # an error if ( exc_str is not None and result is not None and - result.error is None and error is None + result.status != NodeStatus.Error and error is None ): error = exc_str @@ -389,7 +397,7 @@ def on_skip(self): schema_name = self.node.schema node_name = self.node.name - error = None + error_message = None if not self.node.is_ephemeral_model: # if this model was skipped due to an upstream ephemeral model # failure, print a special 'error skip' message. @@ -408,7 +416,7 @@ def on_skip(self): 'an ephemeral failure' ) # set an error so dbt will exit with an error code - error = ( + error_message = ( 'Compilation Error in {}, caused by compilation error ' 'in referenced ephemeral model {}' .format(self.node.unique_id, @@ -423,7 +431,7 @@ def on_skip(self): self.num_nodes ) - node_result = RunModelResult(self.node, skip=True, error=error) + node_result = self.skip_result(self.node, error_message) return node_result def do_skip(self, cause=None): diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 7e54cc1fbc9..0c522b7c9fd 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -13,8 +13,8 @@ from dbt.contracts.results import ( FreshnessExecutionResultArtifact, - FreshnessResult, - PartialResult, + FreshnessResult, NodeStatus, + PartialNodeResult, RunResult, RunStatus, SourceFreshnessResult, ) from dbt.exceptions import RuntimeException, InternalException @@ -37,10 +37,11 @@ def on_skip(self): ) def get_result_status(self, result) -> Dict[str, str]: - if result.error: - return {'node_status': 'error', 'node_error': str(result.error)} + if result.status == NodeStatus.Error: + return {'node_status': 'error', 'node_error': str(result.message)} else: - return {'node_status': str(result.status)} + # TODO(kw) I think this needs to be updated + return {'node_status': str(result.message)} def before_execute(self): description = 'freshness of {0.source_name}.{0.name}'.format(self.node) @@ -54,13 +55,14 @@ def _build_run_result(self, node, start_time, error, status, timing_info, execution_time = time.time() - start_time thread_id = threading.current_thread().name status = utils.lowercase(status) - return PartialResult( + # TODO(kw): uhh not sure what type to return here + return PartialNodeResult( node=node, - status=status, - error=error, + status="Asdf", execution_time=execution_time, thread_id=thread_id, timing=timing_info, + message="" ) def from_run_result(self, result, start_time, timing_info): @@ -160,7 +162,7 @@ def get_result(self, results, elapsed_time, generated_at): def task_end_messages(self, results): for result in results: - if result.error is not None: + if result.status == NodeStatus.Error: print_run_result_error(result) print_timestamped_line('Done.') diff --git a/core/dbt/task/generate.py b/core/dbt/task/generate.py index 1f666466f59..7f6c5253547 100644 --- a/core/dbt/task/generate.py +++ b/core/dbt/task/generate.py @@ -11,7 +11,7 @@ from dbt.contracts.graph.compiled import CompileResultNode from dbt.contracts.graph.manifest import Manifest from dbt.contracts.results import ( - TableMetadata, CatalogTable, CatalogResults, Primitive, CatalogKey, + NodeStatus, RunStatus, TableMetadata, CatalogTable, CatalogResults, Primitive, CatalogKey, StatsItem, StatsDict, ColumnMetadata, CatalogArtifact ) from dbt.exceptions import InternalException @@ -211,7 +211,7 @@ def run(self) -> CatalogArtifact: compile_results = None if self.args.compile: compile_results = CompileTask.run(self) - if any(r.error is not None for r in compile_results): + if any(r.status == NodeStatus.Error for r in compile_results): print_timestamped_line( 'compile failed, cannot generate docs' ) diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index 850e9009251..4dc12a6543b 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -11,6 +11,8 @@ from dbt import ui from dbt import utils +from dbt.contracts.results import NodeResult, NodeStatus + def print_fancy_output_line( msg: str, status: str, logger_fn: Callable, index: Optional[int], @@ -98,36 +100,36 @@ def print_cancel_line(model) -> None: def get_printable_result( result, success: str, error: str) -> Tuple[str, str, Callable]: - if result.error is not None: + if result.status == NodeStatus.Error: info = 'ERROR {}'.format(error) - status = ui.red(result.status) + status = ui.red(result.message) logger_fn = logger.error else: info = 'OK {}'.format(success) - status = ui.green(result.status) + status = ui.green(result.message) logger_fn = logger.info return info, status, logger_fn def print_test_result_line( - result, schema_name, index: int, total: int + result: NodeResult, schema_name, index: int, total: int ) -> None: model = result.node - if result.error is not None: + if result.status == NodeStatus.Error: info = "ERROR" color = ui.red logger_fn = logger.error - elif result.status == 0: + elif result.status == NodeStatus.Success: info = 'PASS' color = ui.green logger_fn = logger.info - elif result.warn: + elif result.status == NodeStatus.Warn: info = 'WARN {}'.format(result.status) color = ui.yellow logger_fn = logger.warning - elif result.fail: + elif result.status == NodeStatus.Fail: info = 'FAIL {}'.format(result.status) color = ui.red logger_fn = logger.error @@ -196,15 +198,16 @@ def print_seed_result_line(result, schema_name: str, index: int, total: int): def print_freshness_result_line(result, index: int, total: int) -> None: - if result.error: + # TODO(kw) uhhhh + if result.status == NodeStatus.RuntimeError: info = 'ERROR' color = ui.red logger_fn = logger.error - elif result.status == 'error': + elif result.status == NodeStatus.Error: info = 'ERROR STALE' color = ui.red logger_fn = logger.error - elif result.status == 'warn': + elif result.status == NodeStatus.Warn: info = 'WARN' color = ui.yellow logger_fn = logger.warning @@ -237,14 +240,16 @@ def print_freshness_result_line(result, index: int, total: int) -> None: def interpret_run_result(result) -> str: - if result.error is not None or result.fail: + if result.status in (NodeStatus.Error, NodeStatus.Fail): return 'error' - elif result.skipped: + elif result.status == NodeStatus.Skipped: return 'skip' - elif result.warn: + elif result.status == NodeStatus.Warn: return 'warn' - else: + elif result.status in (NodeStatus.Pass, NodeStatus.Success): return 'pass' + else: + raise RuntimeError(f"unhandled result {result}") def print_run_status_line(results) -> None: @@ -272,7 +277,7 @@ def print_run_result_error( with TextOnly(): logger.info("") - if result.fail or (is_warning and result.warn): + if result.status == NodeStatus.Fail or (is_warning and result.status == NodeStatus.Warn): if is_warning: color = ui.yellow info = 'Warning' @@ -303,7 +308,7 @@ def print_run_result_error( else: first = True - for line in result.error.split("\n"): + for line in result.message.split("\n"): if first: logger.error(ui.yellow(line)) first = False @@ -342,8 +347,10 @@ def print_end_of_run_summary( def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None: - errors = [r for r in results if r.error is not None or r.fail] - warnings = [r for r in results if r.warn] + # or r.fail] <- TODO(kw) do we need to handle fail? + errors = [r for r in results if r.status in ( + NodeStatus.Error, NodeStatus.Fail)] + warnings = [r for r in results if r.status == NodeStatus.Warn] with DbtStatusMessage(), InvocationProcessor(): print_end_of_run_summary(len(errors), len(warnings), diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 086d423477e..bf708a8472d 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -23,7 +23,7 @@ from dbt.contracts.graph.manifest import WritableManifest from dbt.contracts.graph.model_config import Hook from dbt.contracts.graph.parsed import ParsedHookNode -from dbt.contracts.results import RunModelResult +from dbt.contracts.results import NodeStatus, RunModelResult, RunStatus from dbt.exceptions import ( CompilationException, InternalException, @@ -105,8 +105,9 @@ def track_model_run(index, num_nodes, run_model_result): "index": index, "total": num_nodes, "execution_time": run_model_result.execution_time, - "run_status": run_model_result.status, - "run_skipped": run_model_result.skip, + # TODO(kw) might need to update model run schema! + "run_status": run_model_result.message, + "run_skipped": run_model_result.status == RunStatus.Skipped, "run_error": None, "model_materialization": run_model_result.node.get_materialization(), "model_id": utils.get_hash(run_model_result.node), @@ -187,7 +188,16 @@ def after_execute(self, result): def _build_run_model_result(self, model, context): result = context['load_result']('main') - return RunModelResult(model, status=result.status) + # TODO(kw) clean this up + return RunModelResult( + node=model, + status=RunStatus.Success, + timing=[], + thread_id="asdf", + execution_time=0, + message=result.status, + agate_table=None + ) def _materialization_relations( self, result: Any, model @@ -400,10 +410,16 @@ def after_run(self, adapter, results): # list of unique database, schema pairs that successfully executed # models were in. for backwards compatibility, include the old # 'schemas', which did not include database information. + database_schema_set: Set[Tuple[Optional[str], str]] = { (r.node.database, r.node.schema) for r in results - if not any((r.error is not None, r.fail, r.skipped)) + if r.status not in ( + NodeStatus.Error, + NodeStatus.Fail, + NodeStatus.Skipped + ) } + self._total_executed += len(results) extras = { diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 7058e7bdad7..4ec83193175 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -31,7 +31,7 @@ from dbt.contracts.graph.compiled import CompileResultNode from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.parsed import ParsedSourceDefinition -from dbt.contracts.results import RunResultsArtifact +from dbt.contracts.results import NodeStatus, RunResultsArtifact from dbt.contracts.state import PreviousState from dbt.exceptions import ( InternalException, @@ -189,17 +189,17 @@ def call_runner(self, runner): fail_fast = getattr(self.config.args, 'fail_fast', False) - if (result.fail is not None or result.error is not None) and fail_fast: + if result.status in (NodeStatus.Error, NodeStatus.Fail) and fail_fast: self._raise_next_tick = FailFastException( message='Failing early due to test failure or runtime error', result=result, node=getattr(result, 'node', None) ) - elif result.error is not None and self.raise_on_first_error(): + elif result.status == NodeStatus.Error and self.raise_on_first_error(): # if we raise inside a thread, it'll just get silently swallowed. # stash the error message we want here, and it will check the # next 'tick' - should be soon since our thread is about to finish! - self._raise_next_tick = RuntimeException(result.error) + self._raise_next_tick = RuntimeException(result.message) return result @@ -287,7 +287,7 @@ def _handle_result(self, result): else: self.manifest.update_node(node) - if result.error is not None: + if result.status is NodeStatus.Error: if is_ephemeral: cause = result else: @@ -436,7 +436,12 @@ def interpret_results(self, results): if results is None: return False - failures = [r for r in results if r.error or r.fail] + failures = [ + r for r in results if r.status in ( + NodeStatus.Error, + NodeStatus.Fail + ) + ] return len(failures) == 0 def get_model_schemas( diff --git a/core/dbt/task/seed.py b/core/dbt/task/seed.py index 47a89d1a5db..efc060a3321 100644 --- a/core/dbt/task/seed.py +++ b/core/dbt/task/seed.py @@ -7,6 +7,7 @@ print_run_end_messages, ) +from dbt.contracts.results import RunStatus from dbt.exceptions import InternalException from dbt.graph import ResourceTypeSelector from dbt.logger import GLOBAL_LOGGER as logger, TextOnly @@ -79,5 +80,5 @@ def show_table(self, result): def show_tables(self, results): for result in results: - if result.error is None: + if result.status != RunStatus.Error: self.show_table(result) diff --git a/core/dbt/task/test.py b/core/dbt/task/test.py index 084ca4247c0..ae612980a7a 100644 --- a/core/dbt/task/test.py +++ b/core/dbt/task/test.py @@ -1,3 +1,4 @@ +import threading from typing import Dict, Any, Set from .compile import CompileRunner @@ -14,7 +15,7 @@ ParsedDataTestNode, ParsedSchemaTestNode, ) -from dbt.contracts.results import RunModelResult +from dbt.contracts.results import RunModelResult, TestStatus from dbt.exceptions import raise_compiler_error, InternalException from dbt.graph import ( ResourceTypeSelector, @@ -83,19 +84,40 @@ def execute(self, test: CompiledTestNode, manifest: Manifest): elif isinstance(test, CompiledSchemaTestNode): failed_rows = self.execute_schema_test(test) else: - raise InternalException( f'Expected compiled schema test or compiled data test, got ' f'{type(test)}' ) - severity = test.config.severity.upper() + severity = test.config.severity.upper() + thread_id = threading.current_thread().name if failed_rows == 0: - return RunModelResult(test, status=failed_rows) + return RunModelResult( + node=test, + status=TestStatus.Success, + timing=[], + thread_id=thread_id, + execution_time=0, + message=None, + ) elif severity == 'ERROR' or flags.WARN_ERROR: - return RunModelResult(test, status=failed_rows, fail=True) + return RunModelResult( + node=test, + status=TestStatus.Fail, + timing=[], + thread_id=thread_id, + execution_time=0, + message=failed_rows, + ) else: - return RunModelResult(test, status=failed_rows, warn=True) + return RunModelResult( + node=test, + status=TestStatus.Warn, + timing=[], + thread_id=thread_id, + execution_time=0, + message=failed_rows, + ) def after_execute(self, result): self.print_result_line(result) @@ -132,6 +154,7 @@ class TestTask(RunTask): Read schema files + custom data tests and validate that constraints are satisfied. """ + def raise_on_first_error(self): return False From 867e2402d23588895ea0014e931e397370c1d6d9 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Mon, 30 Nov 2020 09:00:56 -0500 Subject: [PATCH 2/9] chugging along --- core/dbt/contracts/results.py | 12 ++++-------- core/dbt/task/base.py | 8 +++----- core/dbt/task/compile.py | 12 ++++++++++-- core/dbt/task/freshness.py | 12 ++---------- core/dbt/task/generate.py | 4 ++-- core/dbt/task/printer.py | 13 +++++-------- core/dbt/task/run.py | 4 +--- core/dbt/task/runnable.py | 2 +- test/unit/test_contracts_graph_unparsed.py | 12 ++++++++---- 9 files changed, 36 insertions(+), 43 deletions(-) diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index 280869fc133..ad6a0c69fa4 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -62,7 +62,7 @@ class NodeStatus(StrEnum): Warn = "warn" Skipped = "skipped" Pass = "pass" - RuntimeError = "runtime error" + RuntimeErr = "runtime error" class RunStatus(StrEnum): @@ -78,11 +78,11 @@ class TestStatus(StrEnum): Warn = NodeStatus.Warn -class FreshnessStatus(StrEnum): # maybe this should be the same as test status? +class FreshnessStatus(StrEnum): # maybe this should be the same as test status Pass = NodeStatus.Pass Warn = NodeStatus.Warn Error = NodeStatus.Error - RuntimeError = NodeStatus.RuntimeError + RuntimeErr = NodeStatus.RuntimeErr @dataclass @@ -229,10 +229,6 @@ def skipped(self): return False -def _copykeys(src, keys, **updates): - return {k: getattr(src, k) for k in keys} - - class FreshnessErrorEnum(StrEnum): runtime_error = 'runtime error' @@ -269,7 +265,7 @@ def process_freshness_result( ) -> FreshnessNodeOutput: # TODO(kw) source freshness refactor unique_id = result.node.unique_id - if result.status is FreshnessStatus.RuntimeError: + if result.status is FreshnessStatus.RuntimeErr: return SourceFreshnessRuntimeError( unique_id=unique_id, error=result.message or "", diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py index eabc1962886..c78fea73b53 100644 --- a/core/dbt/task/base.py +++ b/core/dbt/task/base.py @@ -195,10 +195,8 @@ def get_result_status(self, result) -> Dict[str, str]: return {'node_status': 'failed'} elif result.status == NodeStatus.Warn: return {'node_status': 'warn'} - elif result.status == NodeStatus.Success: - return {'node_status': 'passed'} else: - raise RuntimeError(f"unknown status {result.status}") + return {'node_status': 'passed'} def run_with_hooks(self, manifest): if self.skip: @@ -242,7 +240,7 @@ def ephemeral_result(self, node, start_time, timing_info): return self._build_run_result( node=node, start_time=start_time, - status=None, + status=RunStatus.Success, timing_info=timing_info, message=None ) @@ -251,7 +249,7 @@ def from_run_result(self, result, start_time, timing_info): return self._build_run_result( node=result.node, start_time=start_time, - status=result.status, + status=RunStatus.Success, # TODO(kw) fix this! timing_info=timing_info, message=result.message, agate_table=result.agate_table, diff --git a/core/dbt/task/compile.py b/core/dbt/task/compile.py index e4469f39728..cd4ec3d726a 100644 --- a/core/dbt/task/compile.py +++ b/core/dbt/task/compile.py @@ -1,7 +1,7 @@ from .runnable import GraphRunnableTask from .base import BaseRunner -from dbt.contracts.results import RunModelResult +from dbt.contracts.results import RunStatus, RunModelResult from dbt.exceptions import InternalException from dbt.graph import ResourceTypeSelector, SelectionSpec, parse_difference from dbt.logger import print_timestamped_line @@ -16,7 +16,15 @@ def after_execute(self, result): pass def execute(self, compiled_node, manifest): - return RunModelResult(compiled_node) + # TODO(kw) need to think about what to return here + return RunModelResult( + node=compiled_node, + status=RunStatus.Success, + timing=[], + thread_id="asdf", + execution_time=0, + message=None, + ) def compile(self, manifest): compiler = self.adapter.get_compiler() diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 0c522b7c9fd..f3deb051282 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -1,7 +1,6 @@ import os import threading import time -from typing import Dict from .base import BaseRunner from .printer import ( @@ -14,7 +13,7 @@ from dbt.contracts.results import ( FreshnessExecutionResultArtifact, FreshnessResult, NodeStatus, - PartialNodeResult, RunResult, RunStatus, + PartialNodeResult, RunStatus, SourceFreshnessResult, ) from dbt.exceptions import RuntimeException, InternalException @@ -36,13 +35,6 @@ def on_skip(self): 'Freshness: nodes cannot be skipped!' ) - def get_result_status(self, result) -> Dict[str, str]: - if result.status == NodeStatus.Error: - return {'node_status': 'error', 'node_error': str(result.message)} - else: - # TODO(kw) I think this needs to be updated - return {'node_status': str(result.message)} - def before_execute(self): description = 'freshness of {0.source_name}.{0.name}'.format(self.node) print_start_line(description, self.node_index, self.num_nodes) @@ -58,7 +50,7 @@ def _build_run_result(self, node, start_time, error, status, timing_info, # TODO(kw): uhh not sure what type to return here return PartialNodeResult( node=node, - status="Asdf", + status=RunStatus.Success, # TODO(kw) fix this as well execution_time=execution_time, thread_id=thread_id, timing=timing_info, diff --git a/core/dbt/task/generate.py b/core/dbt/task/generate.py index 7f6c5253547..3a043ac4ff0 100644 --- a/core/dbt/task/generate.py +++ b/core/dbt/task/generate.py @@ -11,8 +11,8 @@ from dbt.contracts.graph.compiled import CompileResultNode from dbt.contracts.graph.manifest import Manifest from dbt.contracts.results import ( - NodeStatus, RunStatus, TableMetadata, CatalogTable, CatalogResults, Primitive, CatalogKey, - StatsItem, StatsDict, ColumnMetadata, CatalogArtifact + NodeStatus, TableMetadata, CatalogTable, CatalogResults, Primitive, + CatalogKey, StatsItem, StatsDict, ColumnMetadata, CatalogArtifact ) from dbt.exceptions import InternalException from dbt.include.global_project import DOCS_INDEX_FILE_PATH diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index 4dc12a6543b..4f3e2678848 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -198,8 +198,7 @@ def print_seed_result_line(result, schema_name: str, index: int, total: int): def print_freshness_result_line(result, index: int, total: int) -> None: - # TODO(kw) uhhhh - if result.status == NodeStatus.RuntimeError: + if result.status == NodeStatus.RuntimeErr: info = 'ERROR' color = ui.red logger_fn = logger.error @@ -223,11 +222,7 @@ def print_freshness_result_line(result, index: int, total: int) -> None: source_name = result.source_name table_name = result.table_name - msg = "{info} freshness of {source_name}.{table_name}".format( - info=info, - source_name=source_name, - table_name=table_name - ) + msg = f"{info} freshness of {source_name}.{table_name}" print_fancy_output_line( msg, @@ -277,7 +272,9 @@ def print_run_result_error( with TextOnly(): logger.info("") - if result.status == NodeStatus.Fail or (is_warning and result.status == NodeStatus.Warn): + if result.status == NodeStatus.Fail or ( + is_warning and result.status == NodeStatus.Warn + ): if is_warning: color = ui.yellow info = 'Warning' diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index bf708a8472d..0e6691abf80 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -105,8 +105,7 @@ def track_model_run(index, num_nodes, run_model_result): "index": index, "total": num_nodes, "execution_time": run_model_result.execution_time, - # TODO(kw) might need to update model run schema! - "run_status": run_model_result.message, + "run_status": run_model_result.status, "run_skipped": run_model_result.status == RunStatus.Skipped, "run_error": None, "model_materialization": run_model_result.node.get_materialization(), @@ -196,7 +195,6 @@ def _build_run_model_result(self, model, context): thread_id="asdf", execution_time=0, message=result.status, - agate_table=None ) def _materialization_relations( diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 4ec83193175..26e0101525f 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -287,7 +287,7 @@ def _handle_result(self, result): else: self.manifest.update_node(node) - if result.status is NodeStatus.Error: + if result.status == NodeStatus.Error: if is_ephemeral: cause = result else: diff --git a/test/unit/test_contracts_graph_unparsed.py b/test/unit/test_contracts_graph_unparsed.py index 32ae398bf3a..100b65020f6 100644 --- a/test/unit/test_contracts_graph_unparsed.py +++ b/test/unit/test_contracts_graph_unparsed.py @@ -4,11 +4,12 @@ from dbt.contracts.graph.unparsed import ( UnparsedNode, UnparsedRunHook, UnparsedMacro, Time, TimePeriod, - FreshnessStatus, FreshnessThreshold, Quoting, UnparsedSourceDefinition, + FreshnessThreshold, Quoting, UnparsedSourceDefinition, UnparsedSourceTableDefinition, UnparsedDocumentationFile, UnparsedColumn, UnparsedNodeUpdate, Docs, UnparsedExposure, MaturityType, ExposureOwner, ExposureType ) +from dbt.contracts.results import FreshnessStatus from dbt.node_types import NodeType from .utils import ContractTestCase @@ -192,7 +193,8 @@ def test_both(self): error_seconds = timedelta(days=3).total_seconds() warn_seconds = timedelta(days=1).total_seconds() pass_seconds = timedelta(hours=3).total_seconds() - self.assertEqual(threshold.status(error_seconds), FreshnessStatus.Error) + self.assertEqual(threshold.status( + error_seconds), FreshnessStatus.Error) self.assertEqual(threshold.status(warn_seconds), FreshnessStatus.Warn) self.assertEqual(threshold.status(pass_seconds), FreshnessStatus.Pass) pickle.loads(pickle.dumps(threshold)) @@ -214,7 +216,8 @@ def test_merged(self): error_seconds = timedelta(days=3).total_seconds() warn_seconds = timedelta(days=1).total_seconds() pass_seconds = timedelta(hours=3).total_seconds() - self.assertEqual(threshold.status(error_seconds), FreshnessStatus.Error) + self.assertEqual(threshold.status( + error_seconds), FreshnessStatus.Error) self.assertEqual(threshold.status(warn_seconds), FreshnessStatus.Warn) self.assertEqual(threshold.status(pass_seconds), FreshnessStatus.Pass) @@ -632,7 +635,8 @@ def test_ok_maturities(self): for maturity_allowed in (None, 'low', 'medium', 'high'): tst = self.get_ok_dict() tst['maturity'] = maturity_allowed - assert self.ContractType.from_dict(tst).maturity == maturity_allowed + assert self.ContractType.from_dict( + tst).maturity == maturity_allowed tst = self.get_ok_dict() del tst['maturity'] From 73f7fba79352476ac4da2e83f52d87f7b4297f44 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Tue, 1 Dec 2020 12:38:00 -0500 Subject: [PATCH 3/9] fix printing test status --- core/dbt/contracts/results.py | 2 +- core/dbt/rpc/task_handler.py | 4 ++ core/dbt/task/base.py | 2 +- core/dbt/task/printer.py | 22 +++++---- core/dbt/task/test.py | 2 +- test/integration/100_rpc_test/test_rpc.py | 57 ++++++++++++++--------- 6 files changed, 54 insertions(+), 35 deletions(-) diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index ad6a0c69fa4..0f35de456b5 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -72,7 +72,7 @@ class RunStatus(StrEnum): class TestStatus(StrEnum): - Success = NodeStatus.Success + Pass = NodeStatus.Pass Error = NodeStatus.Error Fail = NodeStatus.Fail Warn = NodeStatus.Warn diff --git a/core/dbt/rpc/task_handler.py b/core/dbt/rpc/task_handler.py index a195a1d491c..f2ed10a6d49 100644 --- a/core/dbt/rpc/task_handler.py +++ b/core/dbt/rpc/task_handler.py @@ -187,6 +187,7 @@ def get_results_context( class StateHandler: """A helper context manager to manage task handler state.""" + def __init__(self, task_handler: 'RequestTaskHandler') -> None: self.handler = task_handler @@ -248,6 +249,7 @@ class SetArgsStateHandler(StateHandler): """A state handler that does not touch state on success and does not execute the teardown """ + def handle_completed(self): pass @@ -257,6 +259,7 @@ def handle_teardown(self): class RequestTaskHandler(threading.Thread, TaskHandlerProtocol): """Handler for the single task triggered by a given jsonrpc request.""" + def __init__( self, manager: TaskManagerProtocol, @@ -400,6 +403,7 @@ def run(self): try: with StateHandler(self): self.result = self.get_result() + except (dbt.exceptions.Exception, RPCException): # we probably got an error after the RPC call ran (and it was # probably deps...). By now anyone who wanted to see it has seen it diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py index c78fea73b53..56013201865 100644 --- a/core/dbt/task/base.py +++ b/core/dbt/task/base.py @@ -249,7 +249,7 @@ def from_run_result(self, result, start_time, timing_info): return self._build_run_result( node=result.node, start_time=start_time, - status=RunStatus.Success, # TODO(kw) fix this! + status=result.status, timing_info=timing_info, message=result.message, agate_table=result.agate_table, diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index 4f3e2678848..e1df6537925 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -11,7 +11,9 @@ from dbt import ui from dbt import utils -from dbt.contracts.results import NodeResult, NodeStatus +from dbt.contracts.results import ( + FreshnessStatus, NodeResult, NodeStatus, TestStatus +) def print_fancy_output_line( @@ -117,20 +119,20 @@ def print_test_result_line( ) -> None: model = result.node - if result.status == NodeStatus.Error: + if result.status == TestStatus.Error: info = "ERROR" color = ui.red logger_fn = logger.error - elif result.status == NodeStatus.Success: + elif result.status == TestStatus.Pass: info = 'PASS' color = ui.green logger_fn = logger.info - elif result.status == NodeStatus.Warn: - info = 'WARN {}'.format(result.status) + elif result.status == TestStatus.Warn: + info = 'WARN {}'.format(result.message) color = ui.yellow logger_fn = logger.warning - elif result.status == NodeStatus.Fail: - info = 'FAIL {}'.format(result.status) + elif result.status == TestStatus.Fail: + info = 'FAIL {}'.format(result.message) color = ui.red logger_fn = logger.error else: @@ -198,15 +200,15 @@ def print_seed_result_line(result, schema_name: str, index: int, total: int): def print_freshness_result_line(result, index: int, total: int) -> None: - if result.status == NodeStatus.RuntimeErr: + if result.status == FreshnessStatus.RuntimeErr: info = 'ERROR' color = ui.red logger_fn = logger.error - elif result.status == NodeStatus.Error: + elif result.status == FreshnessStatus.Error: info = 'ERROR STALE' color = ui.red logger_fn = logger.error - elif result.status == NodeStatus.Warn: + elif result.status == FreshnessStatus.Warn: info = 'WARN' color = ui.yellow logger_fn = logger.warning diff --git a/core/dbt/task/test.py b/core/dbt/task/test.py index ae612980a7a..74cadcc1879 100644 --- a/core/dbt/task/test.py +++ b/core/dbt/task/test.py @@ -94,7 +94,7 @@ def execute(self, test: CompiledTestNode, manifest: Manifest): if failed_rows == 0: return RunModelResult( node=test, - status=TestStatus.Success, + status=TestStatus.Pass, timing=[], thread_id=thread_id, execution_time=0, diff --git a/test/integration/100_rpc_test/test_rpc.py b/test/integration/100_rpc_test/test_rpc.py index 42349b4b4d3..cccf60fc6d2 100644 --- a/test/integration/100_rpc_test/test_rpc.py +++ b/test/integration/100_rpc_test/test_rpc.py @@ -188,7 +188,8 @@ def poll_for_result(self, request_token, request_id=1, timeout=180, state='succe while True: time.sleep(0.5) - response = self.query('poll', _test_request_id=request_id, **kwargs) + response = self.query( + 'poll', _test_request_id=request_id, **kwargs) response_json = response.json() if 'error' in response_json: return response @@ -205,7 +206,8 @@ def poll_for_result(self, request_token, request_id=1, timeout=180, state='succe ) def async_query(self, _method, _sql=None, _test_request_id=1, _poll_timeout=180, macros=None, **kwargs): - response = self.query(_method, _sql, _test_request_id, macros, **kwargs).json() + response = self.query( + _method, _sql, _test_request_id, macros, **kwargs).json() result = self.assertIsResult(response, _test_request_id) self.assertIn('request_token', result) return self.poll_for_result( @@ -215,7 +217,8 @@ def async_query(self, _method, _sql=None, _test_request_id=1, _poll_timeout=180, ) def query(self, _method, _sql=None, _test_request_id=1, macros=None, **kwargs): - built = self.build_query(_method, kwargs, _sql, _test_request_id, macros) + built = self.build_query( + _method, kwargs, _sql, _test_request_id, macros) return query_url(self.url, built) def handle_result(self, bg_query, pipe): @@ -307,7 +310,8 @@ def assertHasErrorData(self, error, expected_error_data): return error_data def assertRunning(self, sleepers): - sleeper_ps_result = self.query('ps', completed=False, active=True).json() + sleeper_ps_result = self.query( + 'ps', completed=False, active=True).json() result = self.assertIsResult(sleeper_ps_result) self.assertEqual(len(result['rows']), len(sleepers)) result_map = {rd['request_id']: rd for rd in result['rows']} @@ -371,7 +375,8 @@ def wait_for_state( return status def run_command_with_id(self, cmd, id_): - self.assertIsResult(self.async_query(cmd, _test_request_id=id_).json(), id_) + self.assertIsResult(self.async_query( + cmd, _test_request_id=id_).json(), id_) def make_many_requests(self, num_requests): stored = [] @@ -424,7 +429,7 @@ def test_compile_sql_postgres(self): compiled_sql='select * from "{}"."{}"."source"'.format( self.default_database, self.unique_schema()) - ) + ) macro = self.async_query( 'compile_sql', @@ -612,11 +617,13 @@ def test_ps_kill_postgres(self): request_token, request_id = self.get_sleep_query() - empty_ps_result = self.query('ps', completed=False, active=False).json() + empty_ps_result = self.query( + 'ps', completed=False, active=False).json() result = self.assertIsResult(empty_ps_result) self.assertEqual(len(result['rows']), 0) - sleeper_ps_result = self.query('ps', completed=False, active=True).json() + sleeper_ps_result = self.query( + 'ps', completed=False, active=True).json() result = self.assertIsResult(sleeper_ps_result) self.assertEqual(len(result['rows']), 1) rowdict = result['rows'] @@ -628,7 +635,8 @@ def test_ps_kill_postgres(self): self.assertGreater(rowdict[0]['elapsed'], 0) self.assertIsNone(rowdict[0]['tags']) - complete_ps_result = self.query('ps', completed=True, active=False).json() + complete_ps_result = self.query( + 'ps', completed=True, active=False).json() result = self.assertIsResult(complete_ps_result) self.assertEqual(len(result['rows']), 1) rowdict = result['rows'] @@ -806,12 +814,9 @@ def assertHasTestResults(self, results, expected, pass_results=None): for result in results: # TODO: should this be included even when it's 'none'? Should # results have all these crazy keys? (no) - self.assertIn('fail', result) - if result['status'] == 0.0: - self.assertIsNone(result['fail']) + if result['status'] == "pass": passes += 1 - else: - self.assertTrue(result['fail']) + self.assertEqual(passes, pass_results) @use_profile('postgres') @@ -845,7 +850,8 @@ def test_compile_project_postgres(self): num_expected=11, ) - result = self.async_query('compile', models=['source:test_source+']).json() + result = self.async_query( + 'compile', models=['source:test_source+']).json() self.assertHasResults( result, {'descendant_model', 'multi_source_model'}, @@ -864,7 +870,8 @@ def test_compile_project_cli_postgres(self): num_expected=11, ) - result = self.async_query('cli_args', cli='compile --models=source:test_source+').json() + result = self.async_query( + 'cli_args', cli='compile --models=source:test_source+').json() self.assertHasResults( result, {'descendant_model', 'multi_source_model'}, @@ -876,13 +883,15 @@ def test_compile_project_cli_postgres(self): def test_run_project_postgres(self): result = self.async_query('run').json() assert 'args' in result['result'] - self.assertHasResults(result, {'descendant_model', 'multi_source_model', 'nonsource_descendant'}) + self.assertHasResults( + result, {'descendant_model', 'multi_source_model', 'nonsource_descendant'}) self.assertTablesEqual('multi_source_model', 'expected_multi_source') @use_profile('postgres') def test_run_project_cli_postgres(self): result = self.async_query('cli_args', cli='run').json() - self.assertHasResults(result, {'descendant_model', 'multi_source_model', 'nonsource_descendant'}) + self.assertHasResults( + result, {'descendant_model', 'multi_source_model', 'nonsource_descendant'}) self.assertTablesEqual('multi_source_model', 'expected_multi_source') @use_profile('postgres') @@ -991,7 +1000,8 @@ def test_sighup_postgres(self): self.assertIn('timestamp', status) - done_query = self.async_query('compile_sql', 'select 1 as id', name='done').json() + done_query = self.async_query( + 'compile_sql', 'select 1 as id', name='done').json() self.assertIsResult(done_query) sleepers = [] @@ -1058,7 +1068,8 @@ def test_gc_by_id_postgres(self): resp = self.query('gc', task_ids=stored[:num_requests//2]).json() result = self.assertIsResult(resp) self.assertEqual(len(result['deleted']), num_requests//2) - self.assertEqual(sorted(result['deleted']), sorted(stored[:num_requests//2])) + self.assertEqual(sorted(result['deleted']), + sorted(stored[:num_requests//2])) self.assertEqual(len(result['missing']), 0) self.assertEqual(len(result['running']), 0) # we should have total - what we removed still there @@ -1069,7 +1080,8 @@ def test_gc_by_id_postgres(self): resp = self.query('gc', task_ids=stored[num_requests//2:]).json() result = self.assertIsResult(resp) self.assertEqual(len(result['deleted']), num_requests//2) - self.assertEqual(sorted(result['deleted']), sorted(stored[num_requests//2:])) + self.assertEqual(sorted(result['deleted']), + sorted(stored[num_requests//2:])) self.assertEqual(len(result['missing']), 0) self.assertEqual(len(result['running']), 0) # all gone! @@ -1146,6 +1158,7 @@ def test_deps_cli_compilation_postgres(self): status = self._check_start_predeps() # do a dbt deps, wait for the result - self.assertIsResult(self.async_query('cli_args', cli='deps', _poll_timeout=180).json()) + self.assertIsResult(self.async_query( + 'cli_args', cli='deps', _poll_timeout=180).json()) self._check_deps_ok(status) From 8dd69efd48506254703b37cacd56dd847d02daad Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Fri, 4 Dec 2020 09:14:05 -0500 Subject: [PATCH 4/9] address test failures --- core/dbt/contracts/graph/model_config.py | 7 +- core/dbt/contracts/results.py | 6 +- core/dbt/task/freshness.py | 42 ++++++---- core/dbt/task/generate.py | 1 + core/dbt/task/printer.py | 12 ++- core/dbt/task/run.py | 6 +- core/dbt/task/runnable.py | 4 +- core/dbt/task/test.py | 37 ++++----- .../001_simple_copy_test/test_simple_copy.py | 2 +- .../test_schema_v2_tests.py | 56 ++++++------- .../009_data_tests_test/test_data_tests.py | 18 ++--- .../020_ephemeral_test/test_ephemeral.py | 49 ++++++------ .../test_bigquery_adapter_functions.py | 26 ++++--- .../test_bigquery_changing_partitions.py | 42 +++++----- .../test_bigquery_copy_failing_models.py | 2 +- .../test_bigquery_date_partitioning.py | 6 +- .../test_simple_bigquery_view.py | 10 +-- .../test_docs_generate.py | 78 ++++++------------- .../033_event_tracking_test/test_events.py | 22 +++--- .../test_simple_presto_view.py | 11 ++- .../042_sources_test/test_sources.py | 38 +++------ .../045_test_severity_tests/test_severity.py | 64 ++++++++------- 22 files changed, 254 insertions(+), 285 deletions(-) diff --git a/core/dbt/contracts/graph/model_config.py b/core/dbt/contracts/graph/model_config.py index d15440ff257..cca3c9f3ef0 100644 --- a/core/dbt/contracts/graph/model_config.py +++ b/core/dbt/contracts/graph/model_config.py @@ -2,8 +2,8 @@ from enum import Enum from itertools import chain from typing import ( - Any, List, Optional, Dict, MutableMapping, Union, Type, NewType, Tuple, - TypeVar, Callable + Any, Hashable, List, Optional, Dict, MutableMapping, Union, Type, NewType, + Tuple, TypeVar, Callable, cast ) # TODO: patch+upgrade hologram to avoid this jsonschema import @@ -493,7 +493,8 @@ def validate(cls, data: Any): to_validate = config else: - schema = _validate_schema(cls) + h_cls = cast(Hashable, cls) + schema = _validate_schema(h_cls) to_validate = data validator = jsonschema.Draft7Validator(schema) diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index 0f35de456b5..0cf15ce1a49 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -91,7 +91,7 @@ class BaseResult(JsonSchemaMixin): timing: List[TimingInfo] thread_id: str execution_time: float - message: Optional[str] + message: Optional[Union[str, int]] @dataclass @@ -236,7 +236,7 @@ class FreshnessErrorEnum(StrEnum): @dataclass class SourceFreshnessRuntimeError(JsonSchemaMixin): unique_id: str - error: str + error: Union[str, int] # TODO(kw) this is to fix mypy state: FreshnessErrorEnum @@ -265,7 +265,7 @@ def process_freshness_result( ) -> FreshnessNodeOutput: # TODO(kw) source freshness refactor unique_id = result.node.unique_id - if result.status is FreshnessStatus.RuntimeErr: + if result.status == FreshnessStatus.RuntimeErr: return SourceFreshnessRuntimeError( unique_id=unique_id, error=result.message or "", diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index f3deb051282..394d36733b6 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -12,16 +12,13 @@ from dbt.contracts.results import ( FreshnessExecutionResultArtifact, - FreshnessResult, NodeStatus, - PartialNodeResult, RunStatus, - SourceFreshnessResult, + FreshnessResult, NodeStatus, PartialSourceFreshnessResult, + SourceFreshnessResult, FreshnessStatus ) from dbt.exceptions import RuntimeException, InternalException from dbt.logger import print_timestamped_line from dbt.node_types import NodeType -from dbt import utils - from dbt.graph import NodeSelector, SelectionSpec, parse_difference from dbt.contracts.graph.parsed import ParsedSourceDefinition @@ -42,19 +39,32 @@ def before_execute(self): def after_execute(self, result): print_freshness_result_line(result, self.node_index, self.num_nodes) - def _build_run_result(self, node, start_time, error, status, timing_info, - skip=False, failed=None): + def error_result(self, node, message, start_time, timing_info): + return self._build_run_result( + node=node, + start_time=start_time, + status=FreshnessStatus.RuntimeErr, + timing_info=timing_info, + message=message, + ) + + def _build_run_result( + self, + node, + start_time, + status, + timing_info, + message + ): execution_time = time.time() - start_time thread_id = threading.current_thread().name - status = utils.lowercase(status) - # TODO(kw): uhh not sure what type to return here - return PartialNodeResult( - node=node, - status=RunStatus.Success, # TODO(kw) fix this as well - execution_time=execution_time, + return PartialSourceFreshnessResult( + status=status, thread_id=thread_id, + execution_time=execution_time, timing=timing_info, - message="" + message=message, + node=node, ) def from_run_result(self, result, start_time, timing_info): @@ -85,10 +95,14 @@ def execute(self, compiled_node, manifest): status = compiled_node.freshness.status(freshness['age']) + # TODO(kw) more cleanup :) return SourceFreshnessResult( node=compiled_node, status=status, thread_id=threading.current_thread().name, + timing=[], + execution_time=0, + message="", **freshness ) diff --git a/core/dbt/task/generate.py b/core/dbt/task/generate.py index 3a043ac4ff0..1358a82559a 100644 --- a/core/dbt/task/generate.py +++ b/core/dbt/task/generate.py @@ -211,6 +211,7 @@ def run(self) -> CatalogArtifact: compile_results = None if self.args.compile: compile_results = CompileTask.run(self) + # TODO(kw) not sure this is the right logic if any(r.status == NodeStatus.Error for r in compile_results): print_timestamped_line( 'compile failed, cannot generate docs' diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index e1df6537925..989e2c83e19 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -346,10 +346,14 @@ def print_end_of_run_summary( def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None: - # or r.fail] <- TODO(kw) do we need to handle fail? - errors = [r for r in results if r.status in ( - NodeStatus.Error, NodeStatus.Fail)] - warnings = [r for r in results if r.status == NodeStatus.Warn] + errors, warnings = [], [] + for r in results: + if (r.status in (NodeStatus.Error, NodeStatus.Fail) or + (r.status == NodeStatus.Skipped and r.message is not None)): + errors.append(r) + elif r.status == NodeStatus.Warn: + warnings.append(r) + with DbtStatusMessage(), InvocationProcessor(): print_end_of_run_summary(len(errors), len(warnings), diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 0e6691abf80..c90233cd985 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -105,9 +105,9 @@ def track_model_run(index, num_nodes, run_model_result): "index": index, "total": num_nodes, "execution_time": run_model_result.execution_time, - "run_status": run_model_result.status, - "run_skipped": run_model_result.status == RunStatus.Skipped, - "run_error": None, + "run_status": str(run_model_result.status).upper(), + "run_skipped": run_model_result.status == NodeStatus.Skipped, + "run_error": run_model_result.status == NodeStatus.Error, "model_materialization": run_model_result.node.get_materialization(), "model_id": utils.get_hash(run_model_result.node), "hashed_contents": utils.get_hashed_contents( diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 26e0101525f..edd264a7b97 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -438,8 +438,10 @@ def interpret_results(self, results): failures = [ r for r in results if r.status in ( + NodeStatus.RuntimeErr, NodeStatus.Error, - NodeStatus.Fail + NodeStatus.Fail, + NodeStatus.Skipped # propogate error message causing skip ) ] return len(failures) == 0 diff --git a/core/dbt/task/test.py b/core/dbt/task/test.py index 74cadcc1879..2f93c6fade4 100644 --- a/core/dbt/task/test.py +++ b/core/dbt/task/test.py @@ -91,33 +91,22 @@ def execute(self, test: CompiledTestNode, manifest: Manifest): severity = test.config.severity.upper() thread_id = threading.current_thread().name + status = None if failed_rows == 0: - return RunModelResult( - node=test, - status=TestStatus.Pass, - timing=[], - thread_id=thread_id, - execution_time=0, - message=None, - ) + status = TestStatus.Pass elif severity == 'ERROR' or flags.WARN_ERROR: - return RunModelResult( - node=test, - status=TestStatus.Fail, - timing=[], - thread_id=thread_id, - execution_time=0, - message=failed_rows, - ) + status = TestStatus.Fail else: - return RunModelResult( - node=test, - status=TestStatus.Warn, - timing=[], - thread_id=thread_id, - execution_time=0, - message=failed_rows, - ) + status = TestStatus.Warn + + return RunModelResult( + node=test, + status=status, + timing=[], + thread_id=thread_id, + execution_time=0, + message=failed_rows, + ) def after_execute(self, result): self.print_result_line(result) diff --git a/test/integration/001_simple_copy_test/test_simple_copy.py b/test/integration/001_simple_copy_test/test_simple_copy.py index 45725e0627f..acbe2e8fd92 100644 --- a/test/integration/001_simple_copy_test/test_simple_copy.py +++ b/test/integration/001_simple_copy_test/test_simple_copy.py @@ -101,7 +101,7 @@ def test__presto__simple_copy(self): self.assertEqual(len(results), 7) for result in results: if 'incremental' in result.node.name: - self.assertIn('not implemented for presto', result.error) + self.assertIn('not implemented for presto', result.message) self.assertManyTablesEqual(["seed", "view_model", "materialized"]) diff --git a/test/integration/008_schema_tests_test/test_schema_v2_tests.py b/test/integration/008_schema_tests_test/test_schema_v2_tests.py index 726d1a0f748..7a8b69854e2 100644 --- a/test/integration/008_schema_tests_test/test_schema_v2_tests.py +++ b/test/integration/008_schema_tests_test/test_schema_v2_tests.py @@ -27,19 +27,18 @@ def run_schema_validations(self): return test_task.run() def assertTestFailed(self, result): - self.assertIsNone(result.error) + self.assertEqual(result.status, "fail") self.assertFalse(result.skipped) self.assertTrue( - result.status > 0, + int(result.message) > 0, 'test {} did not fail'.format(result.node.name) ) def assertTestPassed(self, result): - self.assertIsNone(result.error) + self.assertEqual(result.status, "pass") self.assertFalse(result.skipped) - # status = # of failing rows self.assertEqual( - result.status, 0, + int(result.message), 0, 'test {} failed'.format(result.node.name) ) @@ -59,26 +58,29 @@ def test_postgres_schema_tests(self): else: self.assertTestPassed(result) - self.assertEqual(sum(x.status for x in test_results), 6) + self.assertEqual(sum(x.message for x in test_results), 6) @use_profile('postgres') def test_postgres_schema_test_selection(self): results = self.run_dbt() self.assertEqual(len(results), 5) - test_results = self.run_dbt(['test', '--models', 'tag:table_favorite_color']) - self.assertEqual(len(test_results), 5) # 1 in table_copy, 4 in table_summary + test_results = self.run_dbt( + ['test', '--models', 'tag:table_favorite_color']) + # 1 in table_copy, 4 in table_summary + self.assertEqual(len(test_results), 5) for result in test_results: self.assertTestPassed(result) - test_results = self.run_dbt(['test', '--models', 'tag:favorite_number_is_pi']) + test_results = self.run_dbt( + ['test', '--models', 'tag:favorite_number_is_pi']) self.assertEqual(len(test_results), 1) self.assertTestPassed(test_results[0]) - test_results = self.run_dbt(['test', '--models', 'tag:table_copy_favorite_color']) + test_results = self.run_dbt( + ['test', '--models', 'tag:table_copy_favorite_color']) self.assertEqual(len(test_results), 1) self.assertTestPassed(test_results[0]) - @use_profile('postgres') def test_postgres_schema_test_exclude_failures(self): results = self.run_dbt() @@ -88,7 +90,8 @@ def test_postgres_schema_test_exclude_failures(self): self.assertEqual(len(test_results), 13) for result in test_results: self.assertTestPassed(result) - test_results = self.run_dbt(['test', '--models', 'tag:xfail'], expect_pass=False) + test_results = self.run_dbt( + ['test', '--models', 'tag:xfail'], expect_pass=False) self.assertEqual(len(test_results), 6) for result in test_results: self.assertTestFailed(result) @@ -160,12 +163,12 @@ def test_postgres_malformed_macro_reports_error(self): self.assertEqual(len(test_results), 2) for result in test_results: - self.assertTrue(result.error is not None or result.fail) + self.assertIn(result.status, ('error', 'fail')) # Assert that error is thrown for empty schema test - if result.error is not None: - self.assertIn("Returned 0 rows", result.error) + if result.status == "error": + self.assertIn("Returned 0 rows", result.message) # Assert that failure occurs for normal schema test - elif result.fail: + elif result.status == "fail": self.assertIn(expected_failure, result.node.name) @@ -194,11 +197,10 @@ def test_postgres_hooks_dont_run_for_tests(self): results = self.run_dbt(['test', '--model', 'ephemeral']) self.assertEqual(len(results), 1) for result in results: - self.assertIsNone(result.error) + self.assertEqual(result.status, "pass") self.assertFalse(result.skipped) - # status = # of failing rows self.assertEqual( - result.status, 0, + int(result.message), 0, 'test {} failed'.format(result.node.name) ) @@ -259,9 +261,9 @@ def test_postgres_schema_tests(self): expected_failures = ['unique', 'every_value_is_blue'] for result in test_results: - if result.error is not None: + if result.status == 'error': self.assertTrue(result.node['name'] in expected_failures) - self.assertEqual(sum(x.status for x in test_results), 52) + self.assertEqual(sum(x.message for x in test_results), 52) class TestBQSchemaTests(DBTIntegrationTest): @@ -296,24 +298,22 @@ def test_schema_tests_bigquery(self): for result in test_results: # assert that all deliberately failing tests actually fail if 'failure' in result.node.name: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'fail') self.assertFalse(result.skipped) self.assertTrue( - result.status > 0, + int(result.message) > 0, 'test {} did not fail'.format(result.node.name) ) - # assert that actual tests pass else: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'pass') self.assertFalse(result.skipped) - # status = # of failing rows self.assertEqual( - result.status, 0, + int(result.message), 0, 'test {} failed'.format(result.node.name) ) - self.assertEqual(sum(x.status for x in test_results), 0) + self.assertEqual(sum(x.message for x in test_results), 0) class TestQuotedSchemaTestColumns(DBTIntegrationTest): diff --git a/test/integration/009_data_tests_test/test_data_tests.py b/test/integration/009_data_tests_test/test_data_tests.py index c255c11932d..0e924a046d0 100644 --- a/test/integration/009_data_tests_test/test_data_tests.py +++ b/test/integration/009_data_tests_test/test_data_tests.py @@ -43,16 +43,15 @@ def test_postgres_data_tests(self): for result in test_results: # assert that all deliberately failing tests actually fail if 'fail' in result.node.name: - self.assertIsNone(result.error) + self.assertEqual(result.status, "fail") self.assertFalse(result.skipped) - self.assertTrue(result.status > 0) + self.assertTrue(int(result.message) > 0) # assert that actual tests pass else: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'pass') self.assertFalse(result.skipped) - # status = # of failing rows - self.assertEqual(result.status, 0) + self.assertEqual(int(result.message), 0) # check that all tests were run defined_tests = os.listdir(self.test_path) @@ -72,13 +71,12 @@ def test_snowflake_data_tests(self): for result in test_results: # assert that all deliberately failing tests actually fail if 'fail' in result.node.name: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'fail') self.assertFalse(result.skipped) - self.assertTrue(result.status > 0) + self.assertTrue(int(result.message) > 0) # assert that actual tests pass else: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'pass') self.assertFalse(result.skipped) - # status = # of failing rows - self.assertEqual(result.status, 0) + self.assertEqual(int(result.message), 0) diff --git a/test/integration/020_ephemeral_test/test_ephemeral.py b/test/integration/020_ephemeral_test/test_ephemeral.py index 6bf5277a48d..1419d6c8f13 100644 --- a/test/integration/020_ephemeral_test/test_ephemeral.py +++ b/test/integration/020_ephemeral_test/test_ephemeral.py @@ -22,26 +22,26 @@ def test__postgres(self): self.assertTablesEqual("seed", "dependent") self.assertTablesEqual("seed", "double_dependent") self.assertTablesEqual("seed", "super_dependent") - self.assertTrue(os.path.exists('./target/run/test/models/double_dependent.sql')) + self.assertTrue(os.path.exists( + './target/run/test/models/double_dependent.sql')) with open('./target/run/test/models/double_dependent.sql', 'r') as fp: sql_file = fp.read() sql_file = re.sub(r'\d+', '', sql_file) expected_sql = ('create view "dbt"."test_ephemeral_"."double_dependent__dbt_tmp" as (' - 'with __dbt__CTE__base as (' - 'select * from test_ephemeral_.seed' - '), __dbt__CTE__base_copy as (' - 'select * from __dbt__CTE__base' - ')-- base_copy just pulls from base. Make sure the listed' - '-- graph of CTEs all share the same dbt_cte__base cte' - "select * from __dbt__CTE__base where gender = 'Male'" - 'union all' - "select * from __dbt__CTE__base_copy where gender = 'Female'" - ');') + 'with __dbt__CTE__base as (' + 'select * from test_ephemeral_.seed' + '), __dbt__CTE__base_copy as (' + 'select * from __dbt__CTE__base' + ')-- base_copy just pulls from base. Make sure the listed' + '-- graph of CTEs all share the same dbt_cte__base cte' + "select * from __dbt__CTE__base where gender = 'Male'" + 'union all' + "select * from __dbt__CTE__base_copy where gender = 'Female'" + ');') sql_file = "".join(sql_file.split()) expected_sql = "".join(expected_sql.split()) - self.assertEqual ( sql_file, expected_sql ) - + self.assertEqual(sql_file, expected_sql) @use_profile('snowflake') def test__snowflake(self): @@ -70,24 +70,26 @@ def test__postgres(self): results = self.run_dbt() self.assertEqual(len(results), 2) - self.assertTrue(os.path.exists('./target/run/test/models-n/root_view.sql')) + self.assertTrue(os.path.exists( + './target/run/test/models-n/root_view.sql')) with open('./target/run/test/models-n/root_view.sql', 'r') as fp: sql_file = fp.read() sql_file = re.sub(r'\d+', '', sql_file) expected_sql = ( - 'create view "dbt"."test_ephemeral_"."root_view__dbt_tmp" as (' - 'with __dbt__CTE__ephemeral_level_two as (' - 'select * from "dbt"."test_ephemeral_"."source_table"' - '), __dbt__CTE__ephemeral as (' - 'select * from __dbt__CTE__ephemeral_level_two' - ')select * from __dbt__CTE__ephemeral' - ');') + 'create view "dbt"."test_ephemeral_"."root_view__dbt_tmp" as (' + 'with __dbt__CTE__ephemeral_level_two as (' + 'select * from "dbt"."test_ephemeral_"."source_table"' + '), __dbt__CTE__ephemeral as (' + 'select * from __dbt__CTE__ephemeral_level_two' + ')select * from __dbt__CTE__ephemeral' + ');') sql_file = "".join(sql_file.split()) expected_sql = "".join(expected_sql.split()) - self.assertEqual ( sql_file, expected_sql ) + self.assertEqual(sql_file, expected_sql) + class TestEphemeralErrorHandling(DBTIntegrationTest): @property @@ -104,4 +106,5 @@ def test__postgres_upstream_error(self): results = self.run_dbt(expect_pass=False) self.assertEqual(len(results), 1) - self.assertTrue(results[0].error is not None) + self.assertEqual(results[0].status, 'skipped') + self.assertIn('Compilation Error', results[0].message) diff --git a/test/integration/022_bigquery_test/test_bigquery_adapter_functions.py b/test/integration/022_bigquery_test/test_bigquery_adapter_functions.py index 35dddf44755..d5a60da6935 100644 --- a/test/integration/022_bigquery_test/test_bigquery_adapter_functions.py +++ b/test/integration/022_bigquery_test/test_bigquery_adapter_functions.py @@ -25,10 +25,9 @@ def test__bigquery_adapter_functions(self): self.assertTrue(len(test_results) > 0) for result in test_results: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'pass') self.assertFalse(result.skipped) - # status = # of failing rows - self.assertEqual(result.status, 0) + self.assertEqual(int(result.message), 0) class TestBigqueryAdapterMacros(DBTIntegrationTest): @@ -50,22 +49,29 @@ def test__bigquery_run_create_drop_schema(self): 'db_name': self.default_database, 'schema_name': self.unique_schema(), }) - self.run_dbt(['run-operation', 'my_create_schema', '--args', schema_args]) + self.run_dbt( + ['run-operation', 'my_create_schema', '--args', schema_args]) relation_args = yaml.safe_dump({ 'db_name': self.default_database, 'schema_name': self.unique_schema(), 'table_name': 'some_table', }) - self.run_dbt(['run-operation', 'my_create_table_as', '--args', relation_args]) + self.run_dbt(['run-operation', 'my_create_table_as', + '--args', relation_args]) # exercise list_relations_without_caching and get_columns_in_relation - self.run_dbt(['run-operation', 'ensure_one_relation_in', '--args', schema_args]) + self.run_dbt( + ['run-operation', 'ensure_one_relation_in', '--args', schema_args]) # now to drop the schema - schema_relation = self.adapter.Relation.create(database=self.default_database, schema=self.unique_schema()).without_identifier() + schema_relation = self.adapter.Relation.create( + database=self.default_database, schema=self.unique_schema()).without_identifier() with self.adapter.connection_named('test'): - results = self.adapter.list_relations_without_caching(schema_relation) + results = self.adapter.list_relations_without_caching( + schema_relation) assert len(results) == 1 - self.run_dbt(['run-operation', 'my_drop_schema', '--args', schema_args]) + self.run_dbt( + ['run-operation', 'my_drop_schema', '--args', schema_args]) with self.adapter.connection_named('test'): - results = self.adapter.list_relations_without_caching(schema_relation) + results = self.adapter.list_relations_without_caching( + schema_relation) assert len(results) == 0 diff --git a/test/integration/022_bigquery_test/test_bigquery_changing_partitions.py b/test/integration/022_bigquery_test/test_bigquery_changing_partitions.py index c1a1767ee42..a3b35a69034 100644 --- a/test/integration/022_bigquery_test/test_bigquery_changing_partitions.py +++ b/test/integration/022_bigquery_test/test_bigquery_changing_partitions.py @@ -23,15 +23,15 @@ def test_partitions(self, expected): test_results = self.run_dbt(['test', '--vars', json.dumps(expected)]) for result in test_results: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'pass') self.assertFalse(result.skipped) - # status = # of failing rows - self.assertEqual(result.status, 0) + self.assertEqual(int(result.message), 0) @use_profile('bigquery') def test_bigquery_add_partition(self): before = {"partition_by": None, "cluster_by": None} - after = {"partition_by": {'field': 'cur_time', 'data_type': 'timestamp'}, "cluster_by": None} + after = {"partition_by": {'field': 'cur_time', + 'data_type': 'timestamp'}, "cluster_by": None} self.run_changes(before, after) self.test_partitions({"expected": 1}) @@ -56,22 +56,17 @@ def test_bigquery_add_partition_hour(self): self.run_changes(before, after) self.test_partitions({"expected": 1}) - @use_profile('bigquery') - def test_bigquery_add_partition_hour(self): - before = {"partition_by": {'field': 'cur_time', 'data_type': 'timestamp', 'granularity': 'day'}, "cluster_by": None} - after = {"partition_by": {'field': 'cur_time', 'data_type': 'timestamp', 'granularity': 'hour'}, "cluster_by": None} - self.run_changes(before, after) - self.test_partitions({"expected": 1}) - @use_profile('bigquery') def test_bigquery_remove_partition(self): - before = {"partition_by": {'field': 'cur_time', 'data_type': 'timestamp'}, "cluster_by": None} + before = {"partition_by": {'field': 'cur_time', + 'data_type': 'timestamp'}, "cluster_by": None} after = {"partition_by": None, "cluster_by": None} self.run_changes(before, after) @use_profile('bigquery') def test_bigquery_change_partitions(self): - before = {"partition_by": {'field': 'cur_time', 'data_type': 'timestamp'}, "cluster_by": None} + before = {"partition_by": {'field': 'cur_time', + 'data_type': 'timestamp'}, "cluster_by": None} after = {"partition_by": {'field': "cur_date"}, "cluster_by": None} self.run_changes(before, after) self.test_partitions({"expected": 1}) @@ -80,8 +75,10 @@ def test_bigquery_change_partitions(self): @use_profile('bigquery') def test_bigquery_change_partitions_from_int(self): - before = {"partition_by": {"field": "id", "data_type": "int64", "range": {"start": 0, "end": 10, "interval": 1}}, "cluster_by": None} - after = {"partition_by": {"field": "cur_date", "data_type": "date"}, "cluster_by": None} + before = {"partition_by": {"field": "id", "data_type": "int64", "range": { + "start": 0, "end": 10, "interval": 1}}, "cluster_by": None} + after = {"partition_by": {"field": "cur_date", + "data_type": "date"}, "cluster_by": None} self.run_changes(before, after) self.test_partitions({"expected": 1}) self.run_changes(after, before) @@ -89,24 +86,29 @@ def test_bigquery_change_partitions_from_int(self): @use_profile('bigquery') def test_bigquery_add_clustering(self): - before = {"partition_by": {'field': 'cur_time', 'data_type': 'timestamp'}, "cluster_by": None} + before = {"partition_by": {'field': 'cur_time', + 'data_type': 'timestamp'}, "cluster_by": None} after = {"partition_by": {'field': "cur_date"}, "cluster_by": "id"} self.run_changes(before, after) @use_profile('bigquery') def test_bigquery_remove_clustering(self): - before = {"partition_by": {'field': 'cur_time', 'data_type': 'timestamp'}, "cluster_by": "id"} + before = {"partition_by": {'field': 'cur_time', + 'data_type': 'timestamp'}, "cluster_by": "id"} after = {"partition_by": {'field': "cur_date"}, "cluster_by": None} self.run_changes(before, after) @use_profile('bigquery') def test_bigquery_change_clustering(self): - before = {"partition_by": {'field': 'cur_time', 'data_type': 'timestamp'}, "cluster_by": "id"} + before = {"partition_by": {'field': 'cur_time', + 'data_type': 'timestamp'}, "cluster_by": "id"} after = {"partition_by": {'field': "cur_date"}, "cluster_by": "name"} self.run_changes(before, after) @use_profile('bigquery') def test_bigquery_change_clustering_strict(self): - before = {'partition_by': {'field': 'cur_time', 'data_type': 'timestamp'}, 'cluster_by': 'id'} - after = {'partition_by': {'field': 'cur_date', 'data_type': 'date'}, 'cluster_by': 'name'} + before = {'partition_by': {'field': 'cur_time', + 'data_type': 'timestamp'}, 'cluster_by': 'id'} + after = {'partition_by': {'field': 'cur_date', + 'data_type': 'date'}, 'cluster_by': 'name'} self.run_changes(before, after) diff --git a/test/integration/022_bigquery_test/test_bigquery_copy_failing_models.py b/test/integration/022_bigquery_test/test_bigquery_copy_failing_models.py index a3cd8877200..da5316da8cc 100644 --- a/test/integration/022_bigquery_test/test_bigquery_copy_failing_models.py +++ b/test/integration/022_bigquery_test/test_bigquery_copy_failing_models.py @@ -33,4 +33,4 @@ def project_config(self): def test__bigquery_copy_table_fails(self): results = self.run_dbt(expect_pass=False) self.assertEqual(len(results), 2) - self.assertTrue(results[1].error) + self.assertEqual(results[1].status, 'error') diff --git a/test/integration/022_bigquery_test/test_bigquery_date_partitioning.py b/test/integration/022_bigquery_test/test_bigquery_date_partitioning.py index ae7781ef1a8..e005a1cfd95 100644 --- a/test/integration/022_bigquery_test/test_bigquery_date_partitioning.py +++ b/test/integration/022_bigquery_test/test_bigquery_date_partitioning.py @@ -41,7 +41,7 @@ def test__bigquery_date_partitioning(self): self.assertTrue(len(test_results) > 0) for result in test_results: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'pass') self.assertFalse(result.skipped) - # status = # of failing rows - self.assertEqual(result.status, 0) + # message = # of failing rows + self.assertEqual(int(result.message), 0) diff --git a/test/integration/022_bigquery_test/test_simple_bigquery_view.py b/test/integration/022_bigquery_test/test_simple_bigquery_view.py index 690955862a3..d290df061ef 100644 --- a/test/integration/022_bigquery_test/test_simple_bigquery_view.py +++ b/test/integration/022_bigquery_test/test_simple_bigquery_view.py @@ -34,16 +34,16 @@ def assert_nondupes_pass(self): for result in test_results: if 'dupe' in result.node.name: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'fail') self.assertFalse(result.skipped) - self.assertTrue(result.status > 0) + self.assertTrue(int(result.message) > 0) # assert that actual tests pass else: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'pass') self.assertFalse(result.skipped) - # status = # of failing rows - self.assertEqual(result.status, 0) + # message = # of failing rows + self.assertEqual(int(result.message), 0) class TestSimpleBigQueryRun(TestBaseBigQueryRun): diff --git a/test/integration/029_docs_generate_tests/test_docs_generate.py b/test/integration/029_docs_generate_tests/test_docs_generate.py index c597fd097d7..a8281871fe3 100644 --- a/test/integration/029_docs_generate_tests/test_docs_generate.py +++ b/test/integration/029_docs_generate_tests/test_docs_generate.py @@ -2974,10 +2974,9 @@ def expected_run_results(self, quote_schema=True, quote_model=False, return [ { - 'error': None, + 'status': 'success', + 'message': None, 'execution_time': AnyFloat(), - 'fail': None, - 'warn': None, 'node': { 'alias': 'model', 'build_path': Normalized( @@ -3061,14 +3060,11 @@ def expected_run_results(self, quote_schema=True, quote_model=False, }, 'thread_id': ANY, 'timing': [ANY, ANY], - 'skip': False, - 'status': None, }, { - 'error': None, + 'status': 'success', + 'message': None, 'execution_time': AnyFloat(), - 'fail': None, - 'warn': None, 'node': { 'alias': 'second_model', 'build_path': Normalized( @@ -3153,14 +3149,11 @@ def expected_run_results(self, quote_schema=True, quote_model=False, }, 'thread_id': ANY, 'timing': [ANY, ANY], - 'skip': False, - 'status': None, }, { - 'error': None, + 'status': 'success', + 'message': None, 'execution_time': AnyFloat(), - 'fail': None, - 'warn': None, 'node': { 'alias': 'seed', 'build_path': None, @@ -3239,14 +3232,11 @@ def expected_run_results(self, quote_schema=True, quote_model=False, }, 'thread_id': ANY, 'timing': [ANY, ANY], - 'skip': False, - 'status': None, }, { - 'error': None, - 'warn': None, + 'status': 'success', + 'message': None, 'execution_time': AnyFloat(), - 'fail': None, 'node': { 'alias': 'snapshot_seed', 'build_path': None, @@ -3294,14 +3284,11 @@ def expected_run_results(self, quote_schema=True, quote_model=False, }, 'thread_id': ANY, 'timing': [ANY, ANY], - 'skip': False, - 'status': None, }, { - 'error': None, + 'status': 'success', + 'message': None, 'execution_time': AnyFloat(), - 'fail': None, - 'warn': None, 'node': { 'alias': 'not_null_model_id', 'build_path': Normalized('target/compiled/test/models/schema.yml/schema_test/not_null_model_id.sql'), @@ -3349,14 +3336,11 @@ def expected_run_results(self, quote_schema=True, quote_model=False, }, 'thread_id': ANY, 'timing': [ANY, ANY], - 'skip': False, - 'status': None, }, { - 'error': None, + 'status': 'success', + 'message': None, 'execution_time': AnyFloat(), - 'fail': None, - 'warn': None, 'node': { 'alias': 'test_nothing_model_', 'build_path': Normalized('target/compiled/test/models/schema.yml/schema_test/test_nothing_model_.sql'), @@ -3403,14 +3387,11 @@ def expected_run_results(self, quote_schema=True, quote_model=False, }, 'thread_id': ANY, 'timing': [ANY, ANY], - 'skip': False, - 'status': None }, { - 'error': None, + 'status': 'success', + 'message': None, 'execution_time': AnyFloat(), - 'fail': None, - 'warn': None, 'node': { 'alias': 'unique_model_id', 'build_path': Normalized('target/compiled/test/models/schema.yml/schema_test/unique_model_id.sql'), @@ -3458,8 +3439,6 @@ def expected_run_results(self, quote_schema=True, quote_model=False, }, 'thread_id': ANY, 'timing': [ANY, ANY], - 'skip': False, - 'status': None, }, ] @@ -3492,10 +3471,9 @@ def expected_postgres_references_run_results(self): return [ { - 'error': None, + 'status': 'success', + 'message': None, 'execution_time': AnyFloat(), - 'fail': None, - 'warn': None, 'node': { 'alias': 'ephemeral_summary', 'build_path': Normalized( @@ -3564,14 +3542,11 @@ def expected_postgres_references_run_results(self): }, 'thread_id': ANY, 'timing': [ANY, ANY], - 'skip': False, - 'status': None, }, { - 'error': None, + 'status': 'success', + 'message': None, 'execution_time': AnyFloat(), - 'fail': None, - 'warn': None, 'node': { 'alias': 'view_summary', 'build_path': Normalized( @@ -3639,14 +3614,11 @@ def expected_postgres_references_run_results(self): }, 'thread_id': ANY, 'timing': [ANY, ANY], - 'skip': False, - 'status': None, }, { - 'error': None, + 'status': 'success', + 'message': None, 'execution_time': AnyFloat(), - 'fail': None, - 'warn': None, 'node': { 'alias': 'seed', 'build_path': None, @@ -3725,14 +3697,11 @@ def expected_postgres_references_run_results(self): }, 'thread_id': ANY, 'timing': [ANY, ANY], - 'skip': False, - 'status': None, }, { - 'error': None, - 'warn': None, + 'status': 'success', + 'message': None, 'execution_time': AnyFloat(), - 'fail': None, 'node': { 'alias': 'snapshot_seed', 'build_path': None, @@ -3777,14 +3746,11 @@ def expected_postgres_references_run_results(self): }, 'thread_id': ANY, 'timing': [ANY, ANY], - 'skip': False, - 'status': None, }, ] def verify_run_results(self, expected_run_results): run_results = _read_json('./target/run_results.json') - assert 'metadata' in run_results self.verify_metadata( run_results['metadata'], 'https://schemas.getdbt.com/dbt/run-results/v1.json') diff --git a/test/integration/033_event_tracking_test/test_events.py b/test/integration/033_event_tracking_test/test_events.py index 95883ce6f2c..3376fe7ad5d 100644 --- a/test/integration/033_event_tracking_test/test_events.py +++ b/test/integration/033_event_tracking_test/test_events.py @@ -126,16 +126,16 @@ def populate( 'project_id': project_id, 'user_id': user_id, 'invocation_id': invocation_id, + 'command': command, + 'options': None, # TODO : Add options to compile cmd! 'version': version, - 'command': command, - 'progress': progress, 'run_type': 'regular', + 'adapter_type': adapter_type, + 'progress': progress, - 'options': None, # TODO : Add options to compile cmd! 'result_type': result_type, 'result': None, - 'adapter_type': adapter_type } }, { @@ -158,12 +158,14 @@ def run_context( index, total, status, - error=None ): timing = [] + error = False if status != 'ERROR': timing = [ANY, ANY] + else: + error = True def populate(project_id, user_id, invocation_id, version): return [{ @@ -322,8 +324,8 @@ def seed_context(project_id, user_id, invocation_id, version): 'index': 1, 'total': 1, - 'run_status': 'INSERT 1', - 'run_error': None, + 'run_status': 'SUCCESS', + 'run_error': False, 'run_skipped': False, 'timing': [ANY, ANY], @@ -415,7 +417,7 @@ def test__postgres_event_tracking_models(self): model_id='4fbacae0e1b69924b22964b457148fb8', index=1, total=2, - status='CREATE VIEW', + status='SUCCESS', materialization='view' ), self.run_context( @@ -423,7 +425,7 @@ def test__postgres_event_tracking_models(self): model_id='57994a805249953b31b738b1af7a1eeb', index=2, total=2, - status='CREATE VIEW', + status='SUCCESS', materialization='view' ), self.build_context('run', 'end', result_type='ok') @@ -687,7 +689,7 @@ def test__postgres_event_tracking_snapshot(self): model_id='820793a4def8d8a38d109a9709374849', index=1, total=1, - status='SELECT 1', + status='SUCCESS', materialization='snapshot' ), self.build_context('snapshot', 'end', result_type='ok') diff --git a/test/integration/041_presto_test/test_simple_presto_view.py b/test/integration/041_presto_test/test_simple_presto_view.py index 44190e6b122..b1b7849b7b8 100644 --- a/test/integration/041_presto_test/test_simple_presto_view.py +++ b/test/integration/041_presto_test/test_simple_presto_view.py @@ -34,16 +34,16 @@ def assert_nondupes_pass(self): for result in test_results: if 'dupe' in result.node.name: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'fail') self.assertFalse(result.skipped) - self.assertTrue(result.status > 0) + self.assertTrue(int(result.message) > 0) # assert that actual tests pass else: - self.assertIsNone(result.error) + self.assertEqual(result.status, 'pass') self.assertFalse(result.skipped) - # status = # of failing rows - self.assertEqual(result.status, 0) + # message = # of failing rows + self.assertEqual(int(result.message), 0) class TestSimplePrestoRun(TestBasePrestoRun): @@ -74,4 +74,3 @@ def test_presto_run_twice(self): results = self.run_dbt() self.assertEqual(len(results), 2) self.assert_nondupes_pass() - diff --git a/test/integration/042_sources_test/test_sources.py b/test/integration/042_sources_test/test_sources.py index 250079f87ae..3d7012f5fc1 100644 --- a/test/integration/042_sources_test/test_sources.py +++ b/test/integration/042_sources_test/test_sources.py @@ -76,7 +76,8 @@ def _set_updated_at_to(self, delta): quoted_columns = ','.join( self.adapter.quote(c) if self.adapter_type != 'bigquery' else c for c in - ('favorite_color', 'id', 'first_name', 'email', 'ip_address', 'updated_at') + ('favorite_color', 'id', 'first_name', + 'email', 'ip_address', 'updated_at') ) self.run_sql( raw_sql, @@ -88,7 +89,8 @@ def _set_updated_at_to(self, delta): 'quoted_columns': quoted_columns, } ) - self.last_inserted_time = insert_time.strftime("%Y-%m-%dT%H:%M:%S+00:00") + self.last_inserted_time = insert_time.strftime( + "%Y-%m-%dT%H:%M:%S+00:00") class TestSources(SuccessfulSourcesTest): @@ -254,7 +256,6 @@ def _assert_freshness_results(self, path, state): key = key.upper() assert data['metadata']['env'] == {key: 'value'} - last_inserted_time = self.last_inserted_time self.assertEqual(len(data['results']), 1) @@ -285,8 +286,6 @@ def _run_source_freshness(self): ) self.assertEqual(len(results), 1) self.assertEqual(results[0].status, 'error') - self.assertTrue(results[0].fail) - self.assertIsNone(results[0].error) self._assert_freshness_results('target/error_source.json', 'error') self._set_updated_at_to(timedelta(hours=-12)) @@ -296,8 +295,6 @@ def _run_source_freshness(self): ) self.assertEqual(len(results), 1) self.assertEqual(results[0].status, 'warn') - self.assertFalse(results[0].fail) - self.assertIsNone(results[0].error) self._assert_freshness_results('target/warn_source.json', 'warn') self._set_updated_at_to(timedelta(hours=-2)) @@ -307,8 +304,6 @@ def _run_source_freshness(self): ) self.assertEqual(len(results), 1) self.assertEqual(results[0].status, 'pass') - self.assertFalse(results[0].fail) - self.assertIsNone(results[0].error) self._assert_freshness_results('target/pass_source.json', 'pass') @use_profile('postgres') @@ -340,9 +335,7 @@ def test_postgres_error(self): expect_pass=False ) self.assertEqual(len(results), 1) - self.assertEqual(results[0].status, 'error') - self.assertFalse(results[0].fail) - self.assertIsNotNone(results[0].error) + self.assertEqual(results[0].status, 'runtime error') class TestSourceFreshnessFilter(SuccessfulSourcesTest): @@ -350,31 +343,22 @@ class TestSourceFreshnessFilter(SuccessfulSourcesTest): def models(self): return 'filtered_models' - def assert_source_freshness_passed(self, results): - self.assertEqual(len(results), 1) - self.assertEqual(results[0].status, 'pass') - self.assertFalse(results[0].fail) - self.assertIsNone(results[0].error) - - def assert_source_freshness_failed(self, results): - self.assertEqual(len(results), 1) - self.assertEqual(results[0].status, 'error') - self.assertTrue(results[0].fail) - self.assertIsNone(results[0].error) - @use_profile('postgres') def test_postgres_all_records(self): # all records are filtered out - self.run_dbt_with_vars(['source', 'snapshot-freshness'], expect_pass=False) + self.run_dbt_with_vars( + ['source', 'snapshot-freshness'], expect_pass=False) # we should insert a record with #101 that's fresh, but will still fail # because the filter excludes it self._set_updated_at_to(timedelta(hours=-2)) - self.run_dbt_with_vars(['source', 'snapshot-freshness'], expect_pass=False) + self.run_dbt_with_vars( + ['source', 'snapshot-freshness'], expect_pass=False) # we should now insert a record with #102 that's fresh, and the filter # includes it self._set_updated_at_to(timedelta(hours=-2)) - results = self.run_dbt_with_vars(['source', 'snapshot-freshness'], expect_pass=True) + results = self.run_dbt_with_vars( + ['source', 'snapshot-freshness'], expect_pass=True) class TestMalformedSources(BaseSourcesTest): diff --git a/test/integration/045_test_severity_tests/test_severity.py b/test/integration/045_test_severity_tests/test_severity.py index 66f2a2dbe9e..8dadf35cf91 100644 --- a/test/integration/045_test_severity_tests/test_severity.py +++ b/test/integration/045_test_severity_tests/test_severity.py @@ -1,5 +1,6 @@ from test.integration.base import DBTIntegrationTest, use_profile + class TestSeverity(DBTIntegrationTest): @property def schema(self): @@ -29,67 +30,64 @@ def run_dbt_with_vars(self, cmd, strict_var, *args, **kwargs): def test_postgres_severity_warnings(self): self.run_dbt_with_vars(['seed'], 'false', strict=False) self.run_dbt_with_vars(['run'], 'false', strict=False) - results = self.run_dbt_with_vars(['test', '--schema'], 'false', strict=False) + results = self.run_dbt_with_vars( + ['test', '--schema'], 'false', strict=False) self.assertEqual(len(results), 2) - self.assertFalse(results[0].fail) - self.assertTrue(results[0].warn) - self.assertEqual(results[0].status, 2) - self.assertFalse(results[1].fail) - self.assertTrue(results[1].warn) - self.assertEqual(results[1].status, 2) + self.assertEqual(results[0].status, 'warn') + self.assertEqual(results[0].message, 2) + self.assertEqual(results[1].status, 'warn') + self.assertEqual(results[1].message, 2) @use_profile('postgres') def test_postgres_severity_rendered_errors(self): self.run_dbt_with_vars(['seed'], 'false', strict=False) self.run_dbt_with_vars(['run'], 'false', strict=False) - results = self.run_dbt_with_vars(['test', '--schema'], 'true', strict=False, expect_pass=False) + results = self.run_dbt_with_vars( + ['test', '--schema'], 'true', strict=False, expect_pass=False) self.assertEqual(len(results), 2) - self.assertTrue(results[0].fail) - self.assertFalse(results[0].warn) - self.assertEqual(results[0].status, 2) - self.assertTrue(results[1].fail) - self.assertFalse(results[1].warn) - self.assertEqual(results[1].status, 2) + self.assertEqual(results[0].status, 'fail') + self.assertEqual(results[0].message, 2) + self.assertEqual(results[1].status, 'fail') + self.assertEqual(results[1].message, 2) @use_profile('postgres') def test_postgres_severity_warnings_strict(self): self.run_dbt_with_vars(['seed'], 'false', strict=False) self.run_dbt_with_vars(['run'], 'false', strict=False) - results = self.run_dbt_with_vars(['test', '--schema'], 'false', expect_pass=False) + results = self.run_dbt_with_vars( + ['test', '--schema'], 'false', expect_pass=False) self.assertEqual(len(results), 2) - self.assertTrue(results[0].fail) - self.assertFalse(results[0].warn) - self.assertEqual(results[0].status, 2) - self.assertTrue(results[1].fail) - self.assertFalse(results[1].warn) - self.assertEqual(results[1].status, 2) + self.assertEqual(results[0].status, 'fail') + self.assertEqual(results[0].message, 2) + self.assertEqual(results[1].status, 'fail') + self.assertEqual(results[1].message, 2) @use_profile('postgres') def test_postgres_data_severity_warnings(self): self.run_dbt_with_vars(['seed'], 'false', strict=False) self.run_dbt_with_vars(['run'], 'false', strict=False) - results = self.run_dbt_with_vars(['test', '--data'], 'false', strict=False) + results = self.run_dbt_with_vars( + ['test', '--data'], 'false', strict=False) self.assertEqual(len(results), 1) - self.assertFalse(results[0].fail) - self.assertTrue(results[0].warn) - self.assertEqual(results[0].status, 2) + self.assertEqual(results[0].status, 'warn') + self.assertEqual(results[0].message, 2) @use_profile('postgres') def test_postgres_data_severity_rendered_errors(self): self.run_dbt_with_vars(['seed'], 'false', strict=False) self.run_dbt_with_vars(['run'], 'false', strict=False) - results = self.run_dbt_with_vars(['test', '--data'], 'true', strict=False, expect_pass=False) + results = self.run_dbt_with_vars( + ['test', '--data'], 'true', strict=False, expect_pass=False) self.assertEqual(len(results), 1) - self.assertTrue(results[0].fail) - self.assertFalse(results[0].warn) - self.assertEqual(results[0].status, 2) + self.assertEqual(results[0].status, 'fail') + self.assertEqual(results[0].message, 2) @use_profile('postgres') def test_postgres_data_severity_warnings_strict(self): self.run_dbt_with_vars(['seed'], 'false', strict=False) self.run_dbt_with_vars(['run'], 'false', strict=False) - results = self.run_dbt_with_vars(['test', '--data'], 'false', expect_pass=False) + results = self.run_dbt_with_vars( + ['test', '--data'], 'false', expect_pass=False) self.assertEqual(len(results), 1) - self.assertTrue(results[0].fail) - self.assertFalse(results[0].warn) - self.assertEqual(results[0].status, 2) + self.assertTrue(results[0].status, 'fail') + self.assertEqual(results[0].message, 2) From 608db5b982b34163fc5991b9e59c463869310f1c Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Thu, 10 Dec 2020 10:25:19 -0500 Subject: [PATCH 5/9] code cleanup + swap `node` with `unique_id` --- core/dbt/contracts/graph/model_config.py | 4 +- core/dbt/contracts/results.py | 45 +- core/dbt/contracts/rpc.py | 5 +- core/dbt/task/compile.py | 4 +- core/dbt/task/freshness.py | 3 +- core/dbt/task/generate.py | 1 - core/dbt/task/printer.py | 7 +- core/dbt/task/run.py | 4 +- .../test_docs_generate.py | 779 +----------------- 9 files changed, 71 insertions(+), 781 deletions(-) diff --git a/core/dbt/contracts/graph/model_config.py b/core/dbt/contracts/graph/model_config.py index cca3c9f3ef0..f8b5352049a 100644 --- a/core/dbt/contracts/graph/model_config.py +++ b/core/dbt/contracts/graph/model_config.py @@ -2,8 +2,8 @@ from enum import Enum from itertools import chain from typing import ( - Any, Hashable, List, Optional, Dict, MutableMapping, Union, Type, NewType, - Tuple, TypeVar, Callable, cast + Any, List, Optional, Dict, MutableMapping, Union, Type, NewType, Tuple, + TypeVar, Callable, cast, Hashable ) # TODO: patch+upgrade hologram to avoid this jsonschema import diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index 0cf15ce1a49..a7b922a012e 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -78,7 +78,7 @@ class TestStatus(StrEnum): Warn = NodeStatus.Warn -class FreshnessStatus(StrEnum): # maybe this should be the same as test status +class FreshnessStatus(StrEnum): Pass = NodeStatus.Pass Warn = NodeStatus.Warn Error = NodeStatus.Error @@ -127,7 +127,7 @@ def to_dict(self, *args, **kwargs): @dataclass class ExecutionResult(JsonSchemaMixin): - results: Sequence[NodeResult] + results: Sequence[BaseResult] elapsed_time: float def __len__(self): @@ -150,19 +150,38 @@ class RunResultsMetadata(BaseArtifactMetadata): ) +@dataclass +class RunResultOutput(BaseResult): + unique_id: str + + +def process_run_result(result: Union[RunResult, RunResultOutput]) -> RunResultOutput: # noqa + if isinstance(result, RunResultOutput): + return result + + return RunResultOutput( + unique_id=result.node.unique_id, + status=result.status, + timing=result.timing, + thread_id=result.thread_id, + execution_time=result.execution_time, + message=result.message + ) + + @dataclass @schema_version('run-results', 1) class RunResultsArtifact( ExecutionResult, - ArtifactMixin, + ArtifactMixin ): - results: Sequence[RunResult] + results: Sequence[RunResultOutput] args: Dict[str, Any] = field(default_factory=dict) @classmethod def from_node_results( cls, - results: Sequence[RunResult], + results: Union[Sequence[RunResult], Sequence[RunResultOutput]], elapsed_time: float, generated_at: datetime, args: Dict, @@ -171,9 +190,10 @@ def from_node_results( dbt_schema_version=str(cls.dbt_schema_version), generated_at=generated_at, ) + processed_results = [process_run_result(result) for result in results] return cls( metadata=meta, - results=results, + results=processed_results, elapsed_time=elapsed_time, args=args ) @@ -236,8 +256,8 @@ class FreshnessErrorEnum(StrEnum): @dataclass class SourceFreshnessRuntimeError(JsonSchemaMixin): unique_id: str - error: Union[str, int] # TODO(kw) this is to fix mypy - state: FreshnessErrorEnum + error: Optional[Union[str, int]] + status: FreshnessErrorEnum @dataclass @@ -246,7 +266,7 @@ class SourceFreshnessOutput(JsonSchemaMixin): max_loaded_at: datetime snapshotted_at: datetime max_loaded_at_time_ago_in_s: float - state: FreshnessStatus + status: FreshnessStatus criteria: FreshnessThreshold @@ -263,13 +283,12 @@ class PartialSourceFreshnessResult(PartialNodeResult): def process_freshness_result( result: FreshnessNodeResult ) -> FreshnessNodeOutput: - # TODO(kw) source freshness refactor unique_id = result.node.unique_id if result.status == FreshnessStatus.RuntimeErr: return SourceFreshnessRuntimeError( unique_id=unique_id, - error=result.message or "", - state=FreshnessErrorEnum.runtime_error, + error=result.message, + status=FreshnessErrorEnum.runtime_error, ) # we know that this must be a SourceFreshnessResult @@ -291,7 +310,7 @@ def process_freshness_result( max_loaded_at=result.max_loaded_at, snapshotted_at=result.snapshotted_at, max_loaded_at_time_ago_in_s=result.age, - state=result.status, + status=result.status, criteria=criteria, ) diff --git a/core/dbt/contracts/rpc.py b/core/dbt/contracts/rpc.py index 66282d1684a..2601bf2da28 100644 --- a/core/dbt/contracts/rpc.py +++ b/core/dbt/contracts/rpc.py @@ -11,7 +11,7 @@ from dbt.contracts.graph.compiled import CompileResultNode from dbt.contracts.graph.manifest import WritableManifest from dbt.contracts.results import ( - TimingInfo, + RunResultOutput, TimingInfo, CatalogArtifact, CatalogResults, ExecutionResult, @@ -21,6 +21,7 @@ RunOperationResultsArtifact, RunResult, RunResultsArtifact, + process_run_result, ) from dbt.contracts.util import VersionedSchema, schema_version from dbt.exceptions import InternalException @@ -225,8 +226,8 @@ def error(self): @dataclass @schema_version('remote-execution-result', 1) class RemoteExecutionResult(ExecutionResult, RemoteResult): + results: Sequence[RunResultOutput] args: Dict[str, Any] = field(default_factory=dict) - results: Sequence[RunResult] generated_at: datetime = field(default_factory=datetime.utcnow) def write(self, path: str): diff --git a/core/dbt/task/compile.py b/core/dbt/task/compile.py index cd4ec3d726a..a5ec30bd77f 100644 --- a/core/dbt/task/compile.py +++ b/core/dbt/task/compile.py @@ -1,3 +1,4 @@ +import threading from .runnable import GraphRunnableTask from .base import BaseRunner @@ -16,12 +17,11 @@ def after_execute(self, result): pass def execute(self, compiled_node, manifest): - # TODO(kw) need to think about what to return here return RunModelResult( node=compiled_node, status=RunStatus.Success, timing=[], - thread_id="asdf", + thread_id=threading.current_thread().name, execution_time=0, message=None, ) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 394d36733b6..0a6c40c40b3 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -95,14 +95,13 @@ def execute(self, compiled_node, manifest): status = compiled_node.freshness.status(freshness['age']) - # TODO(kw) more cleanup :) return SourceFreshnessResult( node=compiled_node, status=status, thread_id=threading.current_thread().name, timing=[], execution_time=0, - message="", + message=None, **freshness ) diff --git a/core/dbt/task/generate.py b/core/dbt/task/generate.py index 1358a82559a..3a043ac4ff0 100644 --- a/core/dbt/task/generate.py +++ b/core/dbt/task/generate.py @@ -211,7 +211,6 @@ def run(self) -> CatalogArtifact: compile_results = None if self.args.compile: compile_results = CompileTask.run(self) - # TODO(kw) not sure this is the right logic if any(r.status == NodeStatus.Error for r in compile_results): print_timestamped_line( 'compile failed, cannot generate docs' diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index 989e2c83e19..f1d05852da6 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -348,8 +348,11 @@ def print_end_of_run_summary( def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None: errors, warnings = [], [] for r in results: - if (r.status in (NodeStatus.Error, NodeStatus.Fail) or - (r.status == NodeStatus.Skipped and r.message is not None)): + if r.status in (NodeStatus.Error, NodeStatus.Fail): + errors.append(r) + elif r.status == NodeStatus.Skipped and r.message is not None: + # this means we skipped a node because of an issue upstream, + # so include it as an error errors.append(r) elif r.status == NodeStatus.Warn: warnings.append(r) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index c90233cd985..93c5c9f74d9 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,4 +1,5 @@ import functools +import threading import time from typing import List, Dict, Any, Iterable, Set, Tuple, Optional, AbstractSet @@ -187,12 +188,11 @@ def after_execute(self, result): def _build_run_model_result(self, model, context): result = context['load_result']('main') - # TODO(kw) clean this up return RunModelResult( node=model, status=RunStatus.Success, timing=[], - thread_id="asdf", + thread_id=threading.current_thread().name, execution_time=0, message=result.status, ) diff --git a/test/integration/029_docs_generate_tests/test_docs_generate.py b/test/integration/029_docs_generate_tests/test_docs_generate.py index a8281871fe3..626fe171e4d 100644 --- a/test/integration/029_docs_generate_tests/test_docs_generate.py +++ b/test/integration/029_docs_generate_tests/test_docs_generate.py @@ -2928,136 +2928,17 @@ def _quote(self, value): quote_char = '`' if self.adapter_type == 'bigquery' else '"' return '{0}{1}{0}'.format(quote_char, value) - def expected_run_results(self, quote_schema=True, quote_model=False, - model_database=None): + def expected_run_results(self): """ The expected results of this run. """ - models_path = self.dir('models') - model_sql_path = os.path.join(models_path, 'model.sql') - second_model_sql_path = os.path.join(models_path, 'second_model.sql') - model_schema_yml_path = os.path.join(models_path, 'schema.yml') - seed_schema_yml_path = os.path.join(self.dir('seed'), 'schema.yml') - - if model_database is None: - model_database = self.alternative_database - - model_config = self.rendered_model_config(database=model_database) - second_model_config = self.rendered_model_config( - schema=self.alternate_schema[-4:]) - unrendered_model_config = self.unrendered_model_config( - database=model_database, materialized='view') - unrendered_second_model_config = self.unrendered_model_config( - schema=self.alternate_schema[-4:], materialized='view') - schema = self.unique_schema() - - # we are selecting from the seed, which is always in the default db - quote_database = self.adapter_type != 'snowflake' - compiled_database = (self._quote(self.default_database) - if quote_database else self.default_database) - compiled_schema = self._quote(schema) if quote_schema else schema - compiled_seed = self._quote('seed') if quote_model else 'seed' - relation_name_format = self._relation_name_format( - quote_database, quote_schema, quote_model - ) - - if self.adapter_type == 'bigquery': - compiled_sql = '\n\nselect * from `{}`.`{}`.seed'.format( - self.default_database, schema - ) - else: - compiled_sql = '\n\nselect * from {}.{}.{}'.format( - compiled_database, compiled_schema, compiled_seed - ) - seed_path = self.dir('seed/seed.csv') - snapshot_path = self.dir('snapshot/snapshot_seed.sql') return [ { 'status': 'success', 'message': None, 'execution_time': AnyFloat(), - 'node': { - 'alias': 'model', - 'build_path': Normalized( - 'target/compiled/test/models/model.sql' - ), - 'checksum': self._checksum_file(model_sql_path), - 'columns': { - 'id': { - 'description': 'The user ID number', - 'name': 'id', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'first_name': { - 'description': "The user's first name", - 'name': 'first_name', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'email': { - 'description': "The user's email", - 'name': 'email', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'ip_address': { - 'description': "The user's IP address", - 'name': 'ip_address', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'updated_at': { - 'description': "The last time this user's email was updated", - 'name': 'updated_at', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - } - }, - 'compiled': True, - 'compiled_sql': compiled_sql, - 'config': model_config, - 'sources': [], - 'depends_on': { - 'macros': [], - 'nodes': ['seed.test.seed'] - }, - 'deferred': False, - 'description': 'The test model', - 'docs': {'show': False}, - 'extra_ctes': [], - 'extra_ctes_injected': True, - 'fqn': ['test', 'model'], - 'meta': {}, - 'name': 'model', - 'original_file_path': model_sql_path, - 'package_name': 'test', - 'patch_path': model_schema_yml_path, - 'path': 'model.sql', - 'raw_sql': LineIndifferent(_read_file(model_sql_path).rstrip('\r\n')), - 'refs': [['seed']], - 'relation_name': relation_name_format.format( - model_database, schema, 'model' - ), - 'resource_type': 'model', - 'root_path': self.test_root_realpath, - 'schema': schema, - 'database': model_database, - 'tags': [], - 'unique_id': 'model.test.model', - 'unrendered_config': unrendered_model_config, - }, + 'unique_id': 'model.test.model', 'thread_id': ANY, 'timing': [ANY, ANY], }, @@ -3065,88 +2946,7 @@ def expected_run_results(self, quote_schema=True, quote_model=False, 'status': 'success', 'message': None, 'execution_time': AnyFloat(), - 'node': { - 'alias': 'second_model', - 'build_path': Normalized( - 'target/compiled/test/models/second_model.sql' - ), - 'checksum': self._checksum_file(second_model_sql_path), - 'columns': { - 'id': { - 'description': 'The user ID number', - 'name': 'id', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'first_name': { - 'description': "The user's first name", - 'name': 'first_name', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'email': { - 'description': "The user's email", - 'name': 'email', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'ip_address': { - 'description': "The user's IP address", - 'name': 'ip_address', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'updated_at': { - 'description': "The last time this user's email was updated", - 'name': 'updated_at', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - } - }, - 'compiled': True, - 'compiled_sql': compiled_sql, - 'config': second_model_config, - 'sources': [], - 'depends_on': { - 'macros': [], - 'nodes': ['seed.test.seed'] - }, - 'deferred': False, - 'description': 'The second test model', - 'docs': {'show': False}, - 'extra_ctes': [], - 'extra_ctes_injected': True, - 'fqn': ['test', 'second_model'], - 'meta': {}, - 'name': 'second_model', - 'original_file_path': second_model_sql_path, - 'package_name': 'test', - 'patch_path': model_schema_yml_path, - 'path': 'second_model.sql', - 'raw_sql': LineIndifferent(_read_file(second_model_sql_path).rstrip('\r\n')), - 'refs': [['seed']], - 'relation_name': relation_name_format.format( - self.default_database, self.alternate_schema, - 'second_model' - ), - 'resource_type': 'model', - 'root_path': self.test_root_realpath, - 'schema': self.alternate_schema, - 'database': self.default_database, - 'tags': [], - 'unique_id': 'model.test.second_model', - 'unrendered_config': unrendered_second_model_config, - }, + 'unique_id': 'model.test.second_model', 'thread_id': ANY, 'timing': [ANY, ANY], }, @@ -3154,82 +2954,7 @@ def expected_run_results(self, quote_schema=True, quote_model=False, 'status': 'success', 'message': None, 'execution_time': AnyFloat(), - 'node': { - 'alias': 'seed', - 'build_path': None, - 'checksum': self._checksum_file(seed_path), - 'columns': { - 'id': { - 'description': 'The user ID number', - 'name': 'id', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'first_name': { - 'description': "The user's first name", - 'name': 'first_name', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'email': { - 'description': "The user's email", - 'name': 'email', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'ip_address': { - 'description': "The user's IP address", - 'name': 'ip_address', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'updated_at': { - 'description': "The last time this user's email was updated", - 'name': 'updated_at', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - } - }, - 'compiled': True, - 'compiled_sql': '', - 'config': self.rendered_seed_config(), - 'sources': [], - 'depends_on': {'macros': [], 'nodes': []}, - 'deferred': False, - 'description': 'The test seed', - 'docs': {'show': True}, - 'extra_ctes': [], - 'extra_ctes_injected': True, - 'fqn': ['test', 'seed'], - 'meta': {}, - 'name': 'seed', - 'original_file_path': seed_path, - 'package_name': 'test', - 'patch_path': seed_schema_yml_path, - 'path': 'seed.csv', - 'raw_sql': '', - 'refs': [], - 'relation_name': relation_name_format.format( - self.default_database, schema, 'seed' - ), - 'resource_type': 'seed', - 'root_path': self.test_root_realpath, - 'schema': schema, - 'database': self.default_database, - 'tags': [], - 'unique_id': 'seed.test.seed', - 'unrendered_config': self.unrendered_seed_config(), - }, + 'unique_id': 'seed.test.seed', 'thread_id': ANY, 'timing': [ANY, ANY], }, @@ -3237,51 +2962,7 @@ def expected_run_results(self, quote_schema=True, quote_model=False, 'status': 'success', 'message': None, 'execution_time': AnyFloat(), - 'node': { - 'alias': 'snapshot_seed', - 'build_path': None, - 'checksum': self._checksum_file(snapshot_path), - 'columns': {}, - 'compiled': True, - 'compiled_sql': compiled_sql, - 'config': self.rendered_snapshot_config( - target_schema=self.alternate_schema - ), - 'database': self.default_database, - 'deferred': False, - 'depends_on': { - 'macros': [], - 'nodes': ['seed.test.seed'], - }, - 'description': '', - 'docs': {'show': True}, - 'extra_ctes': [], - 'extra_ctes_injected': True, - 'fqn': ['test', 'snapshot_seed', 'snapshot_seed'], - 'meta': {}, - 'name': 'snapshot_seed', - 'original_file_path': snapshot_path, - 'package_name': 'test', - 'patch_path': None, - 'path': Normalized('snapshot_seed.sql'), - 'raw_sql': LineIndifferent( - _read_file(snapshot_path) - .replace('{% snapshot snapshot_seed %}', '') - .replace('{% endsnapshot %}', '')), - 'refs': [['seed']], - 'relation_name': relation_name_format.format( - self.default_database, self.alternate_schema, - 'snapshot_seed'), - 'resource_type': 'snapshot', - 'root_path': self.test_root_realpath, - 'schema': self.alternate_schema, - 'sources': [], - 'tags': [], - 'unique_id': 'snapshot.test.snapshot_seed', - 'unrendered_config': self.unrendered_snapshot_config( - target_schema=self.alternate_schema - ), - }, + 'unique_id': 'snapshot.test.snapshot_seed', 'thread_id': ANY, 'timing': [ANY, ANY], }, @@ -3289,51 +2970,7 @@ def expected_run_results(self, quote_schema=True, quote_model=False, 'status': 'success', 'message': None, 'execution_time': AnyFloat(), - 'node': { - 'alias': 'not_null_model_id', - 'build_path': Normalized('target/compiled/test/models/schema.yml/schema_test/not_null_model_id.sql'), - 'checksum': {'name': 'none', 'checksum': ''}, - 'column_name': 'id', - 'columns': {}, - 'compiled': True, - 'compiled_sql': AnyStringWith('id is null'), - 'config': self.rendered_tst_config(), - 'sources': [], - 'depends_on': { - 'macros': ['macro.dbt.test_not_null'], - 'nodes': ['model.test.model'], - }, - 'deferred': False, - 'description': '', - 'docs': {'show': True}, - 'extra_ctes': [], - 'extra_ctes_injected': True, - 'fqn': ['test', 'schema_test', 'not_null_model_id'], - 'meta': {}, - 'name': 'not_null_model_id', - 'original_file_path': model_schema_yml_path, - 'package_name': 'test', - 'patch_path': None, - 'path': Normalized('schema_test/not_null_model_id.sql'), - 'raw_sql': "{{ config(severity='ERROR') }}{{ test_not_null(**_dbt_schema_test_kwargs) }}", - 'refs': [['model']], - 'relation_name': None, - 'resource_type': 'test', - 'root_path': self.test_root_realpath, - 'schema': schema, - 'database': self.default_database, - 'tags': ['schema'], - 'unique_id': 'test.test.not_null_model_id', - 'test_metadata': { - 'namespace': None, - 'name': 'not_null', - 'kwargs': { - 'column_name': 'id', - 'model': "{{ ref('model') }}", - }, - }, - 'unrendered_config': self.unrendered_tst_config(), - }, + 'unique_id': 'test.test.not_null_model_id', 'thread_id': ANY, 'timing': [ANY, ANY], }, @@ -3341,205 +2978,27 @@ def expected_run_results(self, quote_schema=True, quote_model=False, 'status': 'success', 'message': None, 'execution_time': AnyFloat(), - 'node': { - 'alias': 'test_nothing_model_', - 'build_path': Normalized('target/compiled/test/models/schema.yml/schema_test/test_nothing_model_.sql'), - 'checksum': {'name': 'none', 'checksum': ''}, - 'column_name': None, - 'columns': {}, - 'compiled': True, - 'compiled_sql': AnyStringWith('select 0'), - 'config': self.rendered_tst_config(), - 'database': self.default_database, - 'depends_on': { - 'macros': ['macro.test.test_nothing'], - 'nodes': ['model.test.model'], - }, - 'deferred': False, - 'description': '', - 'docs': {'show': True}, - 'extra_ctes': [], - 'extra_ctes_injected': True, - 'fqn': ['test', 'schema_test', 'test_nothing_model_'], - 'meta': {}, - 'name': 'test_nothing_model_', - 'original_file_path': model_schema_yml_path, - 'package_name': 'test', - 'patch_path': None, - 'path': Normalized('schema_test/test_nothing_model_.sql'), - 'raw_sql': "{{ config(severity='ERROR') }}{{ test.test_nothing(**_dbt_schema_test_kwargs) }}", - 'refs': [['model']], - 'relation_name': None, - 'resource_type': 'test', - 'root_path': self.test_root_realpath, - 'schema': schema, - 'sources': [], - 'tags': ['schema'], - 'unique_id': 'test.test.test_nothing_model_', - 'test_metadata': { - 'namespace': 'test', - 'name': 'nothing', - 'kwargs': { - 'model': "{{ ref('model') }}", - }, - }, - 'unrendered_config': self.unrendered_tst_config(), - }, + 'unique_id': 'test.test.test_nothing_model_', 'thread_id': ANY, - 'timing': [ANY, ANY], - }, - { - 'status': 'success', - 'message': None, - 'execution_time': AnyFloat(), - 'node': { - 'alias': 'unique_model_id', - 'build_path': Normalized('target/compiled/test/models/schema.yml/schema_test/unique_model_id.sql'), - 'checksum': {'name': 'none', 'checksum': ''}, - 'column_name': 'id', - 'columns': {}, - 'compiled': True, - 'compiled_sql': AnyStringWith('count(*)'), - 'config': self.rendered_tst_config(), - 'database': self.default_database, - 'depends_on': { - 'macros': ['macro.dbt.test_unique'], - 'nodes': ['model.test.model'], - }, - 'deferred': False, - 'description': '', - 'docs': {'show': True}, - 'extra_ctes': [], - 'extra_ctes_injected': True, - 'fqn': ['test', 'schema_test', 'unique_model_id'], - 'meta': {}, - 'name': 'unique_model_id', - 'original_file_path': model_schema_yml_path, - 'package_name': 'test', - 'patch_path': None, - 'path': Normalized('schema_test/unique_model_id.sql'), - 'raw_sql': "{{ config(severity='ERROR') }}{{ test_unique(**_dbt_schema_test_kwargs) }}", - 'refs': [['model']], - 'relation_name': None, - 'resource_type': 'test', - 'root_path': self.test_root_realpath, - 'schema': schema, - 'sources': [], - 'tags': ['schema'], - 'unique_id': 'test.test.unique_model_id', - 'test_metadata': { - 'namespace': None, - 'name': 'unique', - 'kwargs': { - 'column_name': 'id', - 'model': "{{ ref('model') }}", - }, - }, - 'unrendered_config': self.unrendered_tst_config(), - }, + 'timing': [ANY, ANY], + }, + { + 'status': 'success', + 'message': None, + 'execution_time': AnyFloat(), + 'unique_id': 'test.test.unique_model_id', 'thread_id': ANY, 'timing': [ANY, ANY], }, ] def expected_postgres_references_run_results(self): - my_schema_name = self.unique_schema() - - cte_sql = ( - ' __dbt__CTE__ephemeral_copy as (\n\n\nselect * from {}."{}"."seed"\n)' - ).format(self.default_database, my_schema_name) - - ephemeral_injected_sql = ( - '\n\nwith{}select first_name, count(*) as ct from ' - '__dbt__CTE__ephemeral_copy\ngroup by first_name\n' - 'order by first_name asc' - ).format(cte_sql) - - view_compiled_sql = ( - '\n\nselect first_name, ct from "{}"."{}".ephemeral_summary\n' - 'order by ct asc' - ).format(self.default_database, my_schema_name) - - snapshot_compiled_sql = ( - '\n\nselect * from "{}"."{}".seed' - ).format(self.default_database, my_schema_name) - - ephemeral_summary_path = self.dir('ref_models/ephemeral_summary.sql') - view_summary_path = self.dir('ref_models/view_summary.sql') - seed_path = self.dir('seed/seed.csv') - snapshot_path = self.dir('snapshot/snapshot_seed.sql') - return [ { 'status': 'success', 'message': None, 'execution_time': AnyFloat(), - 'node': { - 'alias': 'ephemeral_summary', - 'build_path': Normalized( - 'target/compiled/test/ref_models/ephemeral_summary.sql' - ), - 'checksum': self._checksum_file(ephemeral_summary_path), - 'columns': { - 'first_name': { - 'description': 'The first name being summarized', - 'name': 'first_name', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'ct': { - 'description': 'The number of instances of the first name', - 'name': 'ct', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - }, - 'compiled': True, - 'compiled_sql': ephemeral_injected_sql, - 'config': self.rendered_model_config(materialized='table'), - 'sources': [], - 'depends_on': { - 'nodes': ['model.test.ephemeral_copy'], - 'macros': [] - }, - 'deferred': False, - 'description': ( - 'A summmary table of the ephemeral copy of the seed data' - ), - 'docs': {'show': True}, - 'extra_ctes': [ - {'id': 'model.test.ephemeral_copy', 'sql': cte_sql}, - ], - 'extra_ctes_injected': True, - 'fqn': ['test', 'ephemeral_summary'], - 'meta': {}, - 'name': 'ephemeral_summary', - 'original_file_path': ephemeral_summary_path, - 'package_name': 'test', - 'patch_path': self.dir('ref_models/schema.yml'), - 'path': 'ephemeral_summary.sql', - 'raw_sql': LineIndifferent( - '{{\n config(\n materialized = "table"\n )\n}}\n' - '\nselect first_name, count(*) as ct from ' - "{{ref('ephemeral_copy')}}\ngroup by first_name\n" - 'order by first_name asc' - ), - 'refs': [['ephemeral_copy']], - 'relation_name': '"{0}"."{1}".ephemeral_summary'.format( - self.default_database, my_schema_name - ), - 'resource_type': 'model', - 'root_path': self.test_root_realpath, - 'schema': my_schema_name, - 'database': self.default_database, - 'tags': [], - 'unique_id': 'model.test.ephemeral_summary', - 'unrendered_config': self.unrendered_model_config(materialized='table'), - }, + 'unique_id': 'model.test.ephemeral_summary', 'thread_id': ANY, 'timing': [ANY, ANY], }, @@ -3547,71 +3006,7 @@ def expected_postgres_references_run_results(self): 'status': 'success', 'message': None, 'execution_time': AnyFloat(), - 'node': { - 'alias': 'view_summary', - 'build_path': Normalized( - 'target/compiled/test/ref_models/view_summary.sql' - ), - 'alias': 'view_summary', - 'checksum': self._checksum_file(view_summary_path), - 'columns': { - 'first_name': { - 'description': 'The first name being summarized', - 'name': 'first_name', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'ct': { - 'description': 'The number of instances of the first name', - 'name': 'ct', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - }, - 'compiled': True, - 'compiled_sql': view_compiled_sql, - 'config': self.rendered_model_config(), - 'sources': [], - 'depends_on': { - 'nodes': ['model.test.ephemeral_summary'], - 'macros': [] - }, - 'deferred': False, - 'description': ( - 'A view of the summary of the ephemeral copy of the ' - 'seed data' - ), - 'docs': {'show': True}, - 'extra_ctes': [], - 'extra_ctes_injected': True, - 'fqn': ['test', 'view_summary'], - 'meta': {}, - 'name': 'view_summary', - 'original_file_path': view_summary_path, - 'package_name': 'test', - 'patch_path': self.dir('ref_models/schema.yml'), - 'path': 'view_summary.sql', - 'raw_sql': LineIndifferent( - '{{\n config(\n materialized = "view"\n )\n}}\n\n' - 'select first_name, ct from ' - "{{ref('ephemeral_summary')}}\norder by ct asc" - ), - 'refs': [['ephemeral_summary']], - 'relation_name': '"{0}"."{1}".view_summary'.format( - self.default_database, my_schema_name - ), - 'resource_type': 'model', - 'root_path': self.test_root_realpath, - 'schema': my_schema_name, - 'database': self.default_database, - 'tags': [], - 'unique_id': 'model.test.view_summary', - 'unrendered_config': self.unrendered_model_config(materialized='view'), - }, + 'unique_id': 'model.test.view_summary', 'thread_id': ANY, 'timing': [ANY, ANY], }, @@ -3619,82 +3014,7 @@ def expected_postgres_references_run_results(self): 'status': 'success', 'message': None, 'execution_time': AnyFloat(), - 'node': { - 'alias': 'seed', - 'build_path': None, - 'checksum': self._checksum_file(seed_path), - 'columns': { - 'id': { - 'name': 'id', - 'description': 'The user ID number', - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'first_name': { - 'name': 'first_name', - 'description': "The user's first name", - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'email': { - 'name': 'email', - 'description': "The user's email", - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'ip_address': { - 'name': 'ip_address', - 'description': "The user's IP address", - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - 'updated_at': { - 'name': 'updated_at', - 'description': "The last time this user's email was updated", - 'data_type': None, - 'meta': {}, - 'quote': None, - 'tags': [], - }, - }, - 'compiled': True, - 'compiled_sql': '', - 'config': self.rendered_seed_config(), - 'sources': [], - 'depends_on': {'macros': [], 'nodes': []}, - 'deferred': False, - 'description': 'The test seed', - 'docs': {'show': True}, - 'extra_ctes': [], - 'extra_ctes_injected': True, - 'fqn': ['test', 'seed'], - 'meta': {}, - 'name': 'seed', - 'original_file_path': seed_path, - 'package_name': 'test', - 'patch_path': self.dir('seed/schema.yml'), - 'path': 'seed.csv', - 'raw_sql': '', - 'refs': [], - 'relation_name': '"{0}"."{1}".seed'.format( - self.default_database, my_schema_name - ), - 'resource_type': 'seed', - 'root_path': self.test_root_realpath, - 'schema': my_schema_name, - 'database': self.default_database, - 'tags': [], - 'unique_id': 'seed.test.seed', - 'unrendered_config': self.unrendered_seed_config(), - }, + 'unique_id': 'seed.test.seed', 'thread_id': ANY, 'timing': [ANY, ANY], }, @@ -3702,48 +3022,7 @@ def expected_postgres_references_run_results(self): 'status': 'success', 'message': None, 'execution_time': AnyFloat(), - 'node': { - 'alias': 'snapshot_seed', - 'build_path': None, - 'checksum': self._checksum_file(snapshot_path), - 'columns': {}, - 'compiled': True, - 'compiled_sql': snapshot_compiled_sql, - 'config': self.rendered_snapshot_config( - target_schema=self.alternate_schema - ), - 'database': self.default_database, - 'deferred': False, - 'depends_on': { - 'macros': [], - 'nodes': ['seed.test.seed'], - }, - 'description': '', - 'docs': {'show': True}, - 'extra_ctes': [], - 'extra_ctes_injected': True, - 'fqn': ['test', 'snapshot_seed', 'snapshot_seed'], - 'meta': {}, - 'name': 'snapshot_seed', - 'original_file_path': snapshot_path, - 'package_name': 'test', - 'patch_path': None, - 'path': Normalized('snapshot_seed.sql'), - 'raw_sql': ANY, - 'refs': [['seed']], - 'relation_name': '"{0}"."{1}".snapshot_seed'.format( - self.default_database, self.alternate_schema - ), - 'resource_type': 'snapshot', - 'root_path': self.test_root_realpath, - 'schema': self.alternate_schema, - 'sources': [], - 'tags': [], - 'unique_id': 'snapshot.test.snapshot_seed', - 'unrendered_config': self.unrendered_snapshot_config( - target_schema=self.alternate_schema - ), - }, + 'unique_id': 'snapshot.test.snapshot_seed', 'thread_id': ANY, 'timing': [ANY, ANY], }, @@ -3764,7 +3043,7 @@ def verify_run_results(self, expected_run_results): assert 'args' in run_results # sort the results so we can make reasonable assertions - run_results['results'].sort(key=lambda r: r['node']['unique_id']) + run_results['results'].sort(key=lambda r: r['unique_id']) assert run_results['results'] == expected_run_results set(run_results) == {'elapsed_time', 'results', 'metadata'} @@ -3782,9 +3061,7 @@ def test__postgres__run_and_generate(self): self.verify_manifest(self.expected_seeded_manifest( model_database=self.default_database )) - self.verify_run_results(self.expected_run_results( - model_database=self.default_database - )) + self.verify_run_results(self.expected_run_results()) @use_profile('postgres') def test__postgres_references(self): @@ -3817,10 +3094,7 @@ def test__snowflake__run_and_generate(self): self.verify_catalog(self.expected_snowflake_catalog()) self.verify_manifest(self.expected_seeded_manifest()) - self.verify_run_results(self.expected_run_results( - quote_schema=False, - quote_model=False - )) + self.verify_run_results(self.expected_run_results()) @use_profile('snowflake') def test__snowflake__run_and_generate_ignore_quoting_parameter(self): @@ -3844,8 +3118,7 @@ def connect(*args, **kwargs): self.verify_catalog(self.expected_snowflake_catalog(case_columns=True)) self.verify_manifest(self.expected_seeded_manifest(quote_model=True)) - self.verify_run_results(self.expected_run_results( - quote_schema=False, quote_model=True)) + self.verify_run_results(self.expected_run_results()) @use_profile('bigquery') def test__bigquery__run_and_generate(self): @@ -3872,9 +3145,7 @@ def test__redshift__run_and_generate(self): self.verify_manifest(self.expected_seeded_manifest( model_database=self.default_database )) - self.verify_run_results(self.expected_run_results( - model_database=self.default_database - )) + self.verify_run_results(self.expected_run_results()) @use_profile('redshift') def test__redshift__incremental_view(self): @@ -3894,9 +3165,7 @@ def test__presto__run_and_generate(self): self.verify_manifest(self.expected_seeded_manifest( model_database=self.default_database )) - self.verify_run_results(self.expected_run_results( - model_database=self.default_database - )) + self.verify_run_results(self.expected_run_results()) class TestDocsGenerateMissingSchema(DBTIntegrationTest): From ef7ff55e079f8098809704d0f9eea7a5fd521502 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Thu, 10 Dec 2020 11:36:29 -0500 Subject: [PATCH 6/9] flake8 --- core/dbt/contracts/rpc.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/dbt/contracts/rpc.py b/core/dbt/contracts/rpc.py index 2601bf2da28..f6b027974c0 100644 --- a/core/dbt/contracts/rpc.py +++ b/core/dbt/contracts/rpc.py @@ -19,9 +19,7 @@ FreshnessResult, RunOperationResult, RunOperationResultsArtifact, - RunResult, RunResultsArtifact, - process_run_result, ) from dbt.contracts.util import VersionedSchema, schema_version from dbt.exceptions import InternalException From ac1de5bce9dab4fc3d4a4599438d0c92017b8a53 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Fri, 11 Dec 2020 10:35:51 -0500 Subject: [PATCH 7/9] more updates --- core/dbt/contracts/results.py | 56 ++++++++++++------- core/dbt/contracts/rpc.py | 12 ++-- core/dbt/task/printer.py | 2 +- core/dbt/task/runnable.py | 5 +- .../042_sources_test/test_sources.py | 2 +- 5 files changed, 45 insertions(+), 32 deletions(-) diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index a7b922a012e..8b5a2b02274 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -27,6 +27,8 @@ from datetime import datetime from typing import Union, Dict, List, Optional, Any, NamedTuple, Sequence +from dbt.clients.system import write_json + @dataclass class TimingInfo(JsonSchemaMixin): @@ -100,7 +102,7 @@ class NodeResult(BaseResult): @dataclass -class PartialNodeResult(NodeResult, Writable): +class PartialNodeResult(NodeResult): # if the result got to the point where it could be skipped/failed, we would # be returning a real result, not a partial. @property @@ -109,14 +111,7 @@ def skipped(self): @dataclass -class WritableRunModelResult(NodeResult, Writable): - @property - def skipped(self): - return self.status == RunStatus.Skipped - - -@dataclass -class RunModelResult(WritableRunModelResult): +class RunModelResult(NodeResult): agate_table: Optional[agate.Table] = None def to_dict(self, *args, **kwargs): @@ -124,6 +119,10 @@ def to_dict(self, *args, **kwargs): dct.pop('agate_table', None) return dct + @property + def skipped(self): + return self.status == RunStatus.Skipped + @dataclass class ExecutionResult(JsonSchemaMixin): @@ -140,7 +139,7 @@ def __getitem__(self, idx): return self.results[idx] -RunResult = Union[PartialNodeResult, WritableRunModelResult] +RunResult = Union[PartialNodeResult, RunModelResult] @dataclass @@ -155,42 +154,54 @@ class RunResultOutput(BaseResult): unique_id: str -def process_run_result(result: Union[RunResult, RunResultOutput]) -> RunResultOutput: # noqa - if isinstance(result, RunResultOutput): - return result - +def process_run_result(result: RunResult) -> RunResultOutput: return RunResultOutput( unique_id=result.node.unique_id, status=result.status, timing=result.timing, thread_id=result.thread_id, execution_time=result.execution_time, - message=result.message + message=result.message, ) @dataclass -@schema_version('run-results', 1) -class RunResultsArtifact( +class RunExecutionResult( ExecutionResult, - ArtifactMixin ): + results: Sequence[RunResult] + args: Dict[str, Any] = field(default_factory=dict) + generated_at: datetime = field(default_factory=datetime.utcnow) + + def write(self, path: str): + writable = RunResultsArtifact.from_execution_results( + results=self.results, + elapsed_time=self.elapsed_time, + generated_at=self.generated_at, + args=self.args, + ) + writable.write(path) + + +@dataclass +@schema_version('run-results', 1) +class RunResultsArtifact(ExecutionResult, ArtifactMixin): results: Sequence[RunResultOutput] args: Dict[str, Any] = field(default_factory=dict) @classmethod - def from_node_results( + def from_execution_results( cls, - results: Union[Sequence[RunResult], Sequence[RunResultOutput]], + results: Sequence[RunResult], elapsed_time: float, generated_at: datetime, args: Dict, ): + processed_results = [process_run_result(result) for result in results] meta = RunResultsMetadata( dbt_schema_version=str(cls.dbt_schema_version), generated_at=generated_at, ) - processed_results = [process_run_result(result) for result in results] return cls( metadata=meta, results=processed_results, @@ -198,6 +209,9 @@ def from_node_results( args=args ) + def write(self, path: str, omit_none=False): + write_json(path, self.to_dict(omit_none=omit_none)) + @dataclass class RunOperationResult(ExecutionResult): diff --git a/core/dbt/contracts/rpc.py b/core/dbt/contracts/rpc.py index f6b027974c0..2bf7046c382 100644 --- a/core/dbt/contracts/rpc.py +++ b/core/dbt/contracts/rpc.py @@ -11,7 +11,7 @@ from dbt.contracts.graph.compiled import CompileResultNode from dbt.contracts.graph.manifest import WritableManifest from dbt.contracts.results import ( - RunResultOutput, TimingInfo, + RunResult, RunResultsArtifact, TimingInfo, CatalogArtifact, CatalogResults, ExecutionResult, @@ -19,7 +19,7 @@ FreshnessResult, RunOperationResult, RunOperationResultsArtifact, - RunResultsArtifact, + RunExecutionResult, ) from dbt.contracts.util import VersionedSchema, schema_version from dbt.exceptions import InternalException @@ -224,12 +224,12 @@ def error(self): @dataclass @schema_version('remote-execution-result', 1) class RemoteExecutionResult(ExecutionResult, RemoteResult): - results: Sequence[RunResultOutput] + results: Sequence[RunResult] args: Dict[str, Any] = field(default_factory=dict) generated_at: datetime = field(default_factory=datetime.utcnow) def write(self, path: str): - writable = RunResultsArtifact.from_node_results( + writable = RunResultsArtifact.from_execution_results( generated_at=self.generated_at, results=self.results, elapsed_time=self.elapsed_time, @@ -240,11 +240,11 @@ def write(self, path: str): @classmethod def from_local_result( cls, - base: RunResultsArtifact, + base: RunExecutionResult, logs: List[LogMessage], ) -> 'RemoteExecutionResult': return cls( - generated_at=base.metadata.generated_at, + generated_at=base.generated_at, results=base.results, elapsed_time=base.elapsed_time, args=base.args, diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index f1d05852da6..27540101cd4 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -305,7 +305,7 @@ def print_run_result_error( logger.info(" compiled SQL at {}".format( result.node.build_path)) - else: + elif result.message is not None: first = True for line in result.message.split("\n"): if first: diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index edd264a7b97..f191aecf7ce 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -31,7 +31,7 @@ from dbt.contracts.graph.compiled import CompileResultNode from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.parsed import ParsedSourceDefinition -from dbt.contracts.results import NodeStatus, RunResultsArtifact +from dbt.contracts.results import NodeStatus, RunExecutionResult from dbt.contracts.state import PreviousState from dbt.exceptions import ( InternalException, @@ -538,8 +538,7 @@ def create_schema(relation: BaseRelation) -> None: create_future.result() def get_result(self, results, elapsed_time, generated_at): - - return RunResultsArtifact.from_node_results( + return RunExecutionResult( results=results, elapsed_time=elapsed_time, generated_at=generated_at, diff --git a/test/integration/042_sources_test/test_sources.py b/test/integration/042_sources_test/test_sources.py index 3d7012f5fc1..7ccebe81a14 100644 --- a/test/integration/042_sources_test/test_sources.py +++ b/test/integration/042_sources_test/test_sources.py @@ -266,7 +266,7 @@ def _assert_freshness_results(self, path, state): 'max_loaded_at': last_inserted_time, 'snapshotted_at': AnyStringWith(), 'max_loaded_at_time_ago_in_s': AnyFloat(), - 'state': state, + 'status': state, 'criteria': { 'filter': None, 'warn_after': {'count': 10, 'period': 'hour'}, From 37af0e0d598110d0c5663098880e660b7779dafd Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Mon, 14 Dec 2020 10:31:23 -0500 Subject: [PATCH 8/9] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3172616d4b0..23294161725 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - Normalize cli-style-strings in manifest selectors dictionary ([#2879](https://github.com/fishtown-anaytics/dbt/issues/2879), [#2895](https://github.com/fishtown-analytics/dbt/pull/2895)) - Hourly, monthly and yearly partitions available in BigQuery ([#2476](https://github.com/fishtown-analytics/dbt/issues/2476), [#2903](https://github.com/fishtown-analytics/dbt/pull/2903)) - Allow BigQuery to default to the environment's default project ([#2828](https://github.com/fishtown-analytics/dbt/pull/2828), [#2908](https://github.com/fishtown-analytics/dbt/pull/2908)) +- Rationalize run result status reporting and clean up artifact schema ([#2493](https://github.com/fishtown-analytics/dbt/issues/2493), [#2943](https://github.com/fishtown-analytics/dbt/pull/2943)) ### Fixes - Respect --project-dir in dbt clean command ([#2840](https://github.com/fishtown-analytics/dbt/issues/2840), [#2841](https://github.com/fishtown-analytics/dbt/pull/2841)) From b60e533b9db3e7a28b1bbade5fc2ea7db7161f77 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Mon, 14 Dec 2020 16:28:08 -0500 Subject: [PATCH 9/9] fix printer output --- core/dbt/task/freshness.py | 7 +++++-- core/dbt/task/printer.py | 15 ++++++++++----- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 0a6c40c40b3..061f40460d1 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -12,7 +12,7 @@ from dbt.contracts.results import ( FreshnessExecutionResultArtifact, - FreshnessResult, NodeStatus, PartialSourceFreshnessResult, + FreshnessResult, PartialSourceFreshnessResult, SourceFreshnessResult, FreshnessStatus ) from dbt.exceptions import RuntimeException, InternalException @@ -167,7 +167,10 @@ def get_result(self, results, elapsed_time, generated_at): def task_end_messages(self, results): for result in results: - if result.status == NodeStatus.Error: + if result.status in ( + FreshnessStatus.Error, + FreshnessStatus.RuntimeErr + ): print_run_result_error(result) print_timestamped_line('Done.') diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index 27540101cd4..7313e4c9b0b 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -104,7 +104,7 @@ def get_printable_result( result, success: str, error: str) -> Tuple[str, str, Callable]: if result.status == NodeStatus.Error: info = 'ERROR {}'.format(error) - status = ui.red(result.message) + status = ui.red(result.status.upper()) logger_fn = logger.error else: info = 'OK {}'.format(success) @@ -292,12 +292,13 @@ def print_run_result_error( result.node.original_file_path)) try: - int(result.status) + # if message is int, must be rows returned for a test + int(result.message) except ValueError: logger.error(" Status: {}".format(result.status)) else: - status = utils.pluralize(result.status, 'result') - logger.error(" Got {}, expected 0.".format(status)) + num_rows = utils.pluralize(result.message, 'result') + logger.error(" Got {}, expected 0.".format(num_rows)) if result.node.build_path is not None: with TextOnly(): @@ -348,7 +349,11 @@ def print_end_of_run_summary( def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None: errors, warnings = [], [] for r in results: - if r.status in (NodeStatus.Error, NodeStatus.Fail): + if r.status in ( + NodeStatus.RuntimeErr, + NodeStatus.Error, + NodeStatus.Fail + ): errors.append(r) elif r.status == NodeStatus.Skipped and r.message is not None: # this means we skipped a node because of an issue upstream,