-
Notifications
You must be signed in to change notification settings - Fork 1
/
test_kubeflow.py
208 lines (160 loc) · 5.67 KB
/
test_kubeflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import asyncio
import re
from subprocess import check_output
import pytest
import requests
from .logger import log_calls, log_calls_async
from .utils import asyncify
TFJOB = "integration/k8s-jobs/mnist.yaml"
SELDONJOB = "integration/k8s-jobs/serve-simple-v1alpha2.yml"
AUTH = ("admin", "foobar")
def get_session():
sess = requests.Session()
sess.auth = AUTH
sess.hooks = {"response": lambda r, *args, **kwargs: r.raise_for_status()}
return sess
def get_ambassador_ip():
"""Returns the Ambassador IP address."""
with open("../PUB_IP") as f:
return f"{f.read().strip()}.xip.io"
def kubectl_create(path: str):
"""Creates a Kubernetes resource from the given path.
Uses juju kubectl plugin that introspects model and divines the proper
kubeconfig.
"""
return check_output(["juju", "kubectl", "--", "create", "-f", path]).strip()
@pytest.mark.asyncio
async def test_validate(model, log_dir):
"""Validates a Kubeflow deployment"""
# Synchronously check what juju thinks happened
validate_statuses(model)
# Check everything else concurrently
await asyncio.gather(
validate_ambassador(),
validate_jupyterhub_api(),
validate_seldon(),
validate_tf_dashboard(),
)
@log_calls
def validate_statuses(model):
"""Validates that a known set of units have booted up into the correct state."""
expected_units = {
"ambassador-auth/0",
"ambassador/0",
"argo-controller/0",
"argo-ui/0",
"jupyter-controller/0",
"jupyter-web/0",
"jupyterhub/0",
"katib-controller/0",
"katib-manager/0",
"katib-db/0",
"katib-ui/0",
"mariadb/0",
"metacontroller/0",
"minio/0",
"modeldb-backend/0",
"modeldb-store/0",
"modeldb-ui/0",
"pipelines-api/0",
"pipelines-dashboard/0",
"pipelines-persistence/0",
"pipelines-scheduledworkflow/0",
"pipelines-ui/0",
"pipelines-viewer/0",
"pytorch-operator/0",
"redis/0",
"seldon-api-frontend/0",
"seldon-cluster-manager/0",
"tensorboard/0",
"tf-job-dashboard/0",
"tf-job-operator/0",
}
assert set(model.units.keys()) == expected_units
for name, unit in model.units.items():
assert unit.agent_status == "idle"
assert unit.workload_status == "active"
assert unit.workload_status_message in ("", "ready")
@log_calls_async
async def validate_ambassador():
"""Validates that the ambassador is up and responding."""
checks = {
"/ambassador/v0/check_ready": b"ambassador readiness check OK",
"/ambassador/v0/check_alive": b"ambassador liveness check OK",
}
ambassador_ip = get_ambassador_ip()
sess = get_session()
for endpoint, text in checks.items():
resp = await asyncify(sess.get)(f"http://{ambassador_ip}{endpoint}")
assert resp.content.startswith(text)
@log_calls_async
async def validate_jupyterhub_api():
"""Validates that JupyterHub is up and responding via Ambassador."""
ambassador_ip = get_ambassador_ip()
sess = get_session()
resp = await asyncify(sess.get)(f"http://{ambassador_ip}/hub/api/")
assert list(resp.json().keys()) == ["version"]
@log_calls_async
async def validate_tf_dashboard():
"""Validates that TF Jobs dashboard is up and responding via Ambassador."""
ambassador_ip = get_ambassador_ip()
sess = get_session()
output = await asyncify(kubectl_create)(TFJOB)
assert (
re.match(rb"tfjob.kubeflow.org/mnist-test-[a-z0-9]{5} created$", output)
is not None
)
expected_jobs = [("PS", 1), ("Worker", 1)]
expected_conditions = [
("Created", "True", "TFJobCreated"),
("Running", "False", "TFJobRunning"),
("Succeeded", "True", "TFJobSucceeded"),
]
expected_statuses = {"PS": {"succeeded": 1}, "Worker": {"succeeded": 1}}
# Wait for up to 5 minutes for the job to complete,
# checking every 5 seconds
for i in range(60):
resp = await asyncify(sess.get)(f"http://{ambassador_ip}/tfjobs/api/tfjob/")
response = resp.json()["items"][0]
jobs = [
(name, spec["replicas"])
for name, spec in response["spec"]["tfReplicaSpecs"].items()
]
conditions = [
(cond["type"], cond["status"], cond["reason"])
for cond in response["status"]["conditions"] or []
]
statuses = response["status"]["replicaStatuses"]
try:
assert jobs == expected_jobs
assert conditions == expected_conditions
assert expected_statuses == statuses
break
except AssertionError as err:
print("Waiting for TFJob to complete...")
print(err)
await asyncio.sleep(5)
else:
pytest.fail("Waited too long for TFJob to succeed!")
@log_calls_async
async def validate_seldon():
ambassador_ip = get_ambassador_ip()
sess = get_session()
output = await asyncify(kubectl_create)(SELDONJOB)
assert (
output == b"seldondeployment.machinelearning.seldon.io/mock-classifier created"
)
for i in range(60):
try:
resp = await asyncify(sess.get)(
f"http://{ambassador_ip}/seldon/mock-classifier/"
)
resp.raise_for_status()
assert resp.text == "Hello World!!"
break
except (AssertionError, requests.HTTPError) as err:
print("Waiting for SeldonDeployment to start...")
print(err)
await asyncio.sleep(5)
else:
pytest.fail("Waited too long for SeldonDeployment to start!")