Skip to content

Commit

Permalink
Merge pull request #2883 from fishtown-analytics/feature/2824-parse-o…
Browse files Browse the repository at this point in the history
…nly-command

Add parse command and collect parse timing info [#2824]
  • Loading branch information
gshank authored Nov 13, 2020
2 parents 3af0202 + 1a8416c commit 13b099f
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 1 deletion.
19 changes: 18 additions & 1 deletion core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import dbt.task.serve as serve_task
import dbt.task.freshness as freshness_task
import dbt.task.run_operation as run_operation_task
import dbt.task.parse as parse_task
from dbt.profiler import profiler
from dbt.task.list import ListTask
from dbt.task.rpc.server import RPCServerTask
Expand Down Expand Up @@ -494,6 +495,21 @@ def _build_compile_subparser(subparsers, base_subparser):
return sub


def _build_parse_subparser(subparsers, base_subparser):
sub = subparsers.add_parser(
'parse',
parents=[base_subparser],
help='''
Parsed the project and provides information on performance
'''
)
sub.set_defaults(cls=parse_task.ParseTask, which='parse',
rpc_method='parse')
sub.add_argument('--write-manifest', action='store_true')
sub.add_argument('--compile', action='store_true')
return sub


def _build_docs_generate_subparser(subparsers, base_subparser):
# it might look like docs_sub is the correct parents entry, but that
# will cause weird errors about 'conflicting option strings'.
Expand Down Expand Up @@ -1006,12 +1022,13 @@ def parse_args(args, cls=DBTArgumentParser):
rpc_sub = _build_rpc_subparser(subs, base_subparser)
run_sub = _build_run_subparser(subs, base_subparser)
compile_sub = _build_compile_subparser(subs, base_subparser)
parse_sub = _build_parse_subparser(subs, base_subparser)
generate_sub = _build_docs_generate_subparser(docs_subs, base_subparser)
test_sub = _build_test_subparser(subs, base_subparser)
seed_sub = _build_seed_subparser(subs, base_subparser)
# --threads, --no-version-check
_add_common_arguments(run_sub, compile_sub, generate_sub, test_sub,
rpc_sub, seed_sub)
rpc_sub, seed_sub, parse_sub)
# --models, --exclude
# list_sub sets up its own arguments.
_add_selection_arguments(run_sub, compile_sub, generate_sub, test_sub)
Expand Down
42 changes: 42 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import (
Dict, Optional, Mapping, Callable, Any, List, Type, Union, MutableMapping
)
import time

import dbt.exceptions
import dbt.flags as flags
Expand Down Expand Up @@ -119,6 +120,10 @@ def __init__(
root_project, all_projects,
)
self._loaded_file_cache: Dict[str, FileBlock] = {}
partial_parse = self._partial_parse_enabled()
self._perf_info: Dict[str, Any] = {
'path_count': 0, 'projects': [],
'is_partial_parse_enabled': partial_parse}

