Skip to content

Commit

Permalink
CT 1808 diff based partial parsing (#6873)
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank authored and peterallenwebb committed Mar 14, 2023
1 parent 8b352de commit 95020ec
Show file tree
Hide file tree
Showing 13 changed files with 407 additions and 179 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230206-084749.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable diff based partial parsing
time: 2023-02-06T08:47:49.688889-05:00
custom:
Author: gshank
Issue: "6592"
7 changes: 1 addition & 6 deletions core/dbt/clients/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from dbt.events.functions import fire_event
from dbt.events.types import (
SystemCouldNotWrite,
SystemErrorRetrievingModTime,
SystemExecutingCmd,
SystemStdOut,
SystemStdErr,
Expand Down Expand Up @@ -77,11 +76,7 @@ def find_matching(
relative_path = os.path.relpath(absolute_path, absolute_path_to_search)
relative_path_to_root = os.path.join(relative_path_to_search, relative_path)

modification_time = 0.0
try:
modification_time = os.path.getmtime(absolute_path)
except OSError:
fire_event(SystemErrorRetrievingModTime(path=absolute_path))
modification_time = os.path.getmtime(absolute_path)
if reobj.match(local_file) and (
not ignore_spec or not ignore_spec.match_file(relative_path_to_root)
):
Expand Down
2 changes: 0 additions & 2 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ def absolute_path(self) -> str:

@property
def original_file_path(self) -> str:
# this is mostly used for reporting errors. It doesn't show the project
# name, should it?
return os.path.join(self.searched_path, self.relative_path)

def seed_too_large(self) -> bool:
Expand Down
27 changes: 14 additions & 13 deletions core/dbt/events/proto_types.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 13 additions & 10 deletions core/dbt/events/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,18 @@ message FinishedRunningStatsMsg {

// I - Project parsing

// Skipping I001, I002, I003, I004, I005, I006, I007
// I001
message InputFileDiffError {
string category = 1;
string file_id = 2;
}

message InputFileDiffErrorMsg {
EventInfo info = 1;
InputFileDiffError data = 2;
}

// Skipping I002, I003, I004, I005, I006, I007

// I008
message InvalidValueForField {
Expand Down Expand Up @@ -1808,15 +1819,7 @@ message MainStackTraceMsg {
MainStackTrace data = 2;
}

// Z004
message SystemErrorRetrievingModTime {
string path = 1;
}

message SystemErrorRetrievingModTimeMsg {
EventInfo info = 1;
SystemErrorRetrievingModTime data = 2;
}
// skipping Z004

// Z005
message SystemCouldNotWrite {
Expand Down
19 changes: 11 additions & 8 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,16 @@ def message(self) -> str:
# =======================================================


# Skipping I001, I002, I003, I004, I005, I006, I007
@dataclass
class InputFileDiffError(DebugLevel, pt.InputFileDiffError):
def code(self):
return "I001"

def message(self) -> str:
return f"Error processing file diff: {self.category}, {self.file_id}"


# Skipping I002, I003, I004, I005, I006, I007


@dataclass
Expand Down Expand Up @@ -1891,13 +1900,7 @@ def message(self) -> str:
return self.stack_trace


@dataclass
class SystemErrorRetrievingModTime(ErrorLevel, pt.SystemErrorRetrievingModTime):
def code(self):
return "Z004"

def message(self) -> str:
return f"Error retrieving modification time for file {self.path}"
# Skipped Z004


@dataclass
Expand Down
57 changes: 45 additions & 12 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,19 @@
from dbt.node_types import NodeType, AccessType
from dbt.clients.jinja import get_rendered, MacroStack
from dbt.clients.jinja_static import statically_extract_macro_calls
from dbt.clients.system import make_directory, write_file
from dbt.clients.system import make_directory, path_exists, read_json, write_file
from dbt.config import Project, RuntimeConfig
from dbt.context.docs import generate_runtime_docs_context
from dbt.context.macro_resolver import MacroResolver, TestMacroNamespace
from dbt.context.configured import generate_macro_context
from dbt.context.providers import ParseProvider
from dbt.contracts.files import FileHash, ParseFileType, SchemaSourceFile
from dbt.parser.read_files import read_files, load_source_file
from dbt.parser.read_files import (
ReadFilesFromFileSystem,
load_source_file,
FileDiff,
ReadFilesFromDiff,
)
from dbt.parser.partial import PartialParsing, special_override_macros
from dbt.contracts.graph.manifest import (
Manifest,
Expand Down Expand Up @@ -152,9 +157,11 @@ def __init__(
root_project: RuntimeConfig,
all_projects: Mapping[str, Project],
macro_hook: Optional[Callable[[Manifest], Any]] = None,
file_diff: Optional[FileDiff] = None,
) -> None:
self.root_project: RuntimeConfig = root_project
self.all_projects: Mapping[str, Project] = all_projects
self.file_diff = file_diff
self.manifest: Manifest = Manifest()
self.new_manifest = self.manifest
self.manifest.metadata = root_project.get_metadata()
Expand Down Expand Up @@ -189,6 +196,7 @@ def get_full_manifest(
cls,
config: RuntimeConfig,
*,
file_diff: Optional[FileDiff] = None,
reset: bool = False,
write_perf_info=False,
) -> Manifest:
Expand All @@ -201,12 +209,19 @@ def get_full_manifest(
adapter.clear_macro_manifest()
macro_hook = adapter.connections.set_query_header

# Hack to test file_diffs
if os.environ.get("DBT_PP_FILE_DIFF_TEST"):
file_diff_path = "file_diff.json"
if path_exists(file_diff_path):
file_diff_dct = read_json(file_diff_path)
file_diff = FileDiff.from_dict(file_diff_dct)

with PARSING_STATE: # set up logbook.Processor for parsing
# Start performance counting
start_load_all = time.perf_counter()

projects = config.load_dependencies()
loader = cls(config, projects, macro_hook)
loader = cls(config, projects, macro_hook=macro_hook, file_diff=file_diff)

manifest = loader.load()

Expand All @@ -228,17 +243,35 @@ def get_full_manifest(

# This is where the main action happens
def load(self):
# Read files creates a dictionary of projects to a dictionary
start_read_files = time.perf_counter()

# This updates the "files" dictionary in self.manifest, and creates
# the partial_parser_files dictionary (see read_files.py),
# which is a dictionary of projects to a dictionary
# of parsers to lists of file strings. The file strings are
# used to get the SourceFiles from the manifest files.
start_read_files = time.perf_counter()
project_parser_files = {}
saved_files = {}
if self.saved_manifest:
saved_files = self.saved_manifest.files
for project in self.all_projects.values():
read_files(project, self.manifest.files, project_parser_files, saved_files)
orig_project_parser_files = project_parser_files
saved_files = self.saved_manifest.files if self.saved_manifest else {}
if self.file_diff:
# We're getting files from a file diff
file_reader = ReadFilesFromDiff(
all_projects=self.all_projects,
files=self.manifest.files,
saved_files=saved_files,
root_project_name=self.root_project.project_name,
file_diff=self.file_diff,
)
else:
# We're getting files from the file system
file_reader = ReadFilesFromFileSystem(
all_projects=self.all_projects,
files=self.manifest.files,
saved_files=saved_files,
)

# Set the files in the manifest and save the project_parser_files
file_reader.read_files()
self.manifest.files = file_reader.files
project_parser_files = orig_project_parser_files = file_reader.project_parser_files
self._perf_info.path_count = len(self.manifest.files)
self._perf_info.read_files_elapsed = time.perf_counter() - start_read_files

Expand Down
33 changes: 9 additions & 24 deletions core/dbt/parser/partial.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def __init__(self, saved_manifest: Manifest, new_files: MutableMapping[str, AnyS
self.project_parser_files: Dict = {}
self.saved_files = self.saved_manifest.files
self.project_parser_files = {}
self.deleted_manifest = Manifest()
self.macro_child_map: Dict[str, List[str]] = {}
(
self.env_vars_changed_source_files,
Expand Down Expand Up @@ -268,7 +267,7 @@ def delete_from_saved(self, file_id):
# macros/tests
if saved_source_file.parse_file_type in mssat_files:
self.remove_mssat_file(saved_source_file)
self.deleted_manifest.files[file_id] = self.saved_manifest.files.pop(file_id)
self.saved_manifest.files.pop(file_id)

# macros
if saved_source_file.parse_file_type in mg_files:
Expand Down Expand Up @@ -311,7 +310,6 @@ def update_mssat_in_saved(self, new_source_file, old_source_file):

# replace source_file in saved and add to parsing list
file_id = new_source_file.file_id
self.deleted_manifest.files[file_id] = old_source_file
self.saved_files[file_id] = deepcopy(new_source_file)
self.add_to_pp_files(new_source_file)
for unique_id in unique_ids:
Expand All @@ -321,7 +319,6 @@ def remove_node_in_saved(self, source_file, unique_id):
if unique_id in self.saved_manifest.nodes:
# delete node in saved
node = self.saved_manifest.nodes.pop(unique_id)
self.deleted_manifest.nodes[unique_id] = node
elif (
source_file.file_id in self.disabled_by_file_id
and unique_id in self.saved_manifest.disabled
Expand Down Expand Up @@ -456,7 +453,7 @@ def delete_macro_file(self, source_file, follow_references=False):
file_id = source_file.file_id
# It's not clear when this file_id would not exist in saved_files
if file_id in self.saved_files:
self.deleted_manifest.files[file_id] = self.saved_files.pop(file_id)
self.saved_files.pop(file_id)

def check_for_special_deleted_macros(self, source_file):
for unique_id in source_file.macros:
Expand Down Expand Up @@ -487,7 +484,6 @@ def handle_macro_file_links(self, source_file, follow_references=False):
continue

base_macro = self.saved_manifest.macros.pop(unique_id)
self.deleted_manifest.macros[unique_id] = base_macro

# Recursively check children of this macro
# The macro_child_map might not exist if a macro is removed by
Expand Down Expand Up @@ -565,16 +561,14 @@ def delete_doc_node(self, source_file):
# remove the nodes in the 'docs' dictionary
docs = source_file.docs.copy()
for unique_id in docs:
self.deleted_manifest.docs[unique_id] = self.saved_manifest.docs.pop(unique_id)
self.saved_manifest.docs.pop(unique_id)
source_file.docs.remove(unique_id)
# The unique_id of objects that contain a doc call are stored in the
# doc source_file.nodes
self.schedule_nodes_for_parsing(source_file.nodes)
source_file.nodes = []
# Remove the file object
self.deleted_manifest.files[source_file.file_id] = self.saved_manifest.files.pop(
source_file.file_id
)
self.saved_manifest.files.pop(source_file.file_id)

# Schema files -----------------------
# Changed schema files
Expand Down Expand Up @@ -608,7 +602,7 @@ def delete_schema_file(self, file_id):
saved_yaml_dict = saved_schema_file.dict_from_yaml
new_yaml_dict = {}
self.handle_schema_file_changes(saved_schema_file, saved_yaml_dict, new_yaml_dict)
self.deleted_manifest.files[file_id] = self.saved_manifest.files.pop(file_id)
self.saved_manifest.files.pop(file_id)

# For each key in a schema file dictionary, process the changed, deleted, and added
# elemnts for the key lists
Expand Down Expand Up @@ -876,8 +870,7 @@ def remove_tests(self, schema_file, dict_key, name):
tests = schema_file.get_tests(dict_key, name)
for test_unique_id in tests:
if test_unique_id in self.saved_manifest.nodes:
node = self.saved_manifest.nodes.pop(test_unique_id)
self.deleted_manifest.nodes[test_unique_id] = node
self.saved_manifest.nodes.pop(test_unique_id)
schema_file.remove_tests(dict_key, name)

def delete_schema_source(self, schema_file, source_dict):
Expand All @@ -892,7 +885,6 @@ def delete_schema_source(self, schema_file, source_dict):
source = self.saved_manifest.sources[unique_id]
if source.source_name == source_name:
source = self.saved_manifest.sources.pop(unique_id)
self.deleted_manifest.sources[unique_id] = source
schema_file.sources.remove(unique_id)
self.schedule_referencing_nodes_for_parsing(unique_id)

Expand All @@ -904,7 +896,6 @@ def delete_schema_macro_patch(self, schema_file, macro):
del schema_file.macro_patches[macro["name"]]
if macro_unique_id and macro_unique_id in self.saved_manifest.macros:
macro = self.saved_manifest.macros.pop(macro_unique_id)
self.deleted_manifest.macros[macro_unique_id] = macro
macro_file_id = macro.file_id
if macro_file_id in self.new_files:
self.saved_files[macro_file_id] = deepcopy(self.new_files[macro_file_id])
Expand All @@ -919,9 +910,7 @@ def delete_schema_exposure(self, schema_file, exposure_dict):
if unique_id in self.saved_manifest.exposures:
exposure = self.saved_manifest.exposures[unique_id]
if exposure.name == exposure_name:
self.deleted_manifest.exposures[unique_id] = self.saved_manifest.exposures.pop(
unique_id
)
self.saved_manifest.exposures.pop(unique_id)
schema_file.exposures.remove(unique_id)
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
Expand All @@ -935,9 +924,7 @@ def delete_schema_group(self, schema_file, group_dict):
group = self.saved_manifest.groups[unique_id]
if group.name == group_name:
self.schedule_nodes_for_parsing(self.saved_manifest.group_map[group.name])
self.deleted_manifest.groups[unique_id] = self.saved_manifest.groups.pop(
unique_id
)
self.saved_manifest.groups.pop(unique_id)
schema_file.groups.remove(unique_id)

# metrics are created only from schema files, but also can be referred to by other nodes
Expand All @@ -951,9 +938,7 @@ def delete_schema_metric(self, schema_file, metric_dict):
# Need to find everything that referenced this metric and schedule for parsing
if unique_id in self.saved_manifest.child_map:
self.schedule_nodes_for_parsing(self.saved_manifest.child_map[unique_id])
self.deleted_manifest.metrics[unique_id] = self.saved_manifest.metrics.pop(
unique_id
)
self.saved_manifest.metrics.pop(unique_id)
schema_file.metrics.remove(unique_id)
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
Expand Down
Loading

0 comments on commit 95020ec

Please sign in to comment.