From 559d6074d7012d5472a96fdeadc0e10f958d6134 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Sun, 24 Oct 2021 08:45:12 -0600 Subject: [PATCH] Faster PostgreSQL db migration to Airflow 2.2 (#19166) Bigger Airflow databases can take a long time to migrate the database, particularly if they have a lot of task instances. On PostgreSQL, creating a new table is much faster than updating the existing table. --- UPDATING.md | 2 + ...2661a43ba3_taskinstance_keyed_to_dagrun.py | 91 +++++++++++++++---- tests/executors/test_base_executor.py | 2 +- tests/models/test_cleartasks.py | 18 ++-- 4 files changed, 86 insertions(+), 27 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index ab2e81bfb6306..9c9304ca77803 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -98,6 +98,8 @@ the scheduler can create in a dag. Now, the maximum number is controlled interna ## Airflow 2.2.0 +Note: Upgrading the database to `2.2.0` or later can take some time to complete, particularly if you have a large `task_instance` table. + ### `worker_log_server_port` configuration has been moved to the ``logging`` section. The `worker_log_server_port` configuration option has been moved from `[celery]` section to `[logging]` section to allow for re-use between different executors. diff --git a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py index 4610a8f750bbd..7dd31dbfc2613 100644 --- a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py +++ b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py @@ -180,10 +180,6 @@ def upgrade(): op.add_column('task_instance', sa.Column('run_id', type_=string_id_col_type, nullable=True)) op.add_column('task_reschedule', sa.Column('run_id', type_=string_id_col_type, nullable=True)) - # Then update the new column by selecting the right value from DagRun - update_query = _multi_table_update(dialect_name, task_instance, task_instance.c.run_id) - op.execute(update_query) - # # TaskReschedule has a FK to TaskInstance, so we have to update that before # we can drop the TI.execution_date column @@ -202,29 +198,81 @@ def upgrade(): batch_op.drop_index('task_reschedule_dag_task_date_fkey') batch_op.drop_index('idx_task_reschedule_dag_task_date') + # Then update the new column by selecting the right value from DagRun + # But first we will drop and recreate indexes to make it faster + if dialect_name == 'postgresql': + # Recreate task_instance, without execution_date and with dagrun.run_id + op.execute( + """ + CREATE TABLE new_task_instance AS SELECT + ti.task_id, + ti.dag_id, + dag_run.run_id, + ti.start_date, + ti.end_date, + ti.duration, + ti.state, + ti.try_number, + ti.hostname, + ti.unixname, + ti.job_id, + ti.pool, + ti.queue, + ti.priority_weight, + ti.operator, + ti.queued_dttm, + ti.pid, + ti.max_tries, + ti.executor_config, + ti.pool_slots, + ti.queued_by_job_id, + ti.external_executor_id, + ti.trigger_id, + ti.trigger_timeout, + ti.next_method, + ti.next_kwargs + FROM task_instance ti + INNER JOIN dag_run ON dag_run.dag_id = ti.dag_id AND dag_run.execution_date = ti.execution_date; + """ + ) + op.drop_table('task_instance') + op.rename_table('new_task_instance', 'task_instance') + + # Fix up columns after the 'create table as select' + with op.batch_alter_table('task_instance', schema=None) as batch_op: + batch_op.alter_column( + 'pool', existing_type=string_id_col_type, existing_nullable=True, nullable=False + ) + batch_op.alter_column('max_tries', existing_type=sa.Integer(), server_default="-1") + batch_op.alter_column( + 'pool_slots', existing_type=sa.Integer(), existing_nullable=True, nullable=False + ) + else: + update_query = _multi_table_update(dialect_name, task_instance, task_instance.c.run_id) + op.execute(update_query) + with op.batch_alter_table('task_instance', schema=None) as batch_op: + if dialect_name != 'postgresql': + # TODO: Is this right for non-postgres? + if dialect_name == 'mssql': + constraints = get_table_constraints(conn, "task_instance") + pk, _ = constraints['PRIMARY KEY'].popitem() + batch_op.drop_constraint(pk, type_='primary') + elif dialect_name not in ('sqlite'): + batch_op.drop_constraint('task_instance_pkey', type_='primary') + batch_op.drop_index('ti_dag_date') + batch_op.drop_index('ti_state_lkp') + batch_op.drop_column('execution_date') + # Then make it non-nullable batch_op.alter_column( 'run_id', existing_type=string_id_col_type, existing_nullable=True, nullable=False ) - batch_op.alter_column( 'dag_id', existing_type=string_id_col_type, existing_nullable=True, nullable=False ) - batch_op.alter_column('execution_date', existing_type=dt_type, existing_nullable=True, nullable=False) - # TODO: Is this right for non-postgres? - if dialect_name == 'mssql': - constraints = get_table_constraints(conn, "task_instance") - pk, _ = constraints['PRIMARY KEY'].popitem() - batch_op.drop_constraint(pk, type_='primary') - elif dialect_name not in ('sqlite'): - batch_op.drop_constraint('task_instance_pkey', type_='primary') batch_op.create_primary_key('task_instance_pkey', ['dag_id', 'task_id', 'run_id']) - - batch_op.drop_index('ti_dag_date') - batch_op.drop_index('ti_state_lkp') - batch_op.drop_column('execution_date') batch_op.create_foreign_key( 'task_instance_dag_run_fkey', 'dag_run', @@ -235,6 +283,15 @@ def upgrade(): batch_op.create_index('ti_dag_run', ['dag_id', 'run_id']) batch_op.create_index('ti_state_lkp', ['dag_id', 'task_id', 'run_id', 'state']) + if dialect_name == 'postgresql': + batch_op.create_index('ti_dag_state', ['dag_id', 'state']) + batch_op.create_index('ti_job_id', ['job_id']) + batch_op.create_index('ti_pool', ['pool', 'state', 'priority_weight']) + batch_op.create_index('ti_state', ['state']) + batch_op.create_foreign_key( + 'task_instance_trigger_id_fkey', 'trigger', ['trigger_id'], ['id'], ondelete="CASCADE" + ) + batch_op.create_index('ti_trigger_id', ['trigger_id']) with op.batch_alter_table('task_reschedule', schema=None) as batch_op: batch_op.drop_column('execution_date') diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index 561d551422fcc..49d6c016ddc57 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -69,5 +69,5 @@ def test_try_adopt_task_instances(dag_maker): dagrun = dag_maker.create_dagrun(execution_date=date) tis = dagrun.task_instances - assert [ti.task_id for ti in tis] == ["task_1", "task_2", "task_3"] + assert {ti.task_id for ti in tis} == {"task_1", "task_2", "task_3"} assert BaseExecutor().try_adopt_task_instances(tis) == tis diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index e6444354f1fb8..aad91b12bd41d 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -53,7 +53,7 @@ def test_clear_task_instances(self, dag_maker): state=State.RUNNING, run_type=DagRunType.SCHEDULED, ) - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) @@ -125,7 +125,7 @@ def test_clear_task_instances_dr_state(self, state, last_scheduling, dag_maker): state=State.RUNNING, run_type=DagRunType.SCHEDULED, ) - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) dr.last_scheduling_decision = DEFAULT_DATE ti0.state = TaskInstanceState.SUCCESS ti1.state = TaskInstanceState.SUCCESS @@ -160,7 +160,7 @@ def test_clear_task_instances_without_task(self, dag_maker): run_type=DagRunType.SCHEDULED, ) - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) @@ -203,7 +203,7 @@ def test_clear_task_instances_without_dag(self, dag_maker): run_type=DagRunType.SCHEDULED, ) - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) @@ -243,7 +243,7 @@ def test_clear_task_instances_with_task_reschedule(self, dag_maker): run_type=DagRunType.SCHEDULED, ) - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) ti0.run() @@ -292,7 +292,7 @@ def test_dag_clear(self, dag_maker): ) session = dag_maker.session - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) @@ -409,8 +409,8 @@ def test_operator_clear(self, dag_maker): start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + datetime.timedelta(days=10), ): - op1 = DummyOperator(task_id='bash_op') - op2 = DummyOperator(task_id='dummy_op', retries=1) + op1 = DummyOperator(task_id='test1') + op2 = DummyOperator(task_id='test2', retries=1) op1 >> op2 dr = dag_maker.create_dagrun( @@ -418,7 +418,7 @@ def test_operator_clear(self, dag_maker): run_type=DagRunType.SCHEDULED, ) - ti1, ti2 = dr.task_instances + ti1, ti2 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti1.task = op1 ti2.task = op2