Skip to content

Commit

Permalink
Implement the RPC server
Browse files Browse the repository at this point in the history
- make tasks all have a "from_args" that handles initializing correct config type, etc
- make it possible to process a single node's refs at a time
- Make remote run/compile tasks + rpc server task, wire them up
- add ref() and source() support, and vestigial doc() support
- refactor results a bit to support new result behavior
- don't write to the filesystem on requests
- handle uniqueness issues
  • Loading branch information
Jacob Beck committed Feb 19, 2019
1 parent dfb87dc commit 1a0df17
Show file tree
Hide file tree
Showing 32 changed files with 798 additions and 309 deletions.
24 changes: 17 additions & 7 deletions core/dbt/compilation.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def link_graph(self, linker, manifest):
if cycle: if cycle:
raise RuntimeError("Found a cycle: {}".format(cycle)) raise RuntimeError("Found a cycle: {}".format(cycle))


def compile(self, manifest): def compile(self, manifest, write=True):
linker = Linker() linker = Linker()


self.link_graph(linker, manifest) self.link_graph(linker, manifest)
Expand All @@ -195,25 +195,35 @@ def compile(self, manifest):
manifest.macros.items()): manifest.macros.items()):
stats[node.resource_type] += 1 stats[node.resource_type] += 1


self.write_graph_file(linker, manifest) if write:
self.write_graph_file(linker, manifest)
print_compile_stats(stats) print_compile_stats(stats)


return linker return linker




def compile_manifest(config, manifest): def compile_manifest(config, manifest, write=True):
compiler = Compiler(config) compiler = Compiler(config)
compiler.initialize() compiler.initialize()
return compiler.compile(manifest) return compiler.compile(manifest, write=write)




def compile_node(adapter, config, node, manifest, extra_context): def _is_writable(node):
if not node.injected_sql:
return False

if dbt.utils.is_type(node, NodeType.Archive):
return False

return True


def compile_node(adapter, config, node, manifest, extra_context, write=True):
compiler = Compiler(config) compiler = Compiler(config)
node = compiler.compile_node(node, manifest, extra_context) node = compiler.compile_node(node, manifest, extra_context)
node = _inject_runtime_config(adapter, node, extra_context) node = _inject_runtime_config(adapter, node, extra_context)


if(node.injected_sql is not None and if write and _is_writable(node):
not (dbt.utils.is_type(node, NodeType.Archive))):
logger.debug('Writing injected SQL for node "{}"'.format( logger.debug('Writing injected SQL for node "{}"'.format(
node.unique_id)) node.unique_id))


Expand Down
19 changes: 1 addition & 18 deletions core/dbt/config/__init__.py
Original file line number Original file line Diff line number Diff line change
@@ -1,22 +1,5 @@


from .renderer import ConfigRenderer from .renderer import ConfigRenderer
from .profile import Profile, UserConfig from .profile import Profile, UserConfig, PROFILES_DIR
from .project import Project from .project import Project
from .profile import read_profile
from .profile import PROFILES_DIR
from .runtime import RuntimeConfig from .runtime import RuntimeConfig


def read_profiles(profiles_dir=None):
"""This is only used in main, for some error handling"""
if profiles_dir is None:
profiles_dir = PROFILES_DIR

raw_profiles = read_profile(profiles_dir)

if raw_profiles is None:
profiles = {}
else:
profiles = {k: v for (k, v) in raw_profiles.items() if k != 'config'}

return profiles
8 changes: 8 additions & 0 deletions core/dbt/config/errors.py
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,8 @@
from contextlib import contextmanager

from .profile import read_profile, PROFILES_DIR

from dbt.exceptions import DbtProjectError, DbtProfileError, RuntimeException
from dbt.logger import GLOBAL_LOGGER as logger
from dbt import tracking
from dbt.compat import to_string
8 changes: 2 additions & 6 deletions core/dbt/config/profile.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -335,14 +335,12 @@ def from_raw_profiles(cls, raw_profiles, profile_name, cli_vars,
) )


@classmethod @classmethod
def from_args(cls, args, project_profile_name=None, cli_vars=None): def from_args(cls, args, project_profile_name=None):
"""Given the raw profiles as read from disk and the name of the desired """Given the raw profiles as read from disk and the name of the desired
profile if specified, return the profile component of the runtime profile if specified, return the profile component of the runtime
config. config.
:param args argparse.Namespace: The arguments as parsed from the cli. :param args argparse.Namespace: The arguments as parsed from the cli.
:param cli_vars dict: The command-line variables passed as arguments,
as a dict.
:param project_profile_name Optional[str]: The profile name, if :param project_profile_name Optional[str]: The profile name, if
specified in a project. specified in a project.
:raises DbtProjectError: If there is no profile name specified in the :raises DbtProjectError: If there is no profile name specified in the
Expand All @@ -352,9 +350,7 @@ def from_args(cls, args, project_profile_name=None, cli_vars=None):
target could not be found. target could not be found.
:returns Profile: The new Profile object. :returns Profile: The new Profile object.
""" """
if cli_vars is None: cli_vars = parse_cli_vars(getattr(args, 'vars', '{}'))
cli_vars = parse_cli_vars(getattr(args, 'vars', '{}'))

