From 1d2502ee2313b3e7403e317779a60d558f3ee1a6 Mon Sep 17 00:00:00 2001 From: Dan Davydov Date: Thu, 4 Oct 2018 18:17:54 -0400 Subject: [PATCH] [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp. Summary: [AIRFLOW-3160] Load latest_dagruns asynchronously Differential Revision: https://phabricator.twitter.biz/D224903 --- airflow/version.py | 2 +- airflow/www/templates/airflow/dags.html | 32 ++++++++++++++------ airflow/www/views.py | 20 ++++++++++++ airflow/www_rbac/templates/airflow/dags.html | 32 ++++++++++++++------ airflow/www_rbac/views.py | 27 +++++++++++++++++ tests/core.py | 14 ++------- 6 files changed, 97 insertions(+), 30 deletions(-) diff --git a/airflow/version.py b/airflow/version.py index 8629bbdb0c4d1..e6572deb7ce34 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,4 +18,4 @@ # under the License. # -version = '1.10.0+twtr5' +version = '1.10.0+twtr8' diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index 217b39dcbb4b7..2896a3e7c26b6 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -118,15 +118,11 @@

DAGs

- {% if dag %} - {% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %} - {% if last_run and last_run.execution_date %} - - {{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }} - - - {% endif %} - {% endif %} +
+ + + +
@@ -306,6 +302,24 @@

DAGs

} }); }); + d3.json("{{ url_for('airflow.last_dagruns') }}", function(error, json) { + for(var safe_dag_id in json) { + dag_id = json[safe_dag_id].dag_id; + last_run = json[safe_dag_id].last_run; + g = d3.select('div#last-run-' + safe_dag_id) + + g.selectAll('a') + .attr("href", "{{ url_for('airflow.graph') }}?dag_id=" + dag_id + "&execution_date=" + last_run) + .text(last_run ); + + g.selectAll('span') + .attr("data-original-title", "Start Date: " + last_run) + .style('display', null); + + g.selectAll(".loading-last-run").remove(); + } + d3.selectAll(".loading-last-run").remove(); + }); d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) { for(var dag_id in json) { states = json[dag_id]; diff --git a/airflow/www/views.py b/airflow/www/views.py index 8f6725ef59b44..8dfd20606c7e3 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -663,6 +663,26 @@ def task_stats(self, session=None): payload[dag.safe_dag_id].append(d) return wwwutils.json_response(payload) + @expose('/last_dagruns') + @login_required + @provide_session + def last_dagruns(self, session=None): + DagRun = models.DagRun + + dags_to_latest_runs = dict(session.query( + DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date')) + .group_by(DagRun.dag_id).all()) + + payload = {} + for dag in dagbag.dags.values(): + if dag.dag_id in dags_to_latest_runs and dags_to_latest_runs[dag.dag_id]: + payload[dag.safe_dag_id] = { + 'dag_id': dag.dag_id, + 'last_run': dags_to_latest_runs[dag.dag_id].strftime("%Y-%m-%d %H:%M") + } + + return wwwutils.json_response(payload) + @expose('/code') @login_required def code(self): diff --git a/airflow/www_rbac/templates/airflow/dags.html b/airflow/www_rbac/templates/airflow/dags.html index a7a7467de1f68..ff80479d757a2 100644 --- a/airflow/www_rbac/templates/airflow/dags.html +++ b/airflow/www_rbac/templates/airflow/dags.html @@ -119,15 +119,11 @@

DAGs

- {% if dag %} - {% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %} - {% if last_run and last_run.execution_date %} - - {{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }} - - - {% endif %} - {% endif %} +
+ + + +
@@ -306,6 +302,24 @@

DAGs

} }); }); + d3.json("{{ url_for('Airflow.last_dagruns') }}", function(error, json) { + for(var safe_dag_id in json) { + dag_id = json[safe_dag_id].dag_id; + last_run = json[safe_dag_id].last_run; + g = d3.select('div#last-run-' + safe_dag_id) + + g.selectAll('a') + .attr("href", "{{ url_for('Airflow.graph') }}?dag_id=" + dag_id + "&execution_date=" + last_run) + .text(last_run ); + + g.selectAll('span') + .attr("data-original-title", "Start Date: " + last_run) + .style('display', null); + + g.selectAll(".loading-last-run").remove(); + } + d3.selectAll(".loading-last-run").remove(); + }); d3.json("{{ url_for('Airflow.dag_stats') }}", function(error, json) { for(var dag_id in json) { states = json[dag_id]; diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index 43e481ed0278c..b3955ea734ab9 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -372,6 +372,33 @@ def task_stats(self, session=None): payload[dag.safe_dag_id].append(d) return wwwutils.json_response(payload) + @expose('/last_dagruns') + @has_access + @provide_session + def last_dagruns(self, session=None): + DagRun = models.DagRun + + filter_dag_ids = appbuilder.sm.get_accessible_dag_ids() + + if not filter_dag_ids: + return + + dags_to_latest_runs = dict(session.query( + DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date')) + .group_by(DagRun.dag_id).all()) + + payload = {} + for dag in dagbag.dags.values(): + dag_accessible = 'all_dags' in filter_dag_ids or dag.dag_id in filter_dag_ids + if (dag_accessible and dag.dag_id in dags_to_latest_runs and + dags_to_latest_runs[dag.dag_id]): + payload[dag.safe_dag_id] = { + 'dag_id': dag.dag_id, + 'last_run': dags_to_latest_runs[dag.dag_id].strftime("%Y-%m-%d %H:%M") + } + + return wwwutils.json_response(payload) + @expose('/code') @has_access def code(self): diff --git a/tests/core.py b/tests/core.py index 0c178b4f715d3..801cb91d8649e 100644 --- a/tests/core.py +++ b/tests/core.py @@ -1666,17 +1666,6 @@ def test_index(self): self.assertIn("DAGs", resp_html) self.assertIn("example_bash_operator", resp_html) - # The HTML should contain data for the last-run. A link to the specific run, - # and the text of the date. - url = "/admin/airflow/graph?" + urlencode({ - "dag_id": self.dag_python.dag_id, - "execution_date": self.dagrun_python.execution_date, - }).replace("&", "&") - self.assertIn(url, resp_html) - self.assertIn( - self.dagrun_python.execution_date.strftime("%Y-%m-%d %H:%M"), - resp_html) - def test_query(self): response = self.app.get('/admin/queryview/') self.assertIn("Ad Hoc Query", response.data.decode('utf-8')) @@ -1756,6 +1745,9 @@ def test_dag_views(self): response = self.app.get( '/admin/airflow/task_stats') self.assertIn("example_bash_operator", response.data.decode('utf-8')) + response = self.app.get( + '/admin/airflow/last_dagruns') + self.assertIn("example_bash_operator", response.data.decode('utf-8')) url = ( "/admin/airflow/success?task_id=print_the_context&" "dag_id=example_python_operator&upstream=false&downstream=false&"