Skip to content

Commit

Permalink
fix #7502: write run_results.json for run operation
Browse files Browse the repository at this point in the history
  • Loading branch information
aranke committed May 17, 2023
1 parent f6e5582 commit a500e37
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 46 deletions.
4 changes: 1 addition & 3 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from dbt.contracts.results import (
CatalogArtifact,
RunExecutionResult,
RunOperationResultsArtifact,
)
from dbt.events.base_types import EventMsg
from dbt.task.build import BuildTask
Expand Down Expand Up @@ -53,8 +52,7 @@ class dbtRunnerResult:
List[str], # list/ls
Manifest, # parse
None, # clean, deps, init, source
RunExecutionResult, # build, compile, run, seed, snapshot, test
RunOperationResultsArtifact, # run-operation
RunExecutionResult, # build, compile, run, seed, snapshot, test, run-operation
] = None


Expand Down
34 changes: 0 additions & 34 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,40 +247,6 @@ def write(self, path: str):
write_json(path, self.to_dict(omit_none=False))


@dataclass
class RunOperationResult(ExecutionResult):
success: bool


@dataclass
class RunOperationResultMetadata(BaseArtifactMetadata):
dbt_schema_version: str = field(
default_factory=lambda: str(RunOperationResultsArtifact.dbt_schema_version)
)


@dataclass
@schema_version("run-operation-result", 1)
class RunOperationResultsArtifact(RunOperationResult, ArtifactMixin):
@classmethod
def from_success(
cls,
success: bool,
elapsed_time: float,
generated_at: datetime,
):
meta = RunOperationResultMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
)
return cls(
metadata=meta,
results=[],
elapsed_time=elapsed_time,
success=success,
)


# due to issues with typing.Union collapsing subclasses, this can't subclass
# PartialResult

Expand Down
62 changes: 53 additions & 9 deletions core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
from datetime import datetime
import os
import threading
import traceback
from datetime import datetime

import agate

from .base import ConfiguredTask

import dbt.exceptions
from dbt.adapters.factory import get_adapter
from dbt.contracts.results import RunOperationResultsArtifact
from dbt.contracts.files import FileHash
from dbt.contracts.graph.nodes import HookNode
from dbt.contracts.results import RunResultsArtifact, RunResult, RunStatus, TimingInfo
from dbt.events.functions import fire_event
from dbt.events.types import (
RunningOperationCaughtError,
RunningOperationUncaughtError,
LogDebugStackTrace,
)
from dbt.node_types import NodeType
from .base import ConfiguredTask

RESULT_FILE_NAME = "run_results.json"


class RunOperationTask(ConfiguredTask):
Expand All @@ -22,10 +28,13 @@ def _get_macro_parts(self):
if "." in macro_name:
package_name, macro_name = macro_name.split(".", 1)
else:
package_name = None
package_name = self.config.project_name

return package_name, macro_name

def result_path(self):
return os.path.join(self.config.target_path, RESULT_FILE_NAME)

def _run_unsafe(self) -> agate.Table:
adapter = get_adapter(self.config)

Expand All @@ -40,7 +49,7 @@ def _run_unsafe(self) -> agate.Table:

return res

def run(self) -> RunOperationResultsArtifact:
def run(self) -> RunResultsArtifact:
start = datetime.utcnow()
self.compile_manifest()
try:
Expand All @@ -56,11 +65,46 @@ def run(self) -> RunOperationResultsArtifact:
else:
success = True
end = datetime.utcnow()
return RunOperationResultsArtifact.from_success(

package_name, macro_name = self._get_macro_parts()
fqn = [NodeType.Operation, package_name, macro_name]
unique_id = ".".join(fqn)

run_result = RunResult(
adapter_response={},
status=RunStatus.Success if success else RunStatus.Error,
execution_time=(end - start).total_seconds(),
failures=0 if success else 1,
message=None,
node=HookNode(
alias=macro_name,
checksum=FileHash.from_contents(unique_id),
database=self.config.credentials.database,
schema=self.config.credentials.schema,
resource_type=NodeType.Operation,
fqn=fqn,
name=macro_name,
unique_id=unique_id,
package_name=package_name,
path="",
original_file_path="",
),
thread_id=threading.current_thread().name,
timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)],
)

results = RunResultsArtifact.from_execution_results(
generated_at=end,
elapsed_time=(end - start).total_seconds(),
success=success,
args={
k: v
for k, v in self.args.__dict__.items()
if k.islower() and type(v) in (str, int, float, bool, list, dict)
},
results=[run_result],
)
results.write(self.result_path())
return results

def interpret_results(self, results):
return results.success
return results.results[0].status == RunStatus.Success

0 comments on commit a500e37

Please sign in to comment.