Skip to content

Commit

Permalink
SOPS 3.9.0: use encrypt/decrypt subcommands, use --filename-override …
Browse files Browse the repository at this point in the history
…option for encryption, use filestatus to check for encrypted files (#190)

* SOPS 3.9.0: use encrypt/decrypt subcommands, use --filename-override option for encryption.

* Add regression test for #153.

* Use filestatus for sOPS 3.9.0+ to determine whether a file that cannot be decrypted is unencrypted.

* Refactoring tests a bit.

* Allow tests to have min/max SOPS version; add tests for handle_unencrypted_files.
  • Loading branch information
felixfontein authored Jul 7, 2024
1 parent ca9c8a9 commit ed0318a
Show file tree
Hide file tree
Showing 29 changed files with 483 additions and 77 deletions.
5 changes: 5 additions & 0 deletions changelogs/fragments/190-sops-3.9.0.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
minor_changes:
- "Detect SOPS 3.9.0 and use new ``decrypt`` and ``encrypt`` subcommands (https://github.com/ansible-collections/community.sops/pull/190)."
- "sops vars plugin - new option ``handle_unencrypted_files`` allows to control behavior when encountering unencrypted files with SOPS 3.9.0+ (https://github.com/ansible-collections/community.sops/pull/190)."
bugfixes:
- "sops_encrypt - properly support ``path_regex`` in ``.sops.yaml`` when SOPS 3.9.0 or later is used (https://github.com/ansible-collections/community.sops/issues/153, https://github.com/ansible-collections/community.sops/pull/190)."
208 changes: 162 additions & 46 deletions plugins/module_utils/sops.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
__metaclass__ = type


import collections
import json
import os
import re

from ansible.module_utils.common.text.converters import to_text, to_native

Expand Down Expand Up @@ -44,39 +47,41 @@
203: "FileAlreadyEncrypted"
}

_SOPS_VERSION = re.compile(r'^sops ([0-9]+)\.([0-9]+)\.([0-9]+)')


def _create_single_arg(argument_name):
def f(value, arguments, env):
def f(value, arguments, env, version):
arguments.extend([argument_name, to_native(value)])

return f


def _create_comma_separated(argument_name):
def f(value, arguments, env):
def f(value, arguments, env, version):
arguments.extend([argument_name, ','.join([to_native(v) for v in value])])

return f


def _create_repeated(argument_name):
def f(value, arguments, env):
def f(value, arguments, env, version):
for v in value:
arguments.extend([argument_name, to_native(v)])

return f


def _create_boolean(argument_name):
def f(value, arguments, env):
def f(value, arguments, env, version):
if value:
arguments.append(argument_name)

return f


def _create_env_variable(argument_name):
def f(value, arguments, env):
def f(value, arguments, env, version):
env[argument_name] = value

return f
Expand Down Expand Up @@ -114,57 +119,90 @@ def f(value, arguments, env):
class SopsError(Exception):
''' Extend Exception class with sops specific information '''

def __init__(self, filename, exit_code, message, decryption=True):
def __init__(self, filename, exit_code, message, decryption=True, operation=None):
if operation is None:
operation = 'decrypt' if decryption else 'encrypt'
if exit_code in SOPS_ERROR_CODES:
exception_name = SOPS_ERROR_CODES[exit_code]
message = "error with file %s: %s exited with code %d: %s" % (
filename, exception_name, exit_code, to_native(message))
else:
message = "could not %s file %s; Unknown sops error code: %s; message: %s" % (
'decrypt' if decryption else 'encrypt', filename, exit_code, to_native(message))
operation, filename, exit_code, to_native(message))
super(SopsError, self).__init__(message)


class Sops():
''' Utility class to perform sops CLI actions '''
SopsFileStatus = collections.namedtuple('SopsFileStatus', ['encrypted'])

@staticmethod
def _add_options(command, env, get_option_value, options):

class SopsRunner(object):
def _add_options(self, command, env, get_option_value, options):
if get_option_value is None:
return
for option, f in options.items():
v = get_option_value(option)
if v is not None:
f(v, command, env)
f(v, command, env, self.version)

def _debug(self, message):
if self.display:
self.display.vvvv(message)
elif self.module:
self.module.debug(message)

def _warn(self, message):
if self.display:
self.display.warning(message)
elif self.module:
self.module.warn(message)

def __init__(self, binary, module=None, display=None):
self.binary = binary
self.module = module
self.display = display

self.version = (3, 7, 3) # if --disable-version-check is not supported, this is version 3.7.3 or older
self.version_string = '(before 3.8.0)'

exit_code, output, err = self._run_command([self.binary, '--version', '--disable-version-check'])
if exit_code == 0:
m = _SOPS_VERSION.match(output.decode('utf-8'))
if m:
self.version = int(m.group(1)), int(m.group(2)), int(m.group(3))
self.version_string = '%d.%d.%d' % self.version
self._debug('SOPS version detected as %s' % (self.version, ))
else:
self._warn('Cannot extract SOPS version from: %s' % repr(output))
else:
self._debug('Cannot detect SOPS version efficiently, likely a version before 3.8.0')

@staticmethod
def get_sops_binary(get_option_value):
cmd = get_option_value('sops_binary') if get_option_value else None
if cmd is None:
cmd = 'sops'
return cmd
def _run_command(self, command, env=None, data=None, cwd=None):
if self.module:
return self.module.run_command(command, environ_update=env, cwd=cwd, encoding=None, data=data, binary_data=True)

@staticmethod
def decrypt(encrypted_file, content=None,
display=None, decode_output=True, rstrip=True, input_type=None, output_type=None, get_option_value=None, module=None):
process = Popen(command, stdin=None if data is None else PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd, env=env)
output, err = process.communicate(input=data)
return process.returncode, output, err

def decrypt(self, encrypted_file, content=None,
decode_output=True, rstrip=True, input_type=None, output_type=None, get_option_value=None):
# Run sops directly, python module is deprecated
command = [Sops.get_sops_binary(get_option_value)]
command = [self.binary]
if self.version >= (3, 9, 0):
command.append("decrypt")
env = os.environ.copy()
Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS)
self._add_options(command, env, get_option_value, GENERAL_OPTIONS)
if input_type is not None:
command.extend(["--input-type", input_type])
if output_type is not None:
command.extend(["--output-type", output_type])
if content is not None:
encrypted_file = '/dev/stdin'
command.extend(["--decrypt", encrypted_file])
if self.version < (3, 9, 0):
command.append("--decrypt")
command.append(encrypted_file)

if module:
exit_code, output, err = module.run_command(command, environ_update=env, encoding=None, data=content, binary_data=True)
else:
process = Popen(command, stdin=None if content is None else PIPE, stdout=PIPE, stderr=PIPE, env=env)
(output, err) = process.communicate(input=content)
exit_code = process.returncode
exit_code, output, err = self._run_command(command, env=env, data=content)

if decode_output:
# output is binary, we want UTF-8 string
Expand All @@ -173,8 +211,8 @@ def decrypt(encrypted_file, content=None,

# sops logs always to stderr, as stdout is used for
# file content
if err and display:
display.vvvv(to_text(err, errors='surrogate_or_strict'))
if err:
self._debug(u'Unexpected stderr:\n' + to_text(err, errors='surrogate_or_strict'))

if exit_code != 0:
raise SopsError(encrypted_file, exit_code, err, decryption=True)
Expand All @@ -184,36 +222,114 @@ def decrypt(encrypted_file, content=None,

return output

@staticmethod
def encrypt(data, display=None, cwd=None, input_type=None, output_type=None, get_option_value=None, module=None):
def encrypt(self, data, cwd=None, input_type=None, output_type=None, filename=None, get_option_value=None):
# Run sops directly, python module is deprecated
command = [Sops.get_sops_binary(get_option_value)]
command = [self.binary]
if self.version >= (3, 9, 0):
command.append("encrypt")
env = os.environ.copy()
Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS)
Sops._add_options(command, env, get_option_value, ENCRYPT_OPTIONS)
self._add_options(command, env, get_option_value, GENERAL_OPTIONS)
self._add_options(command, env, get_option_value, ENCRYPT_OPTIONS)
if input_type is not None:
command.extend(["--input-type", input_type])
if output_type is not None:
command.extend(["--output-type", output_type])
command.extend(["--encrypt", "/dev/stdin"])
if self.version < (3, 9, 0):
command.append("--encrypt")
elif filename:
command.extend(["--filename-override", filename])
command.append("/dev/stdin")

if module:
exit_code, output, err = module.run_command(command, data=data, binary_data=True, cwd=cwd, environ_update=env, encoding=None)
else:
process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd, env=env)
(output, err) = process.communicate(input=data)
exit_code = process.returncode
exit_code, output, err = self._run_command(command, env=env, data=data, cwd=cwd)

# sops logs always to stderr, as stdout is used for
# file content
if err and display:
display.vvvv(to_text(err, errors='surrogate_or_strict'))
if err:
self._debug(u'Unexpected stderr:\n' + to_text(err, errors='surrogate_or_strict'))

if exit_code != 0:
raise SopsError('to stdout', exit_code, err, decryption=False)

return output

def has_filestatus(self):
return self.version >= (3, 9, 0)

def get_filestatus(self, path):
command = [self.binary, 'filestatus', path]

exit_code, output, err = self._run_command(command)

# sops logs always to stderr, as stdout is used for
# file content
if err:
self._debug(u'Unexpected stderr:\n' + to_text(err, errors='surrogate_or_strict'))

if exit_code != 0:
raise SopsError(path, exit_code, err, operation='inspect')

try:
result = json.loads(output)
return SopsFileStatus(result['encrypted'])
except Exception as exc:
self._debug(u'Unexpected stdout:\n' + to_text(output, errors='surrogate_or_strict'))
raise SopsError(path, 0, 'Cannot decode filestatus result: %s' % exc, operation='inspect')


_SOPS_RUNNER_CACHE = dict()


class Sops():
''' Utility class to perform sops CLI actions '''

@staticmethod
def get_sops_binary(get_option_value):
cmd = get_option_value('sops_binary') if get_option_value else None
if cmd is None:
cmd = 'sops'
return cmd

@staticmethod
def get_sops_runner_from_binary(sops_binary, module=None, display=None):
candidates = _SOPS_RUNNER_CACHE.get(sops_binary, [])
for cand_module, cand_runner in candidates:
if cand_runner is module:
return cand_runner
runner = SopsRunner(sops_binary, module=module, display=display)
candidates.append((module, runner))
_SOPS_RUNNER_CACHE[sops_binary] = candidates
return runner

@staticmethod
def get_sops_runner_from_options(get_option_value, module=None, display=None):
return Sops.get_sops_runner_from_binary(Sops.get_sops_binary(get_option_value), module=module, display=display)

@staticmethod
def decrypt(encrypted_file, content=None,
display=None, decode_output=True, rstrip=True, input_type=None, output_type=None, get_option_value=None, module=None):
runner = Sops.get_sops_runner_from_options(get_option_value, module=module, display=display)
return runner.decrypt(
encrypted_file,
content=content,
decode_output=decode_output,
rstrip=rstrip,
input_type=input_type,
output_type=output_type,
get_option_value=get_option_value,
)

@staticmethod
def encrypt(data, display=None, cwd=None, input_type=None, output_type=None, get_option_value=None, module=None, filename=None):
runner = Sops.get_sops_runner_from_options(get_option_value, module=module, display=display)
return runner.encrypt(
data,
cwd=cwd,
input_type=input_type,
output_type=output_type,
get_option_value=get_option_value,
filename=filename,
)


def get_sops_argument_spec(add_encrypt_specific=False):
argument_spec = {
Expand Down
3 changes: 2 additions & 1 deletion plugins/modules/sops_encrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,11 @@ def get_option_value(argument_name):
output_type = None
if path.endswith('.json'):
output_type = 'json'
elif path.endswith('.yaml') or path.endswith('.yml'):
elif path.endswith(('.yml', '.yaml')):
output_type = 'yaml'
data = Sops.encrypt(
data=input_data, cwd=directory, input_type=input_type, output_type=output_type,
filename=os.path.relpath(path, directory) if directory is not None else path,
get_option_value=get_option_value, module=module,
)
write_file(module, data)
Expand Down
Loading

0 comments on commit ed0318a

Please sign in to comment.