Skip to content

Commit

Permalink
[AIRFLOW-1837] Respect task start_date when different from dag's (#4010)
Browse files Browse the repository at this point in the history
Currently task instances get created and scheduled based on the DAG's
start date rather than their own.  This commit adds a check before
creating a task instance to see that the start date is not after
the execution date.
  • Loading branch information
dima-asana authored and Fokko Driesprong committed Oct 13, 2018
1 parent 2444ed2 commit e232f4d
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 30 deletions.
2 changes: 2 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5153,6 +5153,8 @@ def verify_integrity(self, session=None):
for task in six.itervalues(dag.task_dict):
if task.adhoc:
continue
if task.start_date > self.execution_date and not self.is_backfill:
continue

if task.task_id not in task_ids:
ti = TaskInstance(task, self.execution_date)
Expand Down
43 changes: 29 additions & 14 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
from airflow.utils import timezone
from airflow.utils.timezone import datetime
from airflow.utils.state import State
from airflow.utils.dates import infer_time_unit, round_time, scale_time_units
from airflow.utils.dates import days_ago, infer_time_unit, round_time, scale_time_units
from lxml import html
from airflow.exceptions import AirflowException
from airflow.configuration import AirflowConfigException, run_command
Expand All @@ -81,6 +81,7 @@
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
TEST_DAG_ID = 'unit_tests'
EXAMPLE_DAG_DEFAULT_DATE = days_ago(2)

try:
import cPickle as pickle
Expand Down Expand Up @@ -1651,21 +1652,21 @@ def setUp(self):

self.dagrun_python = self.dag_python.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
execution_date=DEFAULT_DATE,
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)

self.sub_dag.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
execution_date=DEFAULT_DATE,
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)

self.example_xcom.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
execution_date=DEFAULT_DATE,
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)
Expand Down Expand Up @@ -1758,7 +1759,7 @@ def test_dag_views(self):
response = self.app.get(
'/admin/airflow/task?'
'task_id=runme_0&dag_id=example_bash_operator&'
'execution_date={}'.format(DEFAULT_DATE_DS))
'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("Attributes", response.data.decode('utf-8'))
response = self.app.get(
'/admin/airflow/dag_stats')
Expand All @@ -1770,22 +1771,21 @@ def test_dag_views(self):
"/admin/airflow/success?task_id=print_the_context&"
"dag_id=example_python_operator&upstream=false&downstream=false&"
"future=false&past=false&execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
response = self.app.get(url + "&confirmed=true")
response = self.app.get(
'/admin/airflow/clear?task_id=print_the_context&'
'dag_id=example_python_operator&future=true&past=false&'
'upstream=true&downstream=false&'
'execution_date={}&'
'origin=/admin'.format(DEFAULT_DATE_DS))
'origin=/admin'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("Wait a minute", response.data.decode('utf-8'))
url = (
"/admin/airflow/success?task_id=section-1&"
"dag_id=example_subdag_operator&upstream=true&downstream=true&"
"future=false&past=false&execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
self.assertIn("section-1-task-1", response.data.decode('utf-8'))
Expand All @@ -1799,7 +1799,7 @@ def test_dag_views(self):
"dag_id=example_python_operator&future=false&past=false&"
"upstream=false&downstream=true&"
"execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
response = self.app.get(url + "&confirmed=true")
Expand All @@ -1808,7 +1808,7 @@ def test_dag_views(self):
"dag_id=example_subdag_operator.section-1&future=false&past=false&"
"upstream=false&downstream=true&recursive=true&"
"execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
self.assertIn("example_subdag_operator.end",
Expand All @@ -1835,7 +1835,7 @@ def test_dag_views(self):
"/admin/airflow/run?task_id=runme_0&"
"dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&"
"ignore_task_deps=true&execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
"origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
response = self.app.get(
"/admin/airflow/refresh?dag_id=example_bash_operator")
Expand Down Expand Up @@ -1870,13 +1870,28 @@ def test_fetch_task_instance(self):
url = (
"/admin/airflow/object/task_instances?"
"dag_id=example_python_operator&"
"execution_date={}".format(DEFAULT_DATE_DS))
"execution_date={}".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("print_the_context", response.data.decode('utf-8'))

def test_dag_view_task_with_python_operator_using_partial(self):
response = self.app.get(
'/admin/airflow/task?'
'task_id=test_dagrun_functool_partial&dag_id=test_task_view_type_check&'
'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("A function with two args", response.data.decode('utf-8'))

def test_dag_view_task_with_python_operator_using_instance(self):
response = self.app.get(
'/admin/airflow/task?'
'task_id=test_dagrun_instance&dag_id=test_task_view_type_check&'
'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("A __call__ method", response.data.decode('utf-8'))

def tearDown(self):
configuration.conf.set("webserver", "expose_config", "False")
self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=timezone.utcnow())
self.dag_bash.clear(start_date=EXAMPLE_DAG_DEFAULT_DATE,
end_date=timezone.utcnow())
session = Session()
session.query(models.DagRun).delete()
session.query(models.TaskInstance).delete()
Expand Down
22 changes: 19 additions & 3 deletions tests/dags/test_scheduler_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,34 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime
from datetime import datetime, timedelta

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
DEFAULT_DATE = datetime(2100, 1, 1)
DEFAULT_DATE = datetime(2016, 1, 1)

# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(
dag_id='test_start_date_scheduling',
start_date=datetime(2100, 1, 1))
start_date=datetime.utcnow() + timedelta(days=1))
dag1_task1 = DummyOperator(
task_id='dummy',
dag=dag1,
owner='airflow')

dag2 = DAG(
dag_id='test_task_start_date_scheduling',
start_date=DEFAULT_DATE
)
dag2_task1 = DummyOperator(
task_id='dummy1',
dag=dag2,
owner='airflow',
start_date=DEFAULT_DATE + timedelta(days=3)
)
dag2_task2 = DummyOperator(
task_id='dummy2',
dag=dag2,
owner='airflow'
)
23 changes: 22 additions & 1 deletion tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2209,7 +2209,7 @@ def test_scheduler_start_date(self):
dag_id = 'test_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
self.assertTrue(dag.start_date > DEFAULT_DATE)
self.assertTrue(dag.start_date > datetime.datetime.utcnow())

scheduler = SchedulerJob(dag_id,
num_runs=2)
Expand Down Expand Up @@ -2244,6 +2244,27 @@ def test_scheduler_start_date(self):
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)

