Skip to content

Commit

Permalink
Merge pull request #51 from zbrookle/pod_namespace
Browse files Browse the repository at this point in the history
Pod namespace
  • Loading branch information
zbrookle authored Oct 21, 2020
2 parents 7f0e38a + c6e6310 commit a5c9db0
Show file tree
Hide file tree
Showing 27 changed files with 557 additions and 392 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ jobs:
with:
path: ~/conda_pkgs_dir
key: ${{ runner.os }}-conda-${{ env.CACHE_NUMBER }}-${{ hashFiles('environment.yml') }}
- uses: goanpeca/setup-miniconda@v1
- uses: conda-incubator/setup-miniconda@v1.7.0
with:
auto-update-conda: true
activate-environment: avionix_airflow
python-version: 3.8
channel-priority: strict
Expand Down Expand Up @@ -75,7 +76,7 @@ jobs:
if: always()

- name: Setup Minikube
uses: manusa/actions-setup-minikube@v2.0.0
uses: manusa/actions-setup-minikube@v2.0.1
with:
minikube version: 'v1.11.0'
kubernetes version: 'v1.18.3'
Expand Down
11 changes: 8 additions & 3 deletions avionix_airflow/build_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from avionix_airflow.kubernetes.postgres import PostgresOrchestrator, SqlOptions
from avionix_airflow.kubernetes.redis import RedisOptions, RedisOrchestrator
from avionix_airflow.kubernetes.value_handler import ValueOrchestrator
from avionix_airflow.kubernetes.services import ServiceFactory


