Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up run results #2943

Merged
merged 9 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Normalize cli-style-strings in manifest selectors dictionary ([#2879](https://github.com/fishtown-anaytics/dbt/issues/2879), [#2895](https://github.com/fishtown-analytics/dbt/pull/2895))
- Hourly, monthly and yearly partitions available in BigQuery ([#2476](https://github.com/fishtown-analytics/dbt/issues/2476), [#2903](https://github.com/fishtown-analytics/dbt/pull/2903))
- Allow BigQuery to default to the environment's default project ([#2828](https://github.com/fishtown-analytics/dbt/pull/2828), [#2908](https://github.com/fishtown-analytics/dbt/pull/2908))
- Rationalize run result status reporting and clean up artifact schema ([#2493](https://github.com/fishtown-analytics/dbt/issues/2493), [#2943](https://github.com/fishtown-analytics/dbt/pull/2943))

### Fixes
- Respect --project-dir in dbt clean command ([#2840](https://github.com/fishtown-analytics/dbt/issues/2840), [#2841](https://github.com/fishtown-analytics/dbt/pull/2841))
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ install:

test: .env
@echo "Full test run starting..."
@time docker-compose run test tox
@time docker-compose run --rm test tox

test-unit: .env
@echo "Unit test run starting..."
@time docker-compose run test tox -e unit-py36,flake8
@time docker-compose run --rm test tox -e unit-py36,flake8

test-integration: .env
@echo "Integration test run starting..."
@time docker-compose run test tox -e integration-postgres-py36,integration-redshift-py36,integration-snowflake-py36,integration-bigquery-py36
@time docker-compose run --rm test tox -e integration-postgres-py36,integration-redshift-py36,integration-snowflake-py36,integration-bigquery-py36

test-quick: .env
@echo "Integration test run starting..."
@time docker-compose run test tox -e integration-postgres-py36 -- -x
@time docker-compose run --rm test tox -e integration-postgres-py36 -- -x

# This rule creates a file named .env that is used by docker-compose for passing
# the USER_ID and GROUP_ID arguments to the Docker image.
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from itertools import chain
from typing import (
Any, List, Optional, Dict, MutableMapping, Union, Type, NewType, Tuple,
TypeVar, Callable
TypeVar, Callable, cast, Hashable
)

# TODO: patch+upgrade hologram to avoid this jsonschema import
Expand Down Expand Up @@ -493,7 +493,8 @@ def validate(cls, data: Any):
to_validate = config

else:
schema = _validate_schema(cls)
h_cls = cast(Hashable, cls)
schema = _validate_schema(h_cls)
to_validate = data

validator = jsonschema.Draft7Validator(schema)
Expand Down
9 changes: 2 additions & 7 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,14 @@ def exceeded(self, actual_age: float) -> bool:
return actual_age > difference


class FreshnessStatus(StrEnum):
Pass = 'pass'
Warn = 'warn'
Error = 'error'


@dataclass
class FreshnessThreshold(JsonSchemaMixin, Mergeable):
warn_after: Optional[Time] = None
error_after: Optional[Time] = None
filter: Optional[str] = None

def status(self, age: float) -> FreshnessStatus:
def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus":
from dbt.contracts.results import FreshnessStatus
if self.error_after and self.error_after.exceeded(age):
return FreshnessStatus.Error
elif self.warn_after and self.warn_after.exceeded(age):
Expand Down
165 changes: 109 additions & 56 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dbt.contracts.graph.manifest import CompileResultNode
from dbt.contracts.graph.unparsed import (
FreshnessStatus, FreshnessThreshold
FreshnessThreshold
)
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.contracts.util import (
Expand All @@ -27,6 +27,8 @@
from datetime import datetime
from typing import Union, Dict, List, Optional, Any, NamedTuple, Sequence

from dbt.clients.system import write_json


@dataclass
class TimingInfo(JsonSchemaMixin):
Expand Down Expand Up @@ -55,22 +57,52 @@ def __exit__(self, exc_type, exc_value, traceback):
logger.debug('finished collecting timing info')


class NodeStatus(StrEnum):
Success = "success"
Error = "error"
Fail = "fail"
Warn = "warn"
Skipped = "skipped"
Pass = "pass"
RuntimeErr = "runtime error"


class RunStatus(StrEnum):
Success = NodeStatus.Success
Error = NodeStatus.Error
Skipped = NodeStatus.Skipped


class TestStatus(StrEnum):
Pass = NodeStatus.Pass
Error = NodeStatus.Error
Fail = NodeStatus.Fail
Warn = NodeStatus.Warn


class FreshnessStatus(StrEnum):
Pass = NodeStatus.Pass
Warn = NodeStatus.Warn
Error = NodeStatus.Error
RuntimeErr = NodeStatus.RuntimeErr


@dataclass
class BaseResult(JsonSchemaMixin):
node: CompileResultNode
error: Optional[str] = None
status: Union[None, str, int, bool] = None
execution_time: Union[str, int] = 0
thread_id: Optional[str] = None
timing: List[TimingInfo] = field(default_factory=list)
fail: Optional[bool] = None
warn: Optional[bool] = None
status: Union[RunStatus, TestStatus, FreshnessStatus]
timing: List[TimingInfo]
thread_id: str
execution_time: float
message: Optional[Union[str, int]]


@dataclass
class PartialResult(BaseResult, Writable):
pass
class NodeResult(BaseResult):
node: CompileResultNode


@dataclass
class PartialNodeResult(NodeResult):
# if the result got to the point where it could be skipped/failed, we would
# be returning a real result, not a partial.
@property
Expand All @@ -79,23 +111,18 @@ def skipped(self):


@dataclass
class WritableRunModelResult(BaseResult, Writable):
skip: bool = False

@property
def skipped(self):
return self.skip


@dataclass
class RunModelResult(WritableRunModelResult):
class RunModelResult(NodeResult):
agate_table: Optional[agate.Table] = None

def to_dict(self, *args, **kwargs):
dct = super().to_dict(*args, **kwargs)
dct.pop('agate_table', None)
return dct

@property
def skipped(self):
return self.status == RunStatus.Skipped


@dataclass
class ExecutionResult(JsonSchemaMixin):
Expand All @@ -112,7 +139,7 @@ def __getitem__(self, idx):
return self.results[idx]


RunResult = Union[PartialResult, WritableRunModelResult]
RunResult = Union[PartialNodeResult, RunModelResult]


@dataclass
Expand All @@ -123,33 +150,68 @@ class RunResultsMetadata(BaseArtifactMetadata):


@dataclass
@schema_version('run-results', 1)
class RunResultsArtifact(
class RunResultOutput(BaseResult):
unique_id: str


def process_run_result(result: RunResult) -> RunResultOutput:
return RunResultOutput(
unique_id=result.node.unique_id,
status=result.status,
timing=result.timing,
thread_id=result.thread_id,
execution_time=result.execution_time,
message=result.message,
)


@dataclass
class RunExecutionResult(
ExecutionResult,
ArtifactMixin,
):
results: Sequence[RunResult]
args: Dict[str, Any] = field(default_factory=dict)
generated_at: datetime = field(default_factory=datetime.utcnow)

def write(self, path: str):
writable = RunResultsArtifact.from_execution_results(
results=self.results,
elapsed_time=self.elapsed_time,
generated_at=self.generated_at,
args=self.args,
)
writable.write(path)


@dataclass
@schema_version('run-results', 1)
class RunResultsArtifact(ExecutionResult, ArtifactMixin):
results: Sequence[RunResultOutput]
args: Dict[str, Any] = field(default_factory=dict)

@classmethod
def from_node_results(
def from_execution_results(
cls,
results: Sequence[RunResult],
elapsed_time: float,
generated_at: datetime,
args: Dict,
):
processed_results = [process_run_result(result) for result in results]
meta = RunResultsMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
)
return cls(
metadata=meta,
results=results,
results=processed_results,
elapsed_time=elapsed_time,
args=args
)

def write(self, path: str, omit_none=False):
write_json(path, self.to_dict(omit_none=omit_none))


@dataclass
class RunOperationResult(ExecutionResult):
Expand All @@ -174,7 +236,7 @@ def from_success(
elapsed_time: float,
generated_at: datetime,
):
meta = RunResultsMetadata(
meta = RunOperationResultMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
)
Expand All @@ -186,45 +248,30 @@ def from_success(
)


@dataclass
class SourceFreshnessResultMixin(JsonSchemaMixin):
max_loaded_at: datetime
snapshotted_at: datetime
age: float


# due to issues with typing.Union collapsing subclasses, this can't subclass
# PartialResult
@dataclass
class SourceFreshnessResult(BaseResult, Writable, SourceFreshnessResultMixin):
class SourceFreshnessResult(NodeResult, Writable):
node: ParsedSourceDefinition
status: FreshnessStatus = FreshnessStatus.Pass

def __post_init__(self):
self.fail = self.status == 'error'

@property
def warned(self):
return self.status == 'warn'
status: FreshnessStatus
max_loaded_at: datetime
snapshotted_at: datetime
age: float

@property
def skipped(self):
return False


def _copykeys(src, keys, **updates):
return {k: getattr(src, k) for k in keys}


class FreshnessErrorEnum(StrEnum):
runtime_error = 'runtime error'


@dataclass
class SourceFreshnessRuntimeError(JsonSchemaMixin):
unique_id: str
error: str
state: FreshnessErrorEnum
error: Optional[Union[str, int]]
status: FreshnessErrorEnum


@dataclass
Expand All @@ -233,23 +280,29 @@ class SourceFreshnessOutput(JsonSchemaMixin):
max_loaded_at: datetime
snapshotted_at: datetime
max_loaded_at_time_ago_in_s: float
state: FreshnessStatus
status: FreshnessStatus
criteria: FreshnessThreshold


FreshnessNodeResult = Union[PartialResult, SourceFreshnessResult]
@dataclass
class PartialSourceFreshnessResult(PartialNodeResult):
status: FreshnessStatus


FreshnessNodeResult = Union[PartialSourceFreshnessResult,
SourceFreshnessResult]
FreshnessNodeOutput = Union[SourceFreshnessRuntimeError, SourceFreshnessOutput]


def process_freshness_result(
result: FreshnessNodeResult
) -> FreshnessNodeOutput:
unique_id = result.node.unique_id
if result.error is not None:
if result.status == FreshnessStatus.RuntimeErr:
return SourceFreshnessRuntimeError(
unique_id=unique_id,
error=result.error,
state=FreshnessErrorEnum.runtime_error,
error=result.message,
status=FreshnessErrorEnum.runtime_error,
)

# we know that this must be a SourceFreshnessResult
Expand All @@ -271,7 +324,7 @@ def process_freshness_result(
max_loaded_at=result.max_loaded_at,
snapshotted_at=result.snapshotted_at,
max_loaded_at_time_ago_in_s=result.age,
state=result.status,
status=result.status,
criteria=criteria,
)

Expand Down
Loading