Skip to content

Commit

Permalink
Move partial parsing to end of parsing and implement new partial parsing
Browse files Browse the repository at this point in the history
method. Switch to using msgpack for saved manifest.
  • Loading branch information
gshank committed Jun 2, 2021
1 parent 2c40530 commit cabc1a9
Show file tree
Hide file tree
Showing 65 changed files with 2,425 additions and 1,143 deletions.
153 changes: 116 additions & 37 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import hashlib
import os
from dataclasses import dataclass, field
from mashumaro.types import SerializableType
from typing import List, Optional, Union, Dict, Any

from dbt.dataclass_schema import dbtClassMixin, StrEnum

from dbt.exceptions import InternalException

from .util import MacroKey, SourceKey
from .util import SourceKey


MAXIMUM_SEED_SIZE = 1 * 1024 * 1024
Expand All @@ -23,7 +22,20 @@ class ParseFileType(StrEnum):
Seed = 'seed'
Documentation = 'docs'
Schema = 'schema'
Hook = 'hook'
Hook = 'hook' # not a real filetype, from dbt_project.yml


parse_file_type_to_parser = {
ParseFileType.Macro: 'MacroParser',
ParseFileType.Model: 'ModelParser',
ParseFileType.Snapshot: 'SnapshotParser',
ParseFileType.Analysis: 'AnalysisParser',
ParseFileType.Test: 'DataTestParser',
ParseFileType.Seed: 'SeedParser',
ParseFileType.Documentation: 'DocumentationParser',
ParseFileType.Schema: 'SchemaParser',
ParseFileType.Hook: 'HookParser',
}


@dataclass
Expand Down Expand Up @@ -122,7 +134,7 @@ def original_file_path(self):


@dataclass
class SourceFile(dbtClassMixin):
class BaseSourceFile(dbtClassMixin, SerializableType):
"""Define a source file in dbt"""
path: Union[FilePath, RemoteFile] # the path information
checksum: FileHash
Expand All @@ -131,45 +143,57 @@ class SourceFile(dbtClassMixin):
# Parse file type: i.e. which parser will process this file
parse_file_type: Optional[ParseFileType] = None
# we don't want to serialize this
_contents: Optional[str] = None
contents: Optional[str] = None
# the unique IDs contained in this file
nodes: List[str] = field(default_factory=list)
docs: List[str] = field(default_factory=list)
macros: List[str] = field(default_factory=list)
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
# any node patches in this file. The entries are names, not unique ids!
patches: List[str] = field(default_factory=list)
# any macro patches in this file. The entries are package, name pairs.
macro_patches: List[MacroKey] = field(default_factory=list)
# any source patches in this file. The entries are package, name pairs
source_patches: List[SourceKey] = field(default_factory=list)
# temporary - for schema source files only
dict_from_yaml: Optional[Dict[str, Any]] = None

@property
def search_key(self) -> Optional[str]:
def file_id(self):
if isinstance(self.path, RemoteFile):
return None
if self.checksum.name == 'none':
return None
return self.path.search_key
return f'{self.project_name}://{self.path.original_file_path}'

@property
def contents(self) -> str:
if self._contents is None:
raise InternalException('SourceFile has no contents!')
return self._contents

@contents.setter
def contents(self, value):
self._contents = value
def _serialize(self):
dct = self.to_dict()
if 'pp_files' in dct:
del dct['pp_files']
if 'pp_test_index' in dct:
del dct['pp_test_index']
return dct

@classmethod
def empty(cls, path: FilePath) -> 'SourceFile':
self = cls(path=path, checksum=FileHash.empty())
self.contents = ''
return self
def _deserialize(cls, dct: Dict[str, int]):
if dct['parse_file_type'] == 'schema':
# TODO: why are these keys even here
if 'pp_files' in dct:
del dct['pp_files']
if 'pp_test_index' in dct:
del dct['pp_test_index']
sf = SchemaSourceFile.from_dict(dct)
else:
sf = SourceFile.from_dict(dct)
return sf

def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
# remove empty lists to save space
dct_keys = list(dct.keys())
for key in dct_keys:
if isinstance(dct[key], list) and not dct[key]:
del dct[key]
# remove contents. Schema files will still have 'dict_from_yaml'
# from the contents
if 'contents' in dct:
del dct['contents']
return dct


@dataclass
class SourceFile(BaseSourceFile):
nodes: List[str] = field(default_factory=list)
docs: List[str] = field(default_factory=list)
macros: List[str] = field(default_factory=list)

@classmethod
def big_seed(cls, path: FilePath) -> 'SourceFile':
Expand All @@ -178,8 +202,63 @@ def big_seed(cls, path: FilePath) -> 'SourceFile':
self.contents = ''
return self

# TODO: do this a different way. This remote file kludge isn't going
# to work long term
@classmethod
def remote(cls, contents: str) -> 'SourceFile':
self = cls(path=RemoteFile(), checksum=FileHash.empty())
self.contents = contents
def remote(cls, contents: str, project_name: str) -> 'SourceFile':
self = cls(
path=RemoteFile(),
checksum=FileHash.from_contents(contents),
project_name=project_name,
contents=contents,
)
return self


@dataclass
class SchemaSourceFile(BaseSourceFile):
dfy: Dict[str, Any] = field(default_factory=dict)
# these are in the manifest.nodes dictionary
tests: List[str] = field(default_factory=list)
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
# node patches contain models, seeds, snapshots, analyses
ndp: List[str] = field(default_factory=list)
# any macro patches in this file by macro unique_id.
mcp: List[str] = field(default_factory=list)
# any source patches in this file. The entries are package, name pairs
# Patches are only against external sources. Sources can be
# created too, but those are in 'sources'
sop: List[SourceKey] = field(default_factory=list)
pp_dict: Optional[Dict[str, Any]] = None
pp_test_index: Optional[Dict[str, Any]] = None

@property
def dict_from_yaml(self):
return self.dfy

@property
def node_patches(self):
return self.ndp

@property
def macro_patches(self):
return self.mcp

@property
def source_patches(self):
return self.sop

def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
if 'pp_files' in dct:
del dct['pp_files']
if 'pp_test_index' in dct:
del dct['pp_test_index']
return dct

def append_patch(self, yaml_key, unique_id):
self.node_patches.append(unique_id)


AnySourceFile = Union[SchemaSourceFile, SourceFile]
Loading

0 comments on commit cabc1a9

Please sign in to comment.