-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed dask executor and tests #22027
Changes from 4 commits
6d86480
f9ba4aa
bf13e7b
b33fbd3
6d03436
3785d20
810fbf7
8093f79
4013070
e1865e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,8 +32,7 @@ | |
try: | ||
# utility functions imported from the dask testing suite to instantiate a test | ||
# cluster for tls tests | ||
from distributed import tests # noqa | ||
from distributed.utils_test import cluster as dask_testing_cluster, get_cert, tls_security | ||
pass | ||
|
||
skip_tls_tests = False | ||
except ImportError: | ||
|
@@ -49,7 +48,7 @@ | |
|
||
# For now we are temporarily removing Dask support until we get Dask Team help us in making the | ||
# tests pass again | ||
skip_dask_tests = True | ||
skip_dask_tests = False | ||
|
||
|
||
@pytest.mark.skipif(skip_dask_tests, reason="The tests are skipped because it needs testing from Dask team") | ||
|
@@ -121,27 +120,27 @@ def setUp(self): | |
|
||
@conf_vars( | ||
{ | ||
('dask', 'tls_ca'): get_cert('tls-ca-cert.pem'), | ||
('dask', 'tls_cert'): get_cert('tls-key-cert.pem'), | ||
('dask', 'tls_key'): get_cert('tls-key.pem'), | ||
('dask', 'tls_ca'): 'certs/tls-ca-cert.pem', | ||
('dask', 'tls_cert'): 'certs/tls-key-cert.pem', | ||
('dask', 'tls_key'): 'certs/tls-key.pem', | ||
} | ||
) | ||
def test_tls(self): | ||
# These use test certs that ship with dask/distributed and should not be | ||
# used in production | ||
with dask_testing_cluster( | ||
worker_kwargs={'security': tls_security(), "protocol": "tls"}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tls_security in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's see. They did not for me when I run them previously. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Atleast on my local, most of the tests were failing because of errors with pickling the function, the change in [airflow/executors/dask_executor.py], fixed it
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool @subkanthi ! Looks good. I think you can bring back the tests now - in my change (when you rebase) you will see how I dealt with get_cert to skip those There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you cna ucomment your line comments now :). |
||
scheduler_kwargs={'security': tls_security(), "protocol": "tls"}, | ||
) as (cluster, _): | ||
|
||
executor = DaskExecutor(cluster_address=cluster['address']) | ||
|
||
self.assert_tasks_on_executor(executor, timeout_executor=120) | ||
|
||
executor.end() | ||
# close the executor, the cluster context manager expects all listeners | ||
# and tasks to have completed. | ||
executor.client.close() | ||
# def test_tls(self): | ||
# # These use test certs that ship with dask/distributed and should not be | ||
# # used in production | ||
# with dask_testing_cluster( | ||
# worker_kwargs={'security': tls_security(), "protocol": "tls"}, | ||
# scheduler_kwargs={'security': tls_security(), "protocol": "tls"}, | ||
# ) as (cluster, _): | ||
# | ||
# executor = DaskExecutor(cluster_address=cluster['address']) | ||
# | ||
# self.assert_tasks_on_executor(executor, timeout_executor=120) | ||
# | ||
# executor.end() | ||
# # close the executor, the cluster context manager expects all listeners | ||
# # and tasks to have completed. | ||
# executor.client.close() | ||
|
||
@mock.patch('airflow.executors.dask_executor.DaskExecutor.sync') | ||
@mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks') | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to #11451