Skip to content

Commit

Permalink
[AIRFLOW-5448] Handle istio-proxy for Kubernetes Pods (#62)
Browse files Browse the repository at this point in the history
Istio service mesh is not compatible by default with Kubernetes Jobs.
The normal behavior is that a Job will be started, get an istio-proxy
sidecar attached to it via the istio mutating webhook, run until
completion, then the 'main' container in the pod stops, but istio-proxy
hangs around indefinitely. This change handles cleanly exiting the
Istio sidecar 'istio-proxy' when a Kubernetes Executor task completes.

(cherry picked from commit 84fa48f)
(cherry picked from commit 6ed59bf)
(cherry picked from commit ba60ede)
  • Loading branch information
sjmiller609 authored and kaxil committed Dec 14, 2020
1 parent 85328b6 commit 80ac218
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 5 deletions.
3 changes: 3 additions & 0 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType
from airflow.kubernetes import pod_generator
from airflow.kubernetes.istio import Istio
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.kube_config import KubeConfig
from airflow.kubernetes.kubernetes_helper_functions import create_pod_id
Expand Down Expand Up @@ -91,6 +92,7 @@ def __init__(
self.watcher_queue = watcher_queue
self.resource_version = resource_version
self.kube_config = kube_config
self.istio = Istio(get_kube_client())

def run(self) -> None:
"""Performs watching"""
Expand Down Expand Up @@ -164,6 +166,7 @@ def _run(
event=event,
)
last_resource_version = task.metadata.resource_version
self.istio.handle_istio_proxy(task)

return last_resource_version

Expand Down
144 changes: 144 additions & 0 deletions airflow/kubernetes/istio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from kubernetes.stream import stream
from packaging.version import parse as semantic_version

from airflow import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin


class SidecarNames:
"""Define strings that indicate container names"""

ISTIO_PROXY = 'istio-proxy'


class Istio(LoggingMixin):
"""Handle all Istio-related logic"""

def __init__(self, kube_client):
super().__init__()
self._client = kube_client

def handle_istio_proxy(self, pod) -> bool:
"""If an istio-proxy sidecar is detected, and all other containers
are terminated, then attempt to cleanly shutdown the sidecar.
If we detect a version of Istio before it's compatible with Kubernetes
Jobs, then raise an informative error message.
:param pod: The pod which we are checking for the sidecar
:returns: True if we detect and exit istio-proxy, False if we do not detect istio-proxy
:rtype: bool
Raises:
AirflowException: if we find an istio-proxy, and we can't shut it down.
"""
if self._should_shutdown_istio_proxy(pod):
self.log.info(
"Detected that a task finished and needs an istio-proxy sidecar to be cleaned up. "
"pod name: %s",
pod.metadata.name,
)
self._shutdown_istio_proxy(pod)
return True
return False

def _should_shutdown_istio_proxy(self, pod):
"""Look for an istio-proxy, and decide if it should be shutdown.
Args:
pod (V1Pod): The pod which we are checking for the sidecar
Returns:
(bool): True if we detect istio-proxy, and all other containers
are finished running, otherwise false
"""
if pod.status.phase != "Running":
return False
found_istio = False
for container_status in pod.status.container_statuses:
if container_status.name == SidecarNames.ISTIO_PROXY and container_status.state.running:
found_istio = True
continue
if not container_status.state.terminated:
# Any state besides 'terminated' should be
# considered still busy
return False
# If we didn't find istio at all, then we should
# not shut it down. Also we should only shut it down
# if it has state "running".
return found_istio

def _shutdown_istio_proxy(self, pod):
"""Shutdown the istio-proxy on the provided pod
Args:
pod (V1Pod): The pod which the container is in
Returns:
None
Raises:
AirflowException: if we find an istio-proxy, and we can't shut it down.
"""
for container in pod.spec.containers:

# Skip unless it's a sidecar named as SidecarNames.ISTIO_PROXY.
if container.name != SidecarNames.ISTIO_PROXY:
continue

# Check if supported version of istio-proxy.
# If we can't tell the version, proceed anyways.
if ":" in container.image:
_, tag = container.image.split(":")
if semantic_version(tag) < semantic_version("1.3.0-rc.0"):
raise AirflowException(
'Please use istio version 1.3.0+ for KubernetesExecutor compatibility.'
+ f' Detected version {tag}'
)

# Determine the istio-proxy statusPort,
# which is where /quitquitquit is implemented.
# Default to 15020.
status_port = "15020"
for i in range(len(container.args)):
arg = container.args[i]
if arg.strip() == "--statusPort":
status_port = container.args[i + 1].strip()
break
if arg.strip()[:13] == "--statusPort=":
status_port = arg.strip()[13:]
break

self.log.info("Shutting down istio-proxy in pod %s", pod.metadata.name)
self._post_quitquitquit(pod, container, status_port)

def _post_quitquitquit(self, pod, container, status_port):
"""Send the curl to shutdown the isto-proxy container"""
# Use exec to curl localhost inside of the sidecar.
_ = stream(
self._client.connect_get_namespaced_pod_exec,
pod.metadata.name,
pod.metadata.namespace,
tty=False,
stderr=True,
stdin=False,
stdout=True,
container=container.name,
command=['/bin/sh', '-c', f'curl -XPOST http://127.0.0.1:{status_port}/quitquitquit'],
)
28 changes: 23 additions & 5 deletions airflow/kubernetes/pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from requests.exceptions import BaseHTTPError

from airflow.exceptions import AirflowException
from airflow.kubernetes.istio import Istio
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.settings import pod_mutation_hook
Expand All @@ -46,6 +47,20 @@ class PodStatus:
SUCCEEDED = 'succeeded'


class SleepConfig:
"""Configure sleeps used for polling"""

# Only polls during the start of a pod
POD_STARTING_POLL = 1
# Used to detect all cleanup jobs are completed
# and the entire Pod is cleaned up
POD_RUNNING_POLL = 1
# Polls for the duration of the task execution
# to detect when the task is done. The difference
# between this and POD_RUNNING_POLL is sidecars.
BASE_CONTAINER_RUNNING_POLL = 2


class PodLauncher(LoggingMixin):
"""Launches PODS"""

Expand All @@ -68,6 +83,7 @@ def __init__(
self._client = kube_client or get_kube_client(in_cluster=in_cluster, cluster_context=cluster_context)
self._watch = watch.Watch()
self.extract_xcom = extract_xcom
self.istio = Istio(self._client)

def run_pod_async(self, pod: V1Pod, **kwargs):
"""Runs POD asynchronously"""
Expand Down Expand Up @@ -114,7 +130,8 @@ def start_pod(self, pod: V1Pod, startup_timeout: int = 120):
delta = dt.now() - curr_time
if delta.total_seconds() >= startup_timeout:
raise AirflowException("Pod took too long to start")
time.sleep(1)
time.sleep(SleepConfig.POD_STARTING_POLL)
self.log.debug('Pod not yet started')

def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, Optional[str]]:
"""
Expand Down Expand Up @@ -144,16 +161,17 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, Optional[str]]
# Prefer logs duplication rather than loss
read_logs_since_sec = math.ceil(delta.total_seconds())
result = None
while self.base_container_is_running(pod):
self.log.info('Container %s has state %s', pod.metadata.name, State.RUNNING)
time.sleep(SleepConfig.BASE_CONTAINER_RUNNING_POLL)
if self.extract_xcom:
while self.base_container_is_running(pod):
self.log.info('Container %s has state %s', pod.metadata.name, State.RUNNING)
time.sleep(2)
result = self._extract_xcom(pod)
self.log.info(result)
result = json.loads(result)
self.istio.handle_istio_proxy(self.read_pod(pod))
while self.pod_is_running(pod):
self.log.info('Pod %s has state %s', pod.metadata.name, State.RUNNING)
time.sleep(2)
time.sleep(SleepConfig.POD_RUNNING_POLL)
return self._task_status(self.read_pod(pod)), result

def parse_log_line(self, line: str) -> Tuple[str, str]:
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
kubernetes = [
'cryptography>=2.0.0',
'kubernetes>=3.0.0, <12.0.0',
'packaging>=19.1',
]
kylin = ['kylinpy>=2.6']
ldap = [
Expand Down
119 changes: 119 additions & 0 deletions tests/kubernetes/test_istio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
from unittest.mock import MagicMock, patch

from airflow import AirflowException
from airflow.kubernetes.istio import Istio


def mock_stream(func, *args, **kwargs):
print('calling func')
return func(*args, **kwargs)


class TestIstio(unittest.TestCase):
def setUp(self):
mock_kube_client = MagicMock()
self.istio = Istio(mock_kube_client)

def _mock_pod(self, image="istio/proxyv2:1.3.0", args=None): # noqa
sidecar = MagicMock()
sidecar.name = "istio-proxy"
sidecar.namespace = "fake-namespace"
sidecar.image = image
sidecar.args = args
pod = MagicMock()
pod.spec.containers = [sidecar]
pod.status.phase = "Running"
pod.metadata.name = "fake-pod-name"
pod.metadata.namespace = "fake-namespace"
container_status1 = MagicMock()
container_status1.name = "istio-proxy"
container_status1.state.running = True
container_status1.state.terminated = False
container_status2 = MagicMock()
container_status2.name = "base"
container_status2.state.running = False
container_status2.state.terminated = True
pod.status.container_statuses = [container_status1, container_status2]
return pod

def test_handle_istio_proxy_low_version(self):
pod = self._mock_pod(image="istio/proxyv2:1.2.9")
self.assertRaises(AirflowException, self.istio.handle_istio_proxy, pod)

def _handle_istio_proxy_with_sidecar_args(self, args):
pod = self._mock_pod(args=args)
self.istio.handle_istio_proxy(pod)

@patch("airflow.kubernetes.istio.stream", new=mock_stream)
def test_handle_istio_proxy(self):
args = ["proxy", "sidecar", "--statusPort", "12345"]
self._handle_istio_proxy_with_sidecar_args(args)
self.istio._client.connect_get_namespaced_pod_exec.assert_called_once_with(
'fake-pod-name',
'fake-namespace',
tty=False,
container='istio-proxy',
stderr=True,
stdin=False,
stdout=True,
command=['/bin/sh', '-c', 'curl -XPOST http://127.0.0.1:12345/quitquitquit'],
)

@patch("airflow.kubernetes.istio.stream", new=mock_stream)
def test_handle_istio_proxy_other_cli_format(self):
args = ["proxy", "sidecar", "--statusPort=12345"]
self._handle_istio_proxy_with_sidecar_args(args)
self.istio._client.connect_get_namespaced_pod_exec.assert_called_once_with(
'fake-pod-name',
'fake-namespace',
tty=False,
container='istio-proxy',
stderr=True,
stdin=False,
stdout=True,
command=['/bin/sh', '-c', 'curl -XPOST http://127.0.0.1:12345/quitquitquit'],
)

@patch("airflow.kubernetes.istio.stream", new=mock_stream)
def test_handle_istio_proxy_no_cli_argument(self):
args = ["proxy", "sidecar"]
self._handle_istio_proxy_with_sidecar_args(args)
self.istio._client.connect_get_namespaced_pod_exec.assert_called_once_with(
'fake-pod-name',
'fake-namespace',
tty=False,
container='istio-proxy',
stderr=True,
stdin=False,
stdout=True,
command=['/bin/sh', '-c', 'curl -XPOST http://127.0.0.1:15020/quitquitquit'],
)

@patch("airflow.kubernetes.istio.stream", new=mock_stream)
def test_handle_istio_with_no_sidecar(self):
pod = MagicMock()
pod.spec.containers = []
self.istio.handle_istio_proxy(MagicMock())
self.istio._client.connect_get_namespaced_pod_exec.assert_not_called()


if __name__ == "__main__":
unittest.main()

0 comments on commit 80ac218

Please sign in to comment.