diff --git a/CHANGELOG.md b/CHANGELOG.md index d9cd3022872..3a36a6caa33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ ### Under the hood - Added logic for registry requests to raise a timeout error after a response hangs out for 30 seconds and 5 attempts have been made to reach the endpoint ([#3177](https://github.com/fishtown-analytics/dbt/issues/3177), [#3275](https://github.com/fishtown-analytics/dbt/pull/3275)) +- Added support for invoking the `list` task via the RPC server ([#3311](https://github.com/fishtown-analytics/dbt/issues/3311), [#3384](https://github.com/fishtown-analytics/dbt/pull/3384)) +- Added `unique_id` and `original_file_path` as keys to json responses from the `list` task ([#3356](https://github.com/fishtown-analytics/dbt/issues/3356), [#3384](https://github.com/fishtown-analytics/dbt/pull/3384)) - Use shutil.which so Windows can pick up git.bat as a git executable ([#3035](https://github.com/fishtown-analytics/dbt/issues/3035), [#3134](https://github.com/fishtown-analytics/dbt/issues/3134)) - Add `ssh-client` and update `git` version (using buster backports) in Docker image ([#3337](https://github.com/fishtown-analytics/dbt/issues/3337), [#3338](https://github.com/fishtown-analytics/dbt/pull/3338)) diff --git a/core/dbt/contracts/rpc.py b/core/dbt/contracts/rpc.py index 83d248e84d2..02198630e00 100644 --- a/core/dbt/contracts/rpc.py +++ b/core/dbt/contracts/rpc.py @@ -63,6 +63,16 @@ class RPCCompileParameters(RPCParameters): state: Optional[str] = None +@dataclass +class RPCListParameters(RPCParameters): + resource_types: Optional[List[str]] = None + models: Union[None, str, List[str]] = None + exclude: Union[None, str, List[str]] = None + select: Union[None, str, List[str]] = None + selector: Optional[str] = None + output: Optional[str] = 'json' + + @dataclass class RPCRunParameters(RPCParameters): threads: Optional[int] = None @@ -190,6 +200,13 @@ class RemoteResult(VersionedSchema): logs: List[LogMessage] +@dataclass +@schema_version('remote-list-results', 1) +class RemoteListResults(RemoteResult): + output: List[Any] + generated_at: datetime = field(default_factory=datetime.utcnow) + + @dataclass @schema_version('remote-deps-result', 1) class RemoteDepsResult(RemoteResult): diff --git a/core/dbt/rpc/task_handler.py b/core/dbt/rpc/task_handler.py index ce2f320fc1f..a066b7614b0 100644 --- a/core/dbt/rpc/task_handler.py +++ b/core/dbt/rpc/task_handler.py @@ -38,7 +38,7 @@ QueueTimeoutMessage, ) from dbt.rpc.method import RemoteMethod - +from dbt.task.rpc.project_commands import RemoteListTask # we use this in typing only... from queue import Queue # noqa @@ -78,7 +78,10 @@ def _spawn_setup(self): def task_exec(self) -> None: """task_exec runs first inside the child process""" - signal.signal(signal.SIGTERM, sigterm_handler) + if type(self.task) != RemoteListTask: + # TODO: find another solution for this.. in theory it stops us from + # being able to kill RemoteListTask processes + signal.signal(signal.SIGTERM, sigterm_handler) # the first thing we do in a new process: push logging back over our # queue handler = QueueLogHandler(self.queue) diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py index 0da36feb294..09b23707f58 100644 --- a/core/dbt/task/base.py +++ b/core/dbt/task/base.py @@ -58,6 +58,7 @@ class BaseTask(metaclass=ABCMeta): def __init__(self, args, config): self.args = args + self.args.single_threaded = False self.config = config @classmethod diff --git a/core/dbt/task/list.py b/core/dbt/task/list.py index d78223eef8b..60051e065cf 100644 --- a/core/dbt/task/list.py +++ b/core/dbt/task/list.py @@ -3,7 +3,7 @@ from dbt.contracts.graph.parsed import ( ParsedExposure, - ParsedSourceDefinition, + ParsedSourceDefinition ) from dbt.graph import ( parse_difference, @@ -38,6 +38,8 @@ class ListTask(GraphRunnableTask): 'config', 'resource_type', 'source_name', + 'original_file_path', + 'unique_id' )) def __init__(self, args, config): @@ -120,7 +122,7 @@ def generate_paths(self): def run(self): ManifestTask._runtime_initialize(self) - output = self.config.args.output + output = self.args.output if output == 'selector': generator = self.generate_selectors elif output == 'name': @@ -133,7 +135,11 @@ def run(self): raise InternalException( 'Invalid output {}'.format(output) ) - for result in generator(): + + return self.output_results(generator()) + + def output_results(self, results): + for result in results: self.node_results.append(result) print(result) return self.node_results @@ -143,10 +149,10 @@ def resource_types(self): if self.args.models: return [NodeType.Model] - values = set(self.config.args.resource_types) - if not values: + if not self.args.resource_types: return list(self.DEFAULT_RESOURCE_VALUES) + values = set(self.args.resource_types) if 'default' in values: values.remove('default') values.update(self.DEFAULT_RESOURCE_VALUES) diff --git a/core/dbt/task/rpc/project_commands.py b/core/dbt/task/rpc/project_commands.py index cc345cfde20..a4952cc87fb 100644 --- a/core/dbt/task/rpc/project_commands.py +++ b/core/dbt/task/rpc/project_commands.py @@ -1,3 +1,4 @@ +import json from datetime import datetime from pathlib import Path from typing import List, Optional, Union @@ -15,10 +16,13 @@ RPCTestParameters, RemoteCatalogResults, RemoteExecutionResult, + RemoteListResults, RemoteRunOperationResult, RPCSnapshotParameters, RPCSourceFreshnessParameters, + RPCListParameters, ) +from dbt.exceptions import RuntimeException from dbt.rpc.method import ( Parameters, RemoteManifestMethod ) @@ -32,6 +36,7 @@ from dbt.task.seed import SeedTask from dbt.task.snapshot import SnapshotTask from dbt.task.test import TestTask +from dbt.task.list import ListTask from .base import RPCTask from .cli import HasCLI @@ -258,3 +263,36 @@ def handle_request(self) -> GetManifestResult: def interpret_results(self, results): return results.manifest is not None + + +class RemoteListTask( + RPCCommandTask[RPCListParameters], ListTask +): + METHOD_NAME = 'list' + + def set_args(self, params: RPCListParameters) -> None: + + self.args.output = params.output + self.args.resource_types = self._listify(params.resource_types) + self.args.models = self._listify(params.models) + self.args.exclude = self._listify(params.exclude) + self.args.selector_name = params.selector + self.args.select = self._listify(params.select) + + if self.args.models: + if self.args.select: + raise RuntimeException( + '"models" and "select" are mutually exclusive arguments' + ) + if self.args.resource_types: + raise RuntimeException( + '"models" and "resource_type" are mutually exclusive ' + 'arguments' + ) + + @staticmethod + def output_results(results): + return RemoteListResults( + output=[json.loads(x) for x in results], + logs=None + ) diff --git a/test/integration/047_dbt_ls_test/test_ls.py b/test/integration/047_dbt_ls_test/test_ls.py index 747c4a1f947..5ebe12b14aa 100644 --- a/test/integration/047_dbt_ls_test/test_ls.py +++ b/test/integration/047_dbt_ls_test/test_ls.py @@ -1,6 +1,7 @@ from test.integration.base import DBTIntegrationTest, use_profile import os from dbt.logger import log_manager +from test.integration.base import normalize import json @@ -94,6 +95,8 @@ def expect_snapshot_output(self): 'alias': None, 'check_cols': None, }, + 'unique_id': 'snapshot.test.my_snapshot', + 'original_file_path': normalize('snapshots/snapshot.sql'), 'alias': 'my_snapshot', 'resource_type': 'snapshot', }, @@ -125,6 +128,8 @@ def expect_analyses_output(self): 'schema': None, 'alias': None, }, + 'unique_id': 'analysis.test.a', + 'original_file_path': normalize('analyses/a.sql'), 'alias': 'a', 'resource_type': 'analysis', }, @@ -157,6 +162,8 @@ def expect_model_output(self): 'schema': None, 'alias': None, }, + 'original_file_path': normalize('models/ephemeral.sql'), + 'unique_id': 'model.test.ephemeral', 'alias': 'ephemeral', 'resource_type': 'model', }, @@ -181,6 +188,8 @@ def expect_model_output(self): 'schema': None, 'alias': None, }, + 'original_file_path': normalize('models/incremental.sql'), + 'unique_id': 'model.test.incremental', 'alias': 'incremental', 'resource_type': 'model', }, @@ -204,6 +213,8 @@ def expect_model_output(self): 'schema': None, 'alias': None, }, + 'original_file_path': normalize('models/sub/inner.sql'), + 'unique_id': 'model.test.inner', 'alias': 'inner', 'resource_type': 'model', }, @@ -227,6 +238,8 @@ def expect_model_output(self): 'schema': None, 'alias': None, }, + 'original_file_path': normalize('models/outer.sql'), + 'unique_id': 'model.test.outer', 'alias': 'outer', 'resource_type': 'model', }, @@ -261,6 +274,8 @@ def expect_model_ephemeral_output(self): 'schema': None, 'alias': None, }, + 'unique_id': 'model.test.ephemeral', + 'original_file_path': normalize('models/ephemeral.sql'), 'alias': 'outer', 'resource_type': 'model', }, @@ -277,6 +292,8 @@ def expect_source_output(self): 'config': { 'enabled': True, }, + 'unique_id': 'source.test.my_source.my_table', + 'original_file_path': normalize('models/schema.yml'), 'package_name': 'test', 'name': 'my_table', 'source_name': 'my_source', @@ -314,6 +331,8 @@ def expect_seed_output(self): 'schema': None, 'alias': None, }, + 'unique_id': 'seed.test.seed', + 'original_file_path': normalize('data/seed.csv'), 'alias': 'seed', 'resource_type': 'seed', }, @@ -347,6 +366,8 @@ def expect_test_output(self): 'schema': None, 'alias': None, }, + 'unique_id': 'test.test.not_null_outer_id.8f1c176a93', + 'original_file_path': normalize('models/schema.yml'), 'alias': 'not_null_outer_id', 'resource_type': 'test', }, @@ -371,6 +392,8 @@ def expect_test_output(self): 'schema': None, 'alias': None, }, + 'unique_id': 'test.test.t', + 'original_file_path': normalize('tests/t.sql'), 'alias': 't', 'resource_type': 'test', }, @@ -395,6 +418,8 @@ def expect_test_output(self): 'schema': None, 'alias': None, }, + 'unique_id': 'test.test.unique_outer_id.a653b29b17', + 'original_file_path': normalize('models/schema.yml'), 'alias': 'unique_outer_id', 'resource_type': 'test', }, diff --git a/test/integration/100_rpc_test/test_rpc.py b/test/integration/100_rpc_test/test_rpc.py index 373129cb960..3c327ee96fc 100644 --- a/test/integration/100_rpc_test/test_rpc.py +++ b/test/integration/100_rpc_test/test_rpc.py @@ -1176,3 +1176,99 @@ def test_deps_cli_compilation_postgres(self): 'cli_args', cli='deps', _poll_timeout=180).json()) self._check_deps_ok(status) + + +class TestRPCServerList(HasRPCServer): + should_seed = False + + @property + def models(self): + return "models" + + @mark.flaky(rerun_filter=addr_in_use, max_runs=3) + @use_profile('postgres') + def test_list_base_postgres(self): + result = self.query('list').json() + self.assertIsResult(result) + self.assertEqual(len(result["result"]["output"]), 17) + self.assertEqual( + [x["name"] for x in result["result"]["output"]], + [ + 'descendant_model', + 'ephemeral_model', + 'multi_source_model', + 'nonsource_descendant', + 'expected_multi_source', + 'other_source_table', + 'other_table', + 'source', + 'table', + 'test_table', + 'disabled_test_table', + 'other_test_table', + 'test_table', + 'relationships_descendant_model_favorite_color__favorite_color__source_test_source_test_table_', + 'source_not_null_test_source_test_table_id', + 'source_relationships_test_source_test_table_favorite_color__favorite_color__ref_descendant_model_', + 'source_unique_test_source_test_table_id' + ] + ) + + @mark.flaky(rerun_filter=addr_in_use, max_runs=3) + @use_profile('postgres') + def test_list_resource_type_postgres(self): + result = self.query('list', resource_types=['model']).json() + self.assertIsResult(result) + self.assertEqual(len(result["result"]["output"]), 4) + self.assertEqual( + [x['name'] for x in result["result"]["output"]], + [ + 'descendant_model', + 'ephemeral_model', + 'multi_source_model', + 'nonsource_descendant'] + ) + + @mark.flaky(rerun_filter=addr_in_use, max_runs=3) + @use_profile('postgres') + def test_list_models_postgres(self): + result = self.query('list', models=['descendant_model']).json() + self.assertIsResult(result) + self.assertEqual(len(result["result"]["output"]), 1) + self.assertEqual(result["result"]["output"][0]["name"], 'descendant_model') + + @mark.flaky(rerun_filter=addr_in_use, max_runs=3) + @use_profile('postgres') + def test_list_exclude_postgres(self): + result = self.query('list', exclude=['+descendant_model']).json() + self.assertIsResult(result) + self.assertEqual(len(result["result"]["output"]), 11) + self.assertEqual( + [x['name'] for x in result['result']['output']], + [ + 'ephemeral_model', + 'multi_source_model', + 'nonsource_descendant', + 'expected_multi_source', + 'other_source_table', + 'other_table', + 'source', + 'table', + 'test_table', + 'disabled_test_table', + 'other_test_table' + ] + ) + + @mark.flaky(rerun_filter=addr_in_use, max_runs=3) + @use_profile('postgres') + def test_list_select_postgres(self): + result = self.query('list', select=[ + 'relationships_descendant_model_favorite_color__favorite_color__source_test_source_test_table_' + ]).json() + self.assertIsResult(result) + self.assertEqual(len(result["result"]["output"]), 1) + self.assertEqual( + result["result"]["output"][0]["name"], + 'relationships_descendant_model_favorite_color__favorite_color__source_test_source_test_table_' + )