This repository has been archived by the owner on Jan 2, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
botomfa.py
185 lines (159 loc) · 6.98 KB
/
botomfa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import datetime
import logging
import os
import sys
import boto
import boto.s3
import boto.exception
from boto.sts import STSConnection
logger = logging.getLogger('botomfa')
stdout_handler = logging.StreamHandler(stream=sys.stdout)
stdout_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
stdout_handler.setLevel(logging.DEBUG)
logger.addHandler(stdout_handler)
logger.setLevel(logging.DEBUG)
def get_sts(duration, mfa_serial, mfa_device_name,
long_term, short_term, assume_role_arn=None):
if boto.config.get(long_term, 'aws_access_key_id') is None:
logger.error('aws_access_key_id is missing from section %s'
'or config file is missing.' % (long_term,))
sys.exit(1)
else:
long_term_id = boto.config.get(long_term, 'aws_access_key_id')
if boto.config.get(long_term, 'aws_secret_access_key') is None:
logger.error('aws_secret_access_key is missing from section '
'or config file is missing.' % (long_term,))
sys.exit(1)
else:
long_term_secret = boto.config.get(long_term, 'aws_secret_access_key')
if boto.config.has_section(short_term):
boto.config.remove_option(short_term, 'aws_security_token')
mfa_TOTP = raw_input('Enter AWS MFA code for user %s '
'(renewing for %s seconds):' %
(mfa_device_name, duration))
try:
sts_connection = STSConnection(aws_access_key_id=long_term_id,
aws_secret_access_key=long_term_secret)
if assume_role_arn is None:
tempCredentials = sts_connection.get_session_token(
duration=duration,
mfa_serial_number=mfa_serial,
mfa_token=mfa_TOTP)
assumed_role = 'False'
else:
role_session_name = assume_role_arn.split('/')[-1]
assumedRole = sts_connection.assume_role(
assume_role_arn, role_session_name,
duration_seconds=duration,
mfa_serial_number=mfa_serial,
mfa_token=mfa_TOTP)
tempCredentials = assumedRole.credentials
assumed_role = 'True'
default_options = [
('aws_access_key_id', tempCredentials.access_key),
('aws_secret_access_key', tempCredentials.secret_key),
('aws_security_token', tempCredentials.session_token),
('expiration', tempCredentials.expiration),
('assumed_role', assumed_role)
]
for option, value in default_options:
boto.config.save_user_option(
short_term,
option,
value
)
except boto.exception.BotoServerError as e:
message = '%s - Please try again.' % (e.message)
logger.error(message)
sys.exit(1)
def test_creds(profile_name):
try:
logger.info('Validating temporary credentials..')
if boto.config.getbool(profile_name, 'assumed_role'):
logger.info('You are currently using the credentials of an '
'AssumedRole. Use the --clear flag to clear these '
'credentials')
expiration_string = boto.config.get(profile_name, 'expiration')
if expiration_string is None:
logger.error('Expiration timestamp missing from temporary '
'credentials.')
return False
exp_dt = datetime.datetime.strptime(
expiration_string, '%Y-%m-%dT%H:%M:%SZ'
)
t_diff = exp_dt - datetime.datetime.utcnow()
if t_diff.total_seconds() <= 0:
logger.warn('Your temporary credentials have expired. '
'Attempting to renew...')
return False
# Validate against a real service. This may not be the best solution
# for everyone, as the person attempting to fetch an STS token may
# now have access to S3. This might need to be more flexible or we
# could potentially ditch this altogether?
s3 = boto.connect_s3()
s3.get_all_buckets()
logger.info(
'Temporary credentials validation successful! '
'Token expires in %s seconds at %s' %
(t_diff.seconds, expiration_string)
)
return True
except:
logger.warn('Temporary credentials are invalid.')
return False
def run(duration, aws_account_num, mfa_device_name, profile,
assume_role_arn=None):
# If no profile specified, use default
if profile is None:
logger.debug('Using default profile.')
long_term_profile = 'long-term'
short_term_profile = 'Credentials'
else:
logger.debug('Using profile: %s' % profile)
long_term_profile = '%s-%s' % (profile, 'long-term')
short_term_profile = profile
# Get AWS account number. Needed to build MFA serial
if aws_account_num is None:
logger.error('AWS Account number must be set either via '
'AWS_ACT_NUM environment variable '
'or --aws-acct-num.')
sys.exit(1)
# If your MFA device is named something other than your
# shell's username, it can be provided via MFA_USER
mfa_device_name = (mfa_device_name or
os.environ.get('USER'))
if mfa_device_name is None:
logger.error('Could retrieve MFA device name from environment '
'variables MFA_DEVICE_NAME or USER.')
sys.exit(1)
mfa_serial = 'arn:aws:iam::%s:mfa/%s' % (aws_account_num, mfa_device_name)
logger.debug('Your AWS account number is: %s' % aws_account_num)
logger.debug('Your MFA device name is: %s' % mfa_device_name)
if assume_role_arn:
logger.debug('You are assuming the role: %s' % assume_role_arn)
# if any of the section named fields are missing, prompt for token
if (
boto.config.get(short_term_profile, 'aws_access_key_id') is None or
boto.config.get(short_term_profile, 'aws_secret_access_key') is None or
boto.config.get(short_term_profile, 'aws_security_token') is None
):
logger.info(
'Temporary credentials are missing, obtaining them.')
get_sts(duration, mfa_serial,
mfa_device_name, long_term_profile,
short_term_profile, assume_role_arn)
if not test_creds(short_term_profile):
get_sts(duration, mfa_serial,
mfa_device_name, long_term_profile,
short_term_profile, assume_role_arn)
test_creds(short_term_profile)
def reset_credentials(profile=None):
short_term_profile = 'Credentials'
if profile is not None:
short_term_profile = profile
if boto.config.has_section(short_term_profile):
options = ['aws_access_key_id', 'aws_secret_access_key',
'aws_security_token', 'expiration', 'assumed_role']
for option in options:
boto.config.save_user_option(short_term_profile, option, '')