From 241dc8d805ff70080cffe81e0d4754ec72ea78f1 Mon Sep 17 00:00:00 2001 From: "carter.fendley" Date: Mon, 7 Oct 2024 19:59:16 -0400 Subject: [PATCH 1/5] Do not invoke get image methods twice. Signed-off-by: carter.fendley --- backend/src/v2/compiler/argocompiler/container.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/src/v2/compiler/argocompiler/container.go b/backend/src/v2/compiler/argocompiler/container.go index 989dfffb8c2..119cebfa585 100644 --- a/backend/src/v2/compiler/argocompiler/container.go +++ b/backend/src/v2/compiler/argocompiler/container.go @@ -150,7 +150,7 @@ func (c *workflowCompiler) addContainerDriverTemplate() string { }, }, Container: &k8score.Container{ - Image: GetDriverImage(), + Image: c.driverImage, Command: []string{"driver"}, Args: []string{ "--type", "CONTAINER", @@ -303,7 +303,7 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string { InitContainers: []wfapi.UserContainer{{ Container: k8score.Container{ Name: "kfp-launcher", - Image: GetLauncherImage(), + Image: c.launcherImage, Command: []string{"launcher-v2", "--copy", component.KFPLauncherPath}, VolumeMounts: []k8score.VolumeMount{ { From 0f26851a1861882e41fb964952e20c9cb16f5638 Mon Sep 17 00:00:00 2001 From: "carter.fendley" Date: Tue, 8 Oct 2024 13:38:32 -0400 Subject: [PATCH 2/5] Add configurable driver / launcher log level Signed-off-by: carter.fendley --- backend/src/v2/cmd/driver/main.go | 5 ++ backend/src/v2/cmd/launcher-v2/main.go | 4 ++ .../src/v2/compiler/argocompiler/container.go | 50 ++++++++++++------- backend/src/v2/compiler/argocompiler/dag.go | 37 ++++++++------ .../src/v2/compiler/argocompiler/importer.go | 8 ++- 5 files changed, 70 insertions(+), 34 deletions(-) diff --git a/backend/src/v2/cmd/driver/main.go b/backend/src/v2/cmd/driver/main.go index 793ccfe1b80..a796b5aaba2 100644 --- a/backend/src/v2/cmd/driver/main.go +++ b/backend/src/v2/cmd/driver/main.go @@ -68,12 +68,17 @@ var ( // the value stored in the paths will be either 'true' or 'false' cachedDecisionPath = flag.String("cached_decision_path", "", "Cached Decision output path") conditionPath = flag.String("condition_path", "", "Condition output path") + logLevel = flag.String("log_level", "1", "The verbosity level to log.") ) // func RootDAG(pipelineName string, runID string, component *pipelinespec.ComponentSpec, task *pipelinespec.PipelineTaskSpec, mlmd *metadata.Client) (*Execution, error) { func main() { flag.Parse() + + glog.Infof("Setting log level to: '%s'", *logLevel) + flag.Set("v", *logLevel) + err := drive() if err != nil { glog.Exitf("%v", err) diff --git a/backend/src/v2/cmd/launcher-v2/main.go b/backend/src/v2/cmd/launcher-v2/main.go index 8fb4e8d7625..b8ced5a3f7e 100644 --- a/backend/src/v2/cmd/launcher-v2/main.go +++ b/backend/src/v2/cmd/launcher-v2/main.go @@ -41,6 +41,7 @@ var ( podUID = flag.String("pod_uid", "", "Kubernetes Pod UID.") mlmdServerAddress = flag.String("mlmd_server_address", "", "The MLMD gRPC server address.") mlmdServerPort = flag.String("mlmd_server_port", "8080", "The MLMD gRPC server port.") + logLevel = flag.String("log_level", "1", "The verbosity level to log.") ) func main() { @@ -54,6 +55,9 @@ func run() error { flag.Parse() ctx := context.Background() + glog.Infof("Setting log level to: '%s'", *logLevel) + flag.Set("v", *logLevel) + if *copy != "" { // copy is used to copy this binary to a shared volume // this is a special command, ignore all other flags by returning diff --git a/backend/src/v2/compiler/argocompiler/container.go b/backend/src/v2/compiler/argocompiler/container.go index 119cebfa585..a21303db94a 100644 --- a/backend/src/v2/compiler/argocompiler/container.go +++ b/backend/src/v2/compiler/argocompiler/container.go @@ -34,6 +34,8 @@ const ( LauncherImageEnvVar = "V2_LAUNCHER_IMAGE" DefaultDriverImage = "gcr.io/ml-pipeline/kfp-driver@sha256:dc8b56a2eb071f30409828a8884d621092e68385af11a6c06aa9e9fbcfbb19de" DriverImageEnvVar = "V2_DRIVER_IMAGE" + DriverLogLevelEnvVar = "DRIVER_LOG_LEVEL" + LauncherLogLevelEnvVar = "LAUNCHER_LOG_LEVEL" gcsScratchLocation = "/gcs" gcsScratchName = "gcs-scratch" s3ScratchLocation = "/s3" @@ -130,6 +132,25 @@ func (c *workflowCompiler) addContainerDriverTemplate() string { if ok { return name } + + args := []string{ + "--type", "CONTAINER", + "--pipeline_name", c.spec.GetPipelineInfo().GetName(), + "--run_id", runID(), + "--dag_execution_id", inputValue(paramParentDagID), + "--component", inputValue(paramComponent), + "--task", inputValue(paramTask), + "--container", inputValue(paramContainer), + "--iteration_index", inputValue(paramIterationIndex), + "--cached_decision_path", outputPath(paramCachedDecision), + "--pod_spec_patch_path", outputPath(paramPodSpecPatch), + "--condition_path", outputPath(paramCondition), + "--kubernetes_config", inputValue(paramKubernetesConfig), + } + if value, ok := os.LookupEnv(DriverLogLevelEnvVar); ok { + args = append(args, "--log_level", value) + } + t := &wfapi.Template{ Name: name, Inputs: wfapi.Inputs{ @@ -150,22 +171,9 @@ func (c *workflowCompiler) addContainerDriverTemplate() string { }, }, Container: &k8score.Container{ - Image: c.driverImage, - Command: []string{"driver"}, - Args: []string{ - "--type", "CONTAINER", - "--pipeline_name", c.spec.GetPipelineInfo().GetName(), - "--run_id", runID(), - "--dag_execution_id", inputValue(paramParentDagID), - "--component", inputValue(paramComponent), - "--task", inputValue(paramTask), - "--container", inputValue(paramContainer), - "--iteration_index", inputValue(paramIterationIndex), - "--cached_decision_path", outputPath(paramCachedDecision), - "--pod_spec_patch_path", outputPath(paramPodSpecPatch), - "--condition_path", outputPath(paramCondition), - "--kubernetes_config", inputValue(paramKubernetesConfig), - }, + Image: c.driverImage, + Command: []string{"driver"}, + Args: args, Resources: driverResources, }, } @@ -245,6 +253,13 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string { }, } c.templates[nameContainerExecutor] = container + + args := []string{ + "--copy", component.KFPLauncherPath, + } + if value, ok := os.LookupEnv(LauncherLogLevelEnvVar); ok { + args = append(args, "--log_level", value) + } executor := &wfapi.Template{ Name: nameContainerImpl, Inputs: wfapi.Inputs{ @@ -304,7 +319,8 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string { Container: k8score.Container{ Name: "kfp-launcher", Image: c.launcherImage, - Command: []string{"launcher-v2", "--copy", component.KFPLauncherPath}, + Command: []string{"launcher-v2"}, + Args: args, VolumeMounts: []k8score.VolumeMount{ { Name: volumeNameKFPLauncher, diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go index 719a166a9a3..3087b6e8fd4 100644 --- a/backend/src/v2/compiler/argocompiler/dag.go +++ b/backend/src/v2/compiler/argocompiler/dag.go @@ -15,6 +15,7 @@ package argocompiler import ( "fmt" + "os" "sort" "strings" @@ -406,6 +407,24 @@ func (c *workflowCompiler) addDAGDriverTemplate() string { if ok { return name } + + args := []string{ + "--type", inputValue(paramDriverType), + "--pipeline_name", c.spec.GetPipelineInfo().GetName(), + "--run_id", runID(), + "--dag_execution_id", inputValue(paramParentDagID), + "--component", inputValue(paramComponent), + "--task", inputValue(paramTask), + "--runtime_config", inputValue(paramRuntimeConfig), + "--iteration_index", inputValue(paramIterationIndex), + "--execution_id_path", outputPath(paramExecutionID), + "--iteration_count_path", outputPath(paramIterationCount), + "--condition_path", outputPath(paramCondition), + } + if value, ok := os.LookupEnv(DriverLogLevelEnvVar); ok { + args = append(args, "--log_level", value) + } + t := &wfapi.Template{ Name: name, Inputs: wfapi.Inputs{ @@ -426,21 +445,9 @@ func (c *workflowCompiler) addDAGDriverTemplate() string { }, }, Container: &k8score.Container{ - Image: c.driverImage, - Command: []string{"driver"}, - Args: []string{ - "--type", inputValue(paramDriverType), - "--pipeline_name", c.spec.GetPipelineInfo().GetName(), - "--run_id", runID(), - "--dag_execution_id", inputValue(paramParentDagID), - "--component", inputValue(paramComponent), - "--task", inputValue(paramTask), - "--runtime_config", inputValue(paramRuntimeConfig), - "--iteration_index", inputValue(paramIterationIndex), - "--execution_id_path", outputPath(paramExecutionID), - "--iteration_count_path", outputPath(paramIterationCount), - "--condition_path", outputPath(paramCondition), - }, + Image: c.driverImage, + Command: []string{"driver"}, + Args: args, Resources: driverResources, }, } diff --git a/backend/src/v2/compiler/argocompiler/importer.go b/backend/src/v2/compiler/argocompiler/importer.go index 83ac6453b64..5302d68e9f1 100644 --- a/backend/src/v2/compiler/argocompiler/importer.go +++ b/backend/src/v2/compiler/argocompiler/importer.go @@ -16,6 +16,7 @@ package argocompiler import ( "fmt" + "os" wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" @@ -64,7 +65,7 @@ func (c *workflowCompiler) addImporterTemplate() string { if _, alreadyExists := c.templates[name]; alreadyExists { return name } - launcherArgs := []string{ + args := []string{ "--executor_type", "importer", "--task_spec", inputValue(paramTask), "--component_spec", inputValue(paramComponent), @@ -81,6 +82,9 @@ func (c *workflowCompiler) addImporterTemplate() string { "--mlmd_server_port", fmt.Sprintf("$(%s)", component.EnvMetadataPort), } + if value, ok := os.LookupEnv(LauncherLogLevelEnvVar); ok { + args = append(args, "--log_level", value) + } importerTemplate := &wfapi.Template{ Name: name, Inputs: wfapi.Inputs{ @@ -94,7 +98,7 @@ func (c *workflowCompiler) addImporterTemplate() string { Container: &k8score.Container{ Image: c.launcherImage, Command: []string{"launcher-v2"}, - Args: launcherArgs, + Args: args, EnvFrom: []k8score.EnvFromSource{metadataEnvFrom}, Env: commonEnvs, Resources: driverResources, From 62d4b5683e86dc6bb076ca9a4e89f267babee5ff Mon Sep 17 00:00:00 2001 From: "carter.fendley" Date: Tue, 8 Oct 2024 23:00:05 -0400 Subject: [PATCH 3/5] Update argocompiler golden files Signed-off-by: carter.fendley --- .../testdata/create_mount_delete_dynamic_pvc.yaml | 5 +++-- .../compiler/argocompiler/testdata/create_pod_metadata.yaml | 5 +++-- .../src/v2/compiler/argocompiler/testdata/hello_world.yaml | 5 +++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml b/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml index 25b078fa55d..2a9cc503d3d 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/create_mount_delete_dynamic_pvc.yaml @@ -155,10 +155,11 @@ spec: - mountPath: /.config name: dot-config-scratch initContainers: - - command: - - launcher-v2 + - args: - --copy - /kfp-launcher/launch + command: + - launcher-v2 image: gcr.io/ml-pipeline/kfp-launcher name: kfp-launcher resources: diff --git a/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml b/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml index 7736012a09e..dead21a837e 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/create_pod_metadata.yaml @@ -143,10 +143,11 @@ spec: - mountPath: /.config name: dot-config-scratch initContainers: - - command: - - launcher-v2 + - args: - --copy - /kfp-launcher/launch + command: + - launcher-v2 image: gcr.io/ml-pipeline/kfp-launcher name: kfp-launcher resources: diff --git a/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml b/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml index af2e8eebd21..58114781a38 100644 --- a/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml +++ b/backend/src/v2/compiler/argocompiler/testdata/hello_world.yaml @@ -141,10 +141,11 @@ spec: - mountPath: /.config name: dot-config-scratch initContainers: - - command: - - launcher-v2 + - args: - --copy - /kfp-launcher/launch + command: + - launcher-v2 image: gcr.io/ml-pipeline/kfp-launcher name: kfp-launcher resources: From 5a94260354574c0303ca02bab87aa53fa0059616 Mon Sep 17 00:00:00 2001 From: "carter.fendley" Date: Tue, 15 Oct 2024 17:36:35 -0400 Subject: [PATCH 4/5] Add environment variable configs to compiler tests Signed-off-by: carter.fendley --- .../src/v2/compiler/argocompiler/argo_test.go | 24 ++ .../testdata/with_logging/hello_world.yaml | 308 ++++++++++++++++++ .../testdata/with_logging/importer.yaml | 194 +++++++++++ 3 files changed, 526 insertions(+) create mode 100644 backend/src/v2/compiler/argocompiler/testdata/with_logging/hello_world.yaml create mode 100644 backend/src/v2/compiler/argocompiler/testdata/with_logging/importer.yaml diff --git a/backend/src/v2/compiler/argocompiler/argo_test.go b/backend/src/v2/compiler/argocompiler/argo_test.go index 6c92e54574c..071aa8c2678 100644 --- a/backend/src/v2/compiler/argocompiler/argo_test.go +++ b/backend/src/v2/compiler/argocompiler/argo_test.go @@ -18,6 +18,7 @@ import ( "flag" "fmt" "io/ioutil" + "os" "strings" "testing" @@ -36,6 +37,7 @@ func Test_argo_compiler(t *testing.T) { jobPath string // path of input PipelineJob to compile platformSpecPath string // path of possible input PlatformSpec to compile argoYAMLPath string // path of expected output argo workflow YAML + envVars []string }{ { jobPath: "../testdata/hello_world.json", @@ -57,10 +59,32 @@ func Test_argo_compiler(t *testing.T) { platformSpecPath: "../testdata/create_pod_metadata.json", argoYAMLPath: "testdata/create_pod_metadata.yaml", }, + // With envOptions + { + jobPath: "../testdata/hello_world.json", + platformSpecPath: "", + argoYAMLPath: "testdata/with_logging/hello_world.yaml", + envVars: []string{"DRIVER_LOG_LEVEL=5", "LAUNCHER_LOG_LEVEL=5"}, + }, + { + jobPath: "../testdata/importer.json", + platformSpecPath: "", + argoYAMLPath: "testdata/with_logging/importer.yaml", + envVars: []string{"DRIVER_LOG_LEVEL=5", "LAUNCHER_LOG_LEVEL=5"}, + }, } for _, tt := range tests { t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) { job, platformSpec := load(t, tt.jobPath, tt.platformSpecPath) + if tt.envVars != nil { + for _, envVar := range tt.envVars { + parts := strings.Split(strings.ReplaceAll(envVar, " ", ""), "=") + os.Setenv(parts[0], parts[1]) + + // Unset after test cases has ended + defer os.Unsetenv(parts[0]) + } + } if *update { wf, err := argocompiler.Compile(job, platformSpec, nil) if err != nil { diff --git a/backend/src/v2/compiler/argocompiler/testdata/with_logging/hello_world.yaml b/backend/src/v2/compiler/argocompiler/testdata/with_logging/hello_world.yaml new file mode 100644 index 00000000000..aa0fee53931 --- /dev/null +++ b/backend/src/v2/compiler/argocompiler/testdata/with_logging/hello_world.yaml @@ -0,0 +1,308 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + creationTimestamp: null + generateName: hello-world- +spec: + arguments: + parameters: + - name: components-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7 + value: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + - name: implementations-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7 + value: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf + \"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def + hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser + = argparse.ArgumentParser(prog=''Hello world'', description='''')\n_parser.add_argument(\"--text\", + dest=\"text\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = hello_world(**_parsed_args)\n"],"image":"python:3.9"}' + - name: components-root + value: '{"dag":{"tasks":{"hello-world":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}}},"inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}' + entrypoint: entrypoint + podMetadata: + annotations: + pipelines.kubeflow.org/v2_component: "true" + labels: + pipelines.kubeflow.org/v2_component: "true" + serviceAccountName: pipeline-runner + templates: + - container: + args: + - --type + - CONTAINER + - --pipeline_name + - namespace/n1/pipeline/hello-world + - --run_id + - '{{workflow.uid}}' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --container + - '{{inputs.parameters.container}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --cached_decision_path + - '{{outputs.parameters.cached-decision.path}}' + - --pod_spec_patch_path + - '{{outputs.parameters.pod-spec-patch.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --kubernetes_config + - '{{inputs.parameters.kubernetes-config}}' + - --log_level + - "5" + command: + - driver + image: gcr.io/ml-pipeline/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - name: task + - name: container + - name: parent-dag-id + - default: "-1" + name: iteration-index + - default: "" + name: kubernetes-config + metadata: {} + name: system-container-driver + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + default: "" + path: /tmp/outputs/pod-spec-patch + - default: "false" + name: cached-decision + valueFrom: + default: "false" + path: /tmp/outputs/cached-decision + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: pod-spec-patch + value: '{{inputs.parameters.pod-spec-patch}}' + name: executor + template: system-container-impl + when: '{{inputs.parameters.cached-decision}} != true' + inputs: + parameters: + - name: pod-spec-patch + - default: "false" + name: cached-decision + metadata: {} + name: system-container-executor + outputs: {} + - container: + command: + - should-be-overridden-during-runtime + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: gcr.io/ml-pipeline/should-be-overridden-during-runtime + name: "" + resources: {} + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + - mountPath: /gcs + name: gcs-scratch + - mountPath: /s3 + name: s3-scratch + - mountPath: /minio + name: minio-scratch + - mountPath: /.local + name: dot-local-scratch + - mountPath: /.cache + name: dot-cache-scratch + - mountPath: /.config + name: dot-config-scratch + initContainers: + - args: + - --copy + - /kfp-launcher/launch + - --log_level + - "5" + command: + - launcher-v2 + image: gcr.io/ml-pipeline/kfp-launcher + name: kfp-launcher + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 100m + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + inputs: + parameters: + - name: pod-spec-patch + metadata: {} + name: system-container-impl + outputs: {} + podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' + volumes: + - emptyDir: {} + name: kfp-launcher + - emptyDir: {} + name: gcs-scratch + - emptyDir: {} + name: s3-scratch + - emptyDir: {} + name: minio-scratch + - emptyDir: {} + name: dot-local-scratch + - emptyDir: {} + name: dot-cache-scratch + - emptyDir: {} + name: dot-config-scratch + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}' + - name: container + value: '{{workflow.parameters.implementations-203fce8adabe0cfa7da54b9d3ff79c772136c926974659b51c378727c7ccdfb7}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: hello-world-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.hello-world-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.hello-world-driver.outputs.parameters.cached-decision}}' + depends: hello-world-driver.Succeeded + name: hello-world + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} + - container: + args: + - --type + - '{{inputs.parameters.driver-type}}' + - --pipeline_name + - namespace/n1/pipeline/hello-world + - --run_id + - '{{workflow.uid}}' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --runtime_config + - '{{inputs.parameters.runtime-config}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --execution_id_path + - '{{outputs.parameters.execution-id.path}}' + - --iteration_count_path + - '{{outputs.parameters.iteration-count.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --log_level + - "5" + command: + - driver + image: gcr.io/ml-pipeline/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - default: "" + name: runtime-config + - default: "" + name: task + - default: "0" + name: parent-dag-id + - default: "-1" + name: iteration-index + - default: DAG + name: driver-type + metadata: {} + name: system-dag-driver + outputs: + parameters: + - name: execution-id + valueFrom: + path: /tmp/outputs/execution-id + - name: iteration-count + valueFrom: + default: "0" + path: /tmp/outputs/iteration-count + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-root}}' + - name: runtime-config + value: '{"parameters":{"text":{"stringValue":"hi there"}}}' + - name: driver-type + value: ROOT_DAG + name: root-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.root-driver.outputs.parameters.execution-id}}' + - name: condition + value: "" + depends: root-driver.Succeeded + name: root + template: root + inputs: {} + metadata: {} + name: entrypoint + outputs: {} +status: + finishedAt: null + startedAt: null diff --git a/backend/src/v2/compiler/argocompiler/testdata/with_logging/importer.yaml b/backend/src/v2/compiler/argocompiler/testdata/with_logging/importer.yaml new file mode 100644 index 00000000000..577a2237b9c --- /dev/null +++ b/backend/src/v2/compiler/argocompiler/testdata/with_logging/importer.yaml @@ -0,0 +1,194 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + creationTimestamp: null + generateName: pipeline-with-importer- +spec: + arguments: + parameters: + - name: components-comp-importer + value: '{"executorLabel":"exec-importer","inputDefinitions":{"parameters":{"uri":{"type":"STRING"}}},"outputDefinitions":{"artifacts":{"artifact":{"artifactType":{"schemaTitle":"system.Dataset"}}}}}' + - name: implementations-comp-importer + value: '{"artifactUri":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}},"typeSchema":{"schemaTitle":"system.Dataset"}}' + - name: components-root + value: '{"dag":{"tasks":{"importer":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}}}}},"taskInfo":{"name":"importer"}}}},"inputDefinitions":{"parameters":{"dataset2":{"type":"STRING"}}}}' + entrypoint: entrypoint + podMetadata: + annotations: + pipelines.kubeflow.org/v2_component: "true" + labels: + pipelines.kubeflow.org/v2_component: "true" + serviceAccountName: pipeline-runner + templates: + - container: + args: + - --executor_type + - importer + - --task_spec + - '{{inputs.parameters.task}}' + - --component_spec + - '{{inputs.parameters.component}}' + - --importer_spec + - '{{inputs.parameters.importer}}' + - --pipeline_name + - pipeline-with-importer + - --run_id + - '{{workflow.uid}}' + - --parent_dag_id + - '{{inputs.parameters.parent-dag-id}}' + - --pod_name + - $(KFP_POD_NAME) + - --pod_uid + - $(KFP_POD_UID) + - --mlmd_server_address + - $(METADATA_GRPC_SERVICE_HOST) + - --mlmd_server_port + - $(METADATA_GRPC_SERVICE_PORT) + - --log_level + - "5" + command: + - launcher-v2 + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: gcr.io/ml-pipeline/kfp-launcher + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: task + - name: component + - name: importer + - name: parent-dag-id + metadata: {} + name: system-importer + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constantValue":{"stringValue":"gs://ml-pipeline-playground/shakespeare1.txt"}}}}},"taskInfo":{"name":"importer"}}' + - name: component + value: '{{workflow.parameters.components-comp-importer}}' + - name: importer + value: '{{workflow.parameters.implementations-comp-importer}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: importer + template: system-importer + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} + - container: + args: + - --type + - '{{inputs.parameters.driver-type}}' + - --pipeline_name + - pipeline-with-importer + - --run_id + - '{{workflow.uid}}' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --runtime_config + - '{{inputs.parameters.runtime-config}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --execution_id_path + - '{{outputs.parameters.execution-id.path}}' + - --iteration_count_path + - '{{outputs.parameters.iteration-count.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --log_level + - "5" + command: + - driver + image: gcr.io/ml-pipeline/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - default: "" + name: runtime-config + - default: "" + name: task + - default: "0" + name: parent-dag-id + - default: "-1" + name: iteration-index + - default: DAG + name: driver-type + metadata: {} + name: system-dag-driver + outputs: + parameters: + - name: execution-id + valueFrom: + path: /tmp/outputs/execution-id + - name: iteration-count + valueFrom: + default: "0" + path: /tmp/outputs/iteration-count + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-root}}' + - name: runtime-config + value: '{}' + - name: driver-type + value: ROOT_DAG + name: root-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.root-driver.outputs.parameters.execution-id}}' + - name: condition + value: "" + depends: root-driver.Succeeded + name: root + template: root + inputs: {} + metadata: {} + name: entrypoint + outputs: {} +status: + finishedAt: null + startedAt: null From 264d809cdc985ecf1acf4d2fd2818909875247f6 Mon Sep 17 00:00:00 2001 From: "carter.fendley" Date: Thu, 17 Oct 2024 13:23:11 -0400 Subject: [PATCH 5/5] Handle errors from flag setting and tests Signed-off-by: carter.fendley --- backend/src/v2/cmd/driver/main.go | 7 +++++-- backend/src/v2/cmd/launcher-v2/main.go | 5 ++++- backend/src/v2/compiler/argocompiler/argo_test.go | 12 ++++++++++-- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/backend/src/v2/cmd/driver/main.go b/backend/src/v2/cmd/driver/main.go index a796b5aaba2..8ea71626e2d 100644 --- a/backend/src/v2/cmd/driver/main.go +++ b/backend/src/v2/cmd/driver/main.go @@ -77,9 +77,12 @@ func main() { flag.Parse() glog.Infof("Setting log level to: '%s'", *logLevel) - flag.Set("v", *logLevel) + err := flag.Set("v", *logLevel) + if err != nil { + glog.Warningf("Failed to set log level: %s", err.Error()) + } - err := drive() + err = drive() if err != nil { glog.Exitf("%v", err) } diff --git a/backend/src/v2/cmd/launcher-v2/main.go b/backend/src/v2/cmd/launcher-v2/main.go index b8ced5a3f7e..a77111cd60e 100644 --- a/backend/src/v2/cmd/launcher-v2/main.go +++ b/backend/src/v2/cmd/launcher-v2/main.go @@ -56,7 +56,10 @@ func run() error { ctx := context.Background() glog.Infof("Setting log level to: '%s'", *logLevel) - flag.Set("v", *logLevel) + err := flag.Set("v", *logLevel) + if err != nil { + glog.Warningf("Failed to set log level: %s", err.Error()) + } if *copy != "" { // copy is used to copy this binary to a shared volume diff --git a/backend/src/v2/compiler/argocompiler/argo_test.go b/backend/src/v2/compiler/argocompiler/argo_test.go index 071aa8c2678..bdc73986bc3 100644 --- a/backend/src/v2/compiler/argocompiler/argo_test.go +++ b/backend/src/v2/compiler/argocompiler/argo_test.go @@ -79,10 +79,18 @@ func Test_argo_compiler(t *testing.T) { if tt.envVars != nil { for _, envVar := range tt.envVars { parts := strings.Split(strings.ReplaceAll(envVar, " ", ""), "=") - os.Setenv(parts[0], parts[1]) + err := os.Setenv(parts[0], parts[1]) + if err != nil { + t.Fatalf("Failed to set environment variable '%s' with error: %s", parts[0], err.Error()) + } // Unset after test cases has ended - defer os.Unsetenv(parts[0]) + defer func() { + err := os.Unsetenv(parts[0]) + if err != nil { + t.Fatalf("Failed to unset env variable '%s' with error: %s", parts[0], err.Error()) + } + }() } } if *update {