Skip to content

Commit

Permalink
[TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job schedu…
Browse files Browse the repository at this point in the history
…ling without explicit_defaults_for_timestamp.

Summary: [AIRFLOW-3160] Load latest_dagruns asynchronously

Differential Revision: https://phabricator.twitter.biz/D224903
  • Loading branch information
aoen committed Oct 5, 2018
1 parent fb64f2e commit 1d2502e
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 30 deletions.
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
# under the License.
#

version = '1.10.0+twtr5'
version = '1.10.0+twtr8'
32 changes: 23 additions & 9 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,11 @@ <h2>DAGs</h2>

<!-- Column 7: Last Run -->
<td class="text-nowrap latest_dag_run {{ dag.dag_id }}">
{% if dag %}
{% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
{% if last_run and last_run.execution_date %}
<a href="{{ url_for('airflow.graph', dag_id=dag.dag_id, execution_date=last_run.execution_date) }}">
{{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
</a>
<span aria-hidden="true" id="statuses_info" title="Start Date: {{ last_run.start_date.strftime("%Y-%m-%d %H:%M") }}" class="glyphicon glyphicon-info-sign"></span>
{% endif %}
{% endif %}
<div height="10" width="10" id='last-run-{{ dag.safe_dag_id }}' style="display: block;">
<a></a>
<img class="loading-last-run" width="15" src="{{ url_for("static", filename="loading.gif") }}">
<span aria-hidden="true" id="statuses_info" title=" " class="glyphicon glyphicon-info-sign" style="display:none"></span>
</div>
</td>

<!-- Column 8: Dag Runs -->
Expand Down Expand Up @@ -306,6 +302,24 @@ <h2>DAGs</h2>
}
});
});
d3.json("{{ url_for('airflow.last_dagruns') }}", function(error, json) {
for(var safe_dag_id in json) {
dag_id = json[safe_dag_id].dag_id;
last_run = json[safe_dag_id].last_run;
g = d3.select('div#last-run-' + safe_dag_id)

g.selectAll('a')
.attr("href", "{{ url_for('airflow.graph') }}?dag_id=" + dag_id + "&execution_date=" + last_run)
.text(last_run );

g.selectAll('span')
.attr("data-original-title", "Start Date: " + last_run)
.style('display', null);

g.selectAll(".loading-last-run").remove();
}
d3.selectAll(".loading-last-run").remove();
});
d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) {
for(var dag_id in json) {
states = json[dag_id];
Expand Down
20 changes: 20 additions & 0 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,26 @@ def task_stats(self, session=None):
payload[dag.safe_dag_id].append(d)
return wwwutils.json_response(payload)

@expose('/last_dagruns')
@login_required
@provide_session
def last_dagruns(self, session=None):
DagRun = models.DagRun

dags_to_latest_runs = dict(session.query(
DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
.group_by(DagRun.dag_id).all())

payload = {}
for dag in dagbag.dags.values():
if dag.dag_id in dags_to_latest_runs and dags_to_latest_runs[dag.dag_id]:
payload[dag.safe_dag_id] = {
'dag_id': dag.dag_id,
'last_run': dags_to_latest_runs[dag.dag_id].strftime("%Y-%m-%d %H:%M")
}

return wwwutils.json_response(payload)

@expose('/code')
@login_required
def code(self):
Expand Down
32 changes: 23 additions & 9 deletions airflow/www_rbac/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,11 @@ <h2>DAGs</h2>

<!-- Column 7: Last Run -->
<td class="text-nowrap latest_dag_run {{ dag.dag_id }}">
{% if dag %}
{% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
{% if last_run and last_run.execution_date %}
<a href="{{ url_for('Airflow.graph', dag_id=dag.dag_id, execution_date=last_run.execution_date) }}">
{{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
</a>
<span aria-hidden="true" id="statuses_info" title="Start Date: {{ last_run.start_date.strftime("%Y-%m-%d %H:%M") }}" class="glyphicon glyphicon-info-sign"></span>
{% endif %}
{% endif %}
<div height="10" width="10" id='last-run-{{ dag.safe_dag_id }}' style="display: block;">
<a></a>
<img class="loading-last-run" width="15" src="{{ url_for("static", filename="loading.gif") }}">
<span aria-hidden="true" id="statuses_info" title=" " class="glyphicon glyphicon-info-sign" style="display:none"></span>
</div>
</td>

<!-- Column 8: Dag Runs -->
Expand Down Expand Up @@ -306,6 +302,24 @@ <h2>DAGs</h2>
}
});
});
d3.json("{{ url_for('Airflow.last_dagruns') }}", function(error, json) {
for(var safe_dag_id in json) {
dag_id = json[safe_dag_id].dag_id;
last_run = json[safe_dag_id].last_run;
g = d3.select('div#last-run-' + safe_dag_id)

g.selectAll('a')
.attr("href", "{{ url_for('Airflow.graph') }}?dag_id=" + dag_id + "&execution_date=" + last_run)
.text(last_run );

g.selectAll('span')
.attr("data-original-title", "Start Date: " + last_run)
.style('display', null);

g.selectAll(".loading-last-run").remove();
}
d3.selectAll(".loading-last-run").remove();
});
d3.json("{{ url_for('Airflow.dag_stats') }}", function(error, json) {
for(var dag_id in json) {
states = json[dag_id];
Expand Down
27 changes: 27 additions & 0 deletions airflow/www_rbac/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,33 @@ def task_stats(self, session=None):
payload[dag.safe_dag_id].append(d)
return wwwutils.json_response(payload)

@expose('/last_dagruns')
@has_access
@provide_session
def last_dagruns(self, session=None):
DagRun = models.DagRun

filter_dag_ids = appbuilder.sm.get_accessible_dag_ids()

if not filter_dag_ids:
return

dags_to_latest_runs = dict(session.query(
DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
.group_by(DagRun.dag_id).all())

payload = {}
for dag in dagbag.dags.values():
dag_accessible = 'all_dags' in filter_dag_ids or dag.dag_id in filter_dag_ids
if (dag_accessible and dag.dag_id in dags_to_latest_runs and
dags_to_latest_runs[dag.dag_id]):
payload[dag.safe_dag_id] = {
'dag_id': dag.dag_id,
'last_run': dags_to_latest_runs[dag.dag_id].strftime("%Y-%m-%d %H:%M")
}

return wwwutils.json_response(payload)

@expose('/code')
@has_access
def code(self):
Expand Down
14 changes: 3 additions & 11 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1666,17 +1666,6 @@ def test_index(self):
self.assertIn("DAGs", resp_html)
self.assertIn("example_bash_operator", resp_html)

# The HTML should contain data for the last-run. A link to the specific run,
# and the text of the date.
url = "/admin/airflow/graph?" + urlencode({
"dag_id": self.dag_python.dag_id,
"execution_date": self.dagrun_python.execution_date,
}).replace("&", "&amp;")
self.assertIn(url, resp_html)
self.assertIn(
self.dagrun_python.execution_date.strftime("%Y-%m-%d %H:%M"),
resp_html)

def test_query(self):
response = self.app.get('/admin/queryview/')
self.assertIn("Ad Hoc Query", response.data.decode('utf-8'))
Expand Down Expand Up @@ -1756,6 +1745,9 @@ def test_dag_views(self):
response = self.app.get(
'/admin/airflow/task_stats')
self.assertIn("example_bash_operator", response.data.decode('utf-8'))
response = self.app.get(
'/admin/airflow/last_dagruns')
self.assertIn("example_bash_operator", response.data.decode('utf-8'))
url = (
"/admin/airflow/success?task_id=print_the_context&"
"dag_id=example_python_operator&upstream=false&downstream=false&"
Expand Down

0 comments on commit 1d2502e

Please sign in to comment.