threads_override = getattr(args, 'threads', None) threads_override = getattr(args, 'threads', None)
target_override = getattr(args, 'target', None) target_override = getattr(args, 'target', None)
raw_profiles = read_profile(args.profiles_dir) raw_profiles = read_profile(args.profiles_dir)
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/config/project.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ def from_project_root(cls, project_root, cli_vars):
def from_current_directory(cls, cli_vars): def from_current_directory(cls, cli_vars):
return cls.from_project_root(os.getcwd(), cli_vars) return cls.from_project_root(os.getcwd(), cli_vars)


@classmethod
def from_args(cls, args):
return cls.from_current_directory(getattr(args, 'vars', '{}'))

def hashed_name(self): def hashed_name(self):
return hashlib.md5(self.project_name.encode('utf-8')).hexdigest() return hashlib.md5(self.project_name.encode('utf-8')).hexdigest()


Expand Down
7 changes: 2 additions & 5 deletions core/dbt/config/runtime.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -171,16 +171,13 @@ def from_args(cls, args):
:raises DbtProfileError: If the profile is invalid or missing. :raises DbtProfileError: If the profile is invalid or missing.
:raises ValidationException: If the cli variables are invalid. :raises ValidationException: If the cli variables are invalid.
""" """
cli_vars = parse_cli_vars(getattr(args, 'vars', '{}'))

# build the project and read in packages.yml # build the project and read in packages.yml
project = Project.from_current_directory(cli_vars) project = Project.from_args(args)


# build the profile # build the profile
profile = Profile.from_args( profile = Profile.from_args(
args=args, args=args,
project_profile_name=project.profile_name, project_profile_name=project.profile_name
cli_vars=cli_vars
) )


return cls.from_parts( return cls.from_parts(
Expand Down
10 changes: 10 additions & 0 deletions core/dbt/contracts/graph/manifest.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -409,3 +409,13 @@ def get_used_schemas(self):


def get_used_databases(self): def get_used_databases(self):
return frozenset(node.database for node in self.nodes.values()) return frozenset(node.database for node in self.nodes.values())

def deepcopy(self, config=None):
return Manifest(
nodes={k: v.incorporate() for k, v in self.nodes.items()},
macros={k: v.incorporate() for k, v in self.macros.items()},
docs={k: v.incorporate() for k, v in self.docs.items()},
generated_at=self.generated_at,
disabled=[n.incorporate() for n in self.disabled],
config=config
)
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
NodeType.Seed, NodeType.Seed,
# we need this if parse_node is going to handle archives. # we need this if parse_node is going to handle archives.
NodeType.Archive, NodeType.Archive,
NodeType.RPCCall,
] ]
}, },
}, },
Expand Down
70 changes: 70 additions & 0 deletions core/dbt/contracts/results.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -458,3 +458,73 @@ class FreshnessRunOutput(APIObject):


def __init__(self, meta, sources): def __init__(self, meta, sources):
super(FreshnessRunOutput, self).__init__(meta=meta, sources=sources) super(FreshnessRunOutput, self).__init__(meta=meta, sources=sources)


REMOTE_COMPILE_RESULT_CONTRACT = {
'type': 'object',
'additionalProperties': False,
'properties': {
'raw_sql': {
'type': 'string',
},
'compiled_sql': {
'type': 'string',
},
'timing': {
'type': 'array',
'items': TIMING_INFO_CONTRACT,
},
},
'required': ['raw_sql', 'compiled_sql', 'timing']
}


class RemoteCompileResult(APIObject):
SCHEMA = REMOTE_COMPILE_RESULT_CONTRACT

def __init__(self, raw_sql, compiled_sql, timing=None, **kwargs):
if timing is None:
timing = []
super(RemoteCompileResult, self).__init__(
raw_sql=raw_sql,
compiled_sql=compiled_sql,
timing=timing,
**kwargs
)

@property
def node(self):
return None

@property
def error(self):
return None


REMOTE_RUN_RESULT_CONTRACT = deep_merge(REMOTE_COMPILE_RESULT_CONTRACT, {
'properties': {
'table': {
'type': 'array',
# an array of column name: value dicts, one per row
'items': {
'type': 'object',
'additionalProperties': True,
}
}
},
'required': ['table'],
})


class RemoteRunResult(RemoteCompileResult):
SCHEMA = REMOTE_RUN_RESULT_CONTRACT

def __init__(self, raw_sql, compiled_sql, timing=None, table=None):
if table is None:
table = []
super(RemoteRunResult, self).__init__(
raw_sql=raw_sql,
compiled_sql=compiled_sql,
timing=timing,
table=table
)
4 changes: 4 additions & 0 deletions core/dbt/exceptions.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class InternalException(Exception):
pass pass




class RPCException(Exception):
pass


class RuntimeException(RuntimeError, Exception): class RuntimeException(RuntimeError, Exception):
def __init__(self, msg, node=None): def __init__(self, msg, node=None):
self.stack = [] self.stack = []
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/logger.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
logging.getLogger('google').setLevel(logging.INFO) logging.getLogger('google').setLevel(logging.INFO)
logging.getLogger('snowflake.connector').setLevel(logging.INFO) logging.getLogger('snowflake.connector').setLevel(logging.INFO)
logging.getLogger('parsedatetime').setLevel(logging.INFO) logging.getLogger('parsedatetime').setLevel(logging.INFO)
# we never want to seek werkzeug logs
logging.getLogger('werkzeug').setLevel(logging.CRITICAL)


# provide this for the cache. # provide this for the cache.
CACHE_LOGGER = logging.getLogger('dbt.cache') CACHE_LOGGER = logging.getLogger('dbt.cache')
Expand Down
Loading

0 comments on commit 1a0df17

Please sign in to comment.