Skip to content

Commit

Permalink
Pod exec enhancements (#328)
Browse files Browse the repository at this point in the history
* rework pod_exec example

Rework this example to be closer to the one provided by the official kubernetes
python api. The busybox pod is now created if it does not exist, making the
example easier to use. Also, use contexts to ensure resources are properly
released.

This prepares the work for next commits that will add more cases in this
pod_exec example.

Signed-off-by: Olivier Matz <olivier.matz@6wind.com>

* introduce helper to get command return code in ws_client

When the websocket API is used to execute a command on a pod, the status is sent
over the ERROR_CHANNEL on termination. Add a helper to parse this information
that returns the exit code of the command. This helper can only be used if
_preload_content=False.

The pod_exec example will be updated in next commit to make use of this new
helper.

Signed-off-by: Olivier Matz <olivier.matz@6wind.com>

* add an interactive case in pod_exec example

Introduce an example that shows how the WsApiClient can be used to interactively
execute a command on a pod. The example is similar to what is done in the
official kubernetes python api.

Signed-off-by: Olivier Matz <olivier.matz@6wind.com>

* remove extra await when calling ws_connect()

The ClientSession.ws_connect() method is synchronous and returns a
_RequestContextManager which takes a coroutine as parameter (here,
ClientSession._ws_connect()).

This context manager is in charge of closing the connection in its __aexit__()
method, so it has to be used with "async with".

However, this context manager can also be awaited as it has an __await__()
method. In this case, it will await the _ws_connect() coroutine. This is what is
done in the current code, but the connection will not be released.

Remove the "await" to return the context manager, so that the user can use it
with "async with", which will properly release resources.

This is the documented way of using ws_connect():
https://docs.aiohttp.org/en/stable/client_quickstart.html#websockets

Signed-off-by: Olivier Matz <olivier.matz@6wind.com>

---------

Signed-off-by: Olivier Matz <olivier.matz@6wind.com>
  • Loading branch information
olivier-matz-6wind committed Aug 9, 2024
1 parent 2126b1d commit a37f664
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 36 deletions.
165 changes: 130 additions & 35 deletions examples/pod_exec.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,147 @@
import asyncio

from kubernetes_asyncio import client, config
from aiohttp.http import WSMsgType

from kubernetes_asyncio import client, config, utils
from kubernetes_asyncio.client.api_client import ApiClient
from kubernetes_asyncio.stream import WsApiClient
from kubernetes_asyncio.stream.ws_client import (
ERROR_CHANNEL, STDERR_CHANNEL, STDOUT_CHANNEL,
)

BUSYBOX_POD = "busybox-test"

async def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
await config.load_kube_config()

v1 = client.CoreV1Api()
async def find_busybox_pod():
async with ApiClient() as api:
v1 = client.CoreV1Api(api)
ret = await v1.list_pod_for_all_namespaces()
for i in ret.items:
if i.metadata.namespace == 'default' and i.metadata.name == BUSYBOX_POD:
print(f"Found busybox pod: {i.metadata.name}")
return i.metadata.name
return None

print("Try to find a pod with busybox (name busybox*) ...")
ret = await v1.list_pod_for_all_namespaces()

for i in ret.items:
if i.metadata.name.startswith("busybox"):
pod = i.metadata.name
namespace = i.metadata.namespace
print("Buxy box", pod, "namespace", namespace)
break
else:
print("Busybox not found !")
return
async def create_busybox_pod():
print(f"Pod {BUSYBOX_POD} does not exist. Creating it...")
manifest = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': BUSYBOX_POD,
},
'spec': {
'containers': [{
'image': 'busybox',
'name': 'sleep',
"args": [
"/bin/sh",
"-c",
"while true; do date; sleep 5; done"
]
}]
}
}
async with ApiClient() as api:
objects = await utils.create_from_dict(api, manifest, namespace="default")
pod = objects[0]
print(f"Created pod {pod.metadata.name}.")
return pod.metadata.name

v1_ws = client.CoreV1Api(api_client=WsApiClient())

exec_command = [
"/bin/sh",
"-c",
"echo This message goes to stderr >&2; echo This message goes to stdout",
]
async def wait_busybox_pod_ready():
print(f"Waiting pod {BUSYBOX_POD} to be ready.")
async with ApiClient() as api:
v1 = client.CoreV1Api(api)
while True:
ret = await v1.read_namespaced_pod(name=BUSYBOX_POD, namespace="default")
if ret.status.phase != 'Pending':
break
await asyncio.sleep(1)