class AvionixChartInfo(ChartInfo):
Expand Down Expand Up @@ -45,13 +45,18 @@ def get_chart_builder(
:param cloud_options: A CloudOptions object
:return: Avionix ChartBuilder object that can be used to install airflow
"""
service_factory = ServiceFactory(
sql_options=sql_options,
cloud_options=cloud_options,
airflow_options=airflow_options,
)
orchestrator = AirflowOrchestrator(
sql_options,
redis_options,
ValueOrchestrator(),
airflow_options,
monitoring_options,
cloud_options,
service_factory,
)
dependencies = cloud_options.get_cloud_dependencies()
if monitoring_options.enabled:
Expand All @@ -67,7 +72,7 @@ def get_chart_builder(
if airflow_options.in_celery_mode:
orchestrator += RedisOrchestrator(redis_options)
if sql_options.create_database_in_cluster:
orchestrator += PostgresOrchestrator(sql_options)
orchestrator += PostgresOrchestrator(sql_options, service_factory)
builder = ChartBuilder(
AvionixChartInfo("airflow", dependencies),
orchestrator.get_kube_parts(),
Expand Down
88 changes: 55 additions & 33 deletions avionix_airflow/kubernetes/airflow/airflow_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@

from avionix_airflow.kubernetes.airflow.airflow_options import AirflowOptions
from avionix_airflow.kubernetes.airflow.airflow_storage import (
AirflowDagVolumeGroup,
AirflowLogVolumeGroup,
AirflowSSHSecretsVolumeGroup,
AirflowWorkerPodTemplateStorageGroup,
ExternalStorageVolumeGroup,
StorageGroupFactory,
)
from avionix_airflow.kubernetes.cloud.cloud_options import CloudOptions
from avionix_airflow.kubernetes.monitoring.monitoring_options import MonitoringOptions
Expand Down Expand Up @@ -48,9 +45,9 @@ def __init__(self, name: str, value):
super().__init__("ELASTICSEARCH__" + name, value)


class KubernetesWorkerPodEnvVar(AirflowEnvVar):
class SchedulerEnvVar(AirflowEnvVar):
def __init__(self, name: str, value):
super().__init__("KUBERNETES_ENVIRONMENT_VARIABLES__" + name, value)
super().__init__("SCHEDULER__" + name, value)


class AirflowContainer(Container):
Expand All @@ -67,13 +64,16 @@ def __init__(
ports: Optional[List[ContainerPort]] = None,
readiness_probe: Optional[Probe] = None,
):
values = ValueOrchestrator()
self._values = ValueOrchestrator()
self._sql_options = sql_options
self._redis_options = redis_options
self._airflow_options = airflow_options
self._monitoring_options = monitoring_options
self._cloud_options = cloud_options
self._name = name
self._volume_group_factory = StorageGroupFactory(
self._airflow_options, self._cloud_options, self._airflow_options.namespace
)
super().__init__(
name=name,
args=self._args,
Expand All @@ -83,7 +83,7 @@ def __init__(
env=self._get_environment(),
env_from=[
EnvFromSource(
secret_ref=SecretEnvSource(values.secret_name, optional=False)
secret_ref=SecretEnvSource(self._values.secret_name, optional=False)
),
],
ports=ports,
Expand All @@ -94,18 +94,12 @@ def __init__(

def _get_volume_mounts(self):
mounts = [
AirflowLogVolumeGroup(
self._airflow_options, self._cloud_options
).volume_mount,
AirflowDagVolumeGroup(
self._airflow_options, self._cloud_options
).volume_mount,
ExternalStorageVolumeGroup(
self._airflow_options, self._cloud_options
).volume_mount,
self._volume_group_factory.log_volume_group.volume_mount,
self._volume_group_factory.dag_volume_group.volume_mount,
self._volume_group_factory.external_storage_volume_group.volume_mount,
]
if self._airflow_options.git_ssh_key:
mounts.append(AirflowSSHSecretsVolumeGroup().volume_mount)
mounts.append(self._volume_group_factory.ssh_volume_group.volume_mount)
return mounts

def _get_environment(self):
Expand All @@ -121,11 +115,11 @@ def _args(self) -> List[str]:
return [self._name]

@property
def _executor(self):
def _executor(self) -> str:
return self._airflow_options.core_executor

@property
def _airflow_env(self):
def _airflow_env(self) -> List[CoreEnvVar]:
return [
CoreEnvVar("EXECUTOR", self._executor),
CoreEnvVar("DEFAULT_TIMEZONE", self._airflow_options.default_timezone,),
Expand All @@ -139,7 +133,7 @@ def _airflow_env(self):
]

@property
def _elastic_search_env(self):
def _elastic_search_env(self) -> List[ElasticSearchEnvVar]:
return [
ElasticSearchEnvVar(
"HOST", self._monitoring_options.elastic_search_proxy_uri
Expand All @@ -149,15 +143,11 @@ def _elastic_search_env(self):
]

@property
def _kubernetes_env(self):
def _kubernetes_env(self) -> List[KubernetesEnvVar]:
dag_volume_group = self._volume_group_factory.dag_volume_group
kube_settings = [
KubernetesEnvVar("NAMESPACE", self._airflow_options.namespace),
KubernetesEnvVar(
"DAGS_VOLUME_CLAIM",
AirflowDagVolumeGroup(
self._airflow_options, self._cloud_options
).persistent_volume_claim.metadata.name,
),
KubernetesEnvVar("NAMESPACE", self._airflow_options.pods_namespace),
KubernetesEnvVar("DAGS_VOLUME_CLAIM", dag_volume_group.pvc.metadata.name,),
KubernetesEnvVar(
"WORKER_CONTAINER_REPOSITORY", self._airflow_options.worker_image,
),
Expand All @@ -175,12 +165,10 @@ def _kubernetes_env(self):
),
]
if not self._monitoring_options.enabled:
log_volume_group = self._volume_group_factory.log_volume_group
kube_settings.append(
KubernetesEnvVar(
"LOGS_VOLUME_CLAIM",
AirflowLogVolumeGroup(
self._airflow_options, self._cloud_options
).persistent_volume_claim.metadata.name,
"LOGS_VOLUME_CLAIM", log_volume_group.pvc.metadata.name,
)
)
return kube_settings
Expand All @@ -189,6 +177,28 @@ def _kubernetes_env(self):
class AirflowWorker(AirflowContainer):
_command_entry_point = None

def __init__(
self,
name: str,
sql_options: SqlOptions,
redis_options: RedisOptions,
airflow_options: AirflowOptions,
monitoring_options: MonitoringOptions,
cloud_options: CloudOptions,
ports: Optional[List[ContainerPort]] = None,
readiness_probe: Optional[Probe] = None,
):
super().__init__(
name,
sql_options,
redis_options,
airflow_options,
monitoring_options,
cloud_options,
ports,
readiness_probe,
)

@property
def _args(self):
return []
Expand All @@ -201,6 +211,18 @@ def _executor(self):
"""
return "LocalExecutor"

@property
def _scheduler_env(self) -> List[SchedulerEnvVar]:
return [
SchedulerEnvVar(
"STATSD_HOST",
f"airflow-telegraf.{self._airflow_options.namespace}.svc.cluster.local",
)
]

def _get_environment(self):
return super()._get_environment() + self._scheduler_env


class AirflowMasterContainer(AirflowContainer):
def _get_volume_mounts(self):
Expand Down
5 changes: 5 additions & 0 deletions avionix_airflow/kubernetes/airflow/airflow_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class AirflowOptions:
:param master_image_tag: The docker tag to use for the master image
:param delete_pods_on_failure: Whether or not to terminate pods if
KubernetesExecutor task fails
:param pods_namespace: The namespace in which pods will be launched
"""

dag_sync_image: str
Expand Down Expand Up @@ -114,6 +115,7 @@ class AirflowOptions:
master_image_tag: str = ""
delete_pods_on_failure: bool = False
default_image: ClassVar[str] = "zachb1996/avionix_airflow"
pods_namespace: str = ""

def __post_init__(
self, access_modes, additional_vars, fernet_key: str,
Expand All @@ -124,6 +126,9 @@ def __post_init__(
self.master_image = self._get_image_behavior(self.master_image)
self.worker_image_tag = self._get_tag(self.worker_image_tag)
self.master_image_tag = self._get_tag(self.master_image_tag)
self.pods_namespace = (
self.namespace if not self.pods_namespace else self.pods_namespace
)

self.__additional_vars = additional_vars if additional_vars is not None else {}
if self.smtp_notification_options:
Expand Down
87 changes: 56 additions & 31 deletions avionix_airflow/kubernetes/airflow/airflow_orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
from avionix.kube.core import Namespace
from avionix.kube.meta import ObjectMeta

from avionix_airflow.kubernetes.airflow.airflow_options import AirflowOptions
from avionix_airflow.kubernetes.airflow.airflow_pods import AirflowDeployment
from avionix_airflow.kubernetes.airflow.airflow_roles import AirflowPodRoleGroup
from avionix_airflow.kubernetes.airflow.airflow_secrets import AirflowSecret
from avionix_airflow.kubernetes.airflow.airflow_service import (
FlowerService,
StatsDService,
WebserverService,
)
from avionix_airflow.kubernetes.airflow.airflow_service_accounts import (
AirflowPodServiceAccount,
)
from avionix_airflow.kubernetes.airflow.airflow_storage import (
AirflowDagVolumeGroup,
AirflowLogVolumeGroup,
ExternalStorageVolumeGroup,
)
from avionix_airflow.kubernetes.airflow.airflow_storage import StorageGroupFactory
from avionix_airflow.kubernetes.airflow.airflow_worker_pod_template import (
PodTemplateWorkerConfig,
)
Expand All @@ -25,24 +19,25 @@
from avionix_airflow.kubernetes.orchestration import Orchestrator
from avionix_airflow.kubernetes.postgres.sql_options import SqlOptions
from avionix_airflow.kubernetes.redis.redis_options import RedisOptions
from avionix_airflow.kubernetes.value_handler import ValueOrchestrator
from avionix_airflow.kubernetes.services import ServiceFactory


class AirflowOrchestrator(Orchestrator):
def __init__(
self,
sql_options: SqlOptions,
redis_options: RedisOptions,
values: ValueOrchestrator,
airflow_options: AirflowOptions,
monitoring_options: MonitoringOptions,
cloud_options: CloudOptions,
service_factory: ServiceFactory,
):
dag_group = AirflowDagVolumeGroup(airflow_options, cloud_options)
log_group = AirflowLogVolumeGroup(airflow_options, cloud_options)
external_volume_group = ExternalStorageVolumeGroup(
airflow_options, cloud_options
storage_group_factory = StorageGroupFactory(
airflow_options, cloud_options, airflow_options.namespace
)
dag_group = storage_group_factory.dag_volume_group
log_group = storage_group_factory.log_volume_group
external_volume_group = storage_group_factory.external_storage_volume_group
components = [
AirflowDeployment(
sql_options,
Expand All @@ -51,16 +46,22 @@ def __init__(
monitoring_options,
cloud_options,
),
WebserverService(values, airflow_options.open_node_ports, cloud_options),
dag_group.persistent_volume,
log_group.persistent_volume,
dag_group.persistent_volume_claim,
log_group.persistent_volume_claim,
external_volume_group.persistent_volume,
external_volume_group.persistent_volume_claim,
service_factory.webserver_service,
dag_group.pv,
log_group.pv,
dag_group.pvc,
log_group.pvc,
external_volume_group.pv,
external_volume_group.pvc,
DagRetrievalJob(airflow_options, cloud_options),
AirflowIngress(airflow_options, cloud_options),
AirflowSecret(sql_options, airflow_options, redis_options),
AirflowIngress(airflow_options, cloud_options, service_factory),
AirflowSecret(
sql_options,
airflow_options,
redis_options,
airflow_options.namespace,
service_factory,
),
PodTemplateWorkerConfig(
sql_options,
redis_options,
Expand All @@ -70,15 +71,39 @@ def __init__(
),
]
if monitoring_options.enabled:
components.append(StatsDService(values, airflow_options.open_node_ports))
components.append(service_factory.statsd_service)
if airflow_options.in_celery_mode:
components.append(
FlowerService(values, airflow_options.open_node_ports, cloud_options)
)
components.append(service_factory.flower_service)
if airflow_options.in_kube_mode:
airflow_pod_service_account = AirflowPodServiceAccount()
role_group = AirflowPodRoleGroup(airflow_pod_service_account)
airflow_pod_service_account = AirflowPodServiceAccount(airflow_options)
role_group = AirflowPodRoleGroup(
airflow_pod_service_account, airflow_options
)
components.extend(
[airflow_pod_service_account, role_group.role, role_group.role_binding]
)
if (
airflow_options.pods_namespace != airflow_options.namespace
) and airflow_options.in_kube_mode:
worker_storage_groups = StorageGroupFactory(
airflow_options, cloud_options, airflow_options.pods_namespace
)
components.extend(
[
Namespace(ObjectMeta(name=airflow_options.pods_namespace)),
worker_storage_groups.dag_volume_group.pvc,
worker_storage_groups.dag_volume_group.pv,
worker_storage_groups.log_volume_group.pvc,
worker_storage_groups.log_volume_group.pv,
worker_storage_groups.external_storage_volume_group.pvc,
worker_storage_groups.external_storage_volume_group.pv,
AirflowSecret(
sql_options,
airflow_options,
redis_options,
airflow_options.pods_namespace,
service_factory,
),
]
)
super().__init__(components)
Loading

0 comments on commit a5c9db0

Please sign in to comment.