Skip to content

Commit

Permalink
Merge pull request #2943 from fishtown-analytics/feature/refactor-run…
Browse files Browse the repository at this point in the history
…-results

Clean up run results
  • Loading branch information
Kyle Wigley authored Dec 15, 2020
2 parents cd149b6 + b60e533 commit 454ddc6
Show file tree
Hide file tree
Showing 32 changed files with 568 additions and 1,196 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Normalize cli-style-strings in manifest selectors dictionary ([#2879](https://github.com/fishtown-anaytics/dbt/issues/2879), [#2895](https://github.com/fishtown-analytics/dbt/pull/2895))
- Hourly, monthly and yearly partitions available in BigQuery ([#2476](https://github.com/fishtown-analytics/dbt/issues/2476), [#2903](https://github.com/fishtown-analytics/dbt/pull/2903))
- Allow BigQuery to default to the environment's default project ([#2828](https://github.com/fishtown-analytics/dbt/pull/2828), [#2908](https://github.com/fishtown-analytics/dbt/pull/2908))
- Rationalize run result status reporting and clean up artifact schema ([#2493](https://github.com/fishtown-analytics/dbt/issues/2493), [#2943](https://github.com/fishtown-analytics/dbt/pull/2943))

### Fixes
- Respect --project-dir in dbt clean command ([#2840](https://github.com/fishtown-analytics/dbt/issues/2840), [#2841](https://github.com/fishtown-analytics/dbt/pull/2841))
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ install:

test: .env
@echo "Full test run starting..."
@time docker-compose run test tox
@time docker-compose run --rm test tox

test-unit: .env
@echo "Unit test run starting..."
@time docker-compose run test tox -e unit-py36,flake8
@time docker-compose run --rm test tox -e unit-py36,flake8

test-integration: .env
@echo "Integration test run starting..."
@time docker-compose run test tox -e integration-postgres-py36,integration-redshift-py36,integration-snowflake-py36,integration-bigquery-py36
@time docker-compose run --rm test tox -e integration-postgres-py36,integration-redshift-py36,integration-snowflake-py36,integration-bigquery-py36

test-quick: .env
@echo "Integration test run starting..."
@time docker-compose run test tox -e integration-postgres-py36 -- -x
@time docker-compose run --rm test tox -e integration-postgres-py36 -- -x

# This rule creates a file named .env that is used by docker-compose for passing
# the USER_ID and GROUP_ID arguments to the Docker image.
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from itertools import chain
from typing import (
Any, List, Optional, Dict, MutableMapping, Union, Type, NewType, Tuple,
TypeVar, Callable
TypeVar, Callable, cast, Hashable
)

# TODO: patch+upgrade hologram to avoid this jsonschema import
Expand Down Expand Up @@ -493,7 +493,8 @@ def validate(cls, data: Any):
to_validate = config

else:
schema = _validate_schema(cls)
h_cls = cast(Hashable, cls)
schema = _validate_schema(h_cls)
to_validate = data

validator = jsonschema.Draft7Validator(schema)
Expand Down
9 changes: 2 additions & 7 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,14 @@ def exceeded(self, actual_age: float) -> bool:
return actual_age > difference


class FreshnessStatus(StrEnum):
Pass = 'pass'
Warn = 'warn'
Error = 'error'


@dataclass
class FreshnessThreshold(JsonSchemaMixin, Mergeable):
warn_after: Optional[Time] = None
error_after: Optional[Time] = None
filter: Optional[str] = None

def status(self, age: float) -> FreshnessStatus:
def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus":
from dbt.contracts.results import FreshnessStatus
if self.error_after and self.error_after.exceeded(age):
return FreshnessStatus.Error
elif self.warn_after and self.warn_after.exceeded(age):
Expand Down
165 changes: 109 additions & 56 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dbt.contracts.graph.manifest import CompileResultNode
from dbt.contracts.graph.unparsed import (
FreshnessStatus, FreshnessThreshold
FreshnessThreshold
)
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.contracts.util import (
Expand All @@ -27,6 +27,8 @@
from datetime import datetime
from typing import Union, Dict, List, Optional, Any, NamedTuple, Sequence

from dbt.clients.system import write_json


@dataclass
class TimingInfo(JsonSchemaMixin):
Expand Down Expand Up @@ -55,22 +57,52 @@ def __exit__(self, exc_type, exc_value, traceback):
logger.debug('finished collecting timing info')


class NodeStatus(StrEnum):
Success = "success"
Error = "error"
Fail = "fail"
Warn = "warn"
Skipped = "skipped"
Pass = "pass"
RuntimeErr = "runtime error"


class RunStatus(StrEnum):
Success = NodeStatus.Success
Error = NodeStatus.Error
Skipped = NodeStatus.Skipped


class TestStatus(StrEnum):
Pass = NodeStatus.Pass
Error = NodeStatus.Error
Fail = NodeStatus.Fail
Warn = NodeStatus.Warn


class FreshnessStatus(StrEnum):
Pass = NodeStatus.Pass
Warn = NodeStatus.Warn
Error = NodeStatus.Error
RuntimeErr = NodeStatus.RuntimeErr


@dataclass
class BaseResult(JsonSchemaMixin):
node: CompileResultNode
error: Optional[str] = None
status: Union[None, str, int, bool] = None
execution_time: Union[str, int] = 0
thread_id: Optional[str] = None
timing: List[TimingInfo] = field(default_factory=list)
fail: Optional[bool] = None
warn: Optional[bool] = None
status: Union[RunStatus, TestStatus, FreshnessStatus]
timing: List[TimingInfo]
thread_id: str
execution_time: float
message: Optional[Union[str, int]]


@dataclass
class PartialResult(BaseResult, Writable):
pass
class NodeResult(BaseResult):
node: CompileResultNode


@dataclass
class PartialNodeResult(NodeResult):
# if the result got to the point where it could be skipped/failed, we would
# be returning a real result, not a partial.
@property
Expand All @@ -79,23 +111,18 @@ def skipped(self):


@dataclass
class WritableRunModelResult(BaseResult, Writable):
skip: bool = False

@property
def skipped(self):
return self.skip


@dataclass
class RunModelResult(WritableRunModelResult):
class RunModelResult(NodeResult):
agate_table: Optional[agate.Table] = None

def to_dict(self, *args, **kwargs):
dct = super().to_dict(*args, **kwargs)
dct.pop('agate_table', None)
return dct

@property
def skipped(self):
return self.status == RunStatus.Skipped


@dataclass
class ExecutionResult(JsonSchemaMixin):
Expand All @@ -112,7 +139,7 @@ def __getitem__(self, idx):
return self.results[idx]


RunResult = Union[PartialResult, WritableRunModelResult]
RunResult = Union[PartialNodeResult, RunModelResult]


@dataclass
Expand All @@ -123,33 +150,68 @@ class RunResultsMetadata(BaseArtifactMetadata):


@dataclass
@schema_version('run-results', 1)
class RunResultsArtifact(
class RunResultOutput(BaseResult):
unique_id: str


def process_run_result(result: RunResult) -> RunResultOutput:
return RunResultOutput(
unique_id=result.node.unique_id,
status=result.status,
timing=result.timing,
thread_id=result.thread_id,
execution_time=result.execution_time,
message=result.message,
)


@dataclass
class RunExecutionResult(
ExecutionResult,
ArtifactMixin,
):
results: Sequence[RunResult]
args: Dict[str, Any] = field(default_factory=dict)
generated_at: datetime = field(default_factory=datetime.utcnow)

def write(self, path: str):
writable = RunResultsArtifact.from_execution_results(
results=self.results,
elapsed_time=self.elapsed_time,
generated_at=self.generated_at,
args=self.args,
)
writable.write(path)


@dataclass
@schema_version('run-results', 1)
class RunResultsArtifact(ExecutionResult, ArtifactMixin):
results: Sequence[RunResultOutput]
args: Dict[str, Any] = field(default_factory=dict)

@classmethod
def from_node_results(
def from_execution_results(
cls,
results: Sequence[RunResult],
elapsed_time: float,
generated_at: datetime,
args: Dict,
):
processed_results = [process_run_result(result) for result in results]
meta = RunResultsMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
)
return cls(
metadata=meta,
results=results,
results=processed_results,
elapsed_time=elapsed_time,
args=args
)

def write(self, path: str, omit_none=False):
write_json(path, self.to_dict(omit_none=omit_none))


@dataclass
class RunOperationResult(ExecutionResult):
Expand All @@ -174,7 +236,7 @@ def from_success(
elapsed_time: float,
generated_at: datetime,
):
meta = RunResultsMetadata(
meta = RunOperationResultMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
)
Expand All @@ -186,45 +248,30 @@ def from_success(
)


@dataclass
class SourceFreshnessResultMixin(JsonSchemaMixin):
max_loaded_at: datetime
snapshotted_at: datetime
age: float


# due to issues with typing.Union collapsing subclasses, this can't subclass
# PartialResult
@dataclass
class SourceFreshnessResult(BaseResult, Writable, SourceFreshnessResultMixin):
class SourceFreshnessResult(NodeResult, Writable):
node: ParsedSourceDefinition
status: FreshnessStatus = FreshnessStatus.Pass

def __post_init__(self):
self.fail = self.status == 'error'

@property
def warned(self):
return self.status == 'warn'
status: FreshnessStatus
max_loaded_at: datetime
snapshotted_at: datetime
age: float

@property
def skipped(self):
return False


def _copykeys(src, keys, **updates):
return {k: getattr(src, k) for k in keys}


class FreshnessErrorEnum(StrEnum):
runtime_error = 'runtime error'


@dataclass
class SourceFreshnessRuntimeError(JsonSchemaMixin):
unique_id: str
error: str
state: FreshnessErrorEnum
error: Optional[Union[str, int]]
status: FreshnessErrorEnum


@dataclass
Expand All @@ -233,23 +280,29 @@ class SourceFreshnessOutput(JsonSchemaMixin):
max_loaded_at: datetime
snapshotted_at: datetime
max_loaded_at_time_ago_in_s: float
state: FreshnessStatus
status: FreshnessStatus
criteria: FreshnessThreshold


FreshnessNodeResult = Union[PartialResult, SourceFreshnessResult]
@dataclass
class PartialSourceFreshnessResult(PartialNodeResult):
status: FreshnessStatus


FreshnessNodeResult = Union[PartialSourceFreshnessResult,
SourceFreshnessResult]
FreshnessNodeOutput = Union[SourceFreshnessRuntimeError, SourceFreshnessOutput]


def process_freshness_result(
result: FreshnessNodeResult
) -> FreshnessNodeOutput:
unique_id = result.node.unique_id
if result.error is not None:
if result.status == FreshnessStatus.RuntimeErr:
return SourceFreshnessRuntimeError(
unique_id=unique_id,
error=result.error,
state=FreshnessErrorEnum.runtime_error,
error=result.message,
status=FreshnessErrorEnum.runtime_error,
)

# we know that this must be a SourceFreshnessResult
Expand All @@ -271,7 +324,7 @@ def process_freshness_result(
max_loaded_at=result.max_loaded_at,
snapshotted_at=result.snapshotted_at,
max_loaded_at_time_ago_in_s=result.age,
state=result.status,
status=result.status,
criteria=criteria,
)

Expand Down
Loading

0 comments on commit 454ddc6

Please sign in to comment.