From ebac510a8100ebbbc13c3a101e41526f532c8a5c Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Tue, 16 Jun 2020 13:28:15 -0600 Subject: [PATCH] add a test For the test to work, we have to switch to spawn so that we don't deadlock --- core/dbt/flags.py | 2 +- test/rpc/test_concurrency.py | 39 ++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) create mode 100644 test/rpc/test_concurrency.py diff --git a/core/dbt/flags.py b/core/dbt/flags.py index 7f3e4cb864c..38437d4c08d 100644 --- a/core/dbt/flags.py +++ b/core/dbt/flags.py @@ -30,7 +30,7 @@ def env_set_truthy(key: str) -> Optional[str]: def _get_context(): if os.name == 'posix' and os.uname().sysname.lower() != 'darwin': # on linux fork is available and it's fast - return multiprocessing.get_context('fork') + return multiprocessing.get_context('spawn') else: # on windows, spawn is the only choice. # On osx, fork is buggy: https://bugs.python.org/issue33725 diff --git a/test/rpc/test_concurrency.py b/test/rpc/test_concurrency.py new file mode 100644 index 00000000000..31be49540ef --- /dev/null +++ b/test/rpc/test_concurrency.py @@ -0,0 +1,39 @@ +from concurrent.futures import ThreadPoolExecutor, as_completed + +from .util import ( + get_querier, + ProjectDefinition, +) + + +def _compile_poll_for_result(querier, id: int): + sql = f'select {id} as id' + resp = querier.compile_sql( + request_id=id, sql=sql, name=f'query_{id}' + ) + compile_sql_result = querier.async_wait_for_result(resp) + assert compile_sql_result['results'][0]['compiled_sql'] == sql + + +def test_rpc_compile_sql_concurrency( + project_root, profiles_root, postgres_profile, unique_schema +): + project = ProjectDefinition( + models={'my_model.sql': 'select 1 as id'} + ) + querier_ctx = get_querier( + project_def=project, + project_dir=project_root, + profiles_dir=profiles_root, + schema=unique_schema, + test_kwargs={}, + ) + + with querier_ctx as querier: + values = {} + with ThreadPoolExecutor(max_workers=10) as tpe: + for id in range(20): + fut = tpe.submit(_compile_poll_for_result, querier, id) + values[fut] = id + for fut in as_completed(values): + fut.result()