Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Macro resolver remove lazy loading #9232

77 changes: 27 additions & 50 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Mapping,
Expand All @@ -28,6 +27,7 @@
ConstraintType,
ModelLevelConstraint,
)
from dbt.adapters.contracts.macros import MacroResolver

import agate
import pytz
Expand Down Expand Up @@ -62,7 +62,7 @@
Integer,
)
from dbt.common.clients.jinja import CallableMacroGenerator
from dbt.contracts.graph.manifest import Manifest, MacroManifest
from dbt.contracts.graph.manifest import Manifest
from dbt.common.events.functions import fire_event, warn_or_error
from dbt.adapters.events.types import (
CacheMiss,
Expand Down Expand Up @@ -254,7 +254,20 @@
self.config = config
self.cache = RelationsCache(log_cache_events=config.log_cache_events)
self.connections = self.ConnectionManager(config, mp_context)
self._macro_manifest_lazy: Optional[MacroManifest] = None
self._macro_resolver: Optional[MacroResolver] = None

###
# Methods to set / access a macro resolver
###
def set_macro_resolver(self, macro_resolver: MacroResolver) -> None:
self._macro_resolver = macro_resolver

def get_macro_resolver(self) -> Optional[MacroResolver]:
return self._macro_resolver

def clear_macro_resolver(self) -> None:
if self._macro_resolver is not None:
self._macro_resolver = None

Check warning on line 270 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L269-L270

Added lines #L269 - L270 were not covered by tests

###
# Methods that pass through to the connection manager
Expand Down Expand Up @@ -367,39 +380,6 @@
"""
return cls.ConnectionManager.TYPE

@property
def _macro_manifest(self) -> MacroManifest:
if self._macro_manifest_lazy is None:
return self.load_macro_manifest()
return self._macro_manifest_lazy

def check_macro_manifest(self) -> Optional[MacroManifest]:
"""Return the internal manifest (used for executing macros) if it's
been initialized, otherwise return None.
"""
return self._macro_manifest_lazy

def load_macro_manifest(self, base_macros_only=False) -> MacroManifest:
# base_macros_only is for the test framework
if self._macro_manifest_lazy is None:
# avoid a circular import
from dbt.parser.manifest import ManifestLoader

manifest = ManifestLoader.load_macros(
self.config,
self.connections.set_query_header,
base_macros_only=base_macros_only,
)
# TODO CT-211
self._macro_manifest_lazy = manifest # type: ignore[assignment]
# TODO CT-211
return self._macro_manifest_lazy # type: ignore[return-value]

def clear_macro_manifest(self):
if self._macro_manifest_lazy is not None:
self._macro_manifest_lazy = None

###
# Caching methods
###
def _schema_is_cached(self, database: Optional[str], schema: str) -> bool:
Expand Down Expand Up @@ -975,7 +955,7 @@
:param col_idx: The index into the agate table for the column.
:return: The name of the type in the database
"""
return "integer"

Check warning on line 958 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L958

Added line #L958 was not covered by tests

@classmethod
@abc.abstractmethod
Expand Down Expand Up @@ -1054,11 +1034,10 @@
def execute_macro(
self,
macro_name: str,
manifest: Optional[Manifest] = None,
macro_resolver: Optional[MacroResolver] = None,
project: Optional[str] = None,
context_override: Optional[Dict[str, Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
text_only_columns: Optional[Iterable[str]] = None,
) -> AttrDict:
"""Look macro_name up in the manifest and execute its results.

Expand All @@ -1078,13 +1057,11 @@
if context_override is None:
context_override = {}

if manifest is None:
# TODO CT-211
manifest = self._macro_manifest # type: ignore[assignment]
# TODO CT-211
macro = manifest.find_macro_by_name( # type: ignore[union-attr]
macro_name, self.config.project_name, project
)
resolver = macro_resolver or self._macro_resolver
if resolver is None:
raise DbtInternalError("macro resolver was None when calling execute_macro!")

Check warning on line 1062 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L1062

Added line #L1062 was not covered by tests

macro = resolver.find_macro_by_name(macro_name, self.config.project_name, project)
if macro is None:
if project is None:
package_name = "any package"
Expand All @@ -1104,7 +1081,7 @@
# TODO CT-211
macro=macro,
config=self.config,
manifest=manifest, # type: ignore[arg-type]
manifest=resolver, # type: ignore[arg-type]
package_name=project,
)
macro_context.update(context_override)
Expand Down Expand Up @@ -1140,7 +1117,7 @@
kwargs=kwargs,
# pass in the full manifest so we get any local project
# overrides
manifest=manifest,
macro_resolver=manifest,
)

results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
Expand All @@ -1162,7 +1139,7 @@
kwargs=kwargs,
# pass in the full manifest, so we get any local project
# overrides
manifest=manifest,
macro_resolver=manifest,
)

results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
Expand Down Expand Up @@ -1208,7 +1185,7 @@
return catalogs, exceptions

def row_matches_relation(self, row: agate.Row, relations: Set[BaseRelation]):
pass

Check warning on line 1188 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L1188

Added line #L1188 was not covered by tests

def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]:
with executor(self.config) as tpe:
Expand All @@ -1216,7 +1193,7 @@
schema_map: SchemaSearchMap = self._get_catalog_schemas(manifest)
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue

Check warning on line 1196 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L1196

Added line #L1196 was not covered by tests
name = ".".join([str(info.database), "information_schema"])
fut = tpe.submit_connected(
self, name, self._get_one_catalog, info, schemas, manifest
Expand Down Expand Up @@ -1273,7 +1250,7 @@
AttrDict, # current: contains AdapterResponse + agate.Table
agate.Table, # previous: just table
]
result = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, manifest=manifest)
result = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, macro_resolver=manifest)
if isinstance(result, agate.Table):
warn_or_error(CollectFreshnessReturnSignature())
adapter_response = None
Expand Down Expand Up @@ -1310,7 +1287,7 @@
"relations": [source],
}
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME, kwargs=kwargs, manifest=manifest
GET_RELATION_LAST_MODIFIED_MACRO_NAME, kwargs=kwargs, macro_resolver=manifest
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]

Expand Down
11 changes: 11 additions & 0 deletions core/dbt/adapters/contracts/macros.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Optional
from typing_extensions import Protocol

from dbt.common.clients.jinja import MacroProtocol


class MacroResolver(Protocol):
def find_macro_by_name(
self, name: str, root_project_name: str, package: Optional[str]
) -> Optional[MacroProtocol]:
raise NotImplementedError("find_macro_by_name not implemented")

Check warning on line 11 in core/dbt/adapters/contracts/macros.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/contracts/macros.py#L11

Added line #L11 was not covered by tests
10 changes: 10 additions & 0 deletions core/dbt/adapters/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import agate

from dbt.adapters.contracts.connection import Connection, AdapterRequiredConfig, AdapterResponse
from dbt.adapters.contracts.macros import MacroResolver
from dbt.adapters.contracts.relation import Policy, HasQuoting, RelationConfig
from dbt.contracts.graph.model_config import BaseConfig
from dbt.contracts.graph.manifest import Manifest
Expand Down Expand Up @@ -66,6 +67,15 @@
def __init__(self, config: AdapterRequiredConfig) -> None:
...

def set_macro_resolver(self, macro_resolver: MacroResolver) -> None:
...

Check warning on line 71 in core/dbt/adapters/protocol.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/protocol.py#L71

Added line #L71 was not covered by tests

def get_macro_resolver(self) -> Optional[MacroResolver]:
...

Check warning on line 74 in core/dbt/adapters/protocol.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/protocol.py#L74

Added line #L74 was not covered by tests

def clear_macro_resolver(self) -> None:
...

Check warning on line 77 in core/dbt/adapters/protocol.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/protocol.py#L77

Added line #L77 was not covered by tests

@classmethod
def type(cls) -> str:
pass
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing_extensions import Protocol

from dbt.adapters.base.column import Column
from dbt.common.clients.jinja import MacroProtocol
from dbt.adapters.factory import get_adapter, get_adapter_package_names, get_adapter_type_names
from dbt.common.clients import agate_helper
from dbt.clients.jinja import get_rendered, MacroGenerator, MacroStack
Expand Down Expand Up @@ -1355,7 +1356,7 @@ class MacroContext(ProviderContext):

def __init__(
self,
model: Macro,
model: MacroProtocol,
config: RuntimeConfig,
manifest: Manifest,
provider: Provider,
Expand Down Expand Up @@ -1512,7 +1513,7 @@ def generate_runtime_model_context(


def generate_runtime_macro_context(
macro: Macro,
macro: MacroProtocol,
config: RuntimeConfig,
manifest: Manifest,
package_name: Optional[str],
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@
# the config and adapter may be persistent.
if reset:
config.clear_dependencies()
adapter.clear_macro_manifest()
adapter.clear_macro_resolver()

Check warning on line 289 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L289

Added line #L289 was not covered by tests
macro_hook = adapter.connections.set_query_header

flags = get_flags()
Expand Down Expand Up @@ -1000,7 +1000,7 @@

def save_macros_to_adapter(self, adapter):
macro_manifest = MacroManifest(self.manifest.macros)
adapter._macro_manifest_lazy = macro_manifest
adapter.set_macro_resolver(macro_manifest)
# This executes the callable macro_hook and sets the
# query headers
self.macro_hook(macro_manifest)
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def _run_unsafe(self, package_name, macro_name) -> agate.Table:
with adapter.connection_named("macro_{}".format(macro_name)):
adapter.clear_transaction()
res = adapter.execute_macro(
macro_name, project=package_name, kwargs=macro_kwargs, manifest=self.manifest
macro_name, project=package_name, kwargs=macro_kwargs, macro_resolver=self.manifest
)

return res
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def execute(self, compiled_node, manifest):
model_context = generate_runtime_model_context(compiled_node, self.config, manifest)
compiled_node.compiled_code = self.adapter.execute_macro(
macro_name="get_show_sql",
manifest=manifest,
macro_resolver=manifest,
context_override=model_context,
kwargs={
"compiled_code": model_context["compiled_code"],
Expand Down
17 changes: 16 additions & 1 deletion core/dbt/tests/fixtures/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import warnings
import yaml

from dbt.parser.manifest import ManifestLoader
from dbt.common.exceptions import CompilationError, DbtDatabaseError
import dbt.flags as flags
from dbt.config.runtime import RuntimeConfig
Expand Down Expand Up @@ -289,7 +290,13 @@ def adapter(
adapter = get_adapter(runtime_config)
# We only need the base macros, not macros from dependencies, and don't want
# to run 'dbt deps' here.
adapter.load_macro_manifest(base_macros_only=True)
manifest = ManifestLoader.load_macros(
runtime_config,
adapter.connections.set_query_header,
base_macros_only=True,
)

adapter.set_macro_resolver(manifest)
yield adapter
adapter.cleanup_connections()
reset_adapters()
Expand Down Expand Up @@ -450,6 +457,14 @@ def create_test_schema(self, schema_name=None):

# Drop the unique test schema, usually called in test cleanup
def drop_test_schema(self):
if self.adapter.get_macro_resolver() is None:
manifest = ManifestLoader.load_macros(
self.adapter.config,
self.adapter.connections.set_query_header,
base_macros_only=True,
)
self.adapter.set_macro_resolver(manifest)

with get_connection(self.adapter):
for schema_name in self.created_schemas:
relation = self.adapter.Relation.create(database=self.database, schema=schema_name)
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_postgres_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,9 @@ def _mock_state_check(self):

self.psycopg2.connect.return_value = self.handle
self.adapter = PostgresAdapter(self.config, self.mp_context)
self.adapter._macro_manifest_lazy = load_internal_manifest_macros(self.config)
self.adapter.set_macro_resolver(load_internal_manifest_macros(self.config))
self.adapter.connections.query_header = MacroQueryStringSetter(
self.config, self.adapter._macro_manifest_lazy
self.config, self.adapter.get_macro_resolver()
)

self.qh_patch = mock.patch.object(self.adapter.connections.query_header, "add")
Expand Down
Loading