Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

feat(fal): offer run_result for models in context #588

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
extra += f"target profile: {context.target.profile_name}\n"
extra += f"target database: {context.target.database}\n"

extra += f"run result: {context.current_model.run_result}\n"

response = context.current_model.adapter_response
assert response

Expand Down
2 changes: 2 additions & 0 deletions src/fal/fal_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class CurrentModel:
columns: Dict[str, ColumnInfo]
tests: List[Any]
meta: Dict[Any, Any]
run_result: Optional[Any]
adapter_response: Optional[CurrentAdapterResponse]


Expand Down Expand Up @@ -272,6 +273,7 @@ def _build_script_context(self) -> Context:
columns=model.columns,
tests=tests,
meta=meta,
run_result=model.run_result,
adapter_response=current_adapter_response,
)

Expand Down
2 changes: 1 addition & 1 deletion src/fal/packages/isolated_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _get_shell_bootstrap() -> str:
)


def _fal_main() -> None:
def _fal_main():
from faldbt.logger import LOGGER
from fal.packages import bridge

Expand Down
15 changes: 7 additions & 8 deletions src/fal/planner/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ def _mark_dbt_nodes_status_and_response(
fal_dbt: FalDbt,
status: NodeStatus,
dbt_node: Optional[str] = None,
adapter_response: Optional[dict] = None,
run_result: Optional[dict] = None,
):
for model in fal_dbt.models:
if dbt_node is not None:
if model.unique_id == dbt_node:
model.status = status
model.run_result = run_result

if adapter_response is not None:
model.adapter_response = adapter_response
if run_result is not None:
model.adapter_response = run_result.get('adapter_response')
else:
model.status = status

Expand All @@ -90,9 +91,7 @@ def _map_cli_output_model_results(
if not result.get("unique_id") or not result.get("status"):
continue

yield result["unique_id"], NodeStatus(result["status"]), result.get(
"adapter_response"
)
yield result["unique_id"], NodeStatus(result["status"]), result


def _run_script(script: FalScript) -> Dict[str, Any]:
Expand Down Expand Up @@ -169,10 +168,10 @@ def execute(self, args: argparse.Namespace, fal_dbt: FalDbt) -> int:
args, model_names, fal_dbt.target_path, self.run_index
)

for node, status, adapter_response in _map_cli_output_model_results(
for node, status, result in _map_cli_output_model_results(
output.run_results
):
_mark_dbt_nodes_status_and_response(fal_dbt, status, node, adapter_response)
_mark_dbt_nodes_status_and_response(fal_dbt, status, node, result)

return output.return_code

Expand Down
3 changes: 3 additions & 0 deletions src/faldbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ class DbtModel(_DbtTestableNode):

_adapter_response: Optional[AdapterResponse] = field(default=None)

run_result = Optional[Any]

def __repr__(self):
attrs = ["name", "alias", "unique_id", "columns", "tests", "status"]
props = ", ".join([f"{item}={repr(getattr(self, item))}" for item in attrs])
Expand Down Expand Up @@ -382,6 +384,7 @@ def _map_nodes(
if result:
model.status = result.status.value
model.adapter_response = result.adapter_response
model.run_result = result

models.append(model)

Expand Down