-
Notifications
You must be signed in to change notification settings - Fork 899
/
Copy pathutil.py
3083 lines (2610 loc) · 91.2 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (C) 2012 Canonical Ltd.
# Copyright (C) 2012, 2013 Hewlett-Packard Development Company, L.P.
# Copyright (C) 2012 Yahoo! Inc.
#
# Author: Scott Moser <scott.moser@canonical.com>
# Author: Juerg Haefliger <juerg.haefliger@hp.com>
# Author: Joshua Harlow <harlowja@yahoo-inc.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
import binascii
import contextlib
import copy as obj_copy
import email
import glob
import grp
import gzip
import hashlib
import io
import json
import logging
import os
import os.path
import platform
import pwd
import random
import re
import shlex
import shutil
import socket
import stat
import string
import subprocess
import sys
import time
from base64 import b64decode
from collections import deque
from contextlib import contextmanager, suppress
from errno import ENOENT
from functools import lru_cache
from pathlib import Path
from types import ModuleType
from typing import (
IO,
TYPE_CHECKING,
Any,
Callable,
Deque,
Dict,
Generator,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Union,
cast,
)
from urllib import parse
import yaml
from cloudinit import (
features,
importer,
mergers,
net,
performance,
settings,
subp,
temp_utils,
type_utils,
url_helper,
version,
)
from cloudinit.log.log_util import logexc
from cloudinit.settings import CFG_BUILTIN, PER_ONCE
if TYPE_CHECKING:
# Avoid circular import
from cloudinit.helpers import Paths
_DNS_REDIRECT_IP = None
LOG = logging.getLogger(__name__)
# Helps cleanup filenames to ensure they aren't FS incompatible
FN_REPLACEMENTS = {
os.sep: "_",
}
FN_ALLOWED = "_-.()" + string.digits + string.ascii_letters
TRUE_STRINGS = ("true", "1", "on", "yes")
FALSE_STRINGS = ("off", "0", "no", "false")
def kernel_version():
return tuple(map(int, os.uname().release.split(".")[:2]))
@lru_cache()
def get_dpkg_architecture():
"""Return the sanitized string output by `dpkg --print-architecture`.
N.B. This function is wrapped in functools.lru_cache, so repeated calls
won't shell out every time.
"""
out = subp.subp(["dpkg", "--print-architecture"], capture=True)
return out.stdout.strip()
@lru_cache()
def lsb_release():
fmap = {
"Codename": "codename",
"Description": "description",
"Distributor ID": "id",
"Release": "release",
}
data = {}
try:
out = subp.subp(["lsb_release", "--all"], capture=True)
for line in out.stdout.splitlines():
fname, _, val = line.partition(":")
if fname in fmap:
data[fmap[fname]] = val.strip()
missing = [k for k in fmap.values() if k not in data]
if len(missing):
LOG.warning(
"Missing fields in lsb_release --all output: %s",
",".join(missing),
)
except subp.ProcessExecutionError as err:
LOG.warning("Unable to get lsb_release --all: %s", err)
data = dict((v, "UNAVAILABLE") for v in fmap.values())
return data
def decode_binary(blob: Union[str, bytes], encoding="utf-8") -> str:
# Converts a binary type into a text type using given encoding.
return blob if isinstance(blob, str) else blob.decode(encoding=encoding)
def encode_text(text: Union[str, bytes], encoding="utf-8") -> bytes:
# Converts a text string into a binary type using given encoding.
return text if isinstance(text, bytes) else text.encode(encoding=encoding)
@performance.timed("Base64 decoding")
def maybe_b64decode(data: bytes) -> bytes:
"""base64 decode data
If data is base64 encoded bytes, return b64decode(data).
If not, return data unmodified.
@param data: data as bytes. TypeError is raised if not bytes.
"""
if not isinstance(data, bytes):
raise TypeError("data is '%s', expected bytes" % type(data))
try:
return b64decode(data, validate=True)
except binascii.Error:
return data
def fully_decoded_payload(part):
# In Python 3, decoding the payload will ironically hand us a bytes object.
# 'decode' means to decode according to Content-Transfer-Encoding, not
# according to any charset in the Content-Type. So, if we end up with
# bytes, first try to decode to str via CT charset, and failing that, try
# utf-8 using surrogate escapes.
cte_payload = part.get_payload(decode=True)
if part.get_content_maintype() == "text" and isinstance(
cte_payload, bytes
):
charset = part.get_charset()
if charset and charset.input_codec:
encoding = charset.input_codec
else:
encoding = "utf-8"
return cte_payload.decode(encoding, "surrogateescape")
return cte_payload
class SeLinuxGuard:
def __init__(self, path, recursive=False):
# Late import since it might not always
# be possible to use this
self.selinux: Optional[ModuleType]
try:
self.selinux = importer.import_module("selinux")
except ImportError:
self.selinux = None
self.path = path
self.recursive = recursive
def __enter__(self):
if self.selinux and self.selinux.is_selinux_enabled():
return True
else:
return False
def __exit__(self, excp_type, excp_value, excp_traceback):
if not self.selinux or not self.selinux.is_selinux_enabled():
return
if not os.path.lexists(self.path):
return
path = os.path.realpath(self.path)
try:
stats = os.lstat(path)
self.selinux.matchpathcon(path, stats[stat.ST_MODE])
except OSError:
return
LOG.debug(
"Restoring selinux mode for %s (recursive=%s)",
path,
self.recursive,
)
try:
self.selinux.restorecon(path, recursive=self.recursive)
except OSError as e:
LOG.warning(
"restorecon failed on %s,%s maybe badness? %s",
path,
self.recursive,
e,
)
class MountFailedError(Exception):
pass
class DecompressionError(Exception):
pass
def fork_cb(child_cb, *args, **kwargs):
fid = os.fork()
if fid == 0:
try:
child_cb(*args, **kwargs)
os._exit(0)
except Exception:
logexc(
LOG,
"Failed forking and calling callback %s",
type_utils.obj_name(child_cb),
)
os._exit(1)
else:
LOG.debug(
"Forked child %s who will run callback %s",
fid,
type_utils.obj_name(child_cb),
)
def is_true(val, addons=None):
if isinstance(val, (bool)):
return val is True
check_set = TRUE_STRINGS
if addons:
check_set = list(check_set) + addons
if str(val).lower().strip() in check_set:
return True
return False
def is_false(val, addons=None):
if isinstance(val, (bool)):
return val is False
check_set = FALSE_STRINGS
if addons:
check_set = list(check_set) + addons
if str(val).lower().strip() in check_set:
return True
return False
def translate_bool(val, addons=None):
if not val:
# This handles empty lists and false and
# other things that python believes are false
return False
# If its already a boolean skip
if isinstance(val, (bool)):
return val
return is_true(val, addons)
def rand_str(strlen=32, select_from=None):
r = random.SystemRandom()
if not select_from:
select_from = string.ascii_letters + string.digits
return "".join([r.choice(select_from) for _x in range(strlen)])
def rand_dict_key(dictionary, postfix=None):
if not postfix:
postfix = ""
while True:
newkey = rand_str(strlen=8) + "_" + postfix
if newkey not in dictionary:
break
return newkey
def read_conf(fname, *, instance_data_file=None) -> Dict:
"""Read a yaml config with optional template, and convert to dict"""
# Avoid circular import
from cloudinit.handlers.jinja_template import (
JinjaLoadError,
JinjaSyntaxParsingException,
NotJinjaError,
render_jinja_payload_from_file,
)
try:
config_file = load_text_file(fname)
except FileNotFoundError:
return {}
if instance_data_file and os.path.exists(instance_data_file):
try:
config_file = render_jinja_payload_from_file(
config_file,
fname,
instance_data_file,
)
LOG.debug(
"Applied instance data in '%s' to "
"configuration loaded from '%s'",
instance_data_file,
fname,
)
except JinjaSyntaxParsingException as e:
LOG.warning(
"Failed to render templated yaml config file '%s'. %s",
fname,
e,
)
except NotJinjaError:
# A log isn't appropriate here as we generally expect most
# cloud.cfgs to not be templated. The other path is logged
pass
except JinjaLoadError as e:
LOG.warning(
"Could not apply Jinja template '%s' to '%s'. "
"Exception: %s",
instance_data_file,
config_file,
repr(e),
)
return load_yaml(config_file, default={}) # pyright: ignore
# Merges X lists, and then keeps the
# unique ones, but orders by sort order
# instead of by the original order
def uniq_merge_sorted(*lists):
return sorted(uniq_merge(*lists))
# Merges X lists and then iterates over those
# and only keeps the unique items (order preserving)
# and returns that merged and uniqued list as the
# final result.
#
# Note: if any entry is a string it will be
# split on commas and empty entries will be
# evicted and merged in accordingly.
def uniq_merge(*lists):
combined_list = []
for a_list in lists:
if isinstance(a_list, str):
a_list = a_list.strip().split(",")
# Kickout the empty ones
a_list = [a for a in a_list if a]
combined_list.extend(a_list)
return uniq_list(combined_list)
def clean_filename(fn):
for k, v in FN_REPLACEMENTS.items():
fn = fn.replace(k, v)
removals = []
for k in fn:
if k not in FN_ALLOWED:
removals.append(k)
for k in removals:
fn = fn.replace(k, "")
fn = fn.strip()
return fn
def decomp_gzip(data, quiet=True, decode=True):
try:
with io.BytesIO(encode_text(data)) as buf, gzip.GzipFile(
None, "rb", 1, buf
) as gh:
if decode:
return decode_binary(gh.read())
else:
return gh.read()
except Exception as e:
if quiet:
return data
else:
raise DecompressionError(str(e)) from e
def extract_usergroup(ug_pair):
if not ug_pair:
return (None, None)
ug_parted = ug_pair.split(":", 1)
u = ug_parted[0].strip()
if len(ug_parted) == 2:
g = ug_parted[1].strip()
else:
g = None
if not u or u == "-1" or u.lower() == "none":
u = None
if not g or g == "-1" or g.lower() == "none":
g = None
return (u, g)
def get_modules_from_dir(root_dir: str) -> dict:
entries = dict()
for fname in glob.glob(os.path.join(root_dir, "*.py")):
if not os.path.isfile(fname):
continue
modname = os.path.basename(fname)[0:-3]
modname = modname.strip()
if modname and modname.find(".") == -1:
entries[fname] = modname
return entries
@lru_cache()
def is_Linux():
"""deprecated: prefer Distro object's `is_linux` property
Multiple sources of truth is bad, and already know whether we are
working with Linux from the Distro class. Using Distro offers greater code
reusablity, cleaner code, and easier maintenance.
"""
return "Linux" in platform.system()
@lru_cache()
def is_BSD():
if "BSD" in platform.system():
return True
if platform.system() == "DragonFly":
return True
return False
@lru_cache()
def is_FreeBSD():
return system_info()["variant"] == "freebsd"
@lru_cache()
def is_DragonFlyBSD():
return system_info()["variant"] == "dragonfly"
@lru_cache()
def is_NetBSD():
return system_info()["variant"] == "netbsd"
@lru_cache()
def is_OpenBSD():
return system_info()["variant"] == "openbsd"
def get_cfg_option_bool(yobj, key, default=False):
if key not in yobj:
return default
return translate_bool(yobj[key])
def get_cfg_option_str(yobj, key, default=None):
if key not in yobj:
return default
val = yobj[key]
if not isinstance(val, str):
val = str(val)
return val
def get_cfg_option_int(yobj, key, default=0):
return int(get_cfg_option_str(yobj, key, default=default))
def _parse_redhat_release(release_file=None):
"""Return a dictionary of distro info fields from /etc/redhat-release.
Dict keys will align with /etc/os-release keys:
ID, VERSION_ID, VERSION_CODENAME
"""
if not release_file:
release_file = "/etc/redhat-release"
if not os.path.exists(release_file):
return {}
redhat_release = load_text_file(release_file)
redhat_regex = (
r"(?P<name>.+) release (?P<version>[\d\.]+) "
r"\((?P<codename>[^)]+)\)"
)
# Virtuozzo deviates here
if "Virtuozzo" in redhat_release:
redhat_regex = r"(?P<name>.+) release (?P<version>[\d\.]+)"
match = re.match(redhat_regex, redhat_release)
if match:
group = match.groupdict()
# Virtuozzo has no codename in this file
if "Virtuozzo" in group["name"]:
group["codename"] = group["name"]
group["name"] = group["name"].lower().partition(" linux")[0]
if group["name"] == "red hat enterprise":
group["name"] = "redhat"
return {
"ID": group["name"],
"VERSION_ID": group["version"],
"VERSION_CODENAME": group["codename"],
}
return {}
@lru_cache()
def get_linux_distro():
distro_name = ""
distro_version = ""
flavor = ""
os_release = {}
os_release_rhel = False
if os.path.exists("/etc/os-release"):
os_release = load_shell_content(load_text_file("/etc/os-release"))
if not os_release:
os_release_rhel = True
os_release = _parse_redhat_release()
if os_release:
distro_name = os_release.get("ID", "")
distro_version = os_release.get("VERSION_ID", "")
if "sles" in distro_name or "suse" in distro_name:
# RELEASE_BLOCKER: We will drop this sles divergent behavior in
# the future so that get_linux_distro returns a named tuple
# which will include both version codename and architecture
# on all distributions.
flavor = platform.machine()
elif distro_name == "alpine" or distro_name == "photon":
flavor = os_release.get("PRETTY_NAME", "")
elif distro_name == "virtuozzo" and not os_release_rhel:
# Only use this if the redhat file is not parsed
flavor = os_release.get("PRETTY_NAME", "")
else:
flavor = os_release.get("VERSION_CODENAME", "")
if not flavor:
match = re.match(
r"[^ ]+ \((?P<codename>[^)]+)\)",
os_release.get("VERSION", ""),
)
if match:
flavor = match.groupdict()["codename"]
if distro_name == "rhel":
distro_name = "redhat"
elif is_BSD():
distro_name = platform.system().lower()
distro_version = platform.release()
else:
dist = ("", "", "")
try:
# Was removed in 3.8
dist = platform.dist() # type: ignore # pylint: disable=W1505,E1101
except Exception:
pass
finally:
found = None
for entry in dist:
if entry:
found = 1
if not found:
LOG.warning(
"Unable to determine distribution, template "
"expansion may have unexpected results"
)
return dist
return (distro_name, distro_version, flavor)
def _get_variant(info):
system = info["system"].lower()
variant = "unknown"
if system == "linux":
linux_dist = info["dist"][0].lower()
if linux_dist in (
"almalinux",
"alpine",
"aosc",
"arch",
"azurelinux",
"centos",
"cloudlinux",
"debian",
"eurolinux",
"fedora",
"mariner",
"miraclelinux",
"openeuler",
"opencloudos",
"openmandriva",
"photon",
"rhel",
"rocky",
"suse",
"tencentos",
"virtuozzo",
):
variant = linux_dist
elif linux_dist in ("ubuntu", "linuxmint", "mint"):
variant = "ubuntu"
elif linux_dist == "redhat":
variant = "rhel"
elif linux_dist in (
"opensuse",
"opensuse-leap",
"opensuse-microos",
"opensuse-tumbleweed",
"sle_hpc",
"sle-micro",
"sles",
):
variant = "suse"
else:
variant = "linux"
elif system in (
"windows",
"darwin",
"freebsd",
"netbsd",
"openbsd",
"dragonfly",
):
variant = system
return variant
@lru_cache()
def system_info():
info = {
"platform": platform.platform(),
"system": platform.system(),
"release": platform.release(),
"python": platform.python_version(),
"uname": list(platform.uname()),
"dist": get_linux_distro(),
}
info["variant"] = _get_variant(info)
return info
def get_cfg_option_list(yobj, key, default=None):
"""
Gets the C{key} config option from C{yobj} as a list of strings. If the
key is present as a single string it will be returned as a list with one
string arg.
@param yobj: The configuration object.
@param key: The configuration key to get.
@param default: The default to return if key is not found.
@return: The configuration option as a list of strings or default if key
is not found.
"""
if key not in yobj:
return default
if yobj[key] is None:
return []
val = yobj[key]
if isinstance(val, (list)):
cval = [v for v in val]
return cval
if not isinstance(val, str):
val = str(val)
return [val]
# get a cfg entry by its path array
# for f['a']['b']: get_cfg_by_path(mycfg,('a','b'))
def get_cfg_by_path(yobj, keyp, default=None):
"""Return the value of the item at path C{keyp} in C{yobj}.
example:
get_cfg_by_path({'a': {'b': {'num': 4}}}, 'a/b/num') == 4
get_cfg_by_path({'a': {'b': {'num': 4}}}, 'c/d') == None
@param yobj: A dictionary.
@param keyp: A path inside yobj. it can be a '/' delimited string,
or an iterable.
@param default: The default to return if the path does not exist.
@return: The value of the item at keyp."
is not found."""
if isinstance(keyp, str):
keyp = keyp.split("/")
cur = yobj
for tok in keyp:
if tok not in cur:
return default
cur = cur[tok]
return cur
def fixup_output(cfg, mode):
(outfmt, errfmt) = get_output_cfg(cfg, mode)
redirect_output(outfmt, errfmt)
return (outfmt, errfmt)
# redirect_output(outfmt, errfmt, orig_out, orig_err)
# replace orig_out and orig_err with filehandles specified in outfmt or errfmt
# fmt can be:
# > FILEPATH
# >> FILEPATH
# | program [ arg1 [ arg2 [ ... ] ] ]
#
# with a '|', arguments are passed to shell, so one level of
# shell escape is required.
#
# if _CLOUD_INIT_SAVE_STDOUT is set in environment to a non empty and true
# value then output input will not be closed (useful for debugging).
#
def redirect_output(outfmt, errfmt, o_out=None, o_err=None):
if is_true(os.environ.get("_CLOUD_INIT_SAVE_STDOUT")):
LOG.debug("Not redirecting output due to _CLOUD_INIT_SAVE_STDOUT")
return
if not o_out:
o_out = sys.stdout
if not o_err:
o_err = sys.stderr
# pylint: disable=subprocess-popen-preexec-fn
def set_subprocess_umask_and_gid():
"""Reconfigure umask and group ID to create output files securely.
This is passed to subprocess.Popen as preexec_fn, so it is executed in
the context of the newly-created process. It:
* sets the umask of the process so created files aren't world-readable
* if an adm group exists in the system, sets that as the process' GID
(so that the created file(s) are owned by root:adm)
"""
os.umask(0o037)
try:
group_id = grp.getgrnam("adm").gr_gid
except KeyError:
# No adm group, don't set a group
pass
else:
os.setgid(group_id)
if outfmt:
LOG.debug("Redirecting %s to %s", o_out, outfmt)
(mode, arg) = outfmt.split(" ", 1)
if mode == ">" or mode == ">>":
owith = "ab"
if mode == ">":
owith = "wb"
new_fp = open(arg, owith)
elif mode == "|":
proc = subprocess.Popen(
arg,
shell=True,
stdin=subprocess.PIPE,
preexec_fn=set_subprocess_umask_and_gid,
)
# As stdin is PIPE, then proc.stdin is IO[bytes]
# https://docs.python.org/3/library/subprocess.html#subprocess.Popen.stdin
new_fp = cast(IO[Any], proc.stdin)
else:
raise TypeError("Invalid type for output format: %s" % outfmt)
if o_out:
os.dup2(new_fp.fileno(), o_out.fileno())
if errfmt == outfmt:
LOG.debug("Redirecting %s to %s", o_err, outfmt)
os.dup2(new_fp.fileno(), o_err.fileno())
return
if errfmt:
LOG.debug("Redirecting %s to %s", o_err, errfmt)
(mode, arg) = errfmt.split(" ", 1)
if mode == ">" or mode == ">>":
owith = "ab"
if mode == ">":
owith = "wb"
new_fp = open(arg, owith)
elif mode == "|":
proc = subprocess.Popen(
arg,
shell=True,
stdin=subprocess.PIPE,
preexec_fn=set_subprocess_umask_and_gid,
)
# As stdin is PIPE, then proc.stdin is IO[bytes]
# https://docs.python.org/3/library/subprocess.html#subprocess.Popen.stdin
new_fp = cast(IO[Any], proc.stdin)
else:
raise TypeError("Invalid type for error format: %s" % errfmt)
if o_err:
os.dup2(new_fp.fileno(), o_err.fileno())
def mergemanydict(sources: Sequence[Mapping], reverse=False) -> dict:
"""Merge multiple dicts according to the dict merger rules.
Dict merger rules can be found in cloud-init documentation. If no mergers
have been specified, entries will be recursively added, but no values
get replaced if they already exist. Functionally, this means that the
highest priority keys must be specified first.
Example:
a = {
"a": 1,
"b": 2,
"c": [1, 2, 3],
"d": {
"a": 1,
"b": 2,
},
}
b = {
"a": 10,
"c": [4],
"d": {
"a": 3,
"f": 10,
},
"e": 20,
}
mergemanydict([a, b]) results in:
{
'a': 1,
'b': 2,
'c': [1, 2, 3],
'd': {
'a': 1,
'b': 2,
'f': 10,
},
'e': 20,
}
"""
if reverse:
sources = list(reversed(sources))
merged_cfg: dict = {}
for cfg in sources:
if cfg:
# Figure out which mergers to apply...
mergers_to_apply = mergers.dict_extract_mergers(cfg)
if not mergers_to_apply:
mergers_to_apply = mergers.default_mergers()
merger = mergers.construct(mergers_to_apply)
merged_cfg = merger.merge(merged_cfg, cfg)
return merged_cfg
@contextlib.contextmanager
def chdir(ndir):
curr = os.getcwd()
try:
os.chdir(ndir)
yield ndir
finally:
os.chdir(curr)
@contextlib.contextmanager
def umask(n_msk):
old = os.umask(n_msk)
try:
yield old
finally:
os.umask(old)
def center(text, fill, max_len):
return "{0:{fill}{align}{size}}".format(
text, fill=fill, align="^", size=max_len
)
def del_dir(path):
LOG.debug("Recursively deleting %s", path)
shutil.rmtree(path)
def read_optional_seed(fill, base="", ext="", timeout=5):
"""
returns boolean indicating success or failure (presense of files)
if files are present, populates 'fill' dictionary with 'user-data' and
'meta-data' entries
"""
try:
md, ud, vd, network = read_seeded(base=base, ext=ext, timeout=timeout)
fill["user-data"] = ud
fill["vendor-data"] = vd
fill["meta-data"] = md
fill["network-config"] = network
return True
except url_helper.UrlError as e:
if e.code == url_helper.NOT_FOUND:
return False
raise
def fetch_ssl_details(paths=None):
ssl_details = {}
# Lookup in these locations for ssl key/cert files
if not paths:
ssl_cert_paths = [
"/var/lib/cloud/data/ssl",
"/var/lib/cloud/instance/data/ssl",
]
else:
ssl_cert_paths = [
os.path.join(paths.get_ipath_cur("data"), "ssl"),
os.path.join(paths.get_cpath("data"), "ssl"),
]
ssl_cert_paths = uniq_merge(ssl_cert_paths)
ssl_cert_paths = [d for d in ssl_cert_paths if d and os.path.isdir(d)]
cert_file = None
for d in ssl_cert_paths:
if os.path.isfile(os.path.join(d, "cert.pem")):
cert_file = os.path.join(d, "cert.pem")
break
key_file = None
for d in ssl_cert_paths:
if os.path.isfile(os.path.join(d, "key.pem")):
key_file = os.path.join(d, "key.pem")
break
if cert_file and key_file:
ssl_details["cert_file"] = cert_file
ssl_details["key_file"] = key_file
elif cert_file:
ssl_details["cert_file"] = cert_file
return ssl_details
def load_yaml(blob, default=None, allowed=(dict,)):
loaded = default
blob = decode_binary(blob)
try:
LOG.debug(
"Attempting to load yaml from string "
"of length %s with allowed root types %s",
len(blob),
allowed,
)
converted = yaml.safe_load(blob)
if converted is None:
LOG.debug("loaded blob returned None, returning default.")
converted = default
elif not isinstance(converted, allowed):
# Yes this will just be caught, but thats ok for now...
raise TypeError(
"Yaml load allows %s root types, but got %s instead"
% (allowed, type_utils.obj_name(converted))
)
loaded = converted
except (yaml.YAMLError, TypeError, ValueError) as e:
msg = "Failed loading yaml blob"
mark = None
if hasattr(e, "context_mark") and getattr(e, "context_mark"):
mark = getattr(e, "context_mark")
elif hasattr(e, "problem_mark") and getattr(e, "problem_mark"):
mark = getattr(e, "problem_mark")
if mark:
msg += (
'. Invalid format at line {line} column {col}: "{err}"'.format(