Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warn if concurrent_batches config is set to True, but the available adapter doesn't support it #11145

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241212-113611.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Warn about invalid usages of `concurrent_batches` config
time: 2024-12-12T11:36:11.451962-06:00
custom:
Author: QMalcolm
Issue: "11122"
12 changes: 12 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,18 @@ message MicrobatchModelNoEventTimeInputsMsg {
}


// I075
message InvalidConcurrentBatchesConfig {
int32 num_models = 1;
string adapter_type = 2;
}

message InvalidConcurrentBatchesConfigMsg {
CoreEventInfo info = 1;
InvalidConcurrentBatchesConfig data = 2;
}


// M - Deps generation


Expand Down
898 changes: 451 additions & 447 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,16 @@ def message(self) -> str:
return warning_tag(msg)


class InvalidConcurrentBatchesConfig(WarnLevel):
def code(self) -> str:
return "I075"

def message(self) -> str:
maybe_plural_count_of_models = pluralize(self.num_models, "microbatch model")
description = f"Found {maybe_plural_count_of_models} with the `concurrent_batches` config set to true, but the {self.adapter_type} adapter does not support running batches concurrently. Batches will be run sequentially."
return line_wrap_message(warning_tag(description))


# =======================================================
# M - Deps generation
# =======================================================
Expand Down
24 changes: 24 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dbt.utils
import dbt_common.utils
from dbt import plugins
from dbt.adapters.capability import Capability
from dbt.adapters.factory import (
get_adapter,
get_adapter_package_names,
Expand Down Expand Up @@ -66,6 +67,7 @@
ArtifactWritten,
DeprecatedModel,
DeprecatedReference,
InvalidConcurrentBatchesConfig,
InvalidDisabledTargetInTestNode,
MicrobatchModelNoEventTimeInputs,
NodeNotFoundOrDisabled,
Expand Down Expand Up @@ -510,6 +512,7 @@
self.check_for_model_deprecations()
self.check_for_spaces_in_resource_names()
self.check_for_microbatch_deprecations()
self.check_forcing_batch_concurrency()

Check warning on line 515 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L515

Added line #L515 was not covered by tests

return self.manifest

Expand Down Expand Up @@ -1484,6 +1487,27 @@
if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def check_forcing_batch_concurrency(self) -> None:
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
adapter = get_adapter(self.root_project)

if not adapter.supports(Capability.MicrobatchConcurrency):
models_forcing_concurrent_batches = 0
for node in self.manifest.nodes.values():
if (
hasattr(node.config, "concurrent_batches")
and node.config.concurrent_batches is True
):
models_forcing_concurrent_batches += 1

if models_forcing_concurrent_batches > 0:
warn_or_error(
InvalidConcurrentBatchesConfig(
num_models=models_forcing_concurrent_batches,
adapter_type=adapter.type(),
)
)

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
Expand Down
45 changes: 45 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ArtifactWritten,
EndOfRunSummary,
GenericExceptionOnRun,
InvalidConcurrentBatchesConfig,
JinjaLogDebug,
LogBatchResult,
LogModelResult,
Expand Down Expand Up @@ -71,6 +72,11 @@
select * from {{ ref('input_model') }}
"""

microbatch_model_force_concurrent_batches_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), concurrent_batches=true) }}
select * from {{ ref('input_model') }}
"""

microbatch_yearly_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model') }}
Expand Down Expand Up @@ -1083,3 +1089,42 @@ def test_microbatch(

# we had a bug where having only one batch caused a generic exception
assert len(generic_exception_catcher.caught_events) == 0


class TestCanSilenceInvalidConcurrentBatchesConfigWarning(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_force_concurrent_batches_sql,
}

@pytest.fixture
def event_catcher(self) -> EventCatcher:
return EventCatcher(event_to_catch=InvalidConcurrentBatchesConfig) # type: ignore

def test_microbatch(
self,
project,
event_catcher: EventCatcher,
) -> None:
# This test works because postgres doesn't support concurrent batch execution
# If the postgres adapter starts supporting concurrent batch execution we'll
# need to start mocking the return value of `adapter.supports()`

with patch_microbatch_end_time("2020-01-01 13:57:00"):
_ = run_dbt(["run"], callbacks=[event_catcher.catch])
# We didn't silence the warning, so we get it
assert len(event_catcher.caught_events) == 1

# Clear caught events
event_catcher.caught_events = []

# Run again with silencing
with patch_microbatch_end_time("2020-01-01 13:57:00"):
_ = run_dbt(
["run", "--warn-error-options", "{'silence': ['InvalidConcurrentBatchesConfig']}"],
callbacks=[event_catcher.catch],
)
# Because we silenced the warning, it shouldn't get fired
assert len(event_catcher.caught_events) == 0
60 changes: 59 additions & 1 deletion tests/unit/parser/test_manifest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
from argparse import Namespace
from typing import Optional
from unittest.mock import MagicMock, patch

import pytest
from pytest_mock import MockerFixture

from dbt.adapters.postgres import PostgresAdapter
from dbt.artifacts.resources.base import FileHash
from dbt.config import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest, ManifestStateCheck
from dbt.events.types import UnusedResourceConfigPath
from dbt.events.types import InvalidConcurrentBatchesConfig, UnusedResourceConfigPath
from dbt.flags import set_from_args
from dbt.parser.manifest import ManifestLoader, _warn_for_unused_resource_config_paths
from dbt.parser.read_files import FileDiff
from dbt.tracking import User
from dbt_common.events.event_manager_client import add_callback_to_manager
from tests.unit.fixtures import model_node
from tests.utils import EventCatcher


Expand Down Expand Up @@ -238,3 +241,58 @@ def test_warn_for_unused_resource_config_paths(
else:
assert len(catcher.caught_events) == 1
assert f"{resource_type}.{path}" in str(catcher.caught_events[0].data)


class TestCheckForcingConcurrentBatches:
@pytest.fixture
@patch("dbt.parser.manifest.ManifestLoader.build_manifest_state_check")
@patch("dbt.parser.manifest.os.path.exists")
@patch("dbt.parser.manifest.open")
def manifest_loader(
self, patched_open, patched_os_exist, patched_state_check
) -> ManifestLoader:
mock_project = MagicMock(RuntimeConfig)
mock_project.project_target_path = "mock_target_path"
mock_project.project_name = "mock_project_name"
return ManifestLoader(mock_project, {})

@pytest.fixture
def event_catcher(self) -> EventCatcher:
return EventCatcher(InvalidConcurrentBatchesConfig) # type: ignore

@pytest.mark.parametrize(
"adapter_support,concurrent_batches_config,expect_warning",
[
(False, True, True),
(False, False, False),
(False, None, False),
(True, True, False),
(True, False, False),
(True, None, False),
],
)
def test_check_forcing_concurrent_batches(
self,
mocker: MockerFixture,
manifest_loader: ManifestLoader,
postgres_adapter: PostgresAdapter,
event_catcher: EventCatcher,
adapter_support: bool,
concurrent_batches_config: Optional[bool],
expect_warning: bool,
):
add_callback_to_manager(event_catcher.catch)
model = model_node()
model.config.concurrent_batches = concurrent_batches_config
mocker.patch.object(postgres_adapter, "supports").return_value = adapter_support
mocker.patch("dbt.parser.manifest.get_adapter").return_value = postgres_adapter
mocker.patch.object(manifest_loader.manifest, "use_microbatch_batches").return_value = True

manifest_loader.manifest.add_node_nofile(model)
manifest_loader.check_forcing_batch_concurrency()

if expect_warning:
assert len(event_catcher.caught_events) == 1
assert "Batches will be run sequentially" in event_catcher.caught_events[0].info.msg # type: ignore
else:
assert len(event_catcher.caught_events) == 0
1 change: 1 addition & 0 deletions tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def test_event_codes(self):
core_types.FreshnessConfigProblem(msg=""),
core_types.SemanticValidationFailure(msg=""),
core_types.MicrobatchModelNoEventTimeInputs(model_name=""),
core_types.InvalidConcurrentBatchesConfig(num_models=1, adapter_type=""),
# M - Deps generation ======================
core_types.GitSparseCheckoutSubdirectory(subdir=""),
core_types.GitProgressCheckoutRevision(revision=""),
Expand Down
Loading