Skip to content

Commit

Permalink
[Backport] #3674 (#3718)
Browse files Browse the repository at this point in the history
* Add build RPC method (#3674)

* Add build RPC method

* Add rpc test, some required flags

* Fix flake8

* PR feedback

* Update changelog [skip ci]

* Do not skip CI when rebasing

* Trigger CI so I can merge
  • Loading branch information
jtcohen6 authored Aug 10, 2021
1 parent 1c066cd commit b67e877
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 6 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
## dbt 0.21.0 (Release TBD)

### Fixes
- Fix for RPC requests that raise a RecursionError when serializing Undefined values as JSON ([#3464](https://github.com/dbt-labs/dbt/issues/3464), [#3687](https://github.com/dbt-labs/dbt/pull/3687))

### Under the hood
- Add `build` RPC method, and a subset of flags for `build` task ([#3595](https://github.com/dbt-labs/dbt/issues/3595), [#3674](https://github.com/dbt-labs/dbt/pull/3674))

## dbt 0.21.0b1 (August 03, 2021)


### Breaking changes
- Add full node selection to source freshness command and align selection syntax with other tasks (`dbt source freshness --select source_name` --> `dbt source freshness --select source:souce_name`) and rename `dbt source snapshot-freshness` -> `dbt source freshness`. ([#2987](https://github.com/dbt-labs/dbt/issues/2987), [#3554](https://github.com/dbt-labs/dbt/pull/3554))
- **dbt-snowflake:** Turn off transactions and turn on `autocommit` by default. Explicitly specify `begin` and `commit` for DML statements in incremental and snapshot materializations. Note that this may affect user-space code that depends on transactions.
Expand All @@ -13,7 +22,6 @@
- Fix type coercion issues when fetching query result sets ([#2984](https://github.com/fishtown-analytics/dbt/issues/2984), [#3499](https://github.com/fishtown-analytics/dbt/pull/3499))
- Handle whitespace after a plus sign on the project config ([#3526](https://github.com/dbt-labs/dbt/pull/3526))
- Fix table and view materialization issue when switching from one to the other ([#2161](https://github.com/dbt-labs/dbt/issues/2161)), [#3547](https://github.com/dbt-labs/dbt/pull/3547))
- Fix for RPC requests that raise a RecursionError when serializing Undefined values as JSON ([#3464](https://github.com/dbt-labs/dbt/issues/3464), [#3687](https://github.com/dbt-labs/dbt/pull/3687))

### Under the hood
- Improve default view and table materialization performance by checking relational cache before attempting to drop temp relations ([#3112](https://github.com/fishtown-analytics/dbt/issues/3112), [#3468](https://github.com/fishtown-analytics/dbt/pull/3468))
Expand Down
10 changes: 10 additions & 0 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ class RPCDocsGenerateParameters(RPCParameters):
state: Optional[str] = None


@dataclass
class RPCBuildParameters(RPCParameters):
threads: Optional[int] = None
models: Union[None, str, List[str]] = None
exclude: Union[None, str, List[str]] = None
selector: Optional[str] = None
state: Optional[str] = None
defer: Optional[bool] = None


@dataclass
class RPCCliParameters(RPCParameters):
cli: str
Expand Down
6 changes: 3 additions & 3 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,15 +1072,15 @@ def parse_args(args, cls=DBTArgumentParser):
seed_sub = _build_seed_subparser(subs, base_subparser)
# --threads, --no-version-check
_add_common_arguments(run_sub, compile_sub, generate_sub, test_sub,
rpc_sub, seed_sub, parse_sub)
rpc_sub, seed_sub, parse_sub, build_sub)
# --models, --exclude
# list_sub sets up its own arguments.
_add_selection_arguments(build_sub, run_sub, compile_sub, generate_sub, test_sub)
_add_selection_arguments(snapshot_sub, seed_sub, models_name='select')
# --defer
_add_defer_argument(run_sub, test_sub)
_add_defer_argument(run_sub, test_sub, build_sub)
# --full-refresh
_add_table_mutability_arguments(run_sub, compile_sub)
_add_table_mutability_arguments(run_sub, compile_sub, build_sub)

_build_docs_serve_subparser(docs_subs, base_subparser)
_build_source_freshness_subparser(source_subs, base_subparser)
Expand Down
21 changes: 21 additions & 0 deletions core/dbt/task/rpc/project_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
RPCSnapshotParameters,
RPCSourceFreshnessParameters,
RPCListParameters,
RPCBuildParameters,
)
from dbt.exceptions import RuntimeException
from dbt.rpc.method import (
Expand All @@ -37,6 +38,7 @@
from dbt.task.snapshot import SnapshotTask
from dbt.task.test import TestTask
from dbt.task.list import ListTask
from dbt.task.build import BuildTask

from .base import RPCTask
from .cli import HasCLI
Expand Down Expand Up @@ -305,3 +307,22 @@ def output_results(results):
output=[json.loads(x) for x in results],
logs=None
)


class RemoteBuildProjectTask(RPCCommandTask[RPCBuildParameters], BuildTask):
METHOD_NAME = 'build'

def set_args(self, params: RPCBuildParameters) -> None:
self.args.models = self._listify(params.models)
self.args.exclude = self._listify(params.exclude)
self.args.selector_name = params.selector

if params.threads is not None:
self.args.threads = params.threads
if params.defer is None:
self.args.defer = flags.DEFER_MODE
else:
self.args.defer = params.defer

self.args.state = state_path(params.state)
self.set_previous_state()
136 changes: 136 additions & 0 deletions test/rpc/test_build.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import os
import pytest
import yaml
from .util import (
assert_has_threads,
get_querier,
get_write_manifest,
ProjectDefinition,
)

snapshot_data = '''
{% snapshot my_snapshot %}
{{
config(
target_database=database,
target_schema=schema,
unique_key='id',
strategy='timestamp',
updated_at='updated_at',
)
}}
select id, cast('2019-10-31 23:59:40' as timestamp) as updated_at
from {{ ref('my_model') }}
{% endsnapshot %}
'''

@pytest.mark.supported('postgres')
def test_rpc_build_threads(
project_root, profiles_root, dbt_profile, unique_schema
):
schema_yaml = {
'version': 2,
'models': [{
'name': 'my_model',
'columns': [
{
'name': 'id',
'tests': ['not_null', 'unique'],
},
],
}],
}
project = ProjectDefinition(
project_data={'seeds': {'+quote_columns': False}},
models={
'my_model.sql': 'select * from {{ ref("data") }}',
'schema.yml': yaml.safe_dump(schema_yaml)
},
seeds={'data.csv': 'id,message\n1,hello\n2,goodbye'},
snapshots={'my_snapshots.sql': snapshot_data},
)
querier_ctx = get_querier(
project_def=project,
project_dir=project_root,
profiles_dir=profiles_root,
schema=unique_schema,
test_kwargs={},
)
with querier_ctx as querier:
# first run dbt to get the model built
querier.async_wait_for_result(querier.build())

results = querier.async_wait_for_result(querier.test(threads=5))
assert_has_threads(results, 5)

results = querier.async_wait_for_result(
querier.cli_args('build --threads=7')
)
assert_has_threads(results, 7)


@pytest.mark.supported('postgres')
def test_rpc_build_state(
project_root, profiles_root, dbt_profile, unique_schema
):
schema_yaml = {
'version': 2,
'models': [{
'name': 'my_model',
'columns': [
{
'name': 'id',
'tests': ['not_null', 'unique'],
},
],
}],
}
project = ProjectDefinition(
project_data={'seeds': {'+quote_columns': False}},
models={
'my_model.sql': 'select * from {{ ref("data") }}',
'schema.yml': yaml.safe_dump(schema_yaml)
},
seeds={'data.csv': 'id,message\n1,hello\n2,goodbye'},
snapshots={'my_snapshots.sql': snapshot_data},
)
querier_ctx = get_querier(
project_def=project,
project_dir=project_root,
profiles_dir=profiles_root,
schema=unique_schema,
test_kwargs={},
)
with querier_ctx as querier:
state_dir = os.path.join(project_root, 'state')
os.makedirs(state_dir)

results = querier.async_wait_for_result(querier.build())
assert len(results['results']) == 5

get_write_manifest(querier, os.path.join(state_dir, 'manifest.json'))

project.models['my_model.sql'] = 'select * from {{ ref("data" )}} where id = 2'
project.write_models(project_root, remove=True)
querier.sighup()
assert querier.wait_for_status('ready') is True

results = querier.async_wait_for_result(
querier.build(state='./state', models=['state:modified'])
)
assert len(results['results']) == 3

get_write_manifest(querier, os.path.join(state_dir, 'manifest.json'))

results = querier.async_wait_for_result(
querier.build(state='./state', models=['state:modified']),
)
assert len(results['results']) == 0

# a better test of defer would require multiple targets
results = querier.async_wait_for_result(
querier.build(state='./state', models=['state:modified'], defer=True)
)
assert len(results['results']) == 0
4 changes: 2 additions & 2 deletions test/rpc/test_seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def test_rpc_seed_threads(
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
project_data={'seeds': {'config': {'quote_columns': False}}},
project_data={'seeds': {'+quote_columns': False}},
seeds={'data.csv': 'a,b\n1,hello\n2,goodbye'},
)
querier_ctx = get_querier(
Expand All @@ -39,7 +39,7 @@ def test_rpc_seed_include_exclude(
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
project_data={'seeds': {'config': {'quote_columns': False}}},
project_data={'seeds': {'+quote_columns': False}},
seeds={
'data_1.csv': 'a,b\n1,hello\n2,goodbye',
'data_2.csv': 'a,b\n1,data',
Expand Down
24 changes: 24 additions & 0 deletions test/rpc/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,30 @@ def test(
method='test', params=params, request_id=request_id
)

def build(
self,
models: Optional[Union[str, List[str]]] = None,
exclude: Optional[Union[str, List[str]]] = None,
threads: Optional[int] = None,
request_id: int = 1,
defer: Optional[bool] = None,
state: Optional[bool] = None,
):
params = {}
if models is not None:
params['models'] = models
if exclude is not None:
params['exclude'] = exclude
if threads is not None:
params['threads'] = threads
if defer is not None:
params['defer'] = defer
if state is not None:
params['state'] = state
return self.request(
method='build', params=params, request_id=request_id
)

def docs_generate(self, compile: bool = None, request_id: int = 1):
params = {}
if compile is not None:
Expand Down

0 comments on commit b67e877

Please sign in to comment.