Skip to content

Commit

Permalink
Revert "Support Python functions in workflows (kubeflow#431)" (kubefl…
Browse files Browse the repository at this point in the history
…ow#449)

This reverts commit 45511ce.
  • Loading branch information
lluunn authored and k8s-ci-robot committed Aug 28, 2019
1 parent d57c167 commit 65b445e
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 160 deletions.
7 changes: 0 additions & 7 deletions prow_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,3 @@ workflows:
- app_dir: kubeflow/testing/workflows
component: workflows
name: unittests

- py_func: kubeflow.testing.ci.kf_unittests.create_workflow
name: pyfunctest
# can optionally take keyword arguments
# kw_args:
# a: 1
# b: 2
Empty file removed py/kubeflow/testing/ci/__init__.py
Empty file.
15 changes: 0 additions & 15 deletions py/kubeflow/testing/ci/kf_unittests.py

This file was deleted.

225 changes: 87 additions & 138 deletions py/kubeflow/testing/run_e2e_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This script submits Argo workflows to run the E2E tests and waits for
them to finish. It is intended to be invoked by prow jobs.
It requires the workflow to be expressed as a ksonnet app or a Python function.
It requires the workflow to be expressed as a ksonnet app.
The script can take a config file via --config_file.
The --config_file is expected to be a YAML file as follows:
Expand All @@ -17,10 +17,9 @@
include_dirs:
tensorflow/*
- name: workflow-test
py_func: my_test_package.my_test_module.my_test_workflow
kw_args:
arg1: argument
- name: lint
app_dir: kubeflow/kubeflow/testing/workflows
component: workflows
app_dir is expected to be in the form of
{REPO_OWNER}/{REPO_NAME}/path/to/ksonnet/app
Expand All @@ -34,10 +33,6 @@
include_dirs (optional) is an array of strings that specify which directories, if modified,
should run this workflow.
py_func is the Python method to invoke Argo workflows
kw_args is an array of arguments passed to the Python method
The script expects that the directories
{repos_dir}/{app_dir} exists. Where repos_dir is provided
as a command line argument.
Expand All @@ -46,20 +41,17 @@
import argparse
import datetime
import fnmatch
import importlib
import logging
import os
import tempfile
import uuid
import subprocess
import sys
import yaml
from kubernetes import config
from kubernetes import client as k8s_client
from kubeflow.testing import argo_client
from kubeflow.testing import ks_util
from kubeflow.testing import prow_artifacts
from kubeflow.testing import util
import uuid
import subprocess
import sys
import yaml

# The namespace to launch the Argo workflow in.
def get_namespace(args):
Expand All @@ -70,14 +62,7 @@ def get_namespace(args):
return "kubeflow-releasing"
return "kubeflow-test-infra"

# imports py_func
def py_func_import(py_func, kwargs):
path, module = py_func.rsplit('.', 1)
mod = importlib.import_module(path)
met = getattr(mod, module)
return met(**kwargs)

class WorkflowKSComponent(object):
class WorkflowComponent(object):
"""Datastructure to represent a ksonnet component to submit a workflow."""

def __init__(self, name, app_dir, component, job_types, include_dirs, params):
Expand All @@ -88,14 +73,6 @@ def __init__(self, name, app_dir, component, job_types, include_dirs, params):
self.include_dirs = include_dirs
self.params = params

class WorkflowPyComponent(object):
"""Datastructure to represent a Python function to submit a workflow."""

def __init__(self, name, py_func, kw_args):
self.name = name
self.py_func = py_func
self.args = kw_args

def _get_src_dir():
return os.path.abspath(os.path.join(__file__, "..",))

Expand All @@ -112,13 +89,9 @@ def parse_config_file(config_file, root_dir):

components = []
for i in results["workflows"]:
if i.get("app_dir"):
components.append(WorkflowKSComponent(
i["name"], os.path.join(root_dir, i["app_dir"]), i["component"],
i.get("job_types", []), i.get("include_dirs", []), i.get("params", {})))
if i.get("py_func"):
components.append(WorkflowPyComponent(
i["name"], i["py_func"], i.get("kw_args", {})))
components.append(WorkflowComponent(
i["name"], os.path.join(root_dir, i["app_dir"]), i["component"], i.get("job_types", []),
i.get("include_dirs", []), i.get("params", {})))
return components

def generate_env_from_head(args):
Expand Down Expand Up @@ -202,12 +175,43 @@ def run(args, file_handler): # pylint: disable=too-many-statements,too-many-bran
workflow_names = []
ui_urls = {}

for w in workflows: # pylint: disable=too-many-nested-blocks
for w in workflows:
# Create the name for the workflow
# We truncate sha numbers to prevent the workflow name from being too large.
# Workflow name should not be more than 63 characters because its used
# as a label on the pods.
workflow_name = os.getenv("JOB_NAME") + "-" + w.name
ks_cmd = ks_util.get_ksonnet_cmd(w.app_dir)

# Print ksonnet version
util.run([ks_cmd, "version"])

# Skip this workflow if it is scoped to a different job type.
if w.job_types and not job_type in w.job_types:
logging.info("Skipping workflow %s because job type %s is not one of "
"%s.", w.name, job_type, w.job_types)
continue

# If we are scoping this workflow to specific directories, check if any files
# modified match the specified regex patterns.
dir_modified = False
if w.include_dirs:
for f in changed_files:
for d in w.include_dirs:
if fnmatch.fnmatch(f, d):
dir_modified = True
logging.info("Triggering workflow %s because %s in dir %s is modified.",
w.name, f, d)
break
if dir_modified:
break

# Only consider modified files when the job is pre or post submit, and if
# the include_dirs stanza is defined.
if job_type != "periodic" and w.include_dirs and not dir_modified:
logging.info("Skipping workflow %s because no code modified in %s.",
w.name, w.include_dirs)
continue

if job_type == "presubmit":
workflow_name += "-{0}".format(os.getenv("PULL_NUMBER"))
Expand All @@ -224,111 +228,56 @@ def run(args, file_handler): # pylint: disable=too-many-statements,too-many-bran
# are submitting jobs manually for testing/debugging. Since the prow should
# vend unique build numbers for each job.
workflow_name += "-{0}".format(salt)
workflow_names.append(workflow_name)

# check if ks workflow and run
if hasattr(w, "app_dir"):
ks_cmd = ks_util.get_ksonnet_cmd(w.app_dir)
workflow_names.append(workflow_name)
# Create a new environment for this run
env = workflow_name

# Print ksonnet version
util.run([ks_cmd, "version"])
util.run([ks_cmd, "env", "add", env, "--namespace=" + get_namespace(args)],
cwd=w.app_dir)

# Skip this workflow if it is scoped to a different job type.
if w.job_types and not job_type in w.job_types:
logging.info("Skipping workflow %s because job type %s is not one of "
"%s.", w.name, job_type, w.job_types)
continue
util.run([ks_cmd, "param", "set", "--env=" + env, w.component,
"name", workflow_name],
cwd=w.app_dir)

# If we are scoping this workflow to specific directories, check if any files
# modified match the specified regex patterns.
dir_modified = False
if w.include_dirs:
for f in changed_files:
for d in w.include_dirs:
if fnmatch.fnmatch(f, d):
dir_modified = True
logging.info("Triggering workflow %s because %s in dir %s is modified.",
w.name, f, d)
break
if dir_modified:
break
# Set the prow environment variables.
prow_env = []

# Only consider modified files when the job is pre or post submit, and if
# the include_dirs stanza is defined.
if job_type != "periodic" and w.include_dirs and not dir_modified:
logging.info("Skipping workflow %s because no code modified in %s.",
w.name, w.include_dirs)
names = ["JOB_NAME", "JOB_TYPE", "BUILD_ID", "BUILD_NUMBER",
"PULL_BASE_SHA", "PULL_NUMBER", "PULL_PULL_SHA", "REPO_OWNER",
"REPO_NAME"]
names.sort()
for v in names:
if not os.getenv(v):
continue

# Create a new environment for this run
env = workflow_name

util.run([ks_cmd, "env", "add", env, "--namespace=" + get_namespace(args)],
cwd=w.app_dir)

util.run([ks_cmd, "param", "set", "--env=" + env, w.component,
"name", workflow_name],
cwd=w.app_dir)

# Set the prow environment variables.
prow_env = []

names = ["JOB_NAME", "JOB_TYPE", "BUILD_ID", "BUILD_NUMBER",
"PULL_BASE_SHA", "PULL_NUMBER", "PULL_PULL_SHA", "REPO_OWNER",
"REPO_NAME"]
names.sort()
for v in names:
if not os.getenv(v):
continue
prow_env.append("{0}={1}".format(v, os.getenv(v)))

util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "prow_env",
",".join(prow_env)], cwd=w.app_dir)
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "namespace",
get_namespace(args)], cwd=w.app_dir)
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "bucket",
args.bucket], cwd=w.app_dir)
if args.release:
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "versionTag",
os.getenv("VERSION_TAG")], cwd=w.app_dir)

# Set any extra params. We do this in alphabetical order to make it easier to verify in
# the unittest.
param_names = w.params.keys()
param_names.sort()
for k in param_names:
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, k,
"{0}".format(w.params[k])], cwd=w.app_dir)

# For debugging print out the manifest
util.run([ks_cmd, "show", env, "-c", w.component], cwd=w.app_dir)
util.run([ks_cmd, "apply", env, "-c", w.component], cwd=w.app_dir)

ui_url = ("http://testing-argo.kubeflow.org/workflows/kubeflow-test-infra/{0}"
"?tab=workflow".format(workflow_name))
ui_urls[workflow_name] = ui_url
logging.info("URL for workflow: %s", ui_url)
else:
wf_result = py_func_import(w.py_func, w.args)
group, version = wf_result['apiVersion'].split('/')
config.load_kube_config()
k8s_co = k8s_client.CustomObjectsApi()
if "metadata" in wf_result:
if "generateName" in wf_result["metadata"]:
wf_result["metadata"].pop("generateName")
wf_result["metadata"]["name"] = workflow_name
py_func_result = k8s_co.create_namespaced_custom_object(
group=group,
version=version,
namespace=get_namespace(args),
plural='workflows',
body=wf_result)
logging.info("py_func_result: %s", py_func_result)

ui_url = ("http://testing-argo.kubeflow.org/workflows/kubeflow-test-infra/{0}"
prow_env.append("{0}={1}".format(v, os.getenv(v)))

util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "prow_env",
",".join(prow_env)], cwd=w.app_dir)
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "namespace",
get_namespace(args)], cwd=w.app_dir)
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "bucket",
args.bucket], cwd=w.app_dir)
if args.release:
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "versionTag",
os.getenv("VERSION_TAG")], cwd=w.app_dir)

# Set any extra params. We do this in alphabetical order to make it easier to verify in
# the unittest.
param_names = w.params.keys()
param_names.sort()
for k in param_names:
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, k,
"{0}".format(w.params[k])], cwd=w.app_dir)

# For debugging print out the manifest
util.run([ks_cmd, "show", env, "-c", w.component], cwd=w.app_dir)
util.run([ks_cmd, "apply", env, "-c", w.component], cwd=w.app_dir)

ui_url = ("http://testing-argo.kubeflow.org/workflows/kubeflow-test-infra/{0}"
"?tab=workflow".format(workflow_name))
ui_urls[workflow_name] = ui_url
logging.info("URL for workflow: %s", ui_url)
ui_urls[workflow_name] = ui_url
logging.info("URL for workflow: %s", ui_url)

# We delay creating started.json until we know the Argo workflow URLs
create_started_file(args.bucket, ui_urls)
Expand Down

0 comments on commit 65b445e

Please sign in to comment.