Skip to content

Commit

Permalink
Faster PostgreSQL db migration to Airflow 2.2 (apache#19166)
Browse files Browse the repository at this point in the history
Bigger Airflow databases can take a long time to migrate the database,
particularly if they have a lot of task instances. On PostgreSQL, creating a
new table is much faster than updating the existing table.
  • Loading branch information
jedcunningham authored Oct 24, 2021
1 parent fe998a4 commit 559d607
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 27 deletions.
2 changes: 2 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ the scheduler can create in a dag. Now, the maximum number is controlled interna

## Airflow 2.2.0

Note: Upgrading the database to `2.2.0` or later can take some time to complete, particularly if you have a large `task_instance` table.

### `worker_log_server_port` configuration has been moved to the ``logging`` section.

The `worker_log_server_port` configuration option has been moved from `[celery]` section to `[logging]` section to allow for re-use between different executors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,6 @@ def upgrade():
op.add_column('task_instance', sa.Column('run_id', type_=string_id_col_type, nullable=True))
op.add_column('task_reschedule', sa.Column('run_id', type_=string_id_col_type, nullable=True))

# Then update the new column by selecting the right value from DagRun
update_query = _multi_table_update(dialect_name, task_instance, task_instance.c.run_id)
op.execute(update_query)

#
# TaskReschedule has a FK to TaskInstance, so we have to update that before
# we can drop the TI.execution_date column
Expand All @@ -202,29 +198,81 @@ def upgrade():
batch_op.drop_index('task_reschedule_dag_task_date_fkey')
batch_op.drop_index('idx_task_reschedule_dag_task_date')

# Then update the new column by selecting the right value from DagRun
# But first we will drop and recreate indexes to make it faster
if dialect_name == 'postgresql':
# Recreate task_instance, without execution_date and with dagrun.run_id
op.execute(
"""
CREATE TABLE new_task_instance AS SELECT
ti.task_id,
ti.dag_id,
dag_run.run_id,
ti.start_date,
ti.end_date,
ti.duration,
ti.state,
ti.try_number,
ti.hostname,
ti.unixname,
ti.job_id,
ti.pool,
ti.queue,
ti.priority_weight,
ti.operator,
ti.queued_dttm,
ti.pid,
ti.max_tries,
ti.executor_config,
ti.pool_slots,
ti.queued_by_job_id,
ti.external_executor_id,
ti.trigger_id,
ti.trigger_timeout,
ti.next_method,
ti.next_kwargs
FROM task_instance ti
INNER JOIN dag_run ON dag_run.dag_id = ti.dag_id AND dag_run.execution_date = ti.execution_date;
"""
)
op.drop_table('task_instance')
op.rename_table('new_task_instance', 'task_instance')

# Fix up columns after the 'create table as select'
with op.batch_alter_table('task_instance', schema=None) as batch_op:
batch_op.alter_column(
'pool', existing_type=string_id_col_type, existing_nullable=True, nullable=False
)
batch_op.alter_column('max_tries', existing_type=sa.Integer(), server_default="-1")
batch_op.alter_column(
'pool_slots', existing_type=sa.Integer(), existing_nullable=True, nullable=False
)
else:
update_query = _multi_table_update(dialect_name, task_instance, task_instance.c.run_id)
op.execute(update_query)

with op.batch_alter_table('task_instance', schema=None) as batch_op:
if dialect_name != 'postgresql':
# TODO: Is this right for non-postgres?
if dialect_name == 'mssql':
constraints = get_table_constraints(conn, "task_instance")
pk, _ = constraints['PRIMARY KEY'].popitem()
batch_op.drop_constraint(pk, type_='primary')
elif dialect_name not in ('sqlite'):
batch_op.drop_constraint('task_instance_pkey', type_='primary')
batch_op.drop_index('ti_dag_date')
batch_op.drop_index('ti_state_lkp')
batch_op.drop_column('execution_date')

# Then make it non-nullable
batch_op.alter_column(
'run_id', existing_type=string_id_col_type, existing_nullable=True, nullable=False
)

batch_op.alter_column(
'dag_id', existing_type=string_id_col_type, existing_nullable=True, nullable=False
)
batch_op.alter_column('execution_date', existing_type=dt_type, existing_nullable=True, nullable=False)

# TODO: Is this right for non-postgres?
if dialect_name == 'mssql':
constraints = get_table_constraints(conn, "task_instance")
pk, _ = constraints['PRIMARY KEY'].popitem()
batch_op.drop_constraint(pk, type_='primary')
elif dialect_name not in ('sqlite'):
batch_op.drop_constraint('task_instance_pkey', type_='primary')
batch_op.create_primary_key('task_instance_pkey', ['dag_id', 'task_id', 'run_id'])

batch_op.drop_index('ti_dag_date')
batch_op.drop_index('ti_state_lkp')
batch_op.drop_column('execution_date')
batch_op.create_foreign_key(
'task_instance_dag_run_fkey',
'dag_run',
Expand All @@ -235,6 +283,15 @@ def upgrade():

batch_op.create_index('ti_dag_run', ['dag_id', 'run_id'])
batch_op.create_index('ti_state_lkp', ['dag_id', 'task_id', 'run_id', 'state'])
if dialect_name == 'postgresql':
batch_op.create_index('ti_dag_state', ['dag_id', 'state'])
batch_op.create_index('ti_job_id', ['job_id'])
batch_op.create_index('ti_pool', ['pool', 'state', 'priority_weight'])
batch_op.create_index('ti_state', ['state'])
batch_op.create_foreign_key(
'task_instance_trigger_id_fkey', 'trigger', ['trigger_id'], ['id'], ondelete="CASCADE"
)
batch_op.create_index('ti_trigger_id', ['trigger_id'])

with op.batch_alter_table('task_reschedule', schema=None) as batch_op:
batch_op.drop_column('execution_date')
Expand Down
2 changes: 1 addition & 1 deletion tests/executors/test_base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@ def test_try_adopt_task_instances(dag_maker):
dagrun = dag_maker.create_dagrun(execution_date=date)
tis = dagrun.task_instances

assert [ti.task_id for ti in tis] == ["task_1", "task_2", "task_3"]
assert {ti.task_id for ti in tis} == {"task_1", "task_2", "task_3"}
assert BaseExecutor().try_adopt_task_instances(tis) == tis
18 changes: 9 additions & 9 deletions tests/models/test_cleartasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_clear_task_instances(self, dag_maker):
state=State.RUNNING,
run_type=DagRunType.SCHEDULED,
)
ti0, ti1 = dr.task_instances
ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
ti0.refresh_from_task(task0)
ti1.refresh_from_task(task1)

Expand Down Expand Up @@ -125,7 +125,7 @@ def test_clear_task_instances_dr_state(self, state, last_scheduling, dag_maker):
state=State.RUNNING,
run_type=DagRunType.SCHEDULED,
)
ti0, ti1 = dr.task_instances
ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
dr.last_scheduling_decision = DEFAULT_DATE
ti0.state = TaskInstanceState.SUCCESS
ti1.state = TaskInstanceState.SUCCESS
Expand Down Expand Up @@ -160,7 +160,7 @@ def test_clear_task_instances_without_task(self, dag_maker):
run_type=DagRunType.SCHEDULED,
)

