Skip to content

Commit

Permalink
Move partial parsing to end of parsing. Switch to using msgpack for
Browse files Browse the repository at this point in the history
saved manifest.
  • Loading branch information
gshank committed May 17, 2021
1 parent 8ac5cdd commit d030902
Show file tree
Hide file tree
Showing 49 changed files with 1,961 additions and 1,137 deletions.
153 changes: 116 additions & 37 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import hashlib
import os
from dataclasses import dataclass, field
from mashumaro.types import SerializableType
from typing import List, Optional, Union, Dict, Any

from dbt.dataclass_schema import dbtClassMixin, StrEnum

from dbt.exceptions import InternalException

from .util import MacroKey, SourceKey
from .util import SourceKey


MAXIMUM_SEED_SIZE = 1 * 1024 * 1024
Expand All @@ -23,7 +22,20 @@ class ParseFileType(StrEnum):
Seed = 'seed'
Documentation = 'docs'
Schema = 'schema'
Hook = 'hook'
Hook = 'hook' # not a real filetype, from dbt_project.yml


parse_file_type_to_parser = {
ParseFileType.Macro: 'MacroParser',
ParseFileType.Model: 'ModelParser',
ParseFileType.Snapshot: 'SnapshotParser',
ParseFileType.Analysis: 'AnalysisParser',
ParseFileType.Test: 'DataTestParser',
ParseFileType.Seed: 'SeedParser',
ParseFileType.Documentation: 'DocumentationParser',
ParseFileType.Schema: 'SchemaParser',
ParseFileType.Hook: 'HookParser',
}


@dataclass
Expand Down Expand Up @@ -122,7 +134,7 @@ def original_file_path(self):


@dataclass
class SourceFile(dbtClassMixin):
class BaseSourceFile(dbtClassMixin, SerializableType):
"""Define a source file in dbt"""
path: Union[FilePath, RemoteFile] # the path information
checksum: FileHash
Expand All @@ -131,45 +143,57 @@ class SourceFile(dbtClassMixin):
# Parse file type: i.e. which parser will process this file
parse_file_type: Optional[ParseFileType] = None
# we don't want to serialize this
_contents: Optional[str] = None
contents: Optional[str] = None
# the unique IDs contained in this file
nodes: List[str] = field(default_factory=list)
docs: List[str] = field(default_factory=list)
macros: List[str] = field(default_factory=list)
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
# any node patches in this file. The entries are names, not unique ids!
patches: List[str] = field(default_factory=list)
# any macro patches in this file. The entries are package, name pairs.
macro_patches: List[MacroKey] = field(default_factory=list)
# any source patches in this file. The entries are package, name pairs
source_patches: List[SourceKey] = field(default_factory=list)
# temporary - for schema source files only
dict_from_yaml: Optional[Dict[str, Any]] = None

@property
def search_key(self) -> Optional[str]:
def file_id(self):
if isinstance(self.path, RemoteFile):
return None
if self.checksum.name == 'none':
return None
return self.path.search_key
return f'{self.project_name}://{self.path.original_file_path}'

@property
def contents(self) -> str:
if self._contents is None:
raise InternalException('SourceFile has no contents!')
return self._contents

@contents.setter
def contents(self, value):
self._contents = value
def _serialize(self):
dct = self.to_dict()
if 'pp_files' in dct:
del dct['pp_files']
if 'pp_test_index' in dct:
del dct['pp_test_index']
return dct

@classmethod
def empty(cls, path: FilePath) -> 'SourceFile':
self = cls(path=path, checksum=FileHash.empty())
self.contents = ''
return self
def _deserialize(cls, dct: Dict[str, int]):
if dct['parse_file_type'] == 'schema':
# TODO: why are these keys even here
if 'pp_files' in dct:
del dct['pp_files']
if 'pp_test_index' in dct:
del dct['pp_test_index']
sf = SchemaSourceFile.from_dict(dct)
else:
sf = SourceFile.from_dict(dct)
return sf

def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
# remove empty lists to save space
dct_keys = list(dct.keys())
for key in dct_keys:
if isinstance(dct[key], list) and not dct[key]:
del dct[key]
# remove contents. Schema files will still have 'dict_from_yaml'
# from the contents
if 'contents' in dct:
del dct['contents']
return dct


@dataclass
class SourceFile(BaseSourceFile):
nodes: List[str] = field(default_factory=list)
docs: List[str] = field(default_factory=list)
macros: List[str] = field(default_factory=list)

@classmethod
def big_seed(cls, path: FilePath) -> 'SourceFile':
Expand All @@ -178,8 +202,63 @@ def big_seed(cls, path: FilePath) -> 'SourceFile':
self.contents = ''
return self

# TODO: do this a different way. This remote file kludge isn't going
# to work long term
@classmethod
def remote(cls, contents: str) -> 'SourceFile':
self = cls(path=RemoteFile(), checksum=FileHash.empty())
self.contents = contents
def remote(cls, contents: str, project_name: str) -> 'SourceFile':
self = cls(
path=RemoteFile(),
checksum=FileHash.from_contents(contents),
project_name=project_name,
contents=contents,
)
return self


@dataclass
class SchemaSourceFile(BaseSourceFile):
dfy: Dict[str, Any] = field(default_factory=dict)
# these are in the manifest.nodes dictionary
tests: List[str] = field(default_factory=list)
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
# node patches contain models, seeds, snapshots, analyses
ndp: List[str] = field(default_factory=list)
# any macro patches in this file by macro unique_id.
mcp: List[str] = field(default_factory=list)
# any source patches in this file. The entries are package, name pairs
# Patches are only against external sources. Sources can be
# created too, but those are in 'sources'
sop: List[SourceKey] = field(default_factory=list)
pp_dict: Optional[Dict[str, Any]] = None
pp_test_index: Optional[Dict[str, Any]] = None

@property
def dict_from_yaml(self):
return self.dfy

@property
def node_patches(self):
return self.ndp

@property
def macro_patches(self):
return self.mcp

@property
def source_patches(self):
return self.sop

def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
if 'pp_files' in dct:
del dct['pp_files']
if 'pp_test_index' in dct:
del dct['pp_test_index']
return dct

def append_patch(self, yaml_key, unique_id):
self.node_patches.append(unique_id)


AnySourceFile = Union[SchemaSourceFile, SourceFile]
Loading

0 comments on commit d030902

Please sign in to comment.