-
Notifications
You must be signed in to change notification settings - Fork 1
/
validation.py
1396 lines (1217 loc) · 50.3 KB
/
validation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import json
import os
import requests
import traceback
import yaml
import re
import random
import pytest
from asyncio_extras import async_contextmanager
from async_generator import yield_
from datetime import datetime
from logger import log
from pprint import pformat
from tempfile import NamedTemporaryFile
from utils import (
timeout_for_current_task,
retry_async_with_timeout,
scp_to,
scp_from,
disable_source_dest_check,
verify_deleted,
verify_ready,
)
class MicrobotError(Exception):
pass
class AuditTimestampError(Exception):
pass
async def wait_for_process(model, arg):
""" Retry api_server_with_arg <checks> times with a 5 sec interval """
checks = 60
ready = False
while not ready:
checks -= 1
if await api_server_with_arg(model, arg):
return
else:
if checks <= 0:
assert False
await asyncio.sleep(0.5)
async def wait_for_not_process(model, arg):
""" Retry api_server_with_arg <checks> times with a 5 sec interval """
checks = 60
ready = False
while not ready:
checks -= 1
if await api_server_with_arg(model, arg):
if checks <= 0:
assert False
await asyncio.sleep(0.5)
else:
return
async def api_server_with_arg(model, argument):
master = model.applications["kubernetes-master"]
for unit in master.units:
search = "ps -ef | grep {} | grep apiserver".format(argument)
action = await unit.run(search)
assert action.status == "completed"
raw_output = action.data["results"]["Stdout"]
if len(raw_output.splitlines()) != 1:
return False
return True
async def run_until_success(unit, cmd, timeout_insec=None):
while True:
action = await unit.run(cmd, timeout=timeout_insec)
if (
action.status == "completed"
and "results" in action.data
and action.data["results"]["Code"] == "0"
):
return action.data["results"]["Stdout"]
else:
log(
"Action " + action.status + ". Command failed on unit " + unit.entity_id
)
log("cmd: " + cmd)
if "results" in action.data:
log("code: " + action.data["results"]["Code"])
log("stdout:\n" + action.data["results"]["Stdout"].strip())
log("stderr:\n" + action.data["results"]["Stderr"].strip())
log("Will retry...")
await asyncio.sleep(0.5)
async def get_last_audit_entry_date(unit):
cmd = "cat /root/cdk/audit/audit.log | tail -n 1"
raw = await run_until_success(unit, cmd)
data = json.loads(raw)
if "timestamp" in data:
timestamp = data["timestamp"]
time = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ")
elif "requestReceivedTimestamp" in data:
timestamp = data["requestReceivedTimestamp"]
time = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
else:
raise AuditTimestampError("Unable to find timestamp in {}".format(data))
return time
@async_contextmanager
async def assert_hook_occurs_on_all_units(app, hook):
started_units = set()
finished_units = set()
for unit in app.units:
@unit.on_change
async def on_change(delta, old, new, model):
unit_id = new.entity_id
if new.agent_status_message == "running " + hook + " hook":
started_units.add(unit_id)
if new.agent_status == "idle" and unit_id in started_units:
finished_units.add(unit_id)
await yield_()
log("assert_hook_occurs_on_all_units: waiting for " + hook + " hook")
while len(finished_units) < len(app.units):
await asyncio.sleep(0.5)
async def set_config_and_wait(app, config, tools):
current_config = await app.get_config()
if all(config[key] == current_config[key]["value"] for key in config):
log("set_config_and_wait: new config identical to current, skipping")
return
async with assert_hook_occurs_on_all_units(app, "config-changed"):
await app.set_config(config)
await tools.juju_wait()
async def reset_audit_config(master_app, tools):
config = await master_app.get_config()
await set_config_and_wait(
master_app,
{
"audit-policy": config["audit-policy"]["default"],
"audit-webhook-config": config["audit-webhook-config"]["default"],
"api-extra-args": config["api-extra-args"]["default"],
},
tools,
)
# START TESTS
@pytest.mark.asyncio
async def test_auth_file_propagation(model):
"""Validate that changes to /root/cdk/basic_auth.csv on the leader master
unit are propagated to the other master units.
"""
# Get a leader and non-leader unit to test with
masters = model.applications["kubernetes-master"]
for master in masters.units:
if await master.is_leader_from_status():
leader = master
else:
follower = master
# Change basic_auth.csv on the leader, and get its md5sum
leader_md5 = await run_until_success(
leader,
"echo test,test,test >> /root/cdk/basic_auth.csv && "
"md5sum /root/cdk/basic_auth.csv",
)
# Check that md5sum on non-leader matches
await run_until_success(
follower, 'md5sum /root/cdk/basic_auth.csv | grep "{}"'.format(leader_md5)
)
# Cleanup (remove the line we added)
await run_until_success(leader, "sed -i '$d' /root/cdk/basic_auth.csv")
@pytest.mark.asyncio
async def test_status_messages(model, tools):
""" Validate that the status messages are correct. """
await tools.juju_wait()
expected_messages = {
"kubernetes-master": "Kubernetes master running.",
"kubernetes-worker": "Kubernetes worker running.",
}
for app, message in expected_messages.items():
for unit in model.applications[app].units:
assert unit.workload_status_message == message
@pytest.mark.asyncio
async def test_snap_versions(model):
""" Validate that the installed snap versions are consistent with channel
config on the charms.
"""
snaps_to_validate = {
"kubernetes-master": [
"kubectl",
"kube-apiserver",
"kube-controller-manager",
"kube-scheduler",
"cdk-addons",
],
"kubernetes-worker": ["kubectl", "kubelet", "kube-proxy"],
}
for app_name, snaps in snaps_to_validate.items():
app = model.applications[app_name]
config = await app.get_config()
channel = config["channel"]["value"]
if "/" not in channel:
message = "validate_snap_versions: skipping %s, channel=%s"
message = message % (app_name, channel)
log(message)
continue
track = channel.split("/")[0]
for unit in app.units:
action = await unit.run("snap list")
assert action.status == "completed"
raw_output = action.data["results"]["Stdout"]
# Example of the `snap list` output format we're expecting:
# Name Version Rev Developer Notes
# conjure-up 2.1.5 352 canonical classic
# core 16-2 1689 canonical -
# kubectl 1.6.2 27 canonical classic
lines = raw_output.splitlines()[1:]
snap_versions = dict(line.split()[:2] for line in lines)
for snap in snaps:
snap_version = snap_versions[snap]
if not snap_version.startswith(track + "."):
log(
"Snap {} is version {} and not {}".format(
snap, snap_version, track + "."
)
)
assert snap_version.startswith(track + ".")
@pytest.mark.asyncio
async def test_rbac(model):
""" Validate RBAC is actually on """
app = model.applications["kubernetes-master"]
await app.set_config({"authorization-mode": "RBAC,Node"})
await wait_for_process(model, "RBAC")
cmd = "/snap/bin/kubectl --kubeconfig /root/cdk/kubeconfig get clusterroles"
worker = model.applications["kubernetes-worker"].units[0]
output = await worker.run(cmd)
assert output.status == "completed"
assert "forbidden" in output.data["results"]["Stderr"].lower()
await app.set_config({"authorization-mode": "AlwaysAllow"})
await wait_for_process(model, "AlwaysAllow")
output = await worker.run(cmd)
assert output.status == "completed"
assert "forbidden" not in output.data["results"]["Stderr"]
@pytest.mark.asyncio
async def test_rbac_flag(model):
""" Switch between auth modes and check the apiserver follows """
master = model.applications["kubernetes-master"]
await master.set_config({"authorization-mode": "RBAC"})
await wait_for_process(model, "RBAC")
await master.set_config({"authorization-mode": "AlwaysAllow"})
await wait_for_process(model, "AlwaysAllow")
@pytest.mark.asyncio
@pytest.mark.skip_arch(["s390x", "arm64", "aarch64"])
@pytest.mark.skip("Will retry with different dns resolver")
async def test_microbot(model, tools):
""" Validate the microbot action """
unit = model.applications["kubernetes-worker"].units[0]
action = await unit.run_action("microbot", delete=True)
await action.wait()
action = await unit.run_action("microbot", replicas=3)
await action.wait()
assert action.status == "completed"
for i in range(60):
try:
resp = await tools.requests.get(
"http://" + action.data["results"]["address"]
)
if resp.status_code == 200:
return
except requests.exceptions.ConnectionError:
log(
"Caught connection error attempting to hit xip.io, "
"retrying. Error follows:"
)
log(traceback.print_exc())
await asyncio.sleep(0.5)
raise MicrobotError("Microbot failed to start.")
@pytest.mark.asyncio
@pytest.mark.skip_arch(["s390x", "arm64", "aarch64"])
async def test_dashboard(model, log_dir, tools):
""" Validate that the dashboard is operational """
unit = model.applications["kubernetes-master"].units[0]
with NamedTemporaryFile() as f:
await scp_from(unit, "config", f.name, tools.controller_name, tools.connection)
with open(f.name, "r") as stream:
config = yaml.safe_load(stream)
url = config["clusters"][0]["cluster"]["server"]
user = config["users"][0]["user"]["username"]
password = config["users"][0]["user"]["password"]
auth = tools.requests.auth.HTTPBasicAuth(user, password)
resp = await tools.requests.get(url, auth=auth, verify=False)
assert resp.status_code == 200
# get k8s version
app_config = await model.applications["kubernetes-master"].get_config()
channel = app_config["channel"]["value"]
# if we do not detect the version from the channel eg edge, stable etc
# we should default to the latest dashboard url format
k8s_version = (2, 0)
if "/" in channel:
version_string = channel.split("/")[0]
k8s_version = tuple(int(q) for q in re.findall("[0-9]+", version_string)[:2])
# dashboard will present a login form prompting for login
if k8s_version < (1, 8):
url = "%s/api/v1/namespaces/kube-system/services/kubernetes-dashboard/proxy/#!/login"
else:
url = "%s/api/v1/namespaces/kube-system/services/https:kubernetes-dashboard:/proxy/#!/login"
url %= config["clusters"][0]["cluster"]["server"]
log("Waiting for dashboard to stabilize...")
async def dashboard_present(url):
resp = await tools.requests.get(url, auth=auth, verify=False)
if resp.status_code == 200 and "Dashboard" in resp.text:
return True
return False
await retry_async_with_timeout(
verify_ready,
(unit, "po", ["kubernetes-dashboard"], "-n kube-system"),
timeout_msg="Unable to find kubernetes dashboard before timeout",
)
await retry_async_with_timeout(
dashboard_present, (url,), timeout_msg="Unable to reach dashboard"
)
@pytest.mark.asyncio
async def test_kubelet_anonymous_auth_disabled(model, tools):
""" Validate that kubelet has anonymous auth disabled """
async def validate_unit(unit):
await unit.run("open-port 10250")
address = unit.public_address
url = "https://%s:10250/pods/" % address
response = await tools.requests.get(url, verify=False)
assert response.status_code == 401 # Unauthorized
units = model.applications["kubernetes-worker"].units
await asyncio.gather(*(validate_unit(unit) for unit in units))
@pytest.mark.asyncio
@pytest.mark.skip_apps(["canal", "calico", "tigera-secure-ee"])
async def test_network_policies(model, tools):
""" Apply network policy and use two busyboxes to validate it. """
here = os.path.dirname(os.path.abspath(__file__))
unit = model.applications["kubernetes-master"].units[0]
# Clean-up namespace from any previous runs.
cmd = await unit.run("/snap/bin/kubectl delete ns netpolicy")
assert cmd.status == "completed"
log("Waiting for pods to finish terminating...")
await retry_async_with_timeout(
verify_deleted,
(unit, "ns", "netpolicy"),
timeout_msg="Unable to remove the namespace netpolicy",
)
# Move manifests to the master
await scp_to(
os.path.join(here, "templates", "netpolicy-test.yaml"),
unit,
"netpolicy-test.yaml",
tools.controller_name,
tools.connection,
)
await scp_to(
os.path.join(here, "templates", "restrict.yaml"),
unit,
"restrict.yaml",
tools.controller_name,
tools.connection,
)
cmd = await unit.run("/snap/bin/kubectl create -f /home/ubuntu/netpolicy-test.yaml")
if not cmd.results["Code"] == "0":
log("Failed to create netpolicy test!")
log(cmd.results)
assert cmd.status == "completed" and cmd.results["Code"] == "0"
log("Waiting for pods to show up...")
await retry_async_with_timeout(
verify_ready,
(unit, "po", ["bboxgood", "bboxbad"], "-n netpolicy"),
timeout_msg="Unable to create pods for network policy test",
)
# Try to get to nginx from both busyboxes.
# We expect no failures since we have not applied the policy yet.
async def get_to_networkpolicy_service():
log("Reaching out to nginx.netpolicy with no restrictions")
query_from_bad = "/snap/bin/kubectl exec bboxbad -n netpolicy -- wget --timeout=30 nginx.netpolicy"
query_from_good = "/snap/bin/kubectl exec bboxgood -n netpolicy -- wget --timeout=30 nginx.netpolicy"
cmd_good = await unit.run(query_from_good)
cmd_bad = await unit.run(query_from_bad)
if (
cmd_good.status == "completed"
and cmd_bad.status == "completed"
and "index.html" in cmd_good.data["results"]["Stderr"]
and "index.html" in cmd_bad.data["results"]["Stderr"]
):
return True
return False
await retry_async_with_timeout(
get_to_networkpolicy_service,
(),
timeout_msg="Failed to query nginx.netpolicy even before applying restrictions",
)
# Apply network policy and retry getting to nginx.
# This time the policy should block us.
cmd = await unit.run("/snap/bin/kubectl create -f /home/ubuntu/restrict.yaml")
assert cmd.status == "completed"
await asyncio.sleep(10)
async def get_to_restricted_networkpolicy_service():
log("Reaching out to nginx.netpolicy with restrictions")
query_from_bad = "/snap/bin/kubectl exec bboxbad -n netpolicy -- wget --timeout=30 nginx.netpolicy -O foo.html"
query_from_good = "/snap/bin/kubectl exec bboxgood -n netpolicy -- wget --timeout=30 nginx.netpolicy -O foo.html"
cmd_good = await unit.run(query_from_good)
cmd_bad = await unit.run(query_from_bad)
if (
cmd_good.status == "completed"
and cmd_bad.status == "completed"
and "foo.html" in cmd_good.data["results"]["Stderr"]
and "timed out" in cmd_bad.data["results"]["Stderr"]
):
return True
return False
await retry_async_with_timeout(
get_to_restricted_networkpolicy_service,
(),
timeout_msg="Failed query restricted nginx.netpolicy",
)
# Clean-up namespace from next runs.
cmd = await unit.run("/snap/bin/kubectl delete ns netpolicy")
assert cmd.status == "completed"
@pytest.mark.asyncio
@pytest.mark.slow
async def test_worker_master_removal(model, tools):
# Add a second master
masters = model.applications["kubernetes-master"]
unit_count = len(masters.units)
if unit_count < 2:
await masters.add_unit(1)
await disable_source_dest_check(tools.model_name)
# Add a second worker
workers = model.applications["kubernetes-worker"]
unit_count = len(workers.units)
if unit_count < 2:
await workers.add_unit(1)
await disable_source_dest_check(tools.model_name)
await tools.juju_wait()
# Remove a worker to see how the masters handle it
unit_count = len(workers.units)
await workers.units[0].remove()
await tools.juju_wait()
while len(workers.units) == unit_count:
await asyncio.sleep(15)
log("Waiting for worker removal. (%d/%d)" % (len(workers.units), unit_count))
# Remove the master leader
unit_count = len(masters.units)
for master in masters.units:
if await master.is_leader_from_status():
await master.remove()
await tools.juju_wait()
while len(masters.units) == unit_count:
await asyncio.sleep(15)
log("Waiting for master removal. (%d/%d)" % (len(masters.units), unit_count))
# Try and restore the cluster state
# Tests following this were passing, but they actually
# would fail in a multi-master situation
await workers.add_unit(1)
await masters.add_unit(1)
await disable_source_dest_check(tools.model_name)
log("Waiting for new master and worker.")
await tools.juju_wait()
@pytest.mark.asyncio
async def test_gpu_support(model, tools):
""" Test gpu support. Should be disabled if hardware
is not detected and functional if hardware is fine"""
# See if the workers have nvidia
workers = model.applications["kubernetes-worker"]
action = await workers.units[0].run("lspci -nnk")
nvidia = True if action.results["Stdout"].lower().count("nvidia") > 0 else False
master_unit = model.applications["kubernetes-master"].units[0]
if not nvidia:
# nvidia should not be running
await retry_async_with_timeout(
verify_deleted,
(master_unit, "ds", "nvidia-device-plugin-daemonset", "-n kube-system"),
timeout_msg="nvidia-device-plugin-daemonset is setup without nvidia hardware",
)
else:
# nvidia should be running
await retry_async_with_timeout(
verify_ready,
(master_unit, "ds", ["nvidia-device-plugin-daemonset"], "-n kube-system"),
timeout_msg="nvidia-device-plugin-daemonset not running",
)
# Do an addition on the GPU just be sure.
# First clean any previous runs
here = os.path.dirname(os.path.abspath(__file__))
await scp_to(
os.path.join(here, "templates", "cuda-add.yaml"),
master_unit,
"cuda-add.yaml",
tools.controller_name,
tools.connection,
)
await master_unit.run("/snap/bin/kubectl delete -f /home/ubuntu/cuda-add.yaml")
await retry_async_with_timeout(
verify_deleted,
(master_unit, "po", "cuda-vector-add", "-n default"),
timeout_msg="Cleaning of cuda-vector-add pod failed",
)
# Run the cuda addition
cmd = await master_unit.run(
"/snap/bin/kubectl create -f /home/ubuntu/cuda-add.yaml"
)
if not cmd.results["Code"] == "0":
log("Failed to create cuda-add pod test!")
log(cmd.results)
assert False
async def cuda_test(master):
action = await master.run("/snap/bin/kubectl log cuda-vector-add")
log(action.results["Stdout"])
return action.results["Stdout"].count("Test PASSED") > 0
await retry_async_with_timeout(
cuda_test,
(master_unit,),
timeout_msg="Cuda test did not pass",
timeout_insec=1200,
)
@pytest.mark.asyncio
async def test_extra_args(model, tools):
async def get_filtered_service_args(app, service):
results = []
for unit in app.units:
while True:
action = await unit.run("pgrep -a " + service)
assert action.status == "completed"
if action.data["results"]["Code"] == "0":
raw_output = action.data["results"]["Stdout"]
arg_string = raw_output.partition(" ")[2].partition(" ")[2]
args = {arg.strip() for arg in arg_string.split("--")[1:]}
results.append(args)
break
await asyncio.sleep(0.5)
# charms sometimes choose the master randomly, filter out the master
# arg so we can do comparisons reliably
results = [
{arg for arg in args if not arg.startswith("master=")} for args in results
]
return results
async def run_extra_args_test(app_name, new_config, expected_args):
app = model.applications[app_name]
original_config = await app.get_config()
original_args = {}
for service in expected_args:
original_args[service] = await get_filtered_service_args(app, service)
await app.set_config(new_config)
await tools.juju_wait()
with timeout_for_current_task(600):
try:
for service, expected_service_args in expected_args.items():
while True:
args_per_unit = await get_filtered_service_args(app, service)
if all(expected_service_args <= args for args in args_per_unit):
break
await asyncio.sleep(0.5)
except asyncio.CancelledError:
log("Dumping locals:\n" + pformat(locals()))
raise
filtered_original_config = {
key: original_config[key]["value"] for key in new_config
}
await app.set_config(filtered_original_config)
await tools.juju_wait()
with timeout_for_current_task(600):
try:
for service, original_service_args in original_args.items():
while True:
new_args = await get_filtered_service_args(app, service)
if new_args == original_service_args:
break
await asyncio.sleep(0.5)
except asyncio.CancelledError:
log("Dumping locals:\n" + pformat(locals()))
raise
master_task = run_extra_args_test(
app_name="kubernetes-master",
new_config={
"api-extra-args": " ".join(
[
"min-request-timeout=314", # int arg, overrides a charm default
"watch-cache", # bool arg, implied true
"profiling=false", # bool arg, explicit false
]
),
"controller-manager-extra-args": " ".join(
[
"v=3", # int arg, overrides a charm default
"profiling", # bool arg, implied true
"contention-profiling=false", # bool arg, explicit false
]
),
"scheduler-extra-args": " ".join(
[
"v=3", # int arg, overrides a charm default
"profiling", # bool arg, implied true
"contention-profiling=false", # bool arg, explicit false
]
),
},
expected_args={
"kube-apiserver": {
"min-request-timeout=314",
"watch-cache",
"profiling=false",
},
"kube-controller": {"v=3", "profiling", "contention-profiling=false"},
"kube-scheduler": {"v=3", "profiling", "contention-profiling=false"},
},
)
worker_task = run_extra_args_test(
app_name="kubernetes-worker",
new_config={
"kubelet-extra-args": " ".join(
[
"v=1", # int arg, overrides a charm default
"enable-server", # bool arg, implied true
"alsologtostderr=false", # bool arg, explicit false
]
),
"proxy-extra-args": " ".join(
[
"v=1", # int arg, overrides a charm default
"profiling", # bool arg, implied true
"alsologtostderr=false", # bool arg, explicit false
]
),
},
expected_args={
"kubelet": {"v=1", "enable-server", "alsologtostderr=false"},
"kube-proxy": {"v=1", "profiling", "alsologtostderr=false"},
},
)
await asyncio.gather(master_task, worker_task)
@pytest.mark.asyncio
async def test_kubelet_extra_config(model, tools):
worker_app = model.applications["kubernetes-worker"]
k8s_version_str = worker_app.data["workload-version"]
k8s_minor_version = tuple(int(i) for i in k8s_version_str.split(".")[:2])
if k8s_minor_version < (1, 10):
log("skipping, k8s version v" + k8s_version_str)
return
config = await worker_app.get_config()
old_extra_config = config["kubelet-extra-config"]["value"]
# set the new config
new_extra_config = yaml.dump(
{
# maxPods, because it can be observed in the Node object
"maxPods": 111,
# evictionHard/memory.available, because it has a nested element
"evictionHard": {"memory.available": "200Mi"},
# authentication/webhook/enabled, so we can confirm that other
# items in the authentication section are preserved
"authentication": {"webhook": {"enabled": False}},
}
)
await set_config_and_wait(
worker_app, {"kubelet-extra-config": new_extra_config}, tools
)
# wait for and validate new maxPods value
log("waiting for nodes to show new pod capacity")
master_unit = model.applications["kubernetes-master"].units[0]
while True:
cmd = "/snap/bin/kubectl -o yaml get node"
action = await master_unit.run(str(cmd))
if action.status == "completed" and action.results["Code"] == "0":
nodes = yaml.safe_load(action.results["Stdout"])
all_nodes_updated = all(
[node["status"]["capacity"]["pods"] == "111" for node in nodes["items"]]
)
if all_nodes_updated:
break
await asyncio.sleep(0.5)
# validate config.yaml on each worker
log("validating generated config.yaml files")
for worker_unit in worker_app.units:
cmd = "cat /root/cdk/kubelet/config.yaml"
action = await worker_unit.run(cmd)
if action.status == "completed" and action.results["Code"] == "0":
config = yaml.safe_load(action.results["Stdout"])
assert config["evictionHard"]["memory.available"] == "200Mi"
assert config["authentication"]["webhook"]["enabled"] is False
assert "anonymous" in config["authentication"]
assert "x509" in config["authentication"]
# clean up
await set_config_and_wait(
worker_app, {"kubelet-extra-config": old_extra_config}, tools
)
@pytest.mark.asyncio
async def test_sans(model):
example_domain = "santest.example.com"
app = model.applications["kubernetes-master"]
original_config = await app.get_config()
lb = None
original_lb_config = None
if "kubeapi-load-balancer" in model.applications:
lb = model.applications["kubeapi-load-balancer"]
original_lb_config = await lb.get_config()
async def get_server_certs():
results = []
for unit in app.units:
action = await unit.run(
"openssl s_client -connect 127.0.0.1:6443 </dev/null 2>/dev/null | openssl x509 -text"
)
assert action.status == "completed"
raw_output = action.data["results"]["Stdout"]
results.append(raw_output)
# if there is a load balancer, ask it as well
if lb is not None:
for unit in lb.units:
action = await unit.run(
"openssl s_client -connect 127.0.0.1:443 </dev/null 2>/dev/null | openssl x509 -text"
)
assert action.status == "completed"
raw_output = action.data["results"]["Stdout"]
results.append(raw_output)
return results
async def all_certs_removed():
certs = await get_server_certs()
if any(example_domain in cert for cert in certs):
return False
return True
async def all_certs_in_place():
certs = await get_server_certs()
if not all(example_domain in cert for cert in certs):
return False
return True
# add san to extra san list
await app.set_config({"extra_sans": example_domain})
if lb is not None:
await lb.set_config({"extra_sans": example_domain})
# wait for server certs to update
await retry_async_with_timeout(
all_certs_in_place,
(),
timeout_msg="extra sans config did not propagate to server certs",
)
# now remove it
await app.set_config({"extra_sans": ""})
if lb is not None:
await lb.set_config({"extra_sans": ""})
# verify it went away
await retry_async_with_timeout(
all_certs_removed,
(),
timeout_msg="extra sans config did not propagate to server certs",
)
# reset back to what they had before
await app.set_config({"extra_sans": original_config["extra_sans"]["value"]})
if lb is not None and original_lb_config is not None:
await lb.set_config({"extra_sans": original_lb_config["extra_sans"]["value"]})
@pytest.mark.asyncio
async def test_audit_default_config(model, tools):
app = model.applications["kubernetes-master"]
# Ensure we're using default configuration
await reset_audit_config(app, tools)
# Verify new entries are being logged
unit = app.units[0]
before_date = await get_last_audit_entry_date(unit)
await asyncio.sleep(0.5)
await run_until_success(unit, "/snap/bin/kubectl get po")
after_date = await get_last_audit_entry_date(unit)
assert after_date > before_date
# Verify total log size is less than 1 GB
raw = await run_until_success(unit, "du -bs /root/cdk/audit")
size_in_bytes = int(raw.split()[0])
log("Audit log size in bytes: %d" % size_in_bytes)
max_size_in_bytes = 1000 * 1000 * 1000 * 1.01 # 1 GB, plus some tolerance
assert size_in_bytes <= max_size_in_bytes
# Clean up
await reset_audit_config(app, tools)
@pytest.mark.asyncio
async def test_audit_empty_policy(model, tools):
app = model.applications["kubernetes-master"]
# Set audit-policy to blank
await reset_audit_config(app, tools)
await set_config_and_wait(app, {"audit-policy": ""}, tools)
# Verify no entries are being logged
unit = app.units[0]
before_date = await get_last_audit_entry_date(unit)
await asyncio.sleep(0.5)
await run_until_success(unit, "/snap/bin/kubectl get po")
after_date = await get_last_audit_entry_date(unit)
assert after_date == before_date
# Clean up
await reset_audit_config(app, tools)
@pytest.mark.asyncio
async def test_audit_custom_policy(model, tools):
app = model.applications["kubernetes-master"]
# Set a custom policy that only logs requests to a special namespace
namespace = "validate-audit-custom-policy"
policy = {
"apiVersion": "audit.k8s.io/v1beta1",
"kind": "Policy",
"rules": [{"level": "Metadata", "namespaces": [namespace]}, {"level": "None"}],
}
await reset_audit_config(app, tools)
await set_config_and_wait(app, {"audit-policy": yaml.dump(policy)}, tools)
# Verify no entries are being logged
unit = app.units[0]
before_date = await get_last_audit_entry_date(unit)
await asyncio.sleep(0.5)
await run_until_success(unit, "/snap/bin/kubectl get po")
after_date = await get_last_audit_entry_date(unit)
assert after_date == before_date
# Create our special namespace
namespace_definition = {
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {"name": namespace},
}
path = "/tmp/validate_audit_custom_policy-namespace.yaml"
with NamedTemporaryFile("w") as f:
json.dump(namespace_definition, f)
f.flush()
await scp_to(f.name, unit, path, tools.controller_name, tools.connection)
await run_until_success(unit, "/snap/bin/kubectl create -f " + path)
# Verify our very special request gets logged
before_date = await get_last_audit_entry_date(unit)
await asyncio.sleep(0.5)
await run_until_success(unit, "/snap/bin/kubectl get po -n " + namespace)
after_date = await get_last_audit_entry_date(unit)
assert after_date > before_date
# Clean up
await run_until_success(unit, "/snap/bin/kubectl delete ns " + namespace)
await reset_audit_config(app, tools)
@pytest.mark.asyncio
async def test_audit_webhook(model, tools):
app = model.applications["kubernetes-master"]
unit = app.units[0]
async def get_webhook_server_entry_count():
cmd = "/snap/bin/kubectl logs test-audit-webhook"
raw = await run_until_success(unit, cmd)
lines = raw.splitlines()
count = len(lines)
return count
# Deploy an nginx target for webhook
cmd = "/snap/bin/kubectl delete --ignore-not-found po test-audit-webhook"
await run_until_success(unit, cmd)
cmd = "/snap/bin/kubectl run test-audit-webhook --image nginx:1.15.0-alpine --restart Never"
await run_until_success(unit, cmd)
nginx_ip = None
while nginx_ip is None:
cmd = "/snap/bin/kubectl get po -o json test-audit-webhook"
raw = await run_until_success(unit, cmd)
pod = json.loads(raw)
nginx_ip = pod["status"].get("podIP", None)
# Set audit config with webhook enabled
audit_webhook_config = {
"apiVersion": "v1",
"kind": "Config",
"clusters": [
{"name": "test-audit-webhook", "cluster": {"server": "http://" + nginx_ip}}
],
"contexts": [
{"name": "test-audit-webhook", "context": {"cluster": "test-audit-webhook"}}
],
"current-context": "test-audit-webhook",
}
await reset_audit_config(app, tools)
await set_config_and_wait(
app,
{
"audit-webhook-config": yaml.dump(audit_webhook_config),
"api-extra-args": "audit-webhook-mode=blocking",
},
tools,
)
# Ensure webhook log is growing
before_count = await get_webhook_server_entry_count()
await run_until_success(unit, "/snap/bin/kubectl get po")
after_count = await get_webhook_server_entry_count()
assert after_count > before_count
# Clean up
await reset_audit_config(app, tools)