Skip to content

Commit

Permalink
Separate and split run job method into prepare/execute/complete steps (
Browse files Browse the repository at this point in the history
…#30308)

* Separate and split run job method into prepare/execute/complete steps

As a follow-up after decoupling of the job logic from the BaseJob
ORM object (#30255), the `run` method of BaseJob should also be
decoupled from it (allowing BaseJobPydantic to be passed) as well
as split into three steps, in order to allow db-less mode.

The "prepare" and "complete" steps of the `run` method are modifying
BaseJob ORM-mapped object, so they should be called over the
internal-api from LocalTask, DafFileProcessor and Triggerer running
in db-less mode. The "execute" method however does not need the
database however and should be run locally.

This is not yet full AIP-44 conversion, this is a prerequisite to do
so - and AIP-44 conversion will be done as a follow-up after this one.

However we added a mermaid diagram showing the job lifecycle with and
without Internal API to make it easier to reason about it

Closes: #30295

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
  • Loading branch information
potiuk and jedcunningham authored Apr 11, 2023
1 parent afbe98d commit 73e0313
Show file tree
Hide file tree
Showing 18 changed files with 389 additions and 175 deletions.
6 changes: 3 additions & 3 deletions airflow/cli/commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from airflow import settings
from airflow.configuration import conf
from airflow.jobs.job import Job
from airflow.jobs.job import Job, run_job
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, setup_logging

Expand Down Expand Up @@ -81,6 +81,6 @@ def dag_processor(args):
umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
job.run()
run_job(job)
else:
job.run()
run_job(job)
4 changes: 2 additions & 2 deletions airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.job import Job
from airflow.jobs.job import Job, run_job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.utils import cli as cli_utils
from airflow.utils.cli import process_subdir, setup_locations, setup_logging, sigint_handler, sigquit_handler
Expand All @@ -39,7 +39,7 @@ def _run_scheduler_job(job: Job, *, skip_serve_logs: bool) -> None:
InternalApiConfig.force_database_direct_access()
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(skip_serve_logs), _serve_health_check(enable_health_check):
job.run()
run_job(job)


@cli_utils.action_cli
Expand Down
6 changes: 3 additions & 3 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagRunNotFound, TaskInstanceNotFound
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.job import Job
from airflow.jobs.job import Job, run_job
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagPickle, TaskInstance
Expand Down Expand Up @@ -260,12 +260,12 @@ def _run_task_by_local_task_job(args, ti: TaskInstance) -> TaskReturnCode | None
pool=args.pool,
external_executor_id=_extract_external_executor_id(args),
)
run_job = Job(
local_task_job = Job(
job_runner=local_task_job_runner,
dag_id=ti.dag_id,
)
try:
ret = run_job.run()
ret = run_job(local_task_job)
finally:
if args.shut_down_logging:
logging.shutdown()
Expand Down
6 changes: 3 additions & 3 deletions airflow/cli/commands/triggerer_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from airflow import settings
from airflow.configuration import conf
from airflow.jobs.job import Job
from airflow.jobs.job import Job, run_job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler
Expand Down Expand Up @@ -75,10 +75,10 @@ def triggerer(args):
umask=int(settings.DAEMON_UMASK, 8),
)
with daemon_context, _serve_logs(args.skip_serve_logs):
job.run()
run_job(job)
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
with _serve_logs(args.skip_serve_logs):
job.run()
run_job(job)
158 changes: 158 additions & 0 deletions airflow/jobs/JOB_LIFECYCLE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

These sequence diagrams explain the lifecycle of a Job with relation to the database
operation in the context of the internal API of Airflow.

As part of AIP-44 implementation we separated the ORM Job instance from the code the job runs,
introducing a concept of Job Runners. The Job Runner is a class that is responsible for running
the code and it might execute either in-process when direct database is used, or remotely when
the job is run remotely and communicates via internal API (this part is a work-in-progress and we
will keep on updating these lifecycle diagrams).

This apply to all of the CLI components Airflow runs (Scheduler, DagFileProcessor, Triggerer,
Worker) that run a job. The AIP-44 implementation is not yet complete, but when complete it will
apply to some of the components (DagFileProcessor, Triggerer, Worker) and not to others (Scheduler).

## In-Process Job Runner

```mermaid
sequenceDiagram
participant CLI component
participant JobRunner
participant DB
activate CLI component
CLI component-->>DB: Create Session
activate DB
CLI component->>DB: Create Job
DB->>CLI component: Job object
CLI component->>JobRunner: Create Job Runner
JobRunner ->> CLI component: JobRunner object
CLI component->>JobRunner: Run Job
activate JobRunner
JobRunner->>DB: prepare_for_execution [Job]
DB->>JobRunner: prepared
par
JobRunner->>JobRunner: execute_job
and
JobRunner->>DB: access DB (Variables/Connections etc.)
DB ->> JobRunner: returned data
and
JobRunner-->>DB: create heartbeat session
Note over DB: Note: During heartbeat<br> two DB sessions <br>are opened in parallel(!)
JobRunner->>DB: perform_heartbeat [Job]
JobRunner ->> JobRunner: Heartbeat Callback [Job]
DB ->> JobRunner: heartbeat response
DB -->> JobRunner: close heartbeat session
end
JobRunner->>DB: complete_execution [Job]
DB ->> JobRunner: completed
JobRunner ->> CLI component: completed
deactivate JobRunner
deactivate DB
deactivate CLI component
```

## Internal API Job Runner (WIP)

```mermaid
sequenceDiagram
participant CLI component
participant JobRunner
participant Internal API
participant DB
activate CLI component
CLI component->>Internal API: Create Job
Internal API-->>DB: Create Session
activate DB
Internal API ->> DB: Create Job
DB ->> Internal API: Job object
DB --> Internal API: Close Session
deactivate DB
Internal API->>CLI component: JobPydantic object
CLI component->>JobRunner: Create Job Runner
JobRunner ->> CLI component: JobRunner object
CLI component->>JobRunner: Run Job
activate JobRunner
JobRunner->>Internal API: prepare_for_execution [JobPydantic]
Internal API-->>DB: Create Session
activate DB
Internal API ->> DB: prepare_for_execution [Job]
DB->>Internal API: prepared
DB-->>Internal API: Close Session
deactivate DB
Internal API->>JobRunner: prepared
par
JobRunner->>JobRunner: execute_job
and
JobRunner ->> Internal API: access DB (Variables/Connections etc.)
Internal API-->>DB: Create Session
activate DB
Internal API ->> DB: access DB (Variables/Connections etc.)
DB ->> Internal API: returned data
DB-->>Internal API: Close Session
deactivate DB
Internal API ->> JobRunner: returned data
and
JobRunner->>Internal API: perform_heartbeat <br> [Job Pydantic]
Internal API-->>DB: Create Session
activate DB
Internal API->>DB: perform_heartbeat [Job]
Internal API ->> Internal API: Heartbeat Callback [Job]
DB ->> Internal API: heartbeat response
Internal API ->> JobRunner: heartbeat response
DB-->>Internal API: Close Session
deactivate DB
end
JobRunner->>Internal API: complete_execution <br> [Job Pydantic]
Internal API-->>DB: Create Session
Internal API->>DB: complete_execution [Job]
activate DB
DB ->> Internal API: completed
DB-->>Internal API: Close Session
deactivate DB
Internal API->>JobRunner: completed
JobRunner ->> CLI component: completed
deactivate JobRunner
deactivate CLI component
```
4 changes: 2 additions & 2 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
)
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import Job
from airflow.jobs.job import Job, perform_heartbeat
from airflow.models import DAG, DagPickle
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
Expand Down Expand Up @@ -632,7 +632,7 @@ def _per_task_process(key, ti: TaskInstance, session):
except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e:
self.log.debug(e)

self.job.heartbeat(only_if_necessary=is_unit_test)
perform_heartbeat(job=self.job, only_if_necessary=is_unit_test)
# execute the tasks in the queue
executor.heartbeat()

Expand Down
Loading

0 comments on commit 73e0313

Please sign in to comment.