Skip to content

Commit

Permalink
Merge pull request #1242 from fishtown-analytics/add-node-level-timin…
Browse files Browse the repository at this point in the history
…g-info

add node level timing info to run_results.json and to tracking
  • Loading branch information
cmcarthur authored Feb 1, 2019
2 parents da40954 + 843d342 commit 314b453
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 14 deletions.
85 changes: 81 additions & 4 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,57 @@
from dbt.api.object import APIObject
from dbt.utils import deep_merge
from dbt.utils import deep_merge, timestring
from dbt.contracts.common import named_property
from dbt.contracts.graph.manifest import COMPILE_RESULT_NODE_CONTRACT
from dbt.contracts.graph.parsed import PARSED_NODE_CONTRACT
from dbt.contracts.graph.compiled import COMPILED_NODE_CONTRACT
from dbt.contracts.graph.manifest import PARSED_MANIFEST_CONTRACT


TIMING_INFO_CONTRACT = {
'type': 'object',
'properties': {
'name': {
'type': 'string',
},
'started_at': {
'type': 'string',
'format': 'date-time',
},
'completed_at': {
'type': 'string',
'format': 'date-time',
},
}
}


class TimingInfo(APIObject):

SCHEMA = TIMING_INFO_CONTRACT

@classmethod
def create(cls, name):
return cls(name=name)

def begin(self):
self.set('started_at', timestring())

def end(self):
self.set('completed_at', timestring())


class collect_timing_info:
def __init__(self, name):
self.timing_info = TimingInfo.create(name)

def __enter__(self):
self.timing_info.begin()
return self.timing_info

def __exit__(self, exc_type, exc_value, traceback):
self.timing_info.end()


RUN_MODEL_RESULT_CONTRACT = {
'type': 'object',
'additionalProperties': False,
Expand Down Expand Up @@ -33,6 +79,14 @@
'type': 'number',
'description': 'The execution time, in seconds',
},
'timing': {
'type': 'array',
'items': TIMING_INFO_CONTRACT,
},
'thread_id': {
'type': ['string', 'null'],
'description': 'ID of the executing thread, e.g. Thread-3',
},
'node': COMPILE_RESULT_NODE_CONTRACT,
},
'required': ['node'],
Expand All @@ -44,9 +98,16 @@ class RunModelResult(APIObject):

def __init__(self, node, error=None, skip=False, status=None, failed=None,
execution_time=0):
super(RunModelResult, self).__init__(node=node, error=error, skip=skip,
status=status, fail=failed,
execution_time=execution_time)
super(RunModelResult, self).__init__(
node=node,
error=error,
skip=skip,
status=status,
fail=failed,
execution_time=execution_time,
thread_id=None,
timing=[],
)

