diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py index 5a3eca8f03a12..5d0896455e9fd 100644 --- a/airflow/executors/dask_executor.py +++ b/airflow/executors/dask_executor.py @@ -96,7 +96,7 @@ def airflow_run(): raise AirflowException(f"Attempted to submit task to an unavailable queue: '{queue}'") resources = {queue: 1} - future = self.client.submit(airflow_run, pure=False, resources=resources) + future = self.client.submit(subprocess.check_call, command, pure=False, resources=resources) self.futures[future] = key # type: ignore def _process_future(self, future: Future) -> None: diff --git a/tests/executors/test_dask_executor.py b/tests/executors/test_dask_executor.py index 668034fb09d35..970a7ab0b957f 100644 --- a/tests/executors/test_dask_executor.py +++ b/tests/executors/test_dask_executor.py @@ -32,7 +32,7 @@ # utility functions imported from the dask testing suite to instantiate a test # cluster for tls tests - from distributed.utils_test import cluster as dask_testing_cluster, get_cert, tls_security + from distributed.utils_test import cluster as dask_testing_cluster, tls_security from airflow.executors.dask_executor import DaskExecutor @@ -112,9 +112,9 @@ def setUp(self): @conf_vars( { - ('dask', 'tls_ca'): get_cert('tls-ca-cert.pem'), - ('dask', 'tls_cert'): get_cert('tls-key-cert.pem'), - ('dask', 'tls_key'): get_cert('tls-key.pem'), + ('dask', 'tls_ca'): 'certs/tls-ca-cert.pem', + ('dask', 'tls_cert'): 'certs/tls-key-cert.pem', + ('dask', 'tls_key'): 'certs/tls-key.pem', } ) def test_tls(self):