Skip to content

Commit

Permalink
Merge pull request #45 from zbrookle/add_py_typed
Browse files Browse the repository at this point in the history
Add py typed
  • Loading branch information
zbrookle authored Sep 24, 2020
2 parents aff7132 + ca10f2a commit e36657e
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 46 deletions.
5 changes: 2 additions & 3 deletions avionix_airflow/kubernetes/airflow/airflow_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ def __init__(
super().__init__(
name=name,
args=self._args,
image="airflow-image"
if airflow_options.local_mode
else f"zachb1996/avionix_airflow:{self._airflow_options.master_image_tag}",
image=f"{self._airflow_options.master_image}:"
f"{self._airflow_options.master_image_tag}",
image_pull_policy=airflow_options.image_pull_policy,
env=self._get_environment(),
env_from=[
Expand Down
29 changes: 23 additions & 6 deletions avionix_airflow/kubernetes/airflow/airflow_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,28 +103,45 @@ class AirflowOptions:
additional_vars: InitVar[Optional[Dict[str, str]]] = None
fernet_key: InitVar[str] = ""
dags_paused_at_creation: bool = True
worker_image: InitVar[str] = "airflow-image"
worker_image_tag: str = "latest"
worker_image: str = ""
worker_image_tag: str = ""
open_node_ports: bool = False
local_mode: bool = False
smtp_notification_options: Optional[SmtpNotificationOptions] = None
git_ssh_key: Optional[str] = None
image_pull_policy: str = "IfNotPresent"
master_image_tag: str = "latest"
master_image: str = ""
master_image_tag: str = ""
delete_pods_on_failure: bool = False
default_image: ClassVar[str] = "zachb1996/avionix_airflow"

def __post_init__(
self, access_modes, additional_vars, fernet_key: str, worker_image,
self, access_modes, additional_vars, fernet_key: str,
):
self.access_modes = self.__get_access_modes(access_modes)
self.fernet_key = fernet_key if fernet_key else _create_fernet_key()
if worker_image == "airflow-image" and not self.local_mode:
self.worker_image = "zachb1996/avionix_airflow"
self.worker_image = self._get_image_behavior(self.worker_image)
self.master_image = self._get_image_behavior(self.master_image)
self.worker_image_tag = self._get_tag(self.worker_image_tag)
self.master_image_tag = self._get_tag(self.master_image_tag)

self.__additional_vars = additional_vars if additional_vars is not None else {}
if self.smtp_notification_options:
self.__additional_vars.update(self.smtp_notification_options.to_dict())

def _get_image_behavior(self, image: str):
if not image and not self.local_mode:
return self.default_image
if not image and self.local_mode:
return "airflow-image"
return image

@staticmethod
def _get_tag(tag: str):
if tag:
return tag
return "latest"

@staticmethod
def __get_access_modes(access_modes: Optional[List[str]]):
if access_modes is None:
Expand Down
5 changes: 2 additions & 3 deletions avionix_airflow/kubernetes/airflow/airflow_pods.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
DeploymentStrategy,
RollingUpdateDeployment,
)
from avionix.kube.core import PodSpec, PodTemplateSpec
from avionix.kube.core import Container, PodSpec, PodTemplateSpec
from avionix.kube.meta import LabelSelector

from avionix_airflow.kubernetes.airflow.airflow_containers import (
AirflowContainer,
FlowerUI,
Scheduler,
WebserverUI,
Expand Down Expand Up @@ -77,7 +76,7 @@ def _volumes(self):
return volumes

@abstractmethod
def _get_containers(self) -> List[AirflowContainer]:
def _get_containers(self) -> List[Container]:
pass


Expand Down
2 changes: 2 additions & 0 deletions avionix_airflow/kubernetes/airflow/airflow_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ def __init__(
self.__persistent_volume = AirflowPersistentVolume(
name, storage, host_path, access_modes, cloud_options
)
if self.__volume.persistentVolumeClaim is None:
raise Exception("Need persistent volume claim!")
self.__persistent_volume_claim = AirflowPersistentVolumeClaim(
self.__volume.persistentVolumeClaim.claimName,
access_modes,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from typing import List

from avionix.kube.core import ConfigMap
from avionix.kube.core import ConfigMap, Container
from yaml import dump

from avionix_airflow.kubernetes.airflow.airflow_containers import (
AirflowContainer,
AirflowWorker,
)
from avionix_airflow.kubernetes.airflow.airflow_containers import AirflowWorker
from avionix_airflow.kubernetes.airflow.airflow_options import AirflowOptions
from avionix_airflow.kubernetes.airflow.airflow_pods import AirflowPodTemplate
from avionix_airflow.kubernetes.cloud.cloud_options import CloudOptions
Expand All @@ -18,7 +15,7 @@


class AirflowWorkerPodTemplate(AirflowPodTemplate):
def _get_containers(self) -> List[AirflowContainer]:
def _get_containers(self) -> List[Container]:
return [
AirflowWorker(
"base",
Expand Down
5 changes: 4 additions & 1 deletion avionix_airflow/kubernetes/airflow/ingress_controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import List

from avionix.kube.extensions import (
HTTPIngressPath,
HTTPIngressRuleValue,
Ingress,
IngressRule,
Expand All @@ -15,7 +18,7 @@
class AirflowIngress(Ingress):
def __init__(self, airflow_options: AirflowOptions, cloud_options: CloudOptions):
values = ValueOrchestrator()
ingress_paths = cloud_options.extra_ingress_paths + [
ingress_paths: List[HTTPIngressPath] = cloud_options.extra_ingress_paths + [
AirflowIngressPath(
values.webserver_service_name,
values.webserver_port_name,
Expand Down
14 changes: 7 additions & 7 deletions avionix_airflow/kubernetes/cloud/aws/aws_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from avionix import ChartDependency, ObjectMeta
from avionix.kube.base_objects import KubernetesBaseObject
from avionix.kube.core import CSIPersistentVolumeSource
from avionix.kube.extensions import IngressBackend
from avionix.kube.extensions import HTTPIngressPath, IngressBackend
from avionix.kube.storage import StorageClass

from avionix_airflow.kubernetes.base_ingress_path import AirflowIngressPath
Expand Down Expand Up @@ -201,8 +201,8 @@ def ingress_annotations(self) -> Dict[str, str]:
return annotations

@property
def extra_ingress_paths(self) -> List[AirflowIngressPath]:
ingress_paths = [
def extra_ingress_paths(self) -> List[HTTPIngressPath]:
ingress_paths: List[HTTPIngressPath] = [
AirflowIngressPath(
self._grafana_redirect, self._use_annotation, path="/grafana"
),
Expand All @@ -211,13 +211,13 @@ def extra_ingress_paths(self) -> List[AirflowIngressPath]:
),
]
if self.__use_ssl:
return [
AirflowIngressPath("ssl-redirect", self._use_annotation, path="/*"),
] + ingress_paths
ingress_paths.insert(
0, AirflowIngressPath("ssl-redirect", self._use_annotation, path="/*"),
)
return ingress_paths

@property
def default_backend(self) -> IngressBackend:
def default_backend(self) -> Optional[IngressBackend]:
return None

@property
Expand Down
14 changes: 7 additions & 7 deletions avionix_airflow/kubernetes/cloud/cloud_options.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from abc import ABC, abstractmethod
from typing import Dict, List
from typing import Dict, List, Optional

from avionix import ChartDependency
from avionix.kube.base_objects import KubernetesBaseObject
from avionix.kube.core import CSIPersistentVolumeSource
from avionix.kube.extensions import IngressBackend
from avionix.kube.extensions import HTTPIngressPath, IngressBackend
from avionix.kube.storage import StorageClass

from avionix_airflow.kubernetes.base_ingress_path import AirflowIngressPath


class CloudOptions(ABC):
service_type = "LoadBalancer"
Expand All @@ -20,7 +18,9 @@ def __init__(self, storage_class: StorageClass, volume_mode: str):
self.volume_mode = volume_mode

@abstractmethod
def get_csi_persistent_volume_source(self, name: str) -> CSIPersistentVolumeSource:
def get_csi_persistent_volume_source(
self, name: str
) -> Optional[CSIPersistentVolumeSource]:
pass

@abstractmethod
Expand All @@ -42,12 +42,12 @@ def ingress_annotations(self) -> Dict[str, str]:

@property
@abstractmethod
def extra_ingress_paths(self) -> List[AirflowIngressPath]:
def extra_ingress_paths(self) -> List[HTTPIngressPath]:
pass

@property
@abstractmethod
def default_backend(self) -> IngressBackend:
def default_backend(self) -> Optional[IngressBackend]:
pass

@property
Expand Down
15 changes: 7 additions & 8 deletions avionix_airflow/kubernetes/cloud/local/local_options.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from typing import Dict, List
from typing import Dict, List, Optional

from avionix import ChartDependency, ObjectMeta
from avionix.kube.base_objects import KubernetesBaseObject
from avionix.kube.core import CSIPersistentVolumeSource, HostPathVolumeSource
from avionix.kube.extensions import IngressBackend
from avionix.kube.extensions import HTTPIngressPath, IngressBackend
from avionix.kube.storage import StorageClass

from avionix_airflow.kubernetes.base_ingress_path import AirflowIngressPath
from avionix_airflow.kubernetes.cloud.cloud_options import CloudOptions


Expand All @@ -15,13 +14,13 @@ class LocalOptions(CloudOptions):

def __init__(self):
super().__init__(
storage_class=StorageClass(
ObjectMeta(name="standard"), None, None, None, "efs.csi.aws.com", None
),
storage_class=StorageClass(ObjectMeta(name="standard"), "efs.csi.aws.com"),
volume_mode="Filesystem",
)

def get_csi_persistent_volume_source(self, name: str) -> CSIPersistentVolumeSource:
def get_csi_persistent_volume_source(
self, name: str
) -> Optional[CSIPersistentVolumeSource]:
return None

def get_host_path_volume_source(self, host_path: str):
Expand Down Expand Up @@ -55,7 +54,7 @@ def webserver_service_annotations(self) -> Dict[str, str]:
return {}

@property
def extra_ingress_paths(self) -> List[AirflowIngressPath]:
def extra_ingress_paths(self) -> List[HTTPIngressPath]:
return []

@property
Expand Down
11 changes: 8 additions & 3 deletions avionix_airflow/kubernetes/namespace_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@


class AirflowMeta(ObjectMeta):
def __init__(self, name: str, labels: Optional[dict] = None, *args, **kwargs):
def __init__(
self,
name: str,
labels: Optional[dict] = None,
annotations: Optional[dict] = None,
):
if labels is None:
labels = {}
super().__init__(name=name, labels=labels, *args, **kwargs)
self.labels["app"] = "airflow"
labels["app"] = "airflow"
super().__init__(name=name, labels=labels, annotations=annotations)
Empty file added avionix_airflow/py.typed
Empty file.
1 change: 1 addition & 0 deletions avionix_airflow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
class AvionixAirflowChartInstallationContext(ChartInstallationContext):
def get_status_resources(self):
resources = super().get_status_resources()
print(resources)
new_resources = resources.filter(regex=".*deployment.*")
return new_resources

Expand Down
43 changes: 43 additions & 0 deletions avionix_airflow/tests/test_airflow_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pytest

from avionix_airflow import AirflowOptions


@pytest.mark.parametrize(
"local_mode,worker_image,worker_image_tag,master_image,master_image_tag",
[
(True, "", "", "", ""),
(False, "", "", "", ""),
(True, "test-image", "oldest", "", ""),
(False, "test-image", "oldest", "", ""),
],
)
def test_airflow_options_images(
local_mode: bool,
worker_image: str,
worker_image_tag: str,
master_image: str,
master_image_tag: str,
):
options = AirflowOptions(
"busybox",
["test"],
"* * * * *",
local_mode=local_mode,
worker_image=worker_image,
worker_image_tag=worker_image_tag,
master_image=master_image,
master_image_tag=master_image_tag,
)

# Assert expected master image behavior
if master_image:
assert options.master_image == (master_image if master_image else "")
if not master_image and local_mode:
assert options.master_image == "airflow-image"
assert options.master_image_tag == (
master_image_tag if master_image_tag else "latest"
)
assert options.worker_image_tag == (
worker_image_tag if worker_image_tag else "latest"
)
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ dependencies:
- pytest-rerunfailures
- pip:
# Main requirements
- avionix==0.4.2
- avionix==0.4.3
- python-hosts==1.0.0
- cryptography==3.0
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
avionix>=0.4.2
avionix>=0.4.3
python-hosts==1.0.0
cryptography==3.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def read_file(file_path: Path):
"Typing :: Typed",
"Operating System :: OS Independent",
],
package_data={"avionix_airflow": ["py.typed"]},
long_description_content_type="text/x-rst",
include_package_data=True,
)

0 comments on commit e36657e

Please sign in to comment.