ti0, ti1 = dr.task_instances
ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
ti0.refresh_from_task(task0)
ti1.refresh_from_task(task1)

Expand Down Expand Up @@ -203,7 +203,7 @@ def test_clear_task_instances_without_dag(self, dag_maker):
run_type=DagRunType.SCHEDULED,
)

ti0, ti1 = dr.task_instances
ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
ti0.refresh_from_task(task0)
ti1.refresh_from_task(task1)

Expand Down Expand Up @@ -243,7 +243,7 @@ def test_clear_task_instances_with_task_reschedule(self, dag_maker):
run_type=DagRunType.SCHEDULED,
)

ti0, ti1 = dr.task_instances
ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
ti0.refresh_from_task(task0)
ti1.refresh_from_task(task1)
ti0.run()
Expand Down Expand Up @@ -292,7 +292,7 @@ def test_dag_clear(self, dag_maker):
)
session = dag_maker.session

ti0, ti1 = dr.task_instances
ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
ti0.refresh_from_task(task0)
ti1.refresh_from_task(task1)

Expand Down Expand Up @@ -409,16 +409,16 @@ def test_operator_clear(self, dag_maker):
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10),
):
op1 = DummyOperator(task_id='bash_op')
op2 = DummyOperator(task_id='dummy_op', retries=1)
op1 = DummyOperator(task_id='test1')
op2 = DummyOperator(task_id='test2', retries=1)
op1 >> op2

dr = dag_maker.create_dagrun(
state=State.RUNNING,
run_type=DagRunType.SCHEDULED,
)

ti1, ti2 = dr.task_instances
ti1, ti2 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
ti1.task = op1
ti2.task = op2

Expand Down

0 comments on commit 559d607

Please sign in to comment.