Skip to content

Commit

Permalink
[AIRFLOW-3060] DAG context manager fails to exit properly in certain …
Browse files Browse the repository at this point in the history
…circumstances

[AIRFLOW-3123] Use a stack for DAG context management (apache#3956)

[TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp.

[AIRFLOW-3160] Load latest_dagruns asynchronously

[REVBUMP] v9

[AIRFLOW-2861] Add index on log table (apache#3709)

[AIRFLOW-2747][PARTIAL CHERRYPICK] Explicit re-schedule of sensors

[AIRFLOW-3191] Fix not being able to specify execution_date when creating dagrun

[AIRFLOW-2657] Add ability to delete dag from web UI

Closes apache#3531 from Noremac201/master

[REVBUMP] v10

[AIRFLOW-3233] Fix deletion of DAGs in the UI

[AIRFLOW-4070] log.warning for duplicate task dependencies

This change logs a warning on duplicate task dependencies rather than
raising an AirflowException. This will allow automated task
dependencies to be generated while giving the user the option to
explicitly define task dependencies.

Differential Revision: https://phabricator.twitter.biz/D320000
  • Loading branch information
Newton Le authored and CI committed May 24, 2019
1 parent 1d2502e commit f4915fc
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# flake8: noqa
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""add task_reschedule table
Revision ID: 0a2a5b66e19d
Revises: 9635ae0956e7
Create Date: 2018-06-17 22:50:00.053620
"""

# revision identifiers, used by Alembic.
revision = '0a2a5b66e19d'
down_revision = '9635ae0956e7'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql


TABLE_NAME = 'task_reschedule'
INDEX_NAME = 'idx_' + TABLE_NAME + '_dag_task_date'

# For Microsoft SQL Server, TIMESTAMP is a row-id type,
# having nothing to do with date-time. DateTime() will
# be sufficient.
def mssql_timestamp():
return sa.DateTime()

def mysql_timestamp():
return mysql.TIMESTAMP(fsp=6)

def sa_timestamp():
return sa.TIMESTAMP(timezone=True)

def upgrade():
# See 0e2a74e0fc9f_add_time_zone_awareness
conn = op.get_bind()
if conn.dialect.name == 'mysql':
timestamp = mysql_timestamp
elif conn.dialect.name == 'mssql':
timestamp = mssql_timestamp
else:
timestamp = sa_timestamp

op.create_table(
TABLE_NAME,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('task_id', sa.String(length=250), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
# use explicit server_default=None otherwise mysql implies defaults for first timestamp column
sa.Column('execution_date', timestamp(), nullable=False, server_default=None),
sa.Column('try_number', sa.Integer(), nullable=False),
sa.Column('start_date', timestamp(), nullable=False),
sa.Column('end_date', timestamp(), nullable=False),
sa.Column('duration', sa.Integer(), nullable=False),
sa.Column('reschedule_date', timestamp(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['task_id', 'dag_id', 'execution_date'],
['task_instance.task_id', 'task_instance.dag_id','task_instance.execution_date'],
name='task_reschedule_dag_task_date_fkey')
)
op.create_index(
INDEX_NAME,
TABLE_NAME,
['dag_id', 'task_id', 'execution_date'],
unique=False
)


def downgrade():
op.drop_index(INDEX_NAME, table_name=TABLE_NAME)
op.drop_table(TABLE_NAME)
41 changes: 41 additions & 0 deletions airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from alembic import op

"""add idx_log_dag
Revision ID: dd25f486b8ea
Revises: 9635ae0956e7
Create Date: 2018-08-07 06:41:41.028249
"""

# revision identifiers, used by Alembic.
revision = 'dd25f486b8ea'
down_revision = '9635ae0956e7'
branch_labels = None
depends_on = None


def upgrade():
op.create_index('idx_log_dag', 'log', ['dag_id'], unique=False)


def downgrade():
op.drop_index('idx_log_dag', table_name='log')
12 changes: 9 additions & 3 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2098,6 +2098,10 @@ class Log(Base):
owner = Column(String(500))
extra = Column(Text)

__table_args__ = (
Index('idx_log_dag', dag_id),
)

def __init__(self, event, task_instance, owner=None, extra=None, **kwargs):
self.dttm = timezone.utcnow()
self.event = event
Expand Down Expand Up @@ -2939,7 +2943,7 @@ def task_type(self):

def add_only_new(self, item_set, item):
if item in item_set:
raise AirflowException(
self.log.warning(
'Dependency {self}, {item} already registered'
''.format(**locals()))
else:
Expand Down Expand Up @@ -3252,6 +3256,8 @@ def __init__(
self.on_success_callback = on_success_callback
self.on_failure_callback = on_failure_callback

self._old_context_manager_dags = []

self._comps = {
'dag_id',
'task_ids',
Expand Down Expand Up @@ -3299,13 +3305,13 @@ def __hash__(self):

def __enter__(self):
global _CONTEXT_MANAGER_DAG
self._old_context_manager_dag = _CONTEXT_MANAGER_DAG
self._old_context_manager_dags.append(_CONTEXT_MANAGER_DAG)
_CONTEXT_MANAGER_DAG = self
return self

def __exit__(self, _type, _value, _tb):
global _CONTEXT_MANAGER_DAG
_CONTEXT_MANAGER_DAG = self._old_context_manager_dag
_CONTEXT_MANAGER_DAG = self._old_context_manager_dags.pop()

# /Context Manager ----------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
# under the License.
#

version = '1.10.0+twtr8'
version = '1.10.0+twtr11'
13 changes: 13 additions & 0 deletions airflow/www/templates/airflow/dag.html
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ <h4 class="pull-right">
Refresh
</a>
</li>
<li>
<a href="{{ url_for("airflow.delete", dag_id=dag.dag_id, root=root) }}"
onclick="return confirmDeleteDag('{{ dag.safe_dag_id }}')">
<span class="glyphicon glyphicon-remove-circle" style="color:red" aria-hidden="true"></span>
Delete
</a>
</li>
</ul>
</div>
<hr>
Expand Down Expand Up @@ -302,6 +309,12 @@ <h4 class="modal-title" id="dagModalLabel">
$("#dagModal").css("margin-top","0px");
}

function confirmDeleteDag(dag_id){
return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\
This option will delete ALL metadata, DAG runs, etc.\n\
This cannot be undone.");
}

$("#btn_rendered").click(function(){
url = "{{ url_for('airflow.rendered') }}" +
"?task_id=" + encodeURIComponent(task_id) +
Expand Down
13 changes: 13 additions & 0 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ <h2>DAGs</h2>
<span class="glyphicon glyphicon-refresh" aria-hidden="true" data-original-title="Refresh"></span>
</a>

<!-- Delete -->
<!-- Use dag_id instead of dag.dag_id, because the DAG might not exist in the webserver's DagBag -->
<a href="{{ url_for('airflow.delete', dag_id=dag_id) }}"
onclick="return confirmDeleteDag('{{ dag_id }}')">
<span class="glyphicon glyphicon-remove-circle" style="color:red" aria-hidden="true" data-original-title="Delete Dag"></span>
</a>

</td>
</tr>
{% endfor %}
Expand Down Expand Up @@ -240,6 +247,12 @@ <h2>DAGs</h2>
function confirmTriggerDag(dag_id){
return confirm("Are you sure you want to run '"+dag_id+"' now?");
}

function confirmDeleteDag(dag_id){
return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\
This option will delete ALL metadata, DAG runs, etc.\n\
This cannot be undone.");
}
all_dags = $("[id^=toggle]");
$.each(all_dags, function(i,v) {
$(v).change (function() {
Expand Down
46 changes: 43 additions & 3 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,32 @@ def run(self):
"it should start any moment now.".format(ti))
return redirect(origin)

@expose('/delete')
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def delete(self):
from airflow.api.common.experimental import delete_dag
from airflow.exceptions import DagNotFound, DagFileExists

dag_id = request.args.get('dag_id')
origin = request.args.get('origin') or "/admin/"

try:
delete_dag.delete_dag(dag_id)
except DagNotFound:
flash("DAG with id {} not found. Cannot delete".format(dag_id))
return redirect(request.referrer)
except DagFileExists:
flash("Dag id {} is still in DagBag. "
"Remove the DAG file first.".format(dag_id))
return redirect(request.referrer)

flash("Deleting DAG with id {}. May take a couple minutes to fully"
" disappear.".format(dag_id))
# Upon successful delete return to origin
return redirect(origin)

@expose('/trigger')
@login_required
@wwwutils.action_logging
Expand Down Expand Up @@ -1302,6 +1328,10 @@ def tree(self, session=None):
dag_id = request.args.get('dag_id')
blur = conf.getboolean('webserver', 'demo_mode')
dag = dagbag.get_dag(dag_id)
if dag_id not in dagbag.dags:
flash('DAG "{0}" seems to be missing.'.format(dag_id), "error")
return redirect('/admin/')

root = request.args.get('root')
if root:
dag = dag.sub_dag(
Expand Down Expand Up @@ -2542,9 +2572,18 @@ class DagRunModelView(ModelViewOnly):
('failed', 'failed'),
],
}
form_args = dict(
dag_id=dict(validators=[validators.DataRequired()])
)
form_args = {
'dag_id': {
'validators': [
validators.DataRequired(),
]
},
'execution_date': {
'filters': [
parse_datetime_f,
]
}
}
column_list = (
'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger')
column_filters = column_list
Expand All @@ -2557,6 +2596,7 @@ class DagRunModelView(ModelViewOnly):
dag_id=dag_link,
run_id=dag_run_link
)
form_overrides = dict(execution_date=DateTimeField)

@action('new_delete', "Delete", "Are you sure you want to delete selected records?")
@provide_session
Expand Down
13 changes: 13 additions & 0 deletions airflow/www_rbac/templates/airflow/dag.html
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ <h4 class="pull-right">
Refresh
</a>
</li>
<li>
<a href="{{ url_for('Airflow.delete', dag_id=dag.dag_id, root=root) }}"
onclick="return confirmDeleteDag('{{ dag.safe_dag_id }}')">
<span class="glyphicon glyphicon-remove-circle" style="color:red" aria-hidden="true"></span>
Delete
</a>
</li>
</ul>
</div>
<hr>
Expand Down Expand Up @@ -300,6 +307,12 @@ <h4 class="modal-title" id="dagModalLabel">
$("#dagModal").css("margin-top","0px");
}

function confirmDeleteDag(dag_id){
return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\
This option will delete ALL metadata, DAG runs, etc.\n\
This cannot be undone.");
}

$("#btn_rendered").click(function(){
url = "{{ url_for('Airflow.rendered') }}" +
"?task_id=" + encodeURIComponent(task_id) +
Expand Down
11 changes: 11 additions & 0 deletions airflow/www_rbac/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ <h2>DAGs</h2>
<span class="glyphicon glyphicon-refresh" aria-hidden="true" data-original-title="Refresh"></span>
</a>

<!-- Delete -->
<a href="{{ url_for('Airflow.delete', dag_id=dag.dag_id) }}"
onclick="return confirmDeleteDag('{{ dag.safe_dag_id }}')">
<span class="glyphicon glyphicon-remove-circle" style="color:red" aria-hidden="true" data-original-title="Delete Dag"></span>
</a>
</td>
</tr>
{% endfor %}
Expand Down Expand Up @@ -238,6 +243,12 @@ <h2>DAGs</h2>
window.location = DAGS_INDEX + "?page_size=" + p_size;
});

function confirmDeleteDag(dag_id){
return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\
This option will delete ALL metadata, DAG runs, etc.\n\
This cannot be undone.");
}

function confirmTriggerDag(dag_id){
return confirm("Are you sure you want to run '"+dag_id+"' now?");
}
Expand Down
Loading

0 comments on commit f4915fc

Please sign in to comment.