def parse_with_cache(
self,
Expand Down Expand Up @@ -170,9 +175,34 @@ def parse_project(
# per-project cache.
self._loaded_file_cache.clear()

project_info: Dict[str, Any] = {'parsers': []}
start_timer = time.perf_counter()
total_path_count = 0
for parser in parsers:
parser_path_count = 0
parser_start_timer = time.perf_counter()
for path in parser.search():
self.parse_with_cache(path, parser, old_results)
parser_path_count = parser_path_count + 1
if parser_path_count % 100 == 0:
print("..", end='', flush=True)

if parser_path_count > 0:
parser_elapsed = time.perf_counter() - parser_start_timer
project_info['parsers'].append({'parser': type(
parser).__name__, 'path_count': parser_path_count,
'elapsed': '{:.2f}'.format(parser_elapsed)})
total_path_count = total_path_count + parser_path_count
if total_path_count > 100:
print("..")

elapsed = time.perf_counter() - start_timer
project_info['project_name'] = project.project_name
project_info['path_count'] = total_path_count
project_info['elapsed'] = '{:.2f}'.format(elapsed)
self._perf_info['projects'].append(project_info)
self._perf_info['path_count'] = self._perf_info['path_count'] + \
total_path_count

def load_only_macros(self) -> Manifest:
old_results = self.read_parse_results()
Expand All @@ -197,9 +227,12 @@ def load(self, macro_manifest: Manifest):
self.results.macros.update(macro_manifest.macros)
self.results.files.update(macro_manifest.files)

start_timer = time.perf_counter()
for project in self.all_projects.values():
# parse a single project
self.parse_project(project, macro_manifest, old_results)
self._perf_info['parse_project_elapsed'] = '{:.2f}'.format(
time.perf_counter() - start_timer)

def write_parse_results(self):
path = os.path.join(self.root_project.target_path,
Expand Down Expand Up @@ -300,7 +333,10 @@ def create_manifest(self) -> Manifest:
# before we do anything else, patch the sources. This mutates
# results.disabled, so it needs to come before the final 'disabled'
# list is created
start_patch = time.perf_counter()
sources = patch_sources(self.results, self.root_project)
self._perf_info['patch_sources_elapsed'] = '{:.2f}'.format(
time.perf_counter() - start_patch)
disabled = []
for value in self.results.disabled.values():
disabled.extend(value)
Expand All @@ -322,7 +358,10 @@ def create_manifest(self) -> Manifest:
)
manifest.patch_nodes(self.results.patches)
manifest.patch_macros(self.results.macro_patches)
start_process = time.perf_counter()
self.process_manifest(manifest)
self._perf_info['process_manifest_elapsed'] = '{:.2f}'.format(
time.perf_counter() - start_process)
return manifest

@classmethod
Expand All @@ -333,13 +372,16 @@ def load_all(
macro_hook: Callable[[Manifest], Any],
) -> Manifest:
with PARSING_STATE:
start_load_all = time.perf_counter()
projects = root_config.load_dependencies()
loader = cls(root_config, projects, macro_hook)
loader.load(macro_manifest=macro_manifest)
loader.write_parse_results()
manifest = loader.create_manifest()
_check_manifest(manifest, root_config)
manifest.build_flat_graph()
loader._perf_info['load_all_elapsed'] = '{:.2f}'.format(
time.perf_counter() - start_load_all)
return manifest

@classmethod
Expand Down
96 changes: 96 additions & 0 deletions core/dbt/task/parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# This task is intended to be used for diagnosis, development and
# performance analysis.
# It separates out the parsing flows for easier logging and
# debugging.
# To store cProfile performance data, execute with the '-r'
# flag and an output file: dbt -r dbt.cprof parse.
# Use a visualizer such as snakeviz to look at the output:
# snakeviz dbt.cprof
from dbt.task.base import ConfiguredTask
from dbt.adapters.factory import get_adapter
from dbt.parser.manifest import Manifest, ManifestLoader, _check_manifest
from dbt.logger import DbtProcessState, print_timestamped_line
from dbt.clients.system import write_file
from dbt.graph import Graph
import dbt.utils
import json
import time
from typing import Optional
import os

MANIFEST_FILE_NAME = 'manifest.json'
PERF_INFO_FILE_NAME = 'perf_info.json'
PARSING_STATE = DbtProcessState('parsing')


class ParseTask(ConfiguredTask):
def __init__(self, args, config):
super().__init__(args, config)
self.manifest: Optional[Manifest] = None
self.graph: Optional[Graph] = None
self.loader: Optional[ManifestLoader] = None

def write_manifest(self):
path = os.path.join(self.config.target_path, MANIFEST_FILE_NAME)
self.manifest.write(path)

def write_perf_info(self):
path = os.path.join(self.config.target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self.loader._perf_info,
cls=dbt.utils.JSONEncoder, indent=4))
print_timestamped_line(f"Performance info: {path}")

# This method takes code that normally exists in other files
# and pulls it in here, to simplify logging and make the
# parsing flow-of-control easier to understand and manage,
# with the downside that if changes happen in those other methods,
# similar changes might need to be made here.
# ManifestLoader.get_full_manifest
# ManifestLoader.load
# ManifestLoader.load_all

def get_full_manifest(self):
adapter = get_adapter(self.config) # type: ignore
macro_manifest: Manifest = adapter.load_macro_manifest()
print_timestamped_line("Macro manifest loaded")
root_config = self.config
macro_hook = adapter.connections.set_query_header
with PARSING_STATE:
start_load_all = time.perf_counter()
projects = root_config.load_dependencies()
print_timestamped_line("Dependencies loaded")
loader = ManifestLoader(root_config, projects, macro_hook)
print_timestamped_line("ManifestLoader created")
loader.load(macro_manifest=macro_manifest)
print_timestamped_line("Manifest loaded")
loader.write_parse_results()
print_timestamped_line("Parse results written")
manifest = loader.create_manifest()
print_timestamped_line("Manifest created")
_check_manifest(manifest, root_config)
print_timestamped_line("Manifest checked")
manifest.build_flat_graph()
print_timestamped_line("Flat graph built")
loader._perf_info['load_all_elapsed'] = '{:.2f}'.format(
time.perf_counter() - start_load_all)

self.loader = loader
self.manifest = manifest
print_timestamped_line("Manifest loaded")

def compile_manifest(self):
adapter = get_adapter(self.config)
compiler = adapter.get_compiler()
self.graph = compiler.compile(self.manifest)

def run(self):
print_timestamped_line('Start parsing.')
self.get_full_manifest()
if self.args.compile:
print_timestamped_line('Compiling.')
self.compile_manifest()
if self.args.write_manifest:
print_timestamped_line('Writing manifest.')
self.write_manifest()
self.write_perf_info()
print_timestamped_line('Done.')
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def _run_simple_dbt_commands(self, project_dir):
self.run_dbt(['seed', '--project-dir', project_dir])
self.run_dbt(['run', '--project-dir', project_dir])
self.run_dbt(['test', '--project-dir', project_dir])
self.run_dbt(['parse', '--project-dir', project_dir])
self.run_dbt(['clean', '--project-dir', project_dir])
# In case of 'dbt clean' also test that the clean-targets directories were deleted.
for target in self.config.clean_targets:
Expand Down

0 comments on commit 13b099f

Please sign in to comment.