resp = v1_ws.connect_get_namespaced_pod_exec(
pod,
namespace,
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
)

ret = await resp
async def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
await config.load_kube_config()

pod = await find_busybox_pod()
if not pod:
pod = await create_busybox_pod()
await wait_busybox_pod_ready()

print("Response: ", ret)
# Execute a command in a pod non-interactively, and display its output
print("-------------")
async with WsApiClient() as ws_api:
v1_ws = client.CoreV1Api(api_client=ws_api)
exec_command = [
"/bin/sh",
"-c",
"echo This message goes to stderr >&2; echo This message goes to stdout",
]
ret = await v1_ws.connect_get_namespaced_pod_exec(
pod,
"default",
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
print(f"Response: {ret}")

# Execute a command interactively. If _preload_content=False is passed to
# connect_get_namespaced_pod_exec(), the returned object is an aiohttp ClientWebSocketResponse
# object, that can be manipulated directly.
print("-------------")
async with WsApiClient() as ws_api:
v1_ws = client.CoreV1Api(api_client=ws_api)
exec_command = ['/bin/sh']
websocket = await v1_ws.connect_get_namespaced_pod_exec(
BUSYBOX_POD,
"default",
command=exec_command,
stderr=True,
stdin=True,
stdout=True,
tty=False,
_preload_content=False,
)
commands = [
"echo 'This message goes to stdout'\n",
"echo 'This message goes to stderr' >&2\n",
"exit 1\n",
]
error_data = ""
closed = False
async with websocket as ws:
while commands and not closed:
command = commands.pop(0)
stdin_channel_prefix = chr(0)
await ws.send_bytes((stdin_channel_prefix + command).encode("utf-8"))
while True:
try:
msg = await ws.receive(timeout=1)
except asyncio.TimeoutError:
break
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
closed = True
break
channel = msg.data[0]
data = msg.data[1:].decode("utf-8")
if not data:
continue
if channel == STDOUT_CHANNEL:
print(f"stdout: {data}")
elif channel == STDERR_CHANNEL:
print(f"stderr: {data}")
elif channel == ERROR_CHANNEL:
error_data += data
if error_data:
returncode = ws_api.parse_error_data(error_data)
print(f"Exit code: {returncode}")

if __name__ == "__main__":
loop = asyncio.get_event_loop()
Expand Down
14 changes: 13 additions & 1 deletion kubernetes_asyncio/stream/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.

import json

from six.moves.urllib.parse import urlencode, urlparse, urlunparse

from kubernetes_asyncio.client import ApiClient
Expand Down Expand Up @@ -54,6 +56,16 @@ def __init__(self, configuration=None, header_name=None, header_value=None,
super().__init__(configuration, header_name, header_value, cookie, pool_threads)
self.heartbeat = heartbeat

@classmethod
def parse_error_data(cls, error_data):
"""
Parse data received on ERROR_CHANNEL and return the command exit code.
"""
error_data_json = json.loads(error_data)
if error_data_json.get("status") == "Success":
return 0
return int(error_data_json["details"]["causes"][0]['message'])

async def request(self, method, url, query_params=None, headers=None,
post_params=None, body=None, _preload_content=True,
_request_timeout=None):
Expand Down Expand Up @@ -96,4 +108,4 @@ async def request(self, method, url, query_params=None, headers=None,

else:

return await self.rest_client.pool_manager.ws_connect(url, headers=headers, heartbeat=self.heartbeat)
return self.rest_client.pool_manager.ws_connect(url, headers=headers, heartbeat=self.heartbeat)
14 changes: 14 additions & 0 deletions kubernetes_asyncio/stream/ws_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,17 @@ async def test_exec_ws_with_heartbeat(self):
},
heartbeat=30
)

def test_parse_error_data_success(self):
error_data = '{"metadata":{},"status":"Success"}'
return_code = WsApiClient.parse_error_data(error_data)
self.assertEqual(return_code, 0)

def test_parse_error_data_failure(self):
error_data = (
'{"metadata":{},"status":"Failure",'
'"message":"command terminated with non-zero exit code",'
'"reason":"NonZeroExitCode",'
'"details":{"causes":[{"reason":"ExitCode","message":"1"}]}}')
return_code = WsApiClient.parse_error_data(error_data)
self.assertEqual(return_code, 1)

0 comments on commit a37f664

Please sign in to comment.