Skip to content

Commit

Permalink
Merge branch 'main' into dbeatty/fix-10656
Browse files Browse the repository at this point in the history
  • Loading branch information
dbeatty10 committed Sep 19, 2024
2 parents 98249eb + 3308a43 commit 8186042
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 29 deletions.
76 changes: 54 additions & 22 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,48 @@ def _materialization_relations(self, result: Any, model) -> List[BaseRelation]:
)
raise CompilationError(msg, node=model)

def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
try:
result = MacroGenerator(
materialization_macro, context, stack=context["context_macro_stack"]
)()
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))

return self._build_run_model_result(model, context)

def _execute_microbatch_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
batch_results = None
try:
batch_results = self._execute_microbatch_materialization(
model, manifest, context, materialization_macro
)
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

if batch_results is not None:
return self._build_run_microbatch_model_result(model, batch_results)
else:
return self._build_run_model_result(model, context)

def execute(self, model, manifest):
context = generate_runtime_model_context(model, self.config, manifest)

Expand Down Expand Up @@ -378,29 +420,19 @@ def execute(self, model, manifest):
)

hook_ctx = self.adapter.pre_model_hook(context_config)
batch_results = None
try:
if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and model.config.materialized == "incremental"
and model.config.incremental_strategy == "microbatch"
):
batch_results = self._execute_microbatch_materialization(
model, manifest, context, materialization_macro
)
else:
result = MacroGenerator(
materialization_macro, context, stack=context["context_macro_stack"]
)()
for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

if batch_results:
return self._build_run_microbatch_model_result(model, batch_results)

return self._build_run_model_result(model, context)
if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and model.config.materialized == "incremental"
and model.config.incremental_strategy == "microbatch"
):
return self._execute_microbatch_model(
hook_ctx, context_config, model, manifest, context, materialization_macro
)
else:
return self._execute_model(
hook_ctx, context_config, model, context, materialization_macro
)

def _execute_microbatch_materialization(
self,
Expand Down
13 changes: 7 additions & 6 deletions core/dbt/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from snowplow_tracker import Emitter, SelfDescribingJson, Subject, Tracker
from snowplow_tracker import __version__ as snowplow_version # type: ignore
from snowplow_tracker import logger as sp_logger
from snowplow_tracker.events import StructuredEvent

from dbt import version as dbt_version
from dbt.adapters.exceptions import FailedToConnectError
Expand Down Expand Up @@ -217,12 +218,12 @@ def get_dbt_env_context():
def track(user, *args, **kwargs):
if user.do_not_track:
return
else:
fire_event(SendingEvent(kwargs=str(kwargs)))
try:
tracker.track_struct_event(*args, **kwargs)
except Exception:
fire_event(SendEventFailure())

fire_event(SendingEvent(kwargs=str(kwargs)))
try:
tracker.track(StructuredEvent(*args, **kwargs))
except Exception:
fire_event(SendEventFailure())


def track_project_id(options):
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/test_behavior_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def snowplow_tracker(mocker):
add_callback_to_manager(track_behavior_change_warn)

# don't make a call, catch the request
snowplow_tracker = mocker.patch("dbt.tracking.tracker.track_struct_event")
# to avoid confusion, this is snowplow_tracker's track, not our wrapper that's also named track
snowplow_tracker = mocker.patch("dbt.tracking.tracker.track")

yield snowplow_tracker

Expand Down

0 comments on commit 8186042

Please sign in to comment.