Skip to content

Commit

Permalink
Split out model vs microbatch execution (#10737)
Browse files Browse the repository at this point in the history
  • Loading branch information
QMalcolm authored and peterallenwebb committed Sep 20, 2024
1 parent 10464c1 commit a854272
Showing 1 changed file with 54 additions and 22 deletions.
76 changes: 54 additions & 22 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,48 @@ def _materialization_relations(self, result: Any, model) -> List[BaseRelation]:
)
raise CompilationError(msg, node=model)

def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
try:
result = MacroGenerator(
materialization_macro, context, stack=context["context_macro_stack"]
)()
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))

return self._build_run_model_result(model, context)

def _execute_microbatch_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
batch_results = None
try:
batch_results = self._execute_microbatch_materialization(
model, manifest, context, materialization_macro
)
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

if batch_results is not None:
return self._build_run_microbatch_model_result(model, batch_results)
else:
return self._build_run_model_result(model, context)

def execute(self, model, manifest):
context = generate_runtime_model_context(model, self.config, manifest)

Expand Down Expand Up @@ -378,29 +420,19 @@ def execute(self, model, manifest):
)

hook_ctx = self.adapter.pre_model_hook(context_config)
batch_results = None
try:
if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and model.config.materialized == "incremental"
and model.config.incremental_strategy == "microbatch"
):
batch_results = self._execute_microbatch_materialization(
model, manifest, context, materialization_macro
)
else:
result = MacroGenerator(
materialization_macro, context, stack=context["context_macro_stack"]
)()
for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

if batch_results:
return self._build_run_microbatch_model_result(model, batch_results)

return self._build_run_model_result(model, context)
if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and model.config.materialized == "incremental"
and model.config.incremental_strategy == "microbatch"
):
return self._execute_microbatch_model(
hook_ctx, context_config, model, manifest, context, materialization_macro
)
else:
return self._execute_model(
hook_ctx, context_config, model, context, materialization_macro
)

def _execute_microbatch_materialization(
self,
Expand Down

0 comments on commit a854272

Please sign in to comment.