Skip to content

Commit

Permalink
Refactor ti clear next method kwargs tests (#19194)
Browse files Browse the repository at this point in the history
  • Loading branch information
ReadytoRocc authored Jan 10, 2022
1 parent 4d33ebf commit 3b5adaf
Showing 1 changed file with 18 additions and 37 deletions.
55 changes: 18 additions & 37 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,50 +484,31 @@ def task_function(ti):
ti.state == state

@pytest.mark.parametrize(
"state",
[State.FAILED, State.SKIPPED, State.SUCCESS, State.UP_FOR_RESCHEDULE, State.UP_FOR_RETRY],
"state, exception, retries",
[
(State.FAILED, AirflowException, 0),
(State.SKIPPED, AirflowSkipException, 0),
(State.SUCCESS, None, 0),
(State.UP_FOR_RESCHEDULE, AirflowRescheduleException(timezone.utcnow()), 0),
(State.UP_FOR_RETRY, AirflowException, 1),
],
)
def test_task_wipes_next_fields(self, session, state, dag_maker):
def test_task_wipes_next_fields(self, session, dag_maker, state, exception, retries):
"""
Test that ensures that tasks wipe their next_method and next_kwargs
when they go into a state of FAILED, SKIPPED, SUCCESS, UP_FOR_RESCHEDULE, or UP_FOR_RETRY.
when the TI enters one of the configured states.
"""

def failure():
raise AirflowException

def skip():
raise AirflowSkipException

def success():
return None

def reschedule():
reschedule_date = timezone.utcnow()
raise AirflowRescheduleException(reschedule_date)

_retries = 0
_retry_delay = datetime.timedelta(seconds=0)

if state == State.FAILED:
_python_callable = failure
elif state == State.SKIPPED:
_python_callable = skip
elif state == State.SUCCESS:
_python_callable = success
elif state == State.UP_FOR_RESCHEDULE:
_python_callable = reschedule
elif state in [State.FAILED, State.UP_FOR_RETRY]:
_python_callable = failure
_retries = 1
_retry_delay = datetime.timedelta(seconds=2)
def _raise_if_exception():
if exception:
raise exception

with dag_maker("test_deferred_method_clear"):
task = PythonOperator(
task_id="test_deferred_method_clear_task",
python_callable=_python_callable,
retries=_retries,
retry_delay=_retry_delay,
python_callable=_raise_if_exception,
retries=retries,
retry_delay=datetime.timedelta(seconds=2),
)

dr = dag_maker.create_dagrun()
Expand All @@ -539,9 +520,9 @@ def reschedule():

ti.task = task
if state in [State.FAILED, State.UP_FOR_RETRY]:
with pytest.raises(AirflowException):
with pytest.raises(exception):
ti.run()
elif state in [State.SKIPPED, State.SUCCESS, State.UP_FOR_RESCHEDULE]:
else:
ti.run()
ti.refresh_from_db()

Expand Down

0 comments on commit 3b5adaf

Please sign in to comment.