diff --git a/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py b/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py new file mode 100644 index 0000000000000..643a1ca81b678 --- /dev/null +++ b/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py @@ -0,0 +1,91 @@ +# flake8: noqa +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""add task_reschedule table + +Revision ID: 0a2a5b66e19d +Revises: 9635ae0956e7 +Create Date: 2018-06-17 22:50:00.053620 + +""" + +# revision identifiers, used by Alembic. +revision = '0a2a5b66e19d' +down_revision = '9635ae0956e7' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + + +TABLE_NAME = 'task_reschedule' +INDEX_NAME = 'idx_' + TABLE_NAME + '_dag_task_date' + +# For Microsoft SQL Server, TIMESTAMP is a row-id type, +# having nothing to do with date-time. DateTime() will +# be sufficient. +def mssql_timestamp(): + return sa.DateTime() + +def mysql_timestamp(): + return mysql.TIMESTAMP(fsp=6) + +def sa_timestamp(): + return sa.TIMESTAMP(timezone=True) + +def upgrade(): + # See 0e2a74e0fc9f_add_time_zone_awareness + conn = op.get_bind() + if conn.dialect.name == 'mysql': + timestamp = mysql_timestamp + elif conn.dialect.name == 'mssql': + timestamp = mssql_timestamp + else: + timestamp = sa_timestamp + + op.create_table( + TABLE_NAME, + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('task_id', sa.String(length=250), nullable=False), + sa.Column('dag_id', sa.String(length=250), nullable=False), + # use explicit server_default=None otherwise mysql implies defaults for first timestamp column + sa.Column('execution_date', timestamp(), nullable=False, server_default=None), + sa.Column('try_number', sa.Integer(), nullable=False), + sa.Column('start_date', timestamp(), nullable=False), + sa.Column('end_date', timestamp(), nullable=False), + sa.Column('duration', sa.Integer(), nullable=False), + sa.Column('reschedule_date', timestamp(), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.ForeignKeyConstraint(['task_id', 'dag_id', 'execution_date'], + ['task_instance.task_id', 'task_instance.dag_id','task_instance.execution_date'], + name='task_reschedule_dag_task_date_fkey') + ) + op.create_index( + INDEX_NAME, + TABLE_NAME, + ['dag_id', 'task_id', 'execution_date'], + unique=False + ) + + +def downgrade(): + op.drop_index(INDEX_NAME, table_name=TABLE_NAME) + op.drop_table(TABLE_NAME) diff --git a/airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py b/airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py new file mode 100644 index 0000000000000..3249a2e0589cb --- /dev/null +++ b/airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from alembic import op + +"""add idx_log_dag + +Revision ID: dd25f486b8ea +Revises: 9635ae0956e7 +Create Date: 2018-08-07 06:41:41.028249 + +""" + +# revision identifiers, used by Alembic. +revision = 'dd25f486b8ea' +down_revision = '9635ae0956e7' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_index('idx_log_dag', 'log', ['dag_id'], unique=False) + + +def downgrade(): + op.drop_index('idx_log_dag', table_name='log') diff --git a/airflow/models.py b/airflow/models.py index 2096785b41c57..70746b6bcdfe0 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2098,6 +2098,10 @@ class Log(Base): owner = Column(String(500)) extra = Column(Text) + __table_args__ = ( + Index('idx_log_dag', dag_id), + ) + def __init__(self, event, task_instance, owner=None, extra=None, **kwargs): self.dttm = timezone.utcnow() self.event = event @@ -2939,7 +2943,7 @@ def task_type(self): def add_only_new(self, item_set, item): if item in item_set: - raise AirflowException( + self.log.warning( 'Dependency {self}, {item} already registered' ''.format(**locals())) else: @@ -3252,6 +3256,8 @@ def __init__( self.on_success_callback = on_success_callback self.on_failure_callback = on_failure_callback + self._old_context_manager_dags = [] + self._comps = { 'dag_id', 'task_ids', @@ -3299,13 +3305,13 @@ def __hash__(self): def __enter__(self): global _CONTEXT_MANAGER_DAG - self._old_context_manager_dag = _CONTEXT_MANAGER_DAG + self._old_context_manager_dags.append(_CONTEXT_MANAGER_DAG) _CONTEXT_MANAGER_DAG = self return self def __exit__(self, _type, _value, _tb): global _CONTEXT_MANAGER_DAG - _CONTEXT_MANAGER_DAG = self._old_context_manager_dag + _CONTEXT_MANAGER_DAG = self._old_context_manager_dags.pop() # /Context Manager ---------------------------------------------- diff --git a/airflow/version.py b/airflow/version.py index e6572deb7ce34..b7e5c3b23d9f8 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,4 +18,4 @@ # under the License. # -version = '1.10.0+twtr8' +version = '1.10.0+twtr11' diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index adb2d387da443..9ef4cce5c1c46 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -100,6 +100,13 @@