# these all get set after the fact, generally
error = named_property('error',
Expand All @@ -56,6 +117,14 @@ def __init__(self, node, error=None, skip=False, status=None, failed=None,
status = named_property('status', 'The status of the model execution')
execution_time = named_property('execution_time',
'The time in seconds to execute the model')
thread_id = named_property(
'thread_id',
'ID of the executing thread, e.g. Thread-3'
)
timing = named_property(
'timing',
'List of TimingInfo objects'
)

@property
def errored(self):
Expand All @@ -74,6 +143,14 @@ def serialize(self):
result['node'] = self.node.serialize()
return result

def add_timing_info(self, timing_info):
self.set(
'timing',
self.get('timing', []) + [timing_info.serialize()],
)

return self


EXECUTION_RESULT_CONTRACT = {
'type': 'object',
Expand Down
28 changes: 21 additions & 7 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.exceptions import NotImplementedException
from dbt.utils import get_nodes_by_tags
from dbt.node_types import NodeType, RunHookType
from dbt.adapters.factory import get_adapter
from dbt.contracts.results import RunModelResult
from dbt.contracts.results import RunModelResult, collect_timing_info

import dbt.clients.jinja
import dbt.context.runtime
Expand All @@ -17,6 +16,7 @@

import six
import sys
import threading
import time
import traceback

Expand All @@ -39,6 +39,7 @@ def track_model_run(index, num_nodes, run_model_result):
"model_materialization": dbt.utils.get_materialization(run_model_result.node), # noqa
"model_id": dbt.utils.get_hash(run_model_result.node),
"hashed_contents": dbt.utils.get_hashed_contents(run_model_result.node), # noqa
"timing": run_model_result.timing,
})


Expand Down Expand Up @@ -78,14 +79,26 @@ def safe_run(self, manifest):
started = time.time()

try:
# if we fail here, we still have a compiled node to return
# this has the benefit of showing a build path for the errant model
compiled_node = self.compile(manifest)
result.node = compiled_node
timing = []

with collect_timing_info('compile') as timing_info:
# if we fail here, we still have a compiled node to return
# this has the benefit of showing a build path for the errant
# model
compiled_node = self.compile(manifest)
result.node = compiled_node

timing.append(timing_info)

# for ephemeral nodes, we only want to compile, not run
if not self.is_ephemeral_model(self.node):
result = self.run(compiled_node, manifest)
with collect_timing_info('execute') as timing_info:
result = self.run(compiled_node, manifest)

timing.append(timing_info)

for item in timing:
result = result.add_timing_info(item)

except catchable_errors as e:
if e.node is None:
Expand Down Expand Up @@ -130,6 +143,7 @@ def safe_run(self, manifest):
result.status = 'ERROR'

result.execution_time = time.time() - started
result.thread_id = threading.current_thread().name
return result

def _safe_release_connection(self):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

INVOCATION_SPEC = 'iglu:com.dbt/invocation/jsonschema/1-0-0'
PLATFORM_SPEC = 'iglu:com.dbt/platform/jsonschema/1-0-0'
RUN_MODEL_SPEC = 'iglu:com.dbt/run_model/jsonschema/1-0-0'
RUN_MODEL_SPEC = 'iglu:com.dbt/run_model/jsonschema/1-0-1'
INVOCATION_ENV_SPEC = 'iglu:com.dbt/invocation_env/jsonschema/1-0-0'
PACKAGE_INSTALL_SPEC = 'iglu:com.dbt/package_install/jsonschema/1-0-0'

Expand Down
16 changes: 16 additions & 0 deletions test/integration/029_docs_generate_tests/test_docs_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -1857,6 +1857,8 @@ def expected_run_results(self, quote_schema=True, quote_model=False,
'unique_id': 'model.test.model',
'wrapped_sql': 'None'
},
'thread_id': ANY,
'timing': [ANY, ANY],
'skip': False,
'status': None,
},
Expand Down Expand Up @@ -1904,6 +1906,8 @@ def expected_run_results(self, quote_schema=True, quote_model=False,
'unique_id': 'seed.test.seed',
'wrapped_sql': 'None'
},
'thread_id': ANY,
'timing': [ANY, ANY],
'skip': False,
'status': None,
},
Expand Down Expand Up @@ -1950,6 +1954,8 @@ def expected_run_results(self, quote_schema=True, quote_model=False,
'unique_id': 'test.test.not_null_model_id',
'wrapped_sql': AnyStringWith('id is null')
},
'thread_id': ANY,
'timing': [ANY, ANY],
'skip': False,
'status': None,
},
Expand Down Expand Up @@ -1995,6 +2001,8 @@ def expected_run_results(self, quote_schema=True, quote_model=False,
'unique_id': 'test.test.nothing_model_',
'wrapped_sql': AnyStringWith('select 0'),
},
'thread_id': ANY,
'timing': [ANY, ANY],
'skip': False,
'status': None
},
Expand Down Expand Up @@ -2041,6 +2049,8 @@ def expected_run_results(self, quote_schema=True, quote_model=False,
'unique_id': 'test.test.unique_model_id',
'wrapped_sql': AnyStringWith('count(*)')
},
'thread_id': ANY,
'timing': [ANY, ANY],
'skip': False,
'status': None,
},
Expand Down Expand Up @@ -2153,6 +2163,8 @@ def expected_postgres_references_run_results(self):
'unique_id': 'model.test.ephemeral_summary',
'wrapped_sql': 'None',
},
'thread_id': ANY,
'timing': [ANY, ANY],
'skip': False,
'status': None,
},
Expand Down Expand Up @@ -2237,6 +2249,8 @@ def expected_postgres_references_run_results(self):
'unique_id': 'model.test.view_summary',
'wrapped_sql': 'None',
},
'thread_id': ANY,
'timing': [ANY, ANY],
'skip': False,
'status': None,
},
Expand Down Expand Up @@ -2284,6 +2298,8 @@ def expected_postgres_references_run_results(self):
'unique_id': 'seed.test.seed',
'wrapped_sql': 'None'
},
'thread_id': ANY,
'timing': [ANY, ANY],
'skip': False,
'status': None,
},
Expand Down
13 changes: 11 additions & 2 deletions test/integration/033_event_tracking_test/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,14 @@ def run_context(
status,
error=None
):
timing = []

if status != 'ERROR':
timing = [ANY, ANY]

def populate(project_id, user_id, invocation_id, version):
return [{
'schema': 'iglu:com.dbt/run_model/jsonschema/1-0-0',
'schema': 'iglu:com.dbt/run_model/jsonschema/1-0-1',
'data': {
'invocation_id': invocation_id,

Expand All @@ -159,6 +164,8 @@ def populate(project_id, user_id, invocation_id, version):
'run_status': status,
'run_error': error,
'run_skipped': False,

'timing': timing,
},
}]

Expand Down Expand Up @@ -256,7 +263,7 @@ def test__event_tracking_deps(self):
def test__event_tracking_seed(self):
def seed_context(project_id, user_id, invocation_id, version):
return [{
'schema': 'iglu:com.dbt/run_model/jsonschema/1-0-0',
'schema': 'iglu:com.dbt/run_model/jsonschema/1-0-1',
'data': {
'invocation_id': invocation_id,

Expand All @@ -272,6 +279,8 @@ def seed_context(project_id, user_id, invocation_id, version):
'run_status': 'INSERT 1',
'run_error': None,
'run_skipped': False,

'timing': [ANY, ANY],
},
}]

Expand Down

0 comments on commit 314b453

Please sign in to comment.