Skip to content

Commit

Permalink
Add test coverage to credentials file
Browse files Browse the repository at this point in the history
Cleaned up the code a bit before making feature changes.
  • Loading branch information
jamesls committed Sep 19, 2014
1 parent 0f0000a commit 08db225
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 33 deletions.
4 changes: 4 additions & 0 deletions awscli/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import locale
import urllib.parse as urlparse

raw_input = input

def get_stdout_text_writer():
return sys.stdout

Expand All @@ -41,6 +43,8 @@ def compat_open(filename, mode='r', encoding=None):
import io
import urlparse

raw_input = raw_input

def get_stdout_text_writer():
# In python3, all the sys.stdout/sys.stderr streams are in text
# mode. This means they expect unicode, and will encode the
Expand Down
32 changes: 14 additions & 18 deletions awscli/customizations/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@
from botocore.exceptions import ProfileNotFound

from awscli.customizations.commands import BasicCommand


try:
raw_input = raw_input
except NameError:
raw_input = input
from awscli.compat import raw_input


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -55,7 +50,7 @@ def _mask_value(current_value):
if current_value is None:
return 'None'
else:
return ('*' * 16) + current_value[-4:]
return ('*' * 16) + current_value[-4:]


class InteractivePrompter(object):
Expand Down Expand Up @@ -98,11 +93,11 @@ def update_config(self, new_values, config_filename):

def _create_file(self, config_filename):
# Create the file as well as the parent dir if needed.
dirname, basename = os.path.split(config_filename)
dirname = os.path.split(config_filename)[0]
if not os.path.isdir(dirname):
os.makedirs(dirname)
with os.fdopen(os.open(config_filename,
os.O_WRONLY|os.O_CREAT, 0o600), 'w'):
os.O_WRONLY | os.O_CREAT, 0o600), 'w'):
pass

def _write_new_section(self, section_name, new_values, config_filename):
Expand All @@ -124,15 +119,15 @@ def _find_section_start(self, contents, section_name):
if match is not None and self._matches_section(match,
section_name):
return i
else:
raise SectionNotFoundError(section_name)
raise SectionNotFoundError(section_name)

def _update_section_contents(self, contents, section_name, new_values):
# First, find the line where the section_name is defined.
# This will be the value of i.
new_values = new_values.copy()
# ``contents`` is a list of file line contents.
section_start_line_num = self._find_section_start(contents, section_name)
section_start_line_num = self._find_section_start(contents,
section_name)
# If we get here, then we've found the section. We now need
# to figure out if we're updating a value or adding a new value.
# There's 2 cases. Either we're setting a normal scalar value
Expand Down Expand Up @@ -182,7 +177,8 @@ def _update_subattributes(self, index, contents, values, starting_indent):
line = contents[i]
match = self.OPTION_REGEX.search(line)
if match is not None:
current_indent = len(match.group(1)) - len(match.group(1).lstrip())
current_indent = len(
match.group(1)) - len(match.group(1).lstrip())
key_name = match.group(1).strip()
if key_name in values:
option_value = values[key_name]
Expand All @@ -205,7 +201,8 @@ def _insert_new_values(self, line_number, contents, new_values, indent=''):
subindent = indent + ' '
new_contents.append('%s%s =\n' % (indent, key))
for subkey, subval in list(value.items()):
new_contents.append('%s%s = %s\n' % (subindent, subkey, subval))
new_contents.append('%s%s = %s\n' % (subindent, subkey,
subval))
else:
new_contents.append('%s%s = %s\n' % (indent, key, value))
del new_values[key]
Expand Down Expand Up @@ -302,9 +299,9 @@ def _lookup_credentials(self):
# the credentials.method is sufficient to show
# where the credentials are coming from.
access_key = ConfigValue(credentials.access_key,
credentials.method, '')
credentials.method, '')
secret_key = ConfigValue(credentials.secret_key,
credentials.method, '')
credentials.method, '')
access_key.mask_value()
secret_key.mask_value()
return access_key, secret_key
Expand All @@ -322,6 +319,7 @@ def _lookup_config(self, name):
else:
return ConfigValue(NOT_SET, None, None)