def test_scheduler_task_start_date(self):
"""
Test that the scheduler respects task start dates that are different
from DAG start dates
"""
dag_id = 'test_task_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
scheduler = SchedulerJob(dag_id,
num_runs=2)
scheduler.run()

session = settings.Session()
tiq = session.query(TI).filter(TI.dag_id == dag_id)
ti1s = tiq.filter(TI.task_id == 'dummy1').all()
ti2s = tiq.filter(TI.task_id == 'dummy2').all()
self.assertEqual(len(ti1s), 0)
self.assertEqual(len(ti2s), 2)
for t in ti2s:
self.assertEqual(t.state, State.SUCCESS)

def test_scheduler_multiprocessing(self):
"""
Test that the scheduler can successfully queue multiple dags in parallel
Expand Down
31 changes: 19 additions & 12 deletions tests/www_rbac/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from airflow.models import DAG, DagRun, TaskInstance
from airflow.operators.dummy_operator import DummyOperator
from airflow.settings import Session
from airflow.utils import timezone
from airflow.utils import dates, timezone
from airflow.utils.state import State
from airflow.utils.timezone import datetime
from airflow.www_rbac import app as application
Expand Down Expand Up @@ -263,8 +263,8 @@ def test_mount(self):


class TestAirflowBaseViews(TestBase):
default_date = timezone.datetime(2018, 3, 1)
run_id = "test_{}".format(models.DagRun.id_for_date(default_date))
EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2)
run_id = "test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE))

def setUp(self):
super(TestAirflowBaseViews, self).setUp()
Expand All @@ -291,19 +291,19 @@ def prepare_dagruns(self):

self.bash_dagrun = self.bash_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.default_date,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

self.sub_dagrun = self.sub_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.default_date,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

self.xcom_dagrun = self.xcom_dag.create_dagrun(
run_id=self.run_id,
execution_date=self.default_date,
execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)

Expand All @@ -321,19 +321,19 @@ def test_home(self):

def test_task(self):
url = ('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('Task Instance Details', resp)

def test_xcom(self):
url = ('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('XCom', resp)

def test_rendered(self):
url = ('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('Rendered Template', resp)

Expand Down Expand Up @@ -400,25 +400,32 @@ def test_paused(self):
resp = self.client.post(url, follow_redirects=True)
self.check_content_in_response('OK', resp)

def test_failed(self):
url = ('failed?task_id=run_this_last&dag_id=example_bash_operator&'
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response('Wait a minute', resp)

def test_success(self):

url = ('success?task_id=run_this_last&dag_id=example_bash_operator&'
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response('Wait a minute', resp)

def test_clear(self):
url = ('clear?task_id=runme_1&dag_id=example_bash_operator&'
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response(['example_bash_operator', 'Wait a minute'], resp)

def test_run(self):
url = ('run?task_id=runme_0&dag_id=example_bash_operator&ignore_all_deps=false&'
'ignore_ti_state=true&execution_date={}'
.format(self.percent_encode(self.default_date)))
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response('', resp, resp_code=302)

Expand Down

0 comments on commit e232f4d

Please sign in to comment.