Skip to content

Latest commit

 

History

History
73 lines (49 loc) · 3.12 KB

applying-custom-resources.adoc

File metadata and controls

73 lines (49 loc) · 3.12 KB

Applying Custom Resources

Airflow can be used to apply custom resources from within a cluster. An example of this could be a SparkApplication job that is to be triggered by Airflow. The steps below describe how this can be done.

Define an in-cluster Kubernetes connection

An in-cluster connection can either be created from within the Webserver UI (note that the "in cluster configuration" box is ticked):

Airflow Connections

Alternatively, the connection can be defined by an environment variable in URI format:

AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"

This can be supplied directly in the custom resource for all roles (Airflow expects configuration to be common across components):

link:example$example-airflow-incluster.yaml[role=include]

Define a cluster role for Airflow to create SparkApplication resources

Airflow cannot create or access SparkApplication resources by default - a cluster role is required for this:

link:example$example-airflow-spark-clusterrole.yaml[role=include]

and a corresponding cluster role binding:

link:example$example-airflow-spark-clusterrolebinding.yaml[role=include]

DAG code

Now for the DAG itself. The job to be started is a simple Spark job that calculates the value of pi:

link:example$example-pyspark-pi.yaml[role=include]

This will called from within a DAG by using the connection that was defined earlier. It will be wrapped by the KubernetesHook that the Airflow Kubernetes provider makes available [here.](https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py) There are two classes that are used to:

  • start the job

  • monitor the status of the job

These are written in-line in the python code below, though this is just to make it clear how the code is used (the classes SparkKubernetesOperator and SparkKubernetesSensor will be used for all custom resources and thus are best defined as separate python files that the DAG would reference).

link:example$example-spark-dag.py[role=include]
  1. the wrapper class used for calling the job via KubernetesHook

  2. the connection that created for in-cluster usage

  3. the wrapper class used for monitoring the job via KubernetesHook

  4. the start of the DAG code

  5. the initial task to invoke the job

  6. the subsequent task to monitor the job

  7. the jobs are chained together in the correct order

Once this DAG is mounted in the DAG folder it can be called and its progress viewed from within the Webserver UI:

Airflow Connections

Clicking on the "spark_pi_monitor" task and selecting the logs shows that the status of the job has been tracked by Airflow:

Airflow Connections