-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
__init__.py
200 lines (170 loc) · 7.84 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import json
import logging
import os
import re
import subprocess
import shutil
import tempfile
import zipfile
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# these are coming from the kubectl layer
os.environ['PATH'] = '/opt/helm:/opt/awscli:' + os.environ['PATH']
outdir = os.environ.get('TEST_OUTDIR', '/tmp')
kubeconfig = os.path.join(outdir, 'kubeconfig')
def get_chart_asset_from_url(chart_asset_url):
chart_zip = os.path.join(outdir, 'chart.zip')
shutil.rmtree(chart_zip, ignore_errors=True)
subprocess.check_call(['aws', 's3', 'cp', chart_asset_url, chart_zip])
chart_dir = os.path.join(outdir, 'chart')
shutil.rmtree(chart_dir, ignore_errors=True)
os.mkdir(chart_dir)
with zipfile.ZipFile(chart_zip, 'r') as zip_ref:
zip_ref.extractall(chart_dir)
return chart_dir
def is_ecr_public_available(region):
s = boto3.Session()
return s.get_partition_for_region(region) == 'aws'
def helm_handler(event, context):
logger.info(json.dumps(dict(event, ResponseURL='...')))
request_type = event['RequestType']
props = event['ResourceProperties']
# resource properties
cluster_name = props['ClusterName']
role_arn = props['RoleArn']
release = props['Release']
chart = props.get('Chart', None)
chart_asset_url = props.get('ChartAssetURL', None)
version = props.get('Version', None)
wait = props.get('Wait', False)
timeout = props.get('Timeout', None)
namespace = props.get('Namespace', None)
create_namespace = props.get('CreateNamespace', None)
repository = props.get('Repository', None)
values_text = props.get('Values', None)
skip_crds = props.get('SkipCrds', False)
# "log in" to the cluster
subprocess.check_call([ 'aws', 'eks', 'update-kubeconfig',
'--role-arn', role_arn,
'--name', cluster_name,
'--kubeconfig', kubeconfig
])
if os.path.isfile(kubeconfig):
os.chmod(kubeconfig, 0o600)
# Write out the values to a file and include them with the install and upgrade
values_file = None
if not request_type == "Delete" and not values_text is None:
values = json.loads(values_text)
values_file = os.path.join(outdir, 'values.yaml')
with open(values_file, "w") as f:
f.write(json.dumps(values, indent=2))
if request_type == 'Create' or request_type == 'Update':
# Ensure chart or chart_asset_url are set
if chart == None and chart_asset_url == None:
raise RuntimeError(f'chart or chartAsset must be specified')
if chart_asset_url != None:
assert(chart==None)
assert(repository==None)
assert(version==None)
if not chart_asset_url.startswith('s3://'):
raise RuntimeError(f'ChartAssetURL must point to as s3 location but is {chart_asset_url}')
# future work: support versions from s3 assets
chart = get_chart_asset_from_url(chart_asset_url)
if repository is not None and repository.startswith('oci://'):
tmpdir = tempfile.TemporaryDirectory()
chart_dir = get_chart_from_oci(tmpdir.name, repository, version)
chart = chart_dir
helm('upgrade', release, chart, repository, values_file, namespace, version, wait, timeout, create_namespace)
elif request_type == "Delete":
try:
helm('uninstall', release, namespace=namespace, timeout=timeout)
except Exception as e:
logger.info("delete error: %s" % e)
def get_oci_cmd(repository, version):
# Generates OCI command based on pattern. Public ECR vs Private ECR are treated differently.
private_ecr_pattern = 'oci://(?P<registry>\d+\.dkr\.ecr\.(?P<region>[a-z0-9\-]+)\.amazonaws\.com)*'
public_ecr_pattern = 'oci://(?P<registry>public\.ecr\.aws)*'
private_registry = re.match(private_ecr_pattern, repository).groupdict()
public_registry = re.match(public_ecr_pattern, repository).groupdict()
if private_registry['registry'] is not None:
logger.info("Found AWS private repository")
cmnd = [
f"aws ecr get-login-password --region {private_registry['region']} | " \
f"helm registry login --username AWS --password-stdin {private_registry['registry']}; helm pull {repository} --version {version} --untar"
]
elif public_registry['registry'] is not None:
logger.info("Found AWS public repository, will use default region as deployment")
region = os.environ.get('AWS_REGION', 'us-east-1')
if is_ecr_public_available(region):
cmnd = [
f"aws ecr-public get-login-password --region us-east-1 | " \
f"helm registry login --username AWS --password-stdin {public_registry['registry']}; helm pull {repository} --version {version} --untar"
]
else:
# `aws ecr-public get-login-password` and `helm registry login` not required as ecr public is not available in current region
# see https://helm.sh/docs/helm/helm_registry_login/
cmnd = [f"helm pull {repository} --version {version} --untar"]
else:
logger.error("OCI repository format not recognized, falling back to helm pull")
cmnd = [f"helm pull {repository} --version {version} --untar"]
return cmnd
def get_chart_from_oci(tmpdir, repository = None, version = None):
cmnd = get_oci_cmd(repository, version)
maxAttempts = 3
retry = maxAttempts
while retry > 0:
try:
logger.info(cmnd)
output = subprocess.check_output(cmnd, stderr=subprocess.STDOUT, cwd=tmpdir, shell=True)
logger.info(output)
# effectively returns "$tmpDir/$lastPartOfOCIUrl", because this is how helm pull saves OCI artifact.
# Eg. if we have oci://9999999999.dkr.ecr.us-east-1.amazonaws.com/foo/bar/pet-service repository, helm saves artifact under $tmpDir/pet-service
return os.path.join(tmpdir, repository.rpartition('/')[-1])
except subprocess.CalledProcessError as exc:
output = exc.output
if b'Broken pipe' in output:
retry = retry - 1
logger.info("Broken pipe, retries left: %s" % retry)
else:
raise Exception(output)
raise Exception(f'Operation failed after {maxAttempts} attempts: {output}')
def helm(verb, release, chart = None, repo = None, file = None, namespace = None, version = None, wait = False, timeout = None, create_namespace = None, skip_crds = False):
import subprocess
cmnd = ['helm', verb, release]
if not chart is None:
cmnd.append(chart)
if verb == 'upgrade':
cmnd.append('--install')
if create_namespace:
cmnd.append('--create-namespace')
if not repo is None:
cmnd.extend(['--repo', repo])
if not file is None:
cmnd.extend(['--values', file])
if not version is None:
cmnd.extend(['--version', version])
if not namespace is None:
cmnd.extend(['--namespace', namespace])
if wait:
cmnd.append('--wait')
if skip_crds:
cmnd.append('--skip-crds')
if not timeout is None:
cmnd.extend(['--timeout', timeout])
cmnd.extend(['--kubeconfig', kubeconfig])
maxAttempts = 3
retry = maxAttempts
while retry > 0:
try:
output = subprocess.check_output(cmnd, stderr=subprocess.STDOUT, cwd=outdir)
logger.info(output)
return
except subprocess.CalledProcessError as exc:
output = exc.output
if b'Broken pipe' in output:
retry = retry - 1
logger.info("Broken pipe, retries left: %s" % retry)
else:
raise Exception(output)
raise Exception(f'Operation failed after {maxAttempts} attempts: {output}')