Apache Airflow plugin that exposes xtended secure API endpoints similar to the official Airflow API (Stable) (1.0.0), providing richer capabilities to support more powerful DAG and job management. Apache Airflow version 2.8.0 or higher is necessary.
python3 -m pip install airflow-xtended-api
Build a custom version of this plugin by following the instructions in this doc
Airflow Xtended API plugin uses the same auth mechanism as Airflow API (Stable) (1.0.0). So, by default APIs exposed via this plugin respect the auth mechanism used by your Airflow webserver and also complies with the existing RBAC policies. Note that you will need to pass credentials data as part of the request. Here is a snippet from the official docs when basic authorization is used:
curl -X POST 'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/dags/{dag_id}?update_mask=is_paused' \
-H 'Content-Type: application/json' \
--user "username:password" \
-d '{
"is_paused": true
}'
After installing the plugin python package and restarting your airflow webserver, You can see a link under the 'Xtended API' tab called 'Reference Docs' on the airflow webserver homepage. All the necessary documentation for the supported API endpoints resides on that page. You can also directly navigate to that page using below link.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/xtended_api/
All the supported endpoints are exposed in the below format:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/{ENDPOINT_NAME}
Following are the names of endpoints which are currently supported.
- deploy_dag
- create_dag
- s3_sync
- mongo_sync
- scan_dags
- purge_dags
- refresh_all_dags
- delete_dag
- upload_file
- restart_failed_task
- kill_running_tasks
- run_task_instance
- skip_task_instance
- Deploy a new DAG File to the DAGs directory.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag
- POST
- dag_file - file - Upload & Deploy a DAG from .py or .zip files
- force (optional) - boolean - Force uploading the file if it already exists
- unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
curl -X POST -H 'Content-Type: multipart/form-data' \
--user "username:password" \
-F 'dag_file=@test_dag.py' \
-F 'force=y' \
-F 'unpause=y' \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag
{
"message": "DAG File [<module '{MODULE_NAME}' from '/{DAG_FOLDER}/exam.py'>] has been uploaded",
"status": "success"
}
- GET
- dag_file_url - file - A valid url for fetching .py, .pyc or .zip DAG files
- filename - string - A valid filename ending with .py, .pyc or .zip
- force (optional) - boolean - Force uploading the file if it already exists.
- unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag?dag_file_url={DAG_FILE_URL}&filename=test_dag.py&force=on&unpause=on'
{
"message": "DAG File [<module '{MODULE_NAME}' from '/{DAG_FOLDER}/exam.py'>] has been uploaded",
"status": "success"
}
- Create a new DAG File in the DAGs directory.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/create_dag
- POST
- filename - string - Name of the python DAG file
- dag_code - string(multiline) - Python code of the DAG file
- force (optional) - boolean - Force uploading the file if it already exists
- unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
- Sync DAG files from an S3 bucket.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/s3_sync
- POST
- s3_bucket_name - string - S3 bucket name where DAG files exist
- s3_region - string - S3 region name where the specified bucket exists
- s3_access_key - string - IAM access key having atleast S3 bucket read access
- s3_secret_key - string - IAM secret key for the specifed access key
- s3_object_prefix (optional) - string - Filter results by object prefix
- s3_object_keys (optional) - string - Sync DAG files specifed by the object keys. Multiple object keys are seperated by comma (,)
- skip_purge (optional) - boolean - Skip emptying DAGs directory
- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
curl -X POST -H 'Content-Type: multipart/form-data' \
--user "username:password" \
-F 's3_bucket_name=test-bucket' \
-F 's3_region=us-east-1' \
-F 's3_access_key={IAM_ACCESS_KEY}' \
-F 's3_secret_key={IAM_SECRET_KEY}' \
-F 'skip_purge=y' \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/s3_sync
{
"message": "dag files synced from s3",
"sync_status": {
"synced": ["test_dag0.py", "test_dag1.py", "test_dag2.py"],
"failed": []
},
"status": "success"
}
- Sync DAG files from a mongo db collection
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/mongo_sync
- POST
- connection_string - string - Source mongo server connection string
- db_name - string - Source mongo database name
- collection_name - string - Collection name where DAG data exists in the specified db
- field_filename - string - DAGs are named using value of this document field from the specified collection
- field_dag_source - string - A document field referring the Python source for the yet-to-be created DAGs
- query_filter (optional) - string - JSON query string to filter required documents
- skip_purge (optional) - boolean - Skip emptying DAGs directory
- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
curl -X POST -H 'Content-Type: multipart/form-data' \
--user "username:password" \
-F 'connection_string={MONGO_SERVER_CONNECTION_STRING}' \
-F 'db_name=test_db' \
-F 'collection_name=test_collection' \
-F 'field_dag_source=dag_source' \
-F 'field_filename=dag_filename' \
-F 'skip_purge=y' \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/mongo_sync
{
"message": "dag files synced from mongo",
"sync_status": {
"synced": ["test_dag0.py", "test_dag1.py", "test_dag2.py"],
"failed": []
},
"status": "success"
}
- Refresh all DAGs in the webserver.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/refresh_all_dags
- GET
- None
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/refresh_all_dags
{
"message": "All DAGs are now up-to-date!!",
"status": "success"
}
- Check for newly created DAGs.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/scan_dags
- GET
- otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/scan_dags
{
"message": "Ondemand DAG scan complete!!",
"status": "success"
}
- Empty DAG directory.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/purge_dags
- GET
- None
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/purge_dags
{
"message": "DAG directory purged!!",
"status": "success"
}
- Delete a DAG in the web server from Airflow database and filesystem.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/delete_dag
- GET
- dag_id (optional)- string - DAG id
- filename (optional) - string - Name of the DAG file that needs to be deleted
- Note: Atleast one of args 'dag_id' or 'filename' should be specified
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/delete_dag?dag_id=test_dag&filename=test_dag.py'
{
"message": "DAG [dag_test] deleted",
"status": "success"
}
- Upload a new File to the specified directory.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/upload_file
- POST
- file - file - File to be uploaded
- force (optional) - boolean - Force uploading the file if it already exists
- path (optional) - string - Location where the file is to be uploaded (Default is the DAGs directory)
curl -X POST -H 'Content-Type: multipart/form-data' \
--user "username:password" \
-F 'file=@test_file.py' \
-F 'force=y' \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/upload_file
{
"message": "File [/{DAG_FOLDER}/dag_test.txt] has been uploaded",
"status": "success"
}
- Restart failed tasks with downstream.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/restart_failed_task
- GET
- dag_id - string - DAG id
- run_id - string - DagRun id
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/restart_failed_task?dag_id=test_dag&run_id=test_run'
{
"message": {
"failed_task_count": 1,
"clear_task_count": 7
},
"status": "success"
}
- Kill running tasks having status in ['none', 'running'].
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/kill_running_tasks
- GET
- dag_id - string - DAG id
- run_id - string - DagRun id
- task_id - string - If task_id is none, kill all tasks, else kill the specified task.
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/kill_running_tasks?dag_id=test_dag&run_id=test_run&task_id=test_task'
{
"message": "tasks in test_run killed!!",
"status": "success"
}
- Create DagRun, run the specified tasks, and skip the rest.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/run_task_instance
- POST
- dag_id - string - DAG id
- run_id - string - DagRun id
- tasks - string - task id(s), Multiple tasks are separated by comma (,)
- conf (optional)- string - Optional configuartion for creating DagRun.
curl -X POST -H 'Content-Type: multipart/form-data' \
--user "username:password" \
-F 'dag_id=test_dag' \
-F 'run_id=test_run' \
-F 'tasks=test_task' \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/run_task_instance
{
"execution_date": "2021-06-21T05:50:19.740803+0000",
"status": "success"
}
- Skip one task instance.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/skip_task_instance
- GET
- dag_id - string - DAG id
- run_id - string - DagRun id
- task_id - string - task id
curl -X GET http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/skip_task_instance?dag_id=test_dag&run_id=test_run&task_id=test_task
{
"message": "<TaskInstance: test_dag.test_task 2021-06-21 19:59:34.638794+00:00 [skipped]> skipped!!",
"status": "success"
}
Huge shout out to these awesome plugins that contributed to the growth of Airflow ecosystem, which also inspired this plugin.