diff --git a/CHANGELOG.md b/CHANGELOG.md index 384ed38274d..b6e7ddcaec9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - dbt compile and ls no longer create schemas if they don't already exist ([#2525](https://github.com/fishtown-analytics/dbt/issues/2525), [#2528](https://github.com/fishtown-analytics/dbt/pull/2528)) - `dbt deps` now respects the `--project-dir` flag, so using `dbt deps --project-dir=/some/path` and then `dbt run --project-dir=/some/path` will properly find dependencies ([#2519](https://github.com/fishtown-analytics/dbt/issues/2519), [#2534](https://github.com/fishtown-analytics/dbt/pull/2534)) - `packages.yml` revision/version fields can be float-like again (`revision: '1.0'` is valid). ([#2518](https://github.com/fishtown-analytics/dbt/issues/2518), [#2535](https://github.com/fishtown-analytics/dbt/pull/2535)) +- Parallel RPC requests no longer step on each others' arguments ([[#2484](https://github.com/fishtown-analytics/dbt/issues/2484), [#2554](https://github.com/fishtown-analytics/dbt/pull/2554)]) ## dbt 0.17.0 (June 08, 2020) diff --git a/core/dbt/flags.py b/core/dbt/flags.py index 7f3e4cb864c..ffcb3958081 100644 --- a/core/dbt/flags.py +++ b/core/dbt/flags.py @@ -28,13 +28,8 @@ def env_set_truthy(key: str) -> Optional[str]: def _get_context(): - if os.name == 'posix' and os.uname().sysname.lower() != 'darwin': - # on linux fork is available and it's fast - return multiprocessing.get_context('fork') - else: - # on windows, spawn is the only choice. - # On osx, fork is buggy: https://bugs.python.org/issue33725 - return multiprocessing.get_context('spawn') + # TODO: change this back to use fork() on linux when we have made that safe + return multiprocessing.get_context('spawn') MP_CONTEXT = _get_context() diff --git a/core/dbt/rpc/method.py b/core/dbt/rpc/method.py index b2bd1b36456..74563305107 100644 --- a/core/dbt/rpc/method.py +++ b/core/dbt/rpc/method.py @@ -1,5 +1,6 @@ import inspect from abc import abstractmethod +from copy import deepcopy from typing import List, Optional, Type, TypeVar, Generic, Dict, Any from hologram import JsonSchemaMixin, ValidationError @@ -20,7 +21,7 @@ class RemoteMethod(Generic[Parameters, Result]): METHOD_NAME: Optional[str] = None def __init__(self, args, config): - self.args = args + self.args = deepcopy(args) self.config = config @classmethod diff --git a/test/integration/048_rpc_test/test_rpc.py b/test/integration/048_rpc_test/test_rpc.py index 866dcb750b3..70320681750 100644 --- a/test/integration/048_rpc_test/test_rpc.py +++ b/test/integration/048_rpc_test/test_rpc.py @@ -726,8 +726,9 @@ def test_invalid_requests_postgres(self): 'hi this is not sql', name='foo' ).json() - # neat mystery: Why is this "1" on macos and "2" on linux? - lineno = '1' if sys.platform == 'darwin' else '2' + # this is "1" if the multiprocessing context is "spawn" and "2" if + # it's fork. + lineno = '1' error_data = self.assertIsErrorWith(data, 10003, 'Database Error', { 'type': 'DatabaseException', 'message': f'Database Error in rpc foo (from remote system)\n syntax error at or near "hi"\n LINE {lineno}: hi this is not sql\n ^', diff --git a/test/rpc/test_concurrency.py b/test/rpc/test_concurrency.py new file mode 100644 index 00000000000..31be49540ef --- /dev/null +++ b/test/rpc/test_concurrency.py @@ -0,0 +1,39 @@ +from concurrent.futures import ThreadPoolExecutor, as_completed + +from .util import ( + get_querier, + ProjectDefinition, +) + + +def _compile_poll_for_result(querier, id: int): + sql = f'select {id} as id' + resp = querier.compile_sql( + request_id=id, sql=sql, name=f'query_{id}' + ) + compile_sql_result = querier.async_wait_for_result(resp) + assert compile_sql_result['results'][0]['compiled_sql'] == sql + + +def test_rpc_compile_sql_concurrency( + project_root, profiles_root, postgres_profile, unique_schema +): + project = ProjectDefinition( + models={'my_model.sql': 'select 1 as id'} + ) + querier_ctx = get_querier( + project_def=project, + project_dir=project_root, + profiles_dir=profiles_root, + schema=unique_schema, + test_kwargs={}, + ) + + with querier_ctx as querier: + values = {} + with ThreadPoolExecutor(max_workers=10) as tpe: + for id in range(20): + fut = tpe.submit(_compile_poll_for_result, querier, id) + values[fut] = id + for fut in as_completed(values): + fut.result()