This repository has been archived by the owner on Oct 10, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathalb_cert_update.py
192 lines (152 loc) · 4.92 KB
/
alb_cert_update.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import boto3
from loguru import logger
#import pem
from config import (
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
AWS_SESSION_TOKEN,
ALB_LISTENER_ARN,
CERT_PRIVATE_KEY,
CERT_PUBLIC_KEY,
CERT_CHAIN,
)
def _read_pem_file(pem_file):
# ElbHanler just needs a byte string or a file handle
# pem breakes pem files into list of contained certs but we don't need that
# as we're explicitly passing in what we need from the config file
#cert = pem.parse_file(pem_file)
with open(pem_file, 'rb') as fp:
result = fp.read()
return result
class ElbHandler:
""" API for attaching certificate to ELB listener.
Attributes:
_client (obj): boto3.session.client with service name
of 'elbv2'.
_listener (str): ARN of the ELB listener where the certificate
will be added.
"""
def __init__(self, client):
self._client = client
if ALB_LISTENER_ARN is None:
raise ValueError("No ALB Listener ARN defined.")
self.listener = ALB_LISTENER_ARN
def add_listener_certificate(self, cert_arn, default=True):
""" Adds certificate..
Parameters:
cert_arn (str): AWS ARN of the certificate from the
AWS Certificate Manager.
default (bool): Sets the certificate to default cert
on the listener if True.
Returns:
response (dict): Dictionary containing certificates from
boto3 library.
"""
certs = [
{
"CertificateArn": cert_arn,
#"IsDefault": default,
}
]
response = self._client.add_listener_certificates(
ListenerArn=self.listener,
Certificates=certs,
)
return response
def modify_listener(self, cert_arn, default=True):
""" Adds certificate to an ELB listener and sets it as the default cert.
Parameters:
cert_arn (str): AWS ARN of the certificate from the
AWS Certificate Manager.
default (bool): Sets the certificate to default cert
on the listener if True.
Returns:
response (dict): Dictionary containing certificates from
boto3 library.
"""
certs = [
{
"CertificateArn": cert_arn,
#"IsDefault": default,
}
]
response = self._client.modify_listener(
ListenerArn=self.listener,
Certificates=certs,
)
return response
class AcmHandler:
def __init__(self, client):
self._client = client
self._private_key = None
self._public_key = None
self._chain = None
self.arn = None
@property
def private_key(self):
pass
@private_key.getter
def private_key(self):
return self._private_key
@private_key.setter
def private_key(self, pem_file):
self._private_key = _read_pem_file(pem_file)
logger.info(f"Private key staged from {pem_file}.")
@property
def public_key(self):
pass
@public_key.getter
def public_key(self):
return self._public_key
@public_key.setter
def public_key(self, pem_file):
self._public_key = _read_pem_file(pem_file)
logger.info(f"Public key staged from {pem_file}")
@property
def chain(self):
pass
@chain.getter
def chain(self):
return self._chain
@chain.setter
def chain(self, pem_file):
self._chain = _read_pem_file(pem_file)
logger.info(f"Chain file staged from {pem_file}")
def import_certificate(self):
"""
https://boto3.amazonaws.com/v1/documentation/api/latest/
reference/services/acm.html#ACM.Client.import_certificate
"""
response = self._client.import_certificate(
Certificate=self._public_key,
PrivateKey=self._private_key,
CertificateChain=self._chain,
)
self.arn = response.get('CertificateArn')
logger.info(
f"Imported certificate and received CertificateArn of {self.arn}"
)
def main():
elb_client = boto3.client(
'elbv2',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_session_token=AWS_SESSION_TOKEN,
)
acm_client = boto3.client(
'acm',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_session_token=AWS_SESSION_TOKEN,
)
elb = ElbHandler(elb_client)
acm = AcmHandler(acm_client)
acm.private_key = CERT_PRIVATE_KEY
acm.public_key = CERT_PUBLIC_KEY
acm.chain = CERT_CHAIN
acm.import_certificate()
response = elb.modify_listener(acm.arn)
logger.info(f"Cert added to listener {elb.listener}:\n{response}")
return
if __name__ == "__main__":
exit(main())