class ConfigureSetCommand(BasicCommand):
NAME = 'set'
DESCRIPTION = BasicCommand.FROM_FILE('configure', 'set',
Expand Down Expand Up @@ -362,7 +360,6 @@ def _run_main(self, args, parsed_globals):
section = 'profile %s' % self._session.profile
else:
# First figure out if it's been scoped to a profile.
# This will happen if
parts = varname.split('.')
if parts[0] in ('default', 'profile'):
# Then we know we're scoped to a profile.
Expand Down Expand Up @@ -456,7 +453,6 @@ def _get_dotted_config_value(self, varname):
return value



class ConfigureCommand(BasicCommand):
NAME = 'configure'
DESCRIPTION = BasicCommand.FROM_FILE()
Expand Down
76 changes: 61 additions & 15 deletions tests/unit/customizations/test_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,55 +182,70 @@ def test_session_says_profile_does_not_exist(self):


class TestInteractivePrompter(unittest.TestCase):
@mock.patch('awscli.customizations.configure.raw_input')
def test_access_key_is_masked(self, mock_raw_input):
mock_raw_input.return_value = 'foo'
def setUp(self):
self.patch = mock.patch('awscli.customizations.configure.raw_input')
self.mock_raw_input = self.patch.start()

def tearDown(self):
self.patch.stop()

def test_access_key_is_masked(self):
self.mock_raw_input.return_value = 'foo'
prompter = configure.InteractivePrompter()
response = prompter.get_value(
current_value='myaccesskey', config_name='aws_access_key_id',
prompt_text='Access key')
# First we should return the value from raw_input.
self.assertEqual(response, 'foo')
# We should also not display the entire access key.
prompt_text = mock_raw_input.call_args[0][0]
prompt_text = self.mock_raw_input.call_args[0][0]
self.assertNotIn('myaccesskey', prompt_text)
self.assertRegexpMatches(prompt_text, r'\[\*\*\*\*.*\]')

@mock.patch('awscli.customizations.configure.raw_input')
def test_access_key_not_masked_when_none(self, mock_raw_input):
mock_raw_input.return_value = 'foo'
def test_access_key_not_masked_when_none(self):
self.mock_raw_input.return_value = 'foo'
prompter = configure.InteractivePrompter()
response = prompter.get_value(
current_value=None, config_name='aws_access_key_id',
prompt_text='Access key')
# First we should return the value from raw_input.
self.assertEqual(response, 'foo')
prompt_text = mock_raw_input.call_args[0][0]
prompt_text = self.mock_raw_input.call_args[0][0]
self.assertIn('[None]', prompt_text)

@mock.patch('awscli.customizations.configure.raw_input')
def test_secret_key_is_masked(self, mock_raw_input):
def test_secret_key_is_masked(self):
prompter = configure.InteractivePrompter()
prompter.get_value(
current_value='mysupersecretkey',
config_name='aws_secret_access_key',
prompt_text='Secret Key')
# We should also not display the entire secret key.
prompt_text = mock_raw_input.call_args[0][0]
prompt_text = self.mock_raw_input.call_args[0][0]
self.assertNotIn('mysupersecretkey', prompt_text)
self.assertRegexpMatches(prompt_text, r'\[\*\*\*\*.*\]')

@mock.patch('awscli.customizations.configure.raw_input')
def test_non_secret_keys_are_not_masked(self, mock_raw_input):
def test_non_secret_keys_are_not_masked(self):
prompter = configure.InteractivePrompter()
prompter.get_value(
current_value='mycurrentvalue', config_name='not_a_secret_key',
prompt_text='Enter value')
# We should also not display the entire secret key.
prompt_text = mock_raw_input.call_args[0][0]
prompt_text = self.mock_raw_input.call_args[0][0]
self.assertIn('mycurrentvalue', prompt_text)
self.assertRegexpMatches(prompt_text, r'\[mycurrentvalue\]')

def test_user_hits_enter_returns_none(self):
# If a user hits enter, then raw_input returns the empty string.
self.mock_raw_input.return_value = ''

prompter = configure.InteractivePrompter()
response = prompter.get_value(
current_value=None, config_name='aws_access_key_id',
prompt_text='Access key')
# We convert the empty string to None to indicate that there
# was no input.
self.assertIsNone(response)


class TestConfigFileWriter(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -395,6 +410,22 @@ def test_update_config_with_comments(self):
'foo = newvalue\n'
)

def test_update_config_with_commented_section(self):
original = (
'#[default]\n'
'[default]\n'
'#foo = 1\n'
'bar = 1\n'
)
self.assert_update_config(
original, {'foo': 'newvalue'},
'#[default]\n'
'[default]\n'
'#foo = 1\n'
'bar = 1\n'
'foo = newvalue\n'
)

def test_spaces_around_key_names(self):
original = (
'[default]\n'
Expand Down Expand Up @@ -720,11 +751,26 @@ def test_configure_set_triple_dotted(self):
{'__section__': 'default', 's3': {'signature_version': 's3v4'}},
'myconfigfile')

def test_configure_set_with_profile(self):
def test_configure_set_with_profile_nested(self):
# aws configure set default.s3.signature_version s3v4
set_command = configure.ConfigureSetCommand(self.session, self.config_writer)
set_command(args=['profile.foo.s3.signature_version', 's3v4'],
parsed_globals=None)
self.config_writer.update_config.assert_called_with(
{'__section__': 'profile foo',
's3': {'signature_version': 's3v4'}}, 'myconfigfile')


class TestConfigValueMasking(unittest.TestCase):
def test_config_value_is_masked(self):
config_value = configure.ConfigValue(
'fake_access_key', 'config_file', 'aws_access_key_id')
self.assertEqual(config_value.value, 'fake_access_key')
config_value.mask_value()
self.assertEqual(config_value.value, '****************_key')

def test_dont_mask_unset_value(self):
no_config = configure.ConfigValue(configure.NOT_SET, None, None)
self.assertEqual(no_config.value, configure.NOT_SET)
no_config.mask_value()
self.assertEqual(no_config.value, configure.NOT_SET)

0 comments on commit 08db225

Please sign in to comment.