Skip to content

Commit

Permalink
fix MicrobatchExecutionDebug message (#11071) (#11120)
Browse files Browse the repository at this point in the history
* fix MicrobatchExecutionDebug message

* Fix typing in `describe_batch` to convince mypy `batch_start` exists when needed

---------

Co-authored-by: Quigley Malcolm <quigley.malcolm@dbtlabs.com>
(cherry picked from commit fc6167a)

Co-authored-by: Michelle Ark <MichelleArk@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and MichelleArk authored Dec 10, 2024
1 parent f7ab487 commit ff50d1d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241209-113806.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix debug log messages for microbatch batch execution information
time: 2024-12-09T11:38:06.972743-06:00
custom:
Author: MichelleArk QMalcolm
Issue: "11111"
22 changes: 16 additions & 6 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,21 @@ def set_relation_exists(self, relation_exists: bool) -> None:
def set_batches(self, batches: Dict[int, BatchType]) -> None:
self.batches = batches

@property
def batch_start(self) -> Optional[datetime]:
if self.batch_idx is None:
return None
else:
return self.batches[self.batch_idx][0]

def describe_node(self) -> str:
return f"{self.node.language} microbatch model {self.get_node_representation()}"

def describe_batch(self, batch_start: datetime) -> str:
def describe_batch(self) -> str:
batch_start = self.batch_start
if batch_start is None:
return ""

# Only visualize date if batch_start year/month/day
formatted_batch_start = MicrobatchBuilder.format_batch_start(
batch_start, self.node.config.batch_size
Expand All @@ -393,8 +404,7 @@ def print_batch_result_line(
if self.batch_idx is None:
return

batch_start = self.batches[self.batch_idx][0]
description = self.describe_batch(batch_start)
description = self.describe_batch()
group = group_lookup.get(self.node.unique_id)
if result.status == NodeStatus.Error:
status = result.status
Expand Down Expand Up @@ -426,7 +436,7 @@ def print_batch_start_line(self) -> None:
if batch_start is None:
return

batch_description = self.describe_batch(batch_start)
batch_description = self.describe_batch()
fire_event(
LogStartBatch(
description=batch_description,
Expand Down Expand Up @@ -828,14 +838,14 @@ def _submit_batch(
if not force_sequential_run and batch_runner.should_run_in_parallel():
fire_event(
MicrobatchExecutionDebug(
msg=f"{batch_runner.describe_batch} is being run concurrently"
msg=f"{batch_runner.describe_batch()} is being run concurrently"
)
)
self._submit(pool, [batch_runner], batch_results.append)
else:
fire_event(
MicrobatchExecutionDebug(
msg=f"{batch_runner.describe_batch} is being run sequentially"
msg=f"{batch_runner.describe_batch()} is being run sequentially"
)
)
batch_results.append(self.call_runner(batch_runner))
Expand Down

0 comments on commit ff50d1d

Please sign in to comment.