diff --git a/CHANGELOG.md b/CHANGELOG.md index 47ef24775..99bdaf27d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,12 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) --> ## [Unreleased](https://github.com/cyverse/atmosphere/compare/v36-9...HEAD) - YYYY-MM-DD +### Added + - script to create access token for users + +### Changed + - Using argo workflow to run ansible playbooks to deploy instance + ## [v36-9](https://github.com/cyverse/atmosphere/compare/v36-8...v36-9) - 2020-06-16 ### Fixed diff --git a/Dockerfile b/Dockerfile index 5418c7dc3..aceda51e4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,6 +19,7 @@ RUN apt-get update && \ libssl-dev \ libxml2-dev \ libxslt1-dev \ + locales \ make \ netcat \ openssl \ @@ -39,7 +40,12 @@ RUN apt-get update && \ uwsgi \ uwsgi-plugin-python \ zlib1g-dev && \ - rm -rf /var/lib/apt/lists/* + rm -rf /var/lib/apt/lists/* && \ + locale-gen en_US.UTF-8 + +ENV LANG en_US.UTF-8 +ENV LANGUAGE en_US:en +ENV LC_ALL en_US.UTF-8 # Create PID and log directories for uWSGI RUN mkdir -p /run/uwsgi/app/atmosphere /var/log/uwsgi && \ diff --git a/atmosphere/settings/local.py.j2 b/atmosphere/settings/local.py.j2 index cdc570828..b792a932a 100644 --- a/atmosphere/settings/local.py.j2 +++ b/atmosphere/settings/local.py.j2 @@ -598,3 +598,6 @@ CACHES = { 'LOCATION': '/var/tmp/django_cache', } } + +# Argo workflow +ARGO_CONFIG_FILE_PATH = "{{ ARGO_CONFIG_FILE_PATH }}" diff --git a/docker/argo_config.schema b/docker/argo_config.schema new file mode 100644 index 000000000..5c3ba0aaa --- /dev/null +++ b/docker/argo_config.schema @@ -0,0 +1,34 @@ +{ + "type": "object", + "properties": { + "default": { + "type": "string", + "pattern": "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" + } + }, + "patternProperties": { + "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$": { + "$ref": "#/definitions/provider" + } + }, + "definitions" : { + "provider": { + "properties": { + "api_host": {"type": "string"}, + "api_port": { + "type": "integer", + "minimum": 0, + "maximum": 65535 + }, + "token": {"type": "string"}, + "namespace": {"type": "string"}, + "workflow_base_dir": {"type": "string"}, + "zoneinfo": {"type": "string"}, + "ssl_verify": {"type": "boolean"} + }, + "required": ["api_host", "api_port", "token", "namespace", "workflow_base_dir", "zoneinfo", "ssl_verify"] + } + }, + "required": ["default"], + "additionalProperties": false +} \ No newline at end of file diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index d6b7bc0b4..4d27a0c40 100644 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -64,6 +64,11 @@ else chown -R www-data:www-data /opt/dev/atmosphere fi +# Validate argo config +cat $SECRETS_DIR/argo.config.yml | python -c "import yaml, json, sys; print(json.dumps(yaml.safe_load(sys.stdin.read())));" | tee /tmp/argo_config.json +jsonschema -i /tmp/argo_config.json argo_config.schema +rm /tmp/argo_config.json + # Start services sed -i "s/^bind 127.0.0.1 ::1$/bind 127.0.0.1/" /etc/redis/redis.conf service redis-server start diff --git a/scripts/batch_create_access_token.py b/scripts/batch_create_access_token.py new file mode 100644 index 000000000..05273d0a0 --- /dev/null +++ b/scripts/batch_create_access_token.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +""" +Create access token for users + +Usage: + python scripts/batch_create_access_token.py --token-name workshop_token --users name1,name2,name3 +""" +import django + +django.setup() + +import argparse +from core.models import AccessToken +from core.models.user import AtmosphereUser +from core.models.access_token import create_access_token + + +def parse_arg(): + """ + Parse command line arguments. + + Returns: + argparse.Namespace: result + """ + parser = argparse.ArgumentParser( + description="create or fetch tokens for users" + ) + parser.add_argument( + "--users", + dest="users", + type=str, + required=True, + help="usernames, comma separated if more than 1(no space)" + ) + parser.add_argument( + "--token-name", + dest="token_name", + type=str, + required=True, + help="name of the token" + ) + + args = parser.parse_args() + args.users = args.users.split(',') + return args + + +def fetch_user_by_username(username): + """ + Fetch user by username + + Args: + username (str): username of the user + + Returns: + Optional[AtmosphereUser]: user + """ + try: + return AtmosphereUser.objects.get(username=username) + except Exception as exc: + print("unable to fetch user {}".format(username)) + print(exc) + return None + + +def create_or_fetch_token_for_user(user, token_name): + """ + Fetch token with given name for user, create a token if none exists with the same name + + Args: + user (AtmosphereUser): user + token_name (str): name of the token + + Returns: + str: token + """ + # check if there is any existing token by the same name + existing_tokens = AccessToken.objects.filter(token__user=user) + for token in existing_tokens: + if token.name == token_name: + # return token if same name + return token.token_id + # create new token if none with the same name exists + new_token = create_access_token( + user, token_name, issuer="Personal-Access-Token" + ) + print("new token created for user {}".format(user.username)) + return new_token.token_id + + +def main(): + """ + Entrypoint + """ + args = parse_arg() + for username in args.users: + user = fetch_user_by_username(username) + token = create_or_fetch_token_for_user(user, args.token_name) + print("{}, {}".format(username, token)) + + +if __name__ == '__main__': + main() diff --git a/service/argo/__init__.py b/service/argo/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/service/argo/common.py b/service/argo/common.py new file mode 100644 index 000000000..b4a555ed8 --- /dev/null +++ b/service/argo/common.py @@ -0,0 +1,289 @@ +""" +Common component for Argo +""" + +import os +import yaml +from django.conf import settings + +from service.argo.rest_api import ArgoAPIClient +from service.argo.exception import ( + BaseWorkflowDirNotExist, ProviderWorkflowDirNotExist, WorkflowFileNotExist, + WorkflowFileNotYAML, ArgoConfigFileNotExist, ArgoConfigFileNotYAML, + ArgoConfigFileError +) + + +class ArgoContext: + """ + Context that the Argo Workflow should be executing in + """ + + def __init__( + self, + api_host=None, + api_port=None, + token=None, + namespace=None, + ssl_verify=None, + config=None + ): + """ + Create a context to execute ArgoWorkflow + + Args: + api_host (str, optional): hostname of the Argo API Server. Defaults to None. + api_port (int, optional): port of the Argo API Server. Defaults to None. + token (str, optional): k8s bearer token. Defaults to None. + namespace (str, optional): k8s namespace for the workflow. Defaults to None. + ssl_verify (bool, optional): whether to verify ssl cert or not. Defaults to None. + config (dict, optional): configuration, serve as a fallback if a + config entry is not passed as a parameter. Defaults to None. + """ + if api_host: + self._api_host = api_host + else: + self._api_host = config["api_host"] + + if api_port: + self._api_port = api_port + else: + self._api_port = config["api_port"] + + if token: + self._token = token + else: + self._token = config["token"] + + if namespace: + self._namespace = namespace + else: + self._namespace = config["namespace"] + + if ssl_verify: + self._ssl_verify = ssl_verify + else: + self._ssl_verify = config["ssl_verify"] + + def client(self): + """ + Returns an ArgoAPIClient + + Returns: + ArgoAPIClient: an API client with the config from this context + """ + return ArgoAPIClient( + self._api_host, + self._api_port, + self._token, + self._namespace, + verify=self._ssl_verify + ) + + +def _find_provider_dir( + base_directory, provider_uuid, default_provider="default" +): + """ + Check if the provider workflow directory exists + + Args: + base_directory (str): base directory for workflow files + provider_uuid (str): provider uuid + default_provider (str, optional): default provider name. unset if None + or "". Defaults to "default". + + Raises: + ProviderWorkflowDirNotExist: provider workflow directory not exist + BaseWorkflowDirNotExist: base workflow directory not exist + + Returns: + str: path to provider workflow directory + """ + try: + # find provider directory + provider_dirs = [ + entry + for entry in os.listdir(base_directory) if entry == provider_uuid + ] + # try default provider if given provider dir does not exist + if not provider_dirs and default_provider: + provider_dirs = [ + entry for entry in os.listdir(base_directory) + if entry == default_provider + ] + if not provider_dirs: + raise ProviderWorkflowDirNotExist(provider_uuid) + + provider_dir = base_directory + "/" + provider_dirs[0] + return provider_dir + except OSError: + raise BaseWorkflowDirNotExist(base_directory) + + +def _find_workflow_file(provider_dir_path, filename, provider_uuid): + """ + Find the path of the workflow file, and check if the file exists + + Args: + provider_dir_path (str): path to the provider workflow directory + filename (str): workflow definition filename + provider_uuid (str): provider uuid + + Raises: + WorkflowFileNotExist: workflow definition file not exist + ProviderWorkflowDirNotExist: provider workflow directory not exist + + Returns: + str: path to the workflow file + """ + try: + # find workflow file + wf_files = [ + entry + for entry in os.listdir(provider_dir_path) if entry == filename + ] + if not wf_files: + raise WorkflowFileNotExist(provider_uuid, filename) + + # construct path + wf_file_path = provider_dir_path + "/" + wf_files[0] + return wf_file_path + except OSError: + raise ProviderWorkflowDirNotExist(provider_uuid) + + +def argo_lookup_workflow(base_directory, filename, provider_uuid): + """ + Lookup workflow by name and cloud provider + + Args: + base_directory (str): base directory for workflow files + filename (str): workflow filename + provider_uuid (str): the provider uuid + + Raises: + WorkflowFileNotYAML: unable to parse workflow definition file as YAML + WorkflowFileNotExist: unable to open/read workflow definition file + + Returns: + ArgoWorkflow: JSON object representing the workflow if found, None otherwise + """ + provider_dir_path = _find_provider_dir(base_directory, provider_uuid) + wf_file_path = _find_workflow_file( + provider_dir_path, filename, provider_uuid + ) + + try: + # read workflow definition + with open(wf_file_path, "r") as wf_file: + wf_def = yaml.safe_load(wf_file.read()) + except yaml.YAMLError: + raise WorkflowFileNotYAML(wf_file_path) + except IOError: + raise WorkflowFileNotExist(wf_file_path) + + return wf_def + + +def argo_lookup_yaml_file(base_directory, filename, provider_uuid): + """ + Lookup yaml file by filename and cloud provider and read the yaml file + + Args: + base_directory (str): base directory for workflow files + filename (str): yaml filename + provider_uuid (str): the provider uuid + + Raises: + WorkflowFileNotYAML: unable to parse workflow definition file as YAML + WorkflowFileNotExist: unable to open/read workflow definition file + + Returns: + ArgoWorkflow: JSON object representing the workflow if found, None otherwise + """ + provider_dir_path = _find_provider_dir(base_directory, provider_uuid) + wf_file_path = _find_workflow_file( + provider_dir_path, filename, provider_uuid + ) + + try: + # read workflow definition + with open(wf_file_path, "r") as wf_file: + wf_def = yaml.safe_load(wf_file.read()) + except yaml.YAMLError: + raise WorkflowFileNotYAML(wf_file_path) + except IOError: + raise WorkflowFileNotExist(wf_file_path) + + return wf_def + + +def read_argo_config(config_file_path=None, provider_uuid=None): + """ + Read configuration for Argo. + Read from given path if specified, else read from path specified in the settings. + Only config specific to the provider is returned, if provider uuid is not + given, then uses the default one from the config. + If there is no provider specific config, uses the default one. + + Args: + config_file_path (str, optional): path to the config file. will use + the default one from the setting if None. Defaults to None. + provider_uuid (str, optional): uuid of the provider. Defaults to None. + + Raises: + ArgoConfigFileNotExist: config file missing + ArgoConfigFileNotYAML: config file not yaml + """ + + try: + if not config_file_path: + # path from settings + config_file_path = settings.ARGO_CONFIG_FILE_PATH + + # read config file + with open(settings.ARGO_CONFIG_FILE_PATH, "r") as config_file: + all_config = yaml.safe_load(config_file.read()) + + # validate config + if isinstance(all_config, dict): + raise ArgoConfigFileError("config root not key-value") + if "default" not in all_config: + raise ArgoConfigFileError("default missing") + if all_config["default"] not in all_config: + raise ArgoConfigFileError("config for default provider missing") + + # uses the default provider, when no provider is specified + if not provider_uuid: + default_provider_uuid = all_config["default"] + return all_config[default_provider_uuid] + + # if no provider specific config, uses the default one + if provider_uuid not in all_config: + default_provider_uuid = all_config["default"] + return all_config[default_provider_uuid] + + return all_config[provider_uuid] + except IOError: + raise ArgoConfigFileNotExist(config_file_path) + except yaml.YAMLError: + raise ArgoConfigFileNotYAML(config_file_path) + + +def argo_context_from_config(config_file_path=None): + """ + Construct an ArgoContext from a config file + + Args: + config_file_path (str, optional): path to config file. Defaults to None. + + Returns: + ArgoContext: argo context + """ + # read configuration from file + config = read_argo_config(config_file_path=config_file_path) + + # construct workflow context + context = ArgoContext(config=config) + return context diff --git a/service/argo/exception.py b/service/argo/exception.py new file mode 100644 index 000000000..dbceb3f11 --- /dev/null +++ b/service/argo/exception.py @@ -0,0 +1,75 @@ +""" +Exceptions +""" + + +class ArgoBaseException(Exception): + """ + Base exception for Argo related errors + """ + + +class ResponseNotJSON(ArgoBaseException): + """ + Response of a HTTP request is not JSON + """ + + +class BaseWorkflowDirNotExist(ArgoBaseException): + """ + Base directory for workflow files does not exist. + """ + + +class ProviderWorkflowDirNotExist(ArgoBaseException): + """ + Workflow directory for the provider does not exist. + """ + + +class WorkflowFileNotExist(ArgoBaseException): + """ + Workflow definition file (yaml file) does not exist + """ + + +class WorkflowFileNotYAML(ArgoBaseException): + """ + Unable to parse workflow definition file as YAML + """ + + +class ArgoConfigFileNotExist(ArgoBaseException): + """ + Configuration file for Argo does not exist + """ + + +class ArgoConfigFileNotYAML(ArgoBaseException): + """ + Configuration file for Argo is not yaml + """ + + +class ArgoConfigFileError(ArgoBaseException): + """ + Error in config file + """ + + +class WorkflowDataFileNotExist(ArgoBaseException): + """ + Data file does not exist + """ + + +class WorkflowFailed(ArgoBaseException): + """ + Workflow complete with "Failed" + """ + + +class WorkflowErrored(ArgoBaseException): + """ + Workflow complete with "Error" + """ diff --git a/service/argo/instance_deploy.py b/service/argo/instance_deploy.py new file mode 100644 index 000000000..f9eb151cc --- /dev/null +++ b/service/argo/instance_deploy.py @@ -0,0 +1,204 @@ +""" +Deploy instance. +""" + +import os +import time +from django.conf import settings +from threepio import celery_logger +import atmosphere + +from service.argo.wf_call import argo_workflow_exec +from service.argo.common import argo_context_from_config, read_argo_config +from service.argo.exception import WorkflowFailed, WorkflowErrored + + +def argo_deploy_instance( + provider_uuid, + instance_uuid, + server_ip, + username, + timezone, +): + """ + run Argo workflow to deploy an instance + + Args: + provider_uuid (str): provider uuid + server_ip (str): ip of the server instance + username (str): username + timezone (str): timezone of the provider, e.g. America/Arizona + + Raises: + exc: exception thrown + """ + try: + wf_data = _get_workflow_data( + provider_uuid, server_ip, username, timezone + ) + + wf, status = argo_workflow_exec( + "instance_deploy.yml", + provider_uuid, + wf_data, + config_file_path=settings.ARGO_CONFIG_FILE_PATH, + wait=True + ) + + # dump logs + _dump_deploy_logs(wf, username, instance_uuid) + + celery_logger.debug("ARGO, workflow complete") + celery_logger.debug(status) + + if not status.success: + if status.error: + raise WorkflowErrored(wf.wf_name) + raise WorkflowFailed(wf.wf_name) + except Exception as exc: + celery_logger.debug( + "ARGO, argo_deploy_instance(), {}, {}".format(type(exc), exc) + ) + raise exc + + +def _get_workflow_data(provider_uuid, server_ip, username, timezone): + """ + Generate the data structure to be passed to the workflow + + Args: + server_ip (str): ip of the server instance + username (str): username of the owner of the instance + timezone (str): timezone of the provider + + Returns: + dict: {"arguments": {"parameters": [{"name": "", "value": ""}]}} + """ + wf_data = {"arguments": {"parameters": []}} + wf_data["arguments"]["parameters"].append( + { + "name": "server-ip", + "value": server_ip + } + ) + wf_data["arguments"]["parameters"].append( + { + "name": "user", + "value": username + } + ) + wf_data["arguments"]["parameters"].append({"name": "tz", "value": timezone}) + + # read zoneinfo from argo config + config = read_argo_config( + settings.ARGO_CONFIG_FILE_PATH, provider_uuid=provider_uuid + ) + wf_data["arguments"]["parameters"].append( + { + "name": "zoneinfo", + "value": config["zoneinfo"] + } + ) + + return wf_data + + +def _get_workflow_data_for_temp(provider_uuid, server_ip, username, timezone): + """ + Generate the data structure to be passed to the workflow. + used with workflow template + + Args: + server_ip (str): ip of the server instance + username (str): username of the owner of the instance + timezone (str): timezone of the provider + + Returns: + [str]: a list of parameters to be passed to workflow in the form of "key=value" + """ + wf_data = [] + wf_data.append("server-ip={}".format(server_ip)) + wf_data.append("user={}".format(username)) + wf_data.append("tz={}".format(timezone)) + + # read zoneinfo from argo config + config = read_argo_config( + settings.ARGO_CONFIG_FILE_PATH, provider_uuid=provider_uuid + ) + wf_data.append("zoneinfo={}".format(config["zoneinfo"])) + + return wf_data + + +def _create_deploy_log_dir(username, instance_uuid, timestamp): + """ + Create directory to dump deploy workflow log, + example path: base_dir/username/instance_uuid/timestamp/. + base directory is created if missing + + Args: + username (str): username of the owner of the instance + instance_uuid (str): uuid of the instance + timestamp (str): timestamp of the deploy + + Returns: + str: path to the directory to dump logs + """ + base_dir = os.path.abspath( + os.path.join( + os.path.dirname(atmosphere.__file__), "..", "logs", + "atmosphere_deploy.d" + ) + ) + + # create base dir if missing + if not os.path.isdir(base_dir): + os.makedirs(base_dir) + + # create deploy log directory if missing + directory = os.path.join(base_dir, username, instance_uuid, timestamp) + if not os.path.isdir(directory): + os.makedirs(directory) + + return directory + + +def _dump_deploy_logs(wf, username, instance_uuid): + """ + Dump workflow logs locally + + Args: + wf (ArgoWorkflow): workflow to dump logs of + username (str): username of owner of the instance + instance_uuid (str): uuid of the instance + """ + try: + context = argo_context_from_config(settings.ARGO_CONFIG_FILE_PATH) + + timestamp = time.strftime("%Y-%m-%d_%H%M%S", time.localtime()) + log_dir = _create_deploy_log_dir(username, instance_uuid, timestamp) + + # fetch all info about pods in workflow + nodes = wf.get_nodes(context) + + for node_name, node in nodes.items(): + playbook_name = None + # try finding playbook filename from parameters + if "inputs" in node and "parameters" in node["inputs"]: + for param in node["inputs"]["parameters"]: + if param["name"] == "playbook": + playbook_name = os.path.basename(param["value"]) + break + if playbook_name: + log_filename = os.path.join(log_dir, playbook_name + ".log") + else: + # uses node name if playbook filename is not found + log_filename = os.path.join(log_dir, node_name + ".log") + wf.dump_pod_logs(context, node_name, log_filename) + except Exception as exc: + celery_logger.debug( + "ARGO, failed to dump logs for workflow {}, {}".format( + wf.wf_name, type(exc) + ) + ) + celery_logger.debug(exc) diff --git a/service/argo/rest_api.py b/service/argo/rest_api.py new file mode 100644 index 000000000..d6cff7ea4 --- /dev/null +++ b/service/argo/rest_api.py @@ -0,0 +1,368 @@ +""" +Client to access Argo REST API +""" + +import json +import requests +from threepio import celery_logger as logger +from service.argo.exception import ResponseNotJSON + +try: + from json import JSONDecodeError +except ImportError: + # python2 does not has JSONDecodeError + JSONDecodeError = ValueError + + +class ArgoAPIClient: + """ + REST API Client for Argo. + A thin layer of abstraction over Argo REST API endpoints + """ + + def __init__(self, api_host, port, k8s_token, wf_namespace, verify=True): + """ + init the API client with all necessary credentials + + Args: + api_host (str): hostname of where Argo server is hosted + port (int): port of Argo server + k8s_token (str): k8s token to authenticate with Argo server + wf_namespace (str): k8s namespace used for the workflow + verify (bool): verify SSL/TLS cert or not + """ + self._host = api_host + self._port = port + self._base_url = "https://{}:{}".format(self._host, self._port) + self._token = k8s_token + self._namespace = wf_namespace + self._verify = verify + + def get_workflow(self, wf_name, fields=""): + """ + Endpoint for fetching a workflow + + Args: + wf_name (str): name of the workflow + fields (str): fields to be included in the response + + Returns: + dict: response text as JSON object + """ + api_url = "/api/v1/workflows/{}/{}" + api_url = api_url.format(self._namespace, wf_name) + if fields: + api_url = "{}?fields={}".format(api_url, fields) + + json_resp = self._req("get", api_url) + + return json_resp + + def list_workflow(self): + """ + Endpoint for fetching a list of workflows + + Returns: + dict: response text as JSON object + """ + api_url = "/api/v1/workflows/" + self._namespace + + json_resp = self._req("get", api_url) + + return json_resp + + def run_workflow(self, wf_json): + """ + Endpoint for running a workflow + + Args: + wf_json (dict): workflow definition as JSON object + + Returns: + dict: response text as JSON object + """ + api_url = "/api/v1/workflows/" + self._namespace + + json_data = {} + json_data["namespace"] = self._namespace + json_data["serverDryRun"] = False + json_data["workflow"] = wf_json + + json_resp = self._req("post", api_url, json_data=json_data) + + return json_resp + + def get_log_for_pod_in_workflow( + self, wf_name, pod_name, container_name="main" + ): + """ + Get the logs of a pod in a workflow + + Args: + wf_name (str): name of the workflow + pod_name (str): name of the pod + container_name (str, optional): name of the container. Defaults to "main". + + Returns: + list: a list of lines of logs + """ + api_url = "/api/v1/workflows/{}/{}/{}/log?logOptions.timestamps=true&logOptions.container={}" + api_url = api_url.format( + self._namespace, wf_name, pod_name, container_name + ) + + resp = self._req("get", api_url, json_resp=False) + + logs = [] + # each line is a json obj + for line in resp.split("\n"): + try: + line = line.strip() + if not line: + continue + log_json = json.loads(line) + if "result" not in log_json or "content" not in log_json[ + "result"]: + continue + logs.append(log_json["result"]["content"]) + except Exception: + continue + return logs + + def get_workflow_template(self, wf_temp_name): + """ + fetch a workflow template by its name + + Args: + wf_temp_name (str): name of the workflow template + + Returns: + dict: response text as JSON object + """ + api_url = "/api/v1/workflows-templates/{}/{}" + api_url = api_url.format(self._namespace, wf_temp_name) + + json_resp = self._req("get", api_url) + + return json_resp + + def list_workflow_templates(self): + """ + fetch a list of workflow templates + + Returns: + dict: response text as JSON object + """ + api_url = "/api/v1/workflows-templates/{}" + api_url = api_url.format(self._namespace) + + json_resp = self._req("get", api_url) + + return json_resp + + def create_workflow_template(self, wf_temp_def_json): + """ + create workflow template + + Args: + wf_temp_def (dict): definition of the workflow template + + Returns: + dict: response text as JSON object + """ + api_url = "/api/v1/workflow-templates/" + self._namespace + + json_data = {} + json_data["namespace"] = self._namespace + json_data["template"] = wf_temp_def_json + + json_resp = self._req("post", api_url, json_data=json_data) + + return json_resp + + def update_workflow_template(self, wf_temp_name, wf_temp_def_json): + """ + update workflow template with the given name + + Args: + wf_temp_def (dict): definition of the workflow template + + Returns: + dict: response text as JSON object + """ + api_url = "/api/v1/workflow-templates/{}/{}".format( + self._namespace, wf_temp_name + ) + + json_data = {} + json_data["namespace"] = self._namespace + json_data["template"] = wf_temp_def_json + + json_resp = self._req("put", api_url, json_data=json_data) + + return json_resp + + def submit_workflow_template(self, wf_temp_name, wf_param=[]): + """ + submit a workflow template for execution with parameters. + this will create a workflow. + + Args: + wf_temp_name (str): name of the workflow template + wf_param ([str]): list of parameters, in the form of ["NAME1=VAL1", "NAME2=VAL2"] + + Returns: + dict: response text as JSON object + """ + api_url = "/api/v1/workflows/{}/submit".format(self._namespace) + + json_data = {} + json_data["namespace"] = self._namespace + json_data["resourceKind"] = "WorkflowTemplate" + json_data["resourceName"] = wf_temp_name + json_data["submitOptions"] = {} + json_data["submitOptions"]["parameters"] = wf_param + + json_resp = self._req("post", api_url, json_data=json_data) + + return json_resp + + def delete_workflow_template(self, wf_temp_name): + """ + delete a workflow templates with given name + + Args: + wf_temp_name (str): name of the workflow template + + Returns: + dict: response text as JSON object + """ + api_url = "/api/v1/workflow-templates/{}/{}" + api_url = api_url.format(self._namespace, wf_temp_name) + + json_resp = self._req("delete", api_url) + + return json_resp + + def _req( + self, method, url, json_data={}, additional_headers={}, json_resp=True + ): + """ + send a request with given method to the given url + + Args: + method (str): HTTP method + url (str): api url to send the request to + json_data (dict, optional): JSON payload. Defaults to None. + additional_header (dict, optional): additional headers. Defaults to None. + json_resp (bool, optional): if response is json. Defaults to True. + + Raises: + ResponseNotJSON: raised when the response is not JSON + HTTPError: requert failed + + Returns: + dict: response text as JSON object + """ + + try: + headers = {} + headers["Host"] = self.host + headers["Accept"] = "application/json;q=0.9,*/*;q=0.8" + headers["Content-Type"] = "application/json" + if self._token: + headers["Authorization"] = "Bearer " + self._token + + if additional_headers: + headers.update(additional_headers) + + full_url = self.base_url + url + requests_func = _http_method(method) + if json_data: + resp = requests_func( + full_url, + headers=headers, + json=json_data, + verify=self.verify + ) + else: + resp = requests_func( + full_url, headers=headers, verify=self.verify + ) + resp.raise_for_status() + if json_resp: + return json.loads(resp.text) + return resp.text + except JSONDecodeError as exc: + msg = "ARGO - REST API, {}, {}".format(type(exc), resp.text) + logger.exception(msg) + raise ResponseNotJSON("ARGO, Fail to parse response body as JSON") + except requests.exceptions.HTTPError as exc: + msg = "ARGO - REST API, {}, {}".format(type(exc), resp.text) + logger.exception(msg) + raise exc + + @property + def host(self): + """ + hostname of the Argo API Server. + e.g. localhost + + Returns: + str: hostname of the Argo API Server + """ + return self._host + + @property + def base_url(self): + """ + base url for the Argo API Server. + e.g. http://localhost:1234 + + Returns: + str: base url for the Argo API Server + """ + return self._base_url + + @property + def namespace(self): + """ + k8s namespace used for the workflow + + Returns: + str: k8s namespace + """ + return self._namespace + + @property + def verify(self): + """ + whether to verify SSL/TLS cert of api host or not + + Returns: + bool: whether to verify SSL/TLS cert of api host or not + """ + return self._verify + + +def _http_method(method_str): + """ + Return function for given HTTP Method from requests library + + Args: + method_str (str): HTTP method, "get", "post", etc. + + Returns: + function: requests.get, requests.post, etc. None if no match + """ + if method_str == "get": + return requests.get + if method_str == "post": + return requests.post + if method_str == "delete": + return requests.delete + if method_str == "put": + return requests.put + if method_str == "options": + return requests.options + return None diff --git a/service/argo/wf.py b/service/argo/wf.py new file mode 100644 index 000000000..212d0c2ed --- /dev/null +++ b/service/argo/wf.py @@ -0,0 +1,322 @@ +""" +Workflow +""" + +import os +import time +from threepio import celery_logger as logger + + +class ArgoWorkflow: + """ + A generic interface for using Argo Worflow. + """ + + def __init__(self, wf_name): + """Creates an ArgoWorkflow object from a workflow definition + + Args: + wf_def (dict): workflow definition read/parsed from yaml + """ + self._wf_name = wf_name + self._last_status = None + self._wf_def = None + + @staticmethod + def create(context, wf_def, wf_data={}, lint=False): + """ + Create a running workflow + + Args: + context (ArgoContext): context to execute the workflow in + wf_def (dict): workflow definition + wf_data (dict, optional): workflow data to be pass along. Defaults to {}. + lint (bool, optional): Whether to submit workflow definition for + linting first. Defaults to False. + + Returns: + ArgoWorkflow: ArgoWorkflow object created based on the returned json + """ + if wf_data: + wf_def = _populate_wf_data(wf_def, wf_data) + + json_resp = context.client().run_workflow(wf_def) + wf_name = json_resp["metadata"]["name"] + return ArgoWorkflow(wf_name) + + @staticmethod + def create_n_watch(context, wf_def, wf_data={}): + """ + Create a running workflow, and watch it until completion + + Args: + context (ArgoContext): context to execute the workflow in + wf_def (dict): workflow definition + wf_data (dict, optional): data to be passed to workflow. Defaults to {}. + + Returns: + (ArgoWorkflow, ArgoWorkflowStatus): workflow and status of the workflow + """ + wf = ArgoWorkflow.create(context, wf_def, wf_data=wf_data) + + try: + wf.watch(context, 10, 18) + if wf.last_status.complete: + return (wf, wf.last_status) + wf.watch(context, 60, 1440) + except Exception as exc: + logger.debug( + "ARGO, ArgoWorkflow.create_n_watch(), while watching {}".format( + type(exc) + ) + ) + logger.debug( + "ARGO, ArgoWorkflow.create_n_watch(), while watching {}". + format(exc) + ) + raise exc + return (wf, wf.last_status) + + def status(self, context): + """ + Query status of a workflow + + Args: + context (ArgoContext): context to perform the query + + Returns: + ArgoWorkflowStatus: status of workflow + """ + try: + # get workflow + json_obj = context.client().get_workflow( + self._wf_name, fields="status.phase" + ) + + # unknown state + if "status" not in json_obj or "phase" not in json_obj["status"]: + self._last_status = ArgoWorkflowStatus(complete=False) + return self._last_status + + phase = json_obj["status"]["phase"] + + if phase == "Running": + self._last_status = ArgoWorkflowStatus(complete=False) + return self._last_status + + if phase == "Succeeded": + self._last_status = ArgoWorkflowStatus( + complete=True, success=True + ) + return self._last_status + + if phase == "Failed": + self._last_status = ArgoWorkflowStatus( + complete=True, success=False + ) + return self._last_status + + if phase == "Error": + self._last_status = ArgoWorkflowStatus( + complete=True, success=False, error=True + ) + return self._last_status + + return ArgoWorkflowStatus() + except Exception as exc: + raise exc + + def watch(self, context, interval, repeat_count): + """ + Watch the status of workflow, until the workflow is complete. + This call will block as it is busy waiting. + After a specified number of queries, the call will abort and return last status. + + Args: + context (ArgoContext): context to perform the query + interval (int): interval(sec) in between query for status + repeat_count (int): number of query for status to perform before abort + + Returns: + ArgoWorkflowStatus: last status of the workflow + """ + for _ in range(repeat_count): + status = self.status(context) + if status.complete: + return status + time.sleep(interval) + return status + + def get_nodes(self, context): + """ + Get info (io.argoproj.workflow.v1alpha1.NodeStatus) about the nodes + (including pods) that this workflow consist of. + Note: not all node has a corrsponding pod + + Args: + context (ArgoContext): context used + + Returns: + dict: a dict whose keys are node names, values are info of the corrsponding node + """ + json_resp = context.client().get_workflow( + self._wf_name, fields="status.nodes" + ) + return json_resp["status"]["nodes"] + + def dump_pod_logs(self, context, pod_name, log_file_path): + """ + Dump logs of a pod in the workflow into a log file at the given path. + Technically, it is node_name, calling it the method dump_pod_logs & argument + pod_name is just to conform to the name in the url in swagger doc. + + Args: + context (ArgoContext): context used to fetch the logs + pod_name (str): name of the pod + log_file_path (str): path to the log file + """ + # find out what pods the workflow is consisted of + with open(log_file_path, "a+") as log_file: + logs_lines = context.client().get_log_for_pod_in_workflow( + self.wf_name, pod_name, container_name="main" + ) + log_file.write("\n".join(logs_lines)) + logger.debug( + ("ARGO, log dump for workflow {}, pod {} at: {}\n" + ).format(self.wf_name, pod_name, log_file_path) + ) + + def dump_logs(self, context, log_dir): + """ + Dump logs of the workflow into the log directory provided. + Separate log file for each pods/steps in the workflow, each with the + filename of {{pod_name}}.log + + Args: + context (ArgoContext): context used to fetch the logs + log_dir (str): directory to dump logs into + """ + # find out what pods the workflow is consisted of + json_resp = context.client().get_workflow(self.wf_name) + pod_names = json_resp["status"]["nodes"].keys() + + # dump logs in separate files for each pods + for pod_name in pod_names: + + filename = "{}.log".format(pod_name) + log_file_path = os.path.join(log_dir, filename) + + with open(log_file_path, "a+") as dump_file: + dump_file.write( + "workflow {} has {} pods\n".format( + self.wf_name, len(pod_names) + ) + ) + logs_lines = context.client().get_log_for_pod_in_workflow( + self.wf_name, pod_name, container_name="main" + ) + dump_file.write("\npod {}:\n".format(pod_name)) + dump_file.writelines(logs_lines) + logger.debug( + ("ARGO, log dump for workflow {}, pod {} at: {}\n" + ).format(self.wf_name, pod_name, log_file_path) + ) + + @property + def wf_name(self): + """ + Returns: + str: name of the workflow + """ + return self._wf_name + + @property + def last_status(self): + """ + Returns: + ArgoWorkflowStatus: last known status of the workflow + """ + return self._last_status + + def wf_def(self, context, fetch=True): + """ + Definition of the workflow, will fetch if absent + + Args: + context (ArgoContext): Argo context + fetch (bool, optional): whether to fetch or not if present. Defaults to True. + """ + if self._wf_def and not fetch: + return self._wf_def + self._wf_def = context.client().get_workflow( + self._wf_name, fields="-status" + ) + return self._wf_def + + +class ArgoWorkflowStatus: + """ + Status of a workflow + """ + __slots__ = ["_complete", "_success", "_error"] + + def __init__(self, complete=None, success=None, error=None): + """ + Args: + complete (bool, optional): whether the workflow has completed + success (bool, optional): whether the workflow has succeed + error (bool, optional): whether the workflow has errored out + """ + self._complete = complete + self._success = success + self._error = error + + @property + def complete(self): + """ + Returns: + bool: whether the workflow has completed + """ + return self._complete + + @property + def success(self): + """ + Returns: + bool: whether the workflow has succeed + """ + return self._success + + @property + def error(self): + """ + Returns: + bool: whether the workflow has errored out + """ + return self._error + + +def _populate_wf_data(wf_def, wf_data): + """ + Populate the workflow data in the workflow definition + + Args: + wf_def (dict): workflow definition + wf_data (dict): workflow data to be populated into workflow definition + + Returns: + dict: workflow definition with the workflow data populated + """ + if not wf_data["arguments"]: + return wf_def + if not wf_def["spec"]["arguments"]: + wf_def["spec"]["arguments"] = {} + + if "parameters" in wf_data["arguments"]: + wf_def["spec"]["arguments"]["parameters"] = wf_data["arguments"][ + "parameters"] + if "artifacts" in wf_data["arguments"]: + wf_def["spec"]["arguments"]["artifacts"] = wf_data["arguments"][ + "artifacts"] + + return wf_def diff --git a/service/argo/wf_call.py b/service/argo/wf_call.py new file mode 100644 index 000000000..b8d5db099 --- /dev/null +++ b/service/argo/wf_call.py @@ -0,0 +1,116 @@ +""" +Execute Argo Workflow +""" + +from threepio import celery_logger as logger + +from service.argo.common import ArgoContext, argo_lookup_yaml_file, read_argo_config +from service.argo.wf import ArgoWorkflow, ArgoWorkflowStatus +from service.argo.wf_temp import ArgoWorkflowTemplate + + +def argo_workflow_exec( + workflow_filename, + provider_uuid, + workflow_data, + config_file_path=None, + wait=False +): + """ + Execute an specified Argo workflow. + Find file based on provider. + Pass argument to workflow. + + Args: + workflow_filename (str): filename of the workflow + provider_uuid (str): uuid of the provider + workflow_data (dict): data to be passed to workflow as arguments + config_file_path (str, optional): path to the config file. will use the + default one from the setting if None. Defaults to None. + wait (bool, optional): wait for workflow to complete. Defaults to False. + + Returns: + (ArgoWorkflow, ArgoWorkflowStatus): workflow and status of the workflow + """ + try: + # read configuration from file + config = read_argo_config( + config_file_path=config_file_path, provider_uuid=provider_uuid + ) + + # find the workflow definition & construct workflow + wf_def = argo_lookup_yaml_file( + config["workflow_base_dir"], workflow_filename, provider_uuid + ) + + # construct workflow context + context = ArgoContext(config=config) + + # execute + if wait: + result = ArgoWorkflow.create_n_watch(context, wf_def, workflow_data) + return result + wf = ArgoWorkflow.create(context, wf_def, workflow_data) + return (wf, ArgoWorkflowStatus()) + except Exception as exc: + logger.exception( + "ARGO, argo_workflow_exec(), {} {}".format(type(exc), exc) + ) + raise exc + + +def argo_wf_template_exec( + wf_template_filename, + provider_uuid, + workflow_data, + config_file_path=None, + wait=False +): + """ + Execute an specified Argo workflow. + Find file based on provider. + Pass argument to workflow. + + Args: + wf_template_filename (str): filename of the workflow + provider_uuid (str): uuid of the provider + workflow_data (dict): data to be passed to workflow as arguments + config_file_path (str, optional): path to the config file. will use the + default one from the setting if None. Defaults to None. + wait (bool, optional): wait for workflow to complete. Defaults to False. + + Returns: + (ArgoWorkflow, dict): workflow and status of the workflow, + e.g. {"complete": bool, "success": bool, "error": bool} + """ + try: + # read configuration from file + config = read_argo_config(config_file_path=config_file_path) + + # construct workflow context + context = ArgoContext(config=config) + + # find the workflow definition + wf_temp_def = argo_lookup_yaml_file( + config["workflow_base_dir"], wf_template_filename, provider_uuid + ) + + # submit workflow template + wf_temp = ArgoWorkflowTemplate.create(context, wf_temp_def) + wf_name = wf_temp.execute(context, wf_param=workflow_data) + wf = ArgoWorkflow(wf_name) + + # polling if needed + if wait: + status = wf.watch(context, 10, 18) + if status.complete: + return (wf_name, status) + status = wf.watch(context, 60, 1440) + return (wf, status) + return (wf, {"complete": None, "success": None, "error": None}) + + except Exception as exc: + logger.exception( + "ARGO, argo_wf_template_exec(), {} {}".format(type(exc), exc) + ) + raise exc diff --git a/service/argo/wf_temp.py b/service/argo/wf_temp.py new file mode 100644 index 000000000..00f0b17c2 --- /dev/null +++ b/service/argo/wf_temp.py @@ -0,0 +1,113 @@ +""" +WorkflowTemplate +""" + + +class ArgoWorkflowTemplate: + """ + Abstraction of the WorkflowTemplate in Argo + """ + + def __init__(self, wf_temp_name): + """ + Create a ArgoWorkflowTemplate object + + Args: + wf_temp_name (str): name of the workflow template + """ + self._name = wf_temp_name + self._wf_temp_def = None + + @classmethod + def create(cls, context, wf_temp_def): + """ + Create a WorkflowTemplate, and construct a ArgoWorkflowTemplate based off it + + Args: + context (ArgoContext): context used to query the Argo Server + wf_temp_def (dict): definition of the WorkflowTemplate + + Returns: + ArgoWorkflowTemplate: constructed based off the WorkflowTemplate created + """ + json_resp = context.client().create_workflow_template(wf_temp_def) + name = json_resp["metadata"]["name"] + return ArgoWorkflowTemplate(name) + + def fetch(self, context): + """ + Fetch definition from Argo Server + + Args: + context (ArgoContext): context used to query the Argo Server + """ + json_resp = context.client().get_workflow_template(self.name) + self._wf_temp_def = json_resp + if "apiVersion" not in self._wf_temp_def: + self._wf_temp_def["apiVersion"] = "argoproj.io/v1alpha1" + if "kind" not in self._wf_temp_def: + self._wf_temp_def["kind"] = "WorkflowTemplate" + + def execute(self, context, wf_param): + """ + One-off submission of the workflow template, template is deleted after submission + + Args: + context (ArgoContext): context of which the action should be executed in + wf_param (dict): parameter to pass when submit the workflow template + + Returns: + str: name of the workflow + """ + # submit template + json_resp = self.submit(context, wf_param) + wf_name = json_resp["metadata"]["name"] + + # delete the template + self.delete(context) + return wf_name + + def submit(self, context, wf_param): + """ + Submit the workflow template for execution, a workflow will be created as the result + + Args: + context (ArgoContext): context of which the action should be executed in + wf_param (dict): parameter to pass when submit the workflow + + Returns: + dict: JSON response of the call + """ + json_resp = context.client().submit_workflow_template( + self.name, wf_param=wf_param + ) + return json_resp + + def delete(self, context): + """ + Delete the workflow template + + Args: + context (ArgoContext): context of which the action should be executed in + """ + context.client().delete_workflow_template(self.name) + + @property + def wf_temp_def(self): + """ + definition of the workflow template + + Returns: + dict: definition + """ + return self._wf_temp_def + + @property + def name(self): + """ + name of the workflow template + + Returns: + str: name + """ + return self._name diff --git a/service/tasks/driver.py b/service/tasks/driver.py index 559e8f7ba..19baca14c 100644 --- a/service/tasks/driver.py +++ b/service/tasks/driver.py @@ -27,9 +27,8 @@ from core.models.profile import UserProfile from service.deploy import ( - instance_deploy, user_deploy, build_host_name, ready_to_deploy as - ansible_ready_to_deploy, run_utility_playbooks, execution_has_failures, - execution_has_unreachable + user_deploy, build_host_name, ready_to_deploy as ansible_ready_to_deploy, + run_utility_playbooks, execution_has_failures, execution_has_unreachable ) from service.driver import get_driver, get_account_driver from service.exceptions import AnsibleDeployException @@ -1138,21 +1137,23 @@ def _deploy_instance( redeploy=False, **celery_task_args ): + from service.argo.instance_deploy import argo_deploy_instance + from service.argo.exception import ArgoBaseException try: celery_logger.debug( - "_deploy_instance task started at %s." % datetime.now() + "ARGO, _deploy_instance task started at %s." % datetime.now() ) # Check if instance still exists driver = get_driver(driverCls, provider, identity) instance = driver.get_instance(instance_id) if not instance: celery_logger.debug( - "Instance has been teminated: %s." % instance_id + "ARGO, Instance has been teminated: %s." % instance_id ) return if not instance.ip: celery_logger.debug( - "Instance IP address missing from : %s." % instance_id + "ARGO, Instance IP address missing from : %s." % instance_id ) raise Exception("Instance IP Missing? %s" % instance_id) # NOTE: This is required to use ssh to connect. @@ -1166,12 +1167,20 @@ def _deploy_instance( _deploy_instance.retry(exc=exc) try: username = identity.user.username - instance_deploy(instance.ip, username, instance_id) - _update_status_log(instance, "Ansible Finished for %s." % instance.ip) + provider = Identity.find_instance(instance_id).provider + # Argo workflow instead of service.deploy.instance_deploy() + # TODO: use provider.location until there is short name for provider + argo_deploy_instance( + provider.uuid, instance_id, instance.ip, username, provider.timezone + ) + _update_status_log( + instance, "ARGO, Ansible Finished for %s." % instance.ip + ) celery_logger.debug( - "_deploy_instance task finished at %s." % datetime.now() + "ARGO, _deploy_instance task finished at %s." % datetime.now() ) - except AnsibleDeployException as exc: + except ArgoBaseException as exc: + # retry if encounter any Argo specific exception celery_logger.exception(exc) _deploy_instance.retry(exc=exc) except (BaseException, Exception) as exc: diff --git a/variables.ini.dist b/variables.ini.dist index d1abd9546..140f2f8dc 100644 --- a/variables.ini.dist +++ b/variables.ini.dist @@ -121,6 +121,7 @@ TACC_API_URL = # "/path to api/" #NOTE: Their are *REQUIRED* quotations around 'timedelta(..)'! CELERYBEAT_SCHEDULE = {} # {"monitor_instances": {"schedule": 'timedelta(minutes=5)',}} +ARGO_CONFIG_FILE_PATH= # /path/to/argo/config/file.yml [secrets.py] IRODS_HOST = # irods.server.com