Refresh +
  • + + + Delete + +

  • @@ -302,6 +309,12 @@

    DAGs

    + + + + + + {% endfor %} @@ -240,6 +247,12 @@

    DAGs

    function confirmTriggerDag(dag_id){ return confirm("Are you sure you want to run '"+dag_id+"' now?"); } + + function confirmDeleteDag(dag_id){ + return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\ + This option will delete ALL metadata, DAG runs, etc.\n\ + This cannot be undone."); + } all_dags = $("[id^=toggle]"); $.each(all_dags, function(i,v) { $(v).change (function() { diff --git a/airflow/www/views.py b/airflow/www/views.py index 8dfd20606c7e3..4ad0ed090a01e 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1063,6 +1063,32 @@ def run(self): "it should start any moment now.".format(ti)) return redirect(origin) + @expose('/delete') + @login_required + @wwwutils.action_logging + @wwwutils.notify_owner + def delete(self): + from airflow.api.common.experimental import delete_dag + from airflow.exceptions import DagNotFound, DagFileExists + + dag_id = request.args.get('dag_id') + origin = request.args.get('origin') or "/admin/" + + try: + delete_dag.delete_dag(dag_id) + except DagNotFound: + flash("DAG with id {} not found. Cannot delete".format(dag_id)) + return redirect(request.referrer) + except DagFileExists: + flash("Dag id {} is still in DagBag. " + "Remove the DAG file first.".format(dag_id)) + return redirect(request.referrer) + + flash("Deleting DAG with id {}. May take a couple minutes to fully" + " disappear.".format(dag_id)) + # Upon successful delete return to origin + return redirect(origin) + @expose('/trigger') @login_required @wwwutils.action_logging @@ -1302,6 +1328,10 @@ def tree(self, session=None): dag_id = request.args.get('dag_id') blur = conf.getboolean('webserver', 'demo_mode') dag = dagbag.get_dag(dag_id) + if dag_id not in dagbag.dags: + flash('DAG "{0}" seems to be missing.'.format(dag_id), "error") + return redirect('/admin/') + root = request.args.get('root') if root: dag = dag.sub_dag( @@ -2542,9 +2572,18 @@ class DagRunModelView(ModelViewOnly): ('failed', 'failed'), ], } - form_args = dict( - dag_id=dict(validators=[validators.DataRequired()]) - ) + form_args = { + 'dag_id': { + 'validators': [ + validators.DataRequired(), + ] + }, + 'execution_date': { + 'filters': [ + parse_datetime_f, + ] + } + } column_list = ( 'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger') column_filters = column_list @@ -2557,6 +2596,7 @@ class DagRunModelView(ModelViewOnly): dag_id=dag_link, run_id=dag_run_link ) + form_overrides = dict(execution_date=DateTimeField) @action('new_delete', "Delete", "Are you sure you want to delete selected records?") @provide_session diff --git a/airflow/www_rbac/templates/airflow/dag.html b/airflow/www_rbac/templates/airflow/dag.html index eb449085023b9..e6495fe4be9da 100644 --- a/airflow/www_rbac/templates/airflow/dag.html +++ b/airflow/www_rbac/templates/airflow/dag.html @@ -99,6 +99,13 @@

    Refresh +
  • + + + Delete + +

  • @@ -300,6 +307,12 @@

    DAGs

    + + + + {% endfor %} @@ -238,6 +243,12 @@

    DAGs

    window.location = DAGS_INDEX + "?page_size=" + p_size; }); + function confirmDeleteDag(dag_id){ + return confirm("Are you sure you want to delete '"+dag_id+"' now?\n\ + This option will delete ALL metadata, DAG runs, etc.\n\ + This cannot be undone."); + } + function confirmTriggerDag(dag_id){ return confirm("Are you sure you want to run '"+dag_id+"' now?"); } diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index b3955ea734ab9..28393be441221 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -754,6 +754,32 @@ def run(self): "it should start any moment now.".format(ti)) return redirect(origin) + @expose('/delete') + @action_logging + @has_access + def delete(self): + from airflow.api.common.experimental import delete_dag + from airflow.exceptions import DagNotFound, DagFileExists + + dag_id = request.args.get('dag_id') + origin = request.args.get('origin') or "/" + + try: + delete_dag.delete_dag(dag_id) + except DagNotFound: + flash("DAG with id {} not found. Cannot delete".format(dag_id)) + return redirect(request.referrer) + except DagFileExists: + flash("Dag id {} is still in DagBag. " + "Remove the DAG file first.".format(dag_id)) + return redirect(request.referrer) + + flash("Deleting DAG with id {}. May take a couple minutes to fully" + " disappear.".format(dag_id)) + + # Upon success return to origin. + return redirect(origin) + @expose('/trigger') @has_access @action_logging @@ -987,6 +1013,10 @@ def tree(self, session=None): dag_id = request.args.get('dag_id') blur = conf.getboolean('webserver', 'demo_mode') dag = dagbag.get_dag(dag_id) + if dag_id not in dagbag.dags: + flash('DAG "{0}" seems to be missing.'.format(dag_id), "error") + return redirect('/') + root = request.args.get('root') if root: dag = dag.sub_dag( diff --git a/tests/models.py b/tests/models.py index d38681741daa1..027c9d5951241 100644 --- a/tests/models.py +++ b/tests/models.py @@ -139,6 +139,17 @@ def test_dag_as_context_manager(self): self.assertEqual(dag.dag_id, 'creating_dag_in_cm') self.assertEqual(dag.tasks[0].task_id, 'op6') + with dag: + with dag: + op7 = DummyOperator(task_id='op7') + op8 = DummyOperator(task_id='op8') + op9 = DummyOperator(task_id='op8') + op9.dag = dag2 + + self.assertEqual(op7.dag, dag) + self.assertEqual(op8.dag, dag) + self.assertEqual(op9.dag, dag2) + def test_dag_topological_sort(self): dag = DAG( 'dag',