Skip to content

Commit

Permalink
first pass at adding query stats, naming tbd
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle Wigley committed Dec 16, 2020
1 parent 454ddc6 commit 0370f26
Show file tree
Hide file tree
Showing 19 changed files with 193 additions and 92 deletions.
9 changes: 5 additions & 4 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from multiprocessing.synchronize import RLock
from threading import get_ident
from typing import (
Dict, Tuple, Hashable, Optional, ContextManager, List
Dict, Tuple, Hashable, Optional, ContextManager, List, Union
)

import agate

import dbt.exceptions
from dbt.contracts.connection import (
Connection, Identifier, ConnectionState, AdapterRequiredConfig, LazyHandle
Connection, Identifier, ConnectionState,
AdapterRequiredConfig, LazyHandle, ExecutionStatus
)
from dbt.contracts.graph.manifest import Manifest
from dbt.adapters.base.query_headers import (
Expand Down Expand Up @@ -290,15 +291,15 @@ def _add_query_comment(self, sql: str) -> str:
@abc.abstractmethod
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[str, agate.Table]:
) -> Tuple[Union[str, ExecutionStatus], agate.Table]:
"""Execute the given SQL.
:param str sql: The sql to execute.
:param bool auto_begin: If set, and dbt is not currently inside a
transaction, automatically begin one.
:param bool fetch: If set, fetch results.
:return: A tuple of the status and the results (empty if fetch=False).
:rtype: Tuple[str, agate.Table]
:rtype: Tuple[Union[str, ExecutionStatus], agate.Table]
"""
raise dbt.exceptions.NotImplementedException(
'`execute` is not implemented for this adapter!'
Expand Down
6 changes: 3 additions & 3 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import filter_null_values, executor

from dbt.adapters.base.connections import Connection
from dbt.adapters.base.connections import Connection, ExecutionStatus
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base.relation import (
ComponentName, BaseRelation, InformationSchema, SchemaSearchMap
Expand Down Expand Up @@ -213,7 +213,7 @@ def connection_for(
@available.parse(lambda *a, **k: ('', empty_table()))
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[str, agate.Table]:
) -> Tuple[Union[str, ExecutionStatus], agate.Table]:
"""Execute the given SQL. This is a thin wrapper around
ConnectionManager.execute.
Expand All @@ -222,7 +222,7 @@ def execute(
transaction, automatically begin one.
:param bool fetch: If set, fetch results.
:return: A tuple of the status and the results (empty if fetch=False).
:rtype: Tuple[str, agate.Table]
:rtype: Tuple[Union[str, ExecutionStatus], agate.Table]
"""
return self.connections.execute(
sql=sql,
Expand Down
6 changes: 4 additions & 2 deletions core/dbt/adapters/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import agate

from dbt.contracts.connection import Connection, AdapterRequiredConfig
from dbt.contracts.connection import (
Connection, AdapterRequiredConfig, ExecutionStatus
)
from dbt.contracts.graph.compiled import (
CompiledNode, ManifestNode, NonSourceCompiledNode
)
Expand Down Expand Up @@ -154,7 +156,7 @@ def commit_if_has_connection(self) -> None:

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[str, agate.Table]:
) -> Tuple[Union[str, ExecutionStatus], agate.Table]:
...

def get_compiler(self) -> Compiler_T:
Expand Down
11 changes: 6 additions & 5 deletions core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import abc
import time
from typing import List, Optional, Tuple, Any, Iterable, Dict
from typing import List, Optional, Tuple, Any, Iterable, Dict, Union

import agate

import dbt.clients.agate_helper
import dbt.exceptions
from dbt.adapters.base import BaseConnectionManager
from dbt.contracts.connection import Connection, ConnectionState
from dbt.contracts.connection import (
Connection, ConnectionState, ExecutionStatus
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt import flags

Expand Down Expand Up @@ -76,7 +78,6 @@ def add_query(

cursor = connection.handle.cursor()
cursor.execute(sql, bindings)

logger.debug(
"SQL status: {status} in {elapsed:0.2f} seconds",
status=self.get_status(cursor),
Expand All @@ -86,7 +87,7 @@ def add_query(
return connection, cursor

@abc.abstractclassmethod
def get_status(cls, cursor: Any) -> str:
def get_status(cls, cursor: Any) -> Union[str, ExecutionStatus]:
"""Get the status of the cursor."""
raise dbt.exceptions.NotImplementedException(
'`get_status` is not implemented for this adapter!'
Expand Down Expand Up @@ -118,7 +119,7 @@ def get_result_from_cursor(cls, cursor: Any) -> agate.Table:

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[str, agate.Table]:
) -> Tuple[Union[str, ExecutionStatus], agate.Table]:
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql, auto_begin)
status = self.get_status(cursor)
Expand Down
17 changes: 17 additions & 0 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .macros import MacroNamespaceBuilder, MacroNamespace
from .manifest import ManifestContext
from dbt.contracts.graph.manifest import Manifest, Disabled
from dbt.contracts.connection import ExecutionStatus
from dbt.contracts.graph.compiled import (
CompiledResource,
CompiledSeedNode,
Expand Down Expand Up @@ -83,6 +84,7 @@ class BaseDatabaseWrapper:
Wrapper for runtime database interaction. Applies the runtime quote policy
via a relation proxy.
"""

def __init__(self, adapter, namespace: MacroNamespace):
self._adapter = adapter
self.Relation = RelationProxy(adapter)
Expand Down Expand Up @@ -379,6 +381,7 @@ class ParseDatabaseWrapper(BaseDatabaseWrapper):
"""The parser subclass of the database wrapper applies any explicit
parse-time overrides.
"""

def __getattr__(self, name):
override = (name in self._adapter._available_ and
name in self._adapter._parse_replacements_)
Expand All @@ -399,6 +402,7 @@ class RuntimeDatabaseWrapper(BaseDatabaseWrapper):
"""The runtime database wrapper exposes everything the adapter marks
available.
"""

def __getattr__(self, name):
if name in self._adapter._available_:
return getattr(self._adapter, name)
Expand Down Expand Up @@ -672,6 +676,18 @@ def store_result(
})
return ''

@contextmember
def store_raw_result(
self,
name: str,
message=Optional[str],
state=Optional[str],
rows=Optional[str],
agate_table: Optional[agate.Table] = None
) -> str:
status = ExecutionStatus(message=message, state=state, rows=rows)
return self.store_result(name, status, agate_table)

@contextproperty
def validation(self):
def validate_any(*args) -> Callable[[T], None]:
Expand Down Expand Up @@ -1179,6 +1195,7 @@ class MacroContext(ProviderContext):
- 'schema' does not use any 'model' information
- they can't be configured with config() directives
"""

def __init__(
self,
model: ParsedMacro,
Expand Down
11 changes: 11 additions & 0 deletions core/dbt/contracts/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
register_pattern(Identifier, r'^[A-Za-z_][A-Za-z0-9_]+$')


@dataclass
class ExecutionStatus(JsonSchemaMixin):
message: str
state: Optional[str] = None
rows: Optional[str] = None

def __str__(self):
return self.message


class ConnectionState(StrEnum):
INIT = 'init'
OPEN = 'open'
Expand Down Expand Up @@ -85,6 +95,7 @@ class LazyHandle:
"""Opener must be a callable that takes a Connection object and opens the
connection, updating the handle on the Connection.
"""

def __init__(self, opener: Callable[[Connection], Connection]):
self.opener = opener

Expand Down
3 changes: 2 additions & 1 deletion core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Union, Any, NewType

PIN_PACKAGE_URL = 'https://docs.getdbt.com/docs/package-management#section-specifying-package-versions' # noqa
PIN_PACKAGE_URL = 'https://docs.getdbt.com/docs/package-management#section-specifying-package-versions' # noqa
DEFAULT_SEND_ANONYMOUS_USAGE_STATS = True


Expand Down Expand Up @@ -142,6 +142,7 @@ class RegistryPackageMetadata(
'sql',
'sql_now',
'store_result',
'store_raw_result',
'target',
'this',
'tojson',
Expand Down
33 changes: 12 additions & 21 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from dbt.utils import lowercase
from hologram.helpers import StrEnum
from hologram import JsonSchemaMixin
from hologram import JsonDict, JsonSchemaMixin

import agate

Expand Down Expand Up @@ -102,22 +102,9 @@ class NodeResult(BaseResult):


@dataclass
class PartialNodeResult(NodeResult):
# if the result got to the point where it could be skipped/failed, we would
# be returning a real result, not a partial.
@property
def skipped(self):
return False


@dataclass
class RunModelResult(NodeResult):
class RunResult(NodeResult):
agate_table: Optional[agate.Table] = None

def to_dict(self, *args, **kwargs):
dct = super().to_dict(*args, **kwargs)
dct.pop('agate_table', None)
return dct
adapter_query_status: JsonDict = field(default_factory=dict)

@property
def skipped(self):
Expand All @@ -139,9 +126,6 @@ def __getitem__(self, idx):
return self.results[idx]


RunResult = Union[PartialNodeResult, RunModelResult]


@dataclass
class RunResultsMetadata(BaseArtifactMetadata):
dbt_schema_version: str = field(
Expand All @@ -152,6 +136,7 @@ class RunResultsMetadata(BaseArtifactMetadata):
@dataclass
class RunResultOutput(BaseResult):
unique_id: str
adapter_query_status: JsonDict = field(default_factory=dict)


def process_run_result(result: RunResult) -> RunResultOutput:
Expand All @@ -162,6 +147,7 @@ def process_run_result(result: RunResult) -> RunResultOutput:
thread_id=result.thread_id,
execution_time=result.execution_time,
message=result.message,
adapter_query_status=result.adapter_query_status
)


Expand Down Expand Up @@ -247,9 +233,10 @@ def from_success(
success=success,
)


# due to issues with typing.Union collapsing subclasses, this can't subclass
# PartialResult


@dataclass
class SourceFreshnessResult(NodeResult, Writable):
node: ParsedSourceDefinition
Expand Down Expand Up @@ -285,9 +272,13 @@ class SourceFreshnessOutput(JsonSchemaMixin):


@dataclass
class PartialSourceFreshnessResult(PartialNodeResult):
class PartialSourceFreshnessResult(NodeResult):
status: FreshnessStatus

@property
def skipped(self):
return False


FreshnessNodeResult = Union[PartialSourceFreshnessResult,
SourceFreshnessResult]
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/include/global_project/macros/core.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
{%- endif -%}
{%- endmacro %}

{% macro noop_statement(name=None, status=None, res=None) -%}
{% macro noop_statement(name=None, message=None, state=None, rows=None, res=None) -%}
{%- set sql = caller() -%}

{%- if name == 'main' -%}
Expand All @@ -24,7 +24,7 @@
{%- endif -%}

{%- if name is not none -%}
{{ store_result(name, status=status, agate_table=res) }}
{{ store_raw_result(name, message=message, state=state, rows=rows, agate_table=res) }}
{%- endif -%}

{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@
{% set create_table_sql = create_csv_table(model, agate_table) %}
{% endif %}

{% set status = 'CREATE' if full_refresh_mode else 'INSERT' %}
{% set state = 'CREATE' if full_refresh_mode else 'INSERT' %}
{% set num_rows = (agate_table.rows | length) %}
{% set sql = load_csv_rows(model, agate_table) %}

{% call noop_statement('main', status ~ ' ' ~ num_rows) %}
{% call noop_statement('main', state ~ ' ' ~ num_rows, state, num_rows) %}
{{ create_table_sql }};
-- dbt seed --
{{ sql }}
Expand Down
Loading

0 comments on commit 0370f26

Please sign in to comment.