diff --git a/plugins/action/k8s_info.py b/plugins/action/k8s_info.py index 7798a6ac..f2cc4774 100644 --- a/plugins/action/k8s_info.py +++ b/plugins/action/k8s_info.py @@ -8,6 +8,8 @@ import copy import traceback +from contextlib import contextmanager + from ansible.config.manager import ensure_type from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleAction, AnsibleActionFail @@ -34,6 +36,122 @@ def _ensure_invocation(self, result): return result + @contextmanager + def get_template_data(self, template_path): + try: + source = self._find_needle('templates', template_path) + except AnsibleError as e: + raise AnsibleActionFail(to_text(e)) + + # Get vault decrypted tmp file + try: + tmp_source = self._loader.get_real_file(source) + except AnsibleFileNotFound as e: + raise AnsibleActionFail("could not find template=%s, %s" % (source, to_text(e))) + b_tmp_source = to_bytes(tmp_source, errors='surrogate_or_strict') + + try: + with open(b_tmp_source, 'rb') as f: + try: + template_data = to_text(f.read(), errors='surrogate_or_strict') + except UnicodeError: + raise AnsibleActionFail("Template source files must be utf-8 encoded") + yield template_data + except AnsibleAction: + raise + except Exception as e: + raise AnsibleActionFail("%s: %s" % (type(e).__name__, to_text(e))) + finally: + self._loader.cleanup_tmp_file(b_tmp_source) + + def load_template(self, template, new_module_args, task_vars): + # template is only supported by k8s module. + if self._task.action not in ('k8s', 'community.kubernetes.k8s', 'community.okd.k8s'): + raise AnsibleActionFail("'template' is only supported parameter for 'k8s' module.") + if isinstance(template, string_types): + # treat this as raw_params + template_path = template + newline_sequence = self.DEFAULT_NEWLINE_SEQUENCE + variable_start_string = None + variable_end_string = None + block_start_string = None + block_end_string = None + trim_blocks = True + lstrip_blocks = False + elif isinstance(template, dict): + template_args = template + template_path = template_args.get('path', None) + if not template: + raise AnsibleActionFail("Please specify path for template.") + + # Options type validation strings + for s_type in ('newline_sequence', 'variable_start_string', 'variable_end_string', 'block_start_string', + 'block_end_string'): + if s_type in template_args: + value = ensure_type(template_args[s_type], 'string') + if value is not None and not isinstance(value, string_types): + raise AnsibleActionFail("%s is expected to be a string, but got %s instead" % (s_type, type(value))) + try: + trim_blocks = boolean(template_args.get('trim_blocks', True), strict=False) + lstrip_blocks = boolean(template_args.get('lstrip_blocks', False), strict=False) + except TypeError as e: + raise AnsibleActionFail(to_native(e)) + + newline_sequence = template_args.get('newline_sequence', self.DEFAULT_NEWLINE_SEQUENCE) + variable_start_string = template_args.get('variable_start_string', None) + variable_end_string = template_args.get('variable_end_string', None) + block_start_string = template_args.get('block_start_string', None) + block_end_string = template_args.get('block_end_string', None) + else: + raise AnsibleActionFail("Error while reading template file - " + "a string or dict for template expected, but got %s instead" % type(template)) + + # Option `lstrip_blocks' was added in Jinja2 version 2.7. + if lstrip_blocks: + try: + import jinja2.defaults + except ImportError: + raise AnsibleError('Unable to import Jinja2 defaults for determining Jinja2 features.') + + try: + jinja2.defaults.LSTRIP_BLOCKS + except AttributeError: + raise AnsibleError("Option `lstrip_blocks' is only available in Jinja2 versions >=2.7") + + wrong_sequences = ["\\n", "\\r", "\\r\\n"] + allowed_sequences = ["\n", "\r", "\r\n"] + + # We need to convert unescaped sequences to proper escaped sequences for Jinja2 + if newline_sequence in wrong_sequences: + newline_sequence = allowed_sequences[wrong_sequences.index(newline_sequence)] + elif newline_sequence not in allowed_sequences: + raise AnsibleActionFail("newline_sequence needs to be one of: \n, \r or \r\n") + + # template the source data locally & get ready to transfer + with self.get_template_data(template_path) as template_data: + # add ansible 'template' vars + temp_vars = task_vars.copy() + old_vars = self._templar.available_variables + + self._templar.environment.newline_sequence = newline_sequence + if block_start_string is not None: + self._templar.environment.block_start_string = block_start_string + if block_end_string is not None: + self._templar.environment.block_end_string = block_end_string + if variable_start_string is not None: + self._templar.environment.variable_start_string = variable_start_string + if variable_end_string is not None: + self._templar.environment.variable_end_string = variable_end_string + self._templar.environment.trim_blocks = trim_blocks + self._templar.environment.lstrip_blocks = lstrip_blocks + self._templar.available_variables = temp_vars + resultant = self._templar.do_template(template_data, preserve_trailing_newlines=True, escape_backslashes=False) + self._templar.available_variables = old_vars + resource_definition = self._task.args.get('definition', None) + if not resource_definition: + new_module_args.pop('template') + new_module_args['definition'] = resultant + def run(self, tmp=None, task_vars=None): ''' handler for k8s options ''' if task_vars is None: @@ -91,116 +209,7 @@ def run(self, tmp=None, task_vars=None): template = self._task.args.get('template', None) if template: - # template is only supported by k8s module. - if self._task.action not in ('k8s', 'community.kubernetes.k8s', 'community.okd.k8s'): - raise AnsibleActionFail("'template' is only supported parameter for 'k8s' module.") - if isinstance(template, string_types): - # treat this as raw_params - template_path = template - newline_sequence = self.DEFAULT_NEWLINE_SEQUENCE - variable_start_string = None - variable_end_string = None - block_start_string = None - block_end_string = None - trim_blocks = True - lstrip_blocks = False - elif isinstance(template, dict): - template_args = template - template_path = template_args.get('path', None) - if not template: - raise AnsibleActionFail("Please specify path for template.") - - # Options type validation strings - for s_type in ('newline_sequence', 'variable_start_string', 'variable_end_string', 'block_start_string', - 'block_end_string'): - if s_type in template_args: - value = ensure_type(template_args[s_type], 'string') - if value is not None and not isinstance(value, string_types): - raise AnsibleActionFail("%s is expected to be a string, but got %s instead" % (s_type, type(value))) - try: - trim_blocks = boolean(template_args.get('trim_blocks', True), strict=False) - lstrip_blocks = boolean(template_args.get('lstrip_blocks', False), strict=False) - except TypeError as e: - raise AnsibleActionFail(to_native(e)) - - newline_sequence = template_args.get('newline_sequence', self.DEFAULT_NEWLINE_SEQUENCE) - variable_start_string = template_args.get('variable_start_string', None) - variable_end_string = template_args.get('variable_end_string', None) - block_start_string = template_args.get('block_start_string', None) - block_end_string = template_args.get('block_end_string', None) - else: - raise AnsibleActionFail("Error while reading template file - " - "a string or dict for template expected, but got %s instead" % type(template)) - - # Option `lstrip_blocks' was added in Jinja2 version 2.7. - if lstrip_blocks: - try: - import jinja2.defaults - except ImportError: - raise AnsibleError('Unable to import Jinja2 defaults for determining Jinja2 features.') - - try: - jinja2.defaults.LSTRIP_BLOCKS - except AttributeError: - raise AnsibleError("Option `lstrip_blocks' is only available in Jinja2 versions >=2.7") - - wrong_sequences = ["\\n", "\\r", "\\r\\n"] - allowed_sequences = ["\n", "\r", "\r\n"] - - # We need to convert unescaped sequences to proper escaped sequences for Jinja2 - if newline_sequence in wrong_sequences: - newline_sequence = allowed_sequences[wrong_sequences.index(newline_sequence)] - elif newline_sequence not in allowed_sequences: - raise AnsibleActionFail("newline_sequence needs to be one of: \n, \r or \r\n") - - try: - source = self._find_needle('templates', template_path) - except AnsibleError as e: - raise AnsibleActionFail(to_text(e)) - - # Get vault decrypted tmp file - try: - tmp_source = self._loader.get_real_file(source) - except AnsibleFileNotFound as e: - raise AnsibleActionFail("could not find template=%s, %s" % (source, to_text(e))) - b_tmp_source = to_bytes(tmp_source, errors='surrogate_or_strict') - - # template the source data locally & get ready to transfer - try: - with open(b_tmp_source, 'rb') as f: - try: - template_data = to_text(f.read(), errors='surrogate_or_strict') - except UnicodeError: - raise AnsibleActionFail("Template source files must be utf-8 encoded") - - # add ansible 'template' vars - temp_vars = task_vars.copy() - old_vars = self._templar.available_variables - - self._templar.environment.newline_sequence = newline_sequence - if block_start_string is not None: - self._templar.environment.block_start_string = block_start_string - if block_end_string is not None: - self._templar.environment.block_end_string = block_end_string - if variable_start_string is not None: - self._templar.environment.variable_start_string = variable_start_string - if variable_end_string is not None: - self._templar.environment.variable_end_string = variable_end_string - self._templar.environment.trim_blocks = trim_blocks - self._templar.environment.lstrip_blocks = lstrip_blocks - self._templar.available_variables = temp_vars - resultant = self._templar.do_template(template_data, preserve_trailing_newlines=True, escape_backslashes=False) - self._templar.available_variables = old_vars - resource_definition = self._task.args.get('definition', None) - if not resource_definition: - new_module_args.pop('template') - new_module_args['definition'] = resultant - except AnsibleAction: - raise - except Exception as e: - raise AnsibleActionFail("%s: %s" % (type(e).__name__, to_text(e))) - finally: - self._loader.cleanup_tmp_file(b_tmp_source) + self.load_template(template, new_module_args, task_vars) # Execute the k8s_* module. module_return = self._execute_module(module_name=self._task.action, module_args=new_module_args, task_vars=task_vars)