Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ls to RPC server #3384

Merged
merged 10 commits into from
May 27, 2021
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

### Under the hood
- Added logic for registry requests to raise a timeout error after a response hangs out for 30 seconds and 5 attempts have been made to reach the endpoint ([#3177](https://github.com/fishtown-analytics/dbt/issues/3177), [#3275](https://github.com/fishtown-analytics/dbt/pull/3275))
- Added support for invoking the `list` task via the RPC server ([#3311](https://github.com/fishtown-analytics/dbt/issues/3311), [#3384](https://github.com/fishtown-analytics/dbt/pull/3384))
- Added `unique_id` and `original_file_path` as keys to json responses from the `list` task ([#3356](https://github.com/fishtown-analytics/dbt/issues/3356), [#3384](https://github.com/fishtown-analytics/dbt/pull/3384))
- Use shutil.which so Windows can pick up git.bat as a git executable ([#3035](https://github.com/fishtown-analytics/dbt/issues/3035), [#3134](https://github.com/fishtown-analytics/dbt/issues/3134))
- Add `ssh-client` and update `git` version (using buster backports) in Docker image ([#3337](https://github.com/fishtown-analytics/dbt/issues/3337), [#3338](https://github.com/fishtown-analytics/dbt/pull/3338))

Expand Down
17 changes: 17 additions & 0 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ class RPCCompileParameters(RPCParameters):
state: Optional[str] = None


@dataclass
class RPCListParameters(RPCParameters):
resource_types: Optional[List[str]] = None
models: Union[None, str, List[str]] = None
exclude: Union[None, str, List[str]] = None
select: Optional[List[str]] = None
iknox-fa marked this conversation as resolved.
Show resolved Hide resolved
selector: Optional[str] = None
output: Optional[str] = 'json'


@dataclass
class RPCRunParameters(RPCParameters):
threads: Optional[int] = None
Expand Down Expand Up @@ -190,6 +200,13 @@ class RemoteResult(VersionedSchema):
logs: List[LogMessage]


@dataclass
@schema_version('remote-list-results', 1)
class RemoteListResults(RemoteResult):
output: List[Any]
generated_at: datetime = field(default_factory=datetime.utcnow)


@dataclass
@schema_version('remote-deps-result', 1)
class RemoteDepsResult(RemoteResult):
Expand Down
7 changes: 5 additions & 2 deletions core/dbt/rpc/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
QueueTimeoutMessage,
)
from dbt.rpc.method import RemoteMethod

from dbt.task.rpc.project_commands import RemoteListTask

# we use this in typing only...
from queue import Queue # noqa
Expand Down Expand Up @@ -78,7 +78,10 @@ def _spawn_setup(self):

def task_exec(self) -> None:
"""task_exec runs first inside the child process"""
signal.signal(signal.SIGTERM, sigterm_handler)
if type(self.task) != RemoteListTask:
# TODO: find another solution for this.. in theory it stops us from
# being able to kill RemoteListTask processes
signal.signal(signal.SIGTERM, sigterm_handler)
# the first thing we do in a new process: push logging back over our
# queue
handler = QueueLogHandler(self.queue)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class BaseTask(metaclass=ABCMeta):

def __init__(self, args, config):
self.args = args
self.args.single_threaded = False
self.config = config

@classmethod
Expand Down
16 changes: 11 additions & 5 deletions core/dbt/task/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from dbt.contracts.graph.parsed import (
ParsedExposure,
ParsedSourceDefinition,
ParsedSourceDefinition
)
from dbt.graph import (
parse_difference,
Expand Down Expand Up @@ -38,6 +38,8 @@ class ListTask(GraphRunnableTask):
'config',
'resource_type',
'source_name',
'original_file_path',
'unique_id'
))

def __init__(self, args, config):
Expand Down Expand Up @@ -120,7 +122,7 @@ def generate_paths(self):

def run(self):
ManifestTask._runtime_initialize(self)
output = self.config.args.output
output = self.args.output
if output == 'selector':
generator = self.generate_selectors
elif output == 'name':
Expand All @@ -133,7 +135,11 @@ def run(self):
raise InternalException(
'Invalid output {}'.format(output)
)
for result in generator():

return self.output_results(generator())

def output_results(self, results):
for result in results:
self.node_results.append(result)
print(result)
return self.node_results
Expand All @@ -143,10 +149,10 @@ def resource_types(self):
if self.args.models:
return [NodeType.Model]

values = set(self.config.args.resource_types)
if not values:
if not self.args.resource_types:
return list(self.DEFAULT_RESOURCE_VALUES)

values = set(self.args.resource_types)
if 'default' in values:
values.remove('default')
values.update(self.DEFAULT_RESOURCE_VALUES)
Expand Down
26 changes: 26 additions & 0 deletions core/dbt/task/rpc/project_commands.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Union
Expand All @@ -15,9 +16,11 @@
RPCTestParameters,
RemoteCatalogResults,
RemoteExecutionResult,
RemoteListResults,
RemoteRunOperationResult,
RPCSnapshotParameters,
RPCSourceFreshnessParameters,
RPCListParameters,
)
from dbt.rpc.method import (
Parameters, RemoteManifestMethod
Expand All @@ -32,6 +35,7 @@
from dbt.task.seed import SeedTask
from dbt.task.snapshot import SnapshotTask
from dbt.task.test import TestTask
from dbt.task.list import ListTask

from .base import RPCTask
from .cli import HasCLI
Expand Down Expand Up @@ -258,3 +262,25 @@ def handle_request(self) -> GetManifestResult:

def interpret_results(self, results):
return results.manifest is not None


class RemoteListTask(
RPCCommandTask[RPCListParameters], ListTask
):
METHOD_NAME = 'list'

def set_args(self, params: RPCListParameters) -> None:

self.args.output = params.output
self.args.resource_types = self._listify(params.resource_types)
self.args.models = self._listify(params.models)
self.args.exclude = self._listify(params.exclude)
self.args.selector_name = params.selector
self.args.select = self._listify(params.select)

@staticmethod
def output_results(results):
return RemoteListResults(
output=[json.loads(x) for x in results],
logs=None
)
96 changes: 96 additions & 0 deletions test/integration/100_rpc_test/test_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1176,3 +1176,99 @@ def test_deps_cli_compilation_postgres(self):
'cli_args', cli='deps', _poll_timeout=180).json())

self._check_deps_ok(status)


class TestRPCServerList(HasRPCServer):
should_seed = False

@property
def models(self):
return "models"

@mark.flaky(rerun_filter=addr_in_use, max_runs=3)
@use_profile('postgres')
def test_list_base_postgres(self):
result = self.query('list').json()
self.assertIsResult(result)
self.assertEqual(len(result["result"]["output"]), 17)
self.assertEqual(
[x["name"] for x in result["result"]["output"]],
[
'descendant_model',
'ephemeral_model',
'multi_source_model',
'nonsource_descendant',
'expected_multi_source',
'other_source_table',
'other_table',
'source',
'table',
'test_table',
'disabled_test_table',
'other_test_table',
'test_table',
'relationships_descendant_model_favorite_color__favorite_color__source_test_source_test_table_',
'source_not_null_test_source_test_table_id',
'source_relationships_test_source_test_table_favorite_color__favorite_color__ref_descendant_model_',
'source_unique_test_source_test_table_id'
]
)

@mark.flaky(rerun_filter=addr_in_use, max_runs=3)
@use_profile('postgres')
def test_list_resource_type_postgres(self):
result = self.query('list', resource_types=['model']).json()
self.assertIsResult(result)
self.assertEqual(len(result["result"]["output"]), 4)
self.assertEqual(
[x['name'] for x in result["result"]["output"]],
[
'descendant_model',
'ephemeral_model',
'multi_source_model',
'nonsource_descendant']
)

@mark.flaky(rerun_filter=addr_in_use, max_runs=3)
@use_profile('postgres')
def test_list_models_postgres(self):
result = self.query('list', models=['descendant_model']).json()
self.assertIsResult(result)
self.assertEqual(len(result["result"]["output"]), 1)
self.assertEqual(result["result"]["output"][0]["name"], 'descendant_model')

@mark.flaky(rerun_filter=addr_in_use, max_runs=3)
@use_profile('postgres')
def test_list_exclude_postgres(self):
result = self.query('list', exclude=['+descendant_model']).json()
self.assertIsResult(result)
self.assertEqual(len(result["result"]["output"]), 11)
self.assertEqual(
[x['name'] for x in result['result']['output']],
[
'ephemeral_model',
'multi_source_model',
'nonsource_descendant',
'expected_multi_source',
'other_source_table',
'other_table',
'source',
'table',
'test_table',
'disabled_test_table',
'other_test_table'
]
)

@mark.flaky(rerun_filter=addr_in_use, max_runs=3)
@use_profile('postgres')
def test_list_select_postgres(self):
result = self.query('list', select=[
'relationships_descendant_model_favorite_color__favorite_color__source_test_source_test_table_'
]).json()
self.assertIsResult(result)
self.assertEqual(len(result["result"]["output"]), 1)
self.assertEqual(
result["result"]["output"][0]["name"],
'relationships_descendant_model_favorite_color__favorite_color__source_test_source_test_table_'
)