Skip to content

Commit

Permalink
Merge pull request #1912 from fishtown-analytics/fix/logging-tweaks-2
Browse files Browse the repository at this point in the history
Tweak rpc logging settings
  • Loading branch information
drewbanin authored Nov 12, 2019
2 parents c26cf19 + 0c6bc3e commit f7a9260
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 116 deletions.
20 changes: 13 additions & 7 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class KillParameters(RPCParameters):
@dataclass
class PollParameters(RPCParameters):
request_token: TaskID
logs: bool = False
logs: bool = True
logs_start: int = 0


Expand Down Expand Up @@ -349,9 +349,10 @@ def from_result(
base: RemoteEmptyResult,
tags: TaskTags,
timing: TaskTiming,
logs: List[LogMessage],
) -> 'PollRemoteEmptyCompleteResult':
return cls(
logs=base.logs,
logs=logs,
tags=tags,
state=timing.state,
start=timing.start,
Expand Down Expand Up @@ -380,12 +381,13 @@ def from_result(
base: RemoteExecutionResult,
tags: TaskTags,
timing: TaskTiming,
logs: List[LogMessage],
) -> 'PollExecuteCompleteResult':
return cls(
results=base.results,
generated_at=base.generated_at,
elapsed_time=base.elapsed_time,
logs=base.logs,
logs=logs,
tags=tags,
state=timing.state,
start=timing.start,
Expand All @@ -407,13 +409,14 @@ def from_result(
base: RemoteCompileResult,
tags: TaskTags,
timing: TaskTiming,
logs: List[LogMessage],
) -> 'PollCompileCompleteResult':
return cls(
raw_sql=base.raw_sql,
compiled_sql=base.compiled_sql,
node=base.node,
timing=base.timing,
logs=base.logs,
logs=logs,
tags=tags,
state=timing.state,
start=timing.start,
Expand All @@ -435,13 +438,14 @@ def from_result(
base: RemoteRunResult,
tags: TaskTags,
timing: TaskTiming,
logs: List[LogMessage],
) -> 'PollRunCompleteResult':
return cls(
raw_sql=base.raw_sql,
compiled_sql=base.compiled_sql,
node=base.node,
timing=base.timing,
logs=base.logs,
logs=logs,
table=base.table,
tags=tags,
state=timing.state,
Expand All @@ -464,10 +468,11 @@ def from_result(
base: RemoteRunOperationResult,
tags: TaskTags,
timing: TaskTiming,
logs: List[LogMessage],
) -> 'PollRunOperationCompleteResult':
return cls(
success=base.success,
logs=base.logs,
logs=logs,
tags=tags,
state=timing.state,
start=timing.start,
Expand All @@ -489,12 +494,13 @@ def from_result(
base: RemoteCatalogResults,
tags: TaskTags,
timing: TaskTiming,
logs: List[LogMessage],
) -> 'PollCatalogCompleteResult':
return cls(
nodes=base.nodes,
generated_at=base.generated_at,
_compile_results=base._compile_results,
logs=base.logs,
logs=logs,
tags=tags,
state=timing.state,
start=timing.start,
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/rpc/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def handle_request(self) -> PSResult:


def poll_complete(
timing: TaskTiming, result: Any, tags: TaskTags
timing: TaskTiming, result: Any, tags: TaskTags, logs: List[LogMessage]
) -> PollResult:
if timing.state not in (TaskHandlerState.Success, TaskHandlerState.Failed):
raise dbt.exceptions.InternalException(
Expand Down Expand Up @@ -163,7 +163,7 @@ def poll_complete(
raise dbt.exceptions.InternalException(
'got invalid result in poll_complete: {}'.format(result)
)
return cls.from_result(result, tags, timing)
return cls.from_result(result, tags, timing, logs)


class Poll(RemoteBuiltinMethod[PollParameters, PollResult]):
Expand Down Expand Up @@ -224,6 +224,7 @@ def handle_request(self) -> PollResult:
timing=timing,
result=task.result,
tags=task.tags,
logs=task_logs
)
elif state == TaskHandlerState.Killed:
return PollKilledResult(
Expand Down
31 changes: 15 additions & 16 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,25 +125,23 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context):

hook_text = '{}.{}.{}'.format(hook.package_name, hook_type,
hook.index)
print_hook_start_line(hook_text, idx, num_hooks)

hook_meta_ctx = HookMetadata(hook, self.index_offset(idx))
uid_ctx = UniqueID(hook.unique_id)
with uid_ctx, hook_meta_ctx, startctx:
logger.debug('on-run-hook starting')
status = 'OK'
with UniqueID(hook.unique_id):
with hook_meta_ctx, startctx:
print_hook_start_line(hook_text, idx, num_hooks)

with Timer() as timer:
if len(sql.strip()) > 0:
status, _ = adapter.execute(sql, auto_begin=False,
fetch=False)
self.ran_hooks.append(hook)
status = 'OK'

with uid_ctx, finishctx, DbtModelState({'node_status': status}):
logger.debug('on-run-hook complete')
with Timer() as timer:
if len(sql.strip()) > 0:
status, _ = adapter.execute(sql, auto_begin=False,
fetch=False)
self.ran_hooks.append(hook)

print_hook_end_line(hook_text, status, idx, num_hooks,
timer.elapsed)
with finishctx, DbtModelState({'node_status': 'passed'}):
print_hook_end_line(
hook_text, status, idx, num_hooks, timer.elapsed
)

self._total_executed += len(ordered_hooks)

Expand Down Expand Up @@ -187,12 +185,13 @@ def after_run(self, adapter, results):
r.node.schema for r in results
if not any((r.error is not None, r.fail, r.skipped))
))

self._total_executed += len(results)
with adapter.connection_named('master'):
self.safe_run_hooks(adapter, RunHookType.End,
{'schemas': schemas, 'results': results})

def after_hooks(self, adapter, results, elapsed):
self._total_executed += len(results)
self.print_results_line(results, elapsed)

def build_query(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def project_config(self):
'macro-paths': ['macros'],
}

def test_pickle(self, agate_table):
def do_test_pickle(self, agate_table):
table = {
'column_names': list(agate_table.column_names),
'rows': [list(row) for row in agate_table]
Expand All @@ -36,7 +36,7 @@ def do_test_file(self, filename):
self.assertTrue(len(table.columns) > 0, "agate table had no columns")
self.assertTrue(len(table.rows) > 0, "agate table had no rows")

self.test_pickle(table)
self.do_test_pickle(table)

@use_profile('bigquery')
def test__bigquery_fetch_and_serialize(self):
Expand Down
Loading

0 comments on commit f7a9260

Please sign in to comment.