Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backend] Pod Labels only respected when python function name starts with comp. #11105

Closed
boarder7395 opened this issue Aug 16, 2024 · 1 comment

Comments

@boarder7395
Copy link
Contributor

Environment

Steps to reproduce

This is a weird one that had me scratching my head for a long time. Two identical pipelines where the only change is the name of the function of the first component results in the labels/annotations not getting added to the pod. I tested with several function name variants.

Working Names:

  • comp
  • component
  • component1

Names that failed:

  • scomp
  • scomponent
  • s3_writer
  • dataset_writer

The following two pipelines can be used for replication. The first shows the labels are successfully added, the second shows when the pod is created without the labels.

Pipeline where labels successfully get added to the pod.

from kfp import dsl, kubernetes
from kfp.client import Client


@dsl.component()
def comp(dataset: dsl.Output[dsl.Dataset]):
    with open(dataset.path, 'w') as fh:
        fh.write("hello world.")


@dsl.component()
def comp2(dataset: dsl.Input[dsl.Dataset]):
    with open(dataset.path, 'r') as fh:
        print(fh.read())


@dsl.pipeline
def my_pipeline():
    task = comp()
    task2 = comp2(dataset=task.output)
    kubernetes.add_pod_label(
        task,
        label_key='kubeflow.com/kfp',
        label_value='pipeline-node',
    )
    kubernetes.add_pod_annotation(
        task,
        annotation_key='run_id',
        annotation_value='123456',
    )

Pipeline where the labels fail to get added to the pod. (only difference is function name comp -> scomp).

from kfp import dsl, kubernetes
from kfp.client import Client


@dsl.component()
def scomp(dataset: dsl.Output[dsl.Dataset]):
    with open(dataset.path, 'w') as fh:
        fh.write("hello world.")


@dsl.component()
def comp2(dataset: dsl.Input[dsl.Dataset]):
    with open(dataset.path, 'r') as fh:
        print(fh.read())


@dsl.pipeline
def my_pipeline():
    task = scomp()
    task2 = comp2(dataset=task.output)
    kubernetes.add_pod_label(
        task,
        label_key='kubeflow.com/kfp',
        label_value='pipeline-node',
    )
    kubernetes.add_pod_annotation(
        task,
        annotation_key='run_id',
        annotation_value='123456',
    )

Expected result

Labels should be added regardless of component function name.

Materials and Reference

Note the following pipelines use a custom image repository which has a python 3.11 image with kfp sdk 2.8.0 installed. This is due to organization policies for our environment. The sample above uses the default 3.7 image and should exhibit the same behavior.

Pipeline 1 Spec

pipeline_spec:
  components:
    comp-comp:
      executorLabel: exec-comp
      outputDefinitions:
        artifacts:
          dataset:
            artifactType:
              schemaTitle: system.Dataset
              schemaVersion: 0.0.1
    comp-comp2:
      executorLabel: exec-comp2
      inputDefinitions:
        artifacts:
          dataset:
            artifactType:
              schemaTitle: system.Dataset
              schemaVersion: 0.0.1
  deploymentSpec:
    executors:
      exec-comp:
        container:
          args:
            - '--executor_input'
            - '{{$}}'
            - '--function_to_execute'
            - comp
          command:
            - sh
            - '-ec'
            - >
              program_path=$(mktemp -d)


              printf "%s" "$0" > "$program_path/ephemeral_component.py"

              _KFP_RUNTIME=true python3 -m
              kfp.dsl.executor_main                        
              --component_module_path                        
              "$program_path/ephemeral_component.py"                        
              "$@"
            - |+

              import kfp
              from kfp import dsl
              from kfp.dsl import *
              from typing import *

              def comp(dataset: dsl.Output[dsl.Dataset]):
                  with open(dataset.path, 'w') as fh:
                      fh.write("hello world.")

          image: >-
            docker.artifactory.aws.athenahealth.com/athenahealth/tkalbach/kfp/add:0.0.5
      exec-comp2:
        container:
          args:
            - '--executor_input'
            - '{{$}}'
            - '--function_to_execute'
            - comp2
          command:
            - sh
            - '-ec'
            - >
              program_path=$(mktemp -d)


              printf "%s" "$0" > "$program_path/ephemeral_component.py"

              _KFP_RUNTIME=true python3 -m
              kfp.dsl.executor_main                        
              --component_module_path                        
              "$program_path/ephemeral_component.py"                        
              "$@"
            - |+

              import kfp
              from kfp import dsl
              from kfp.dsl import *
              from typing import *

              def comp2(dataset: dsl.Input[dsl.Dataset]):
                  with open(dataset.path, 'r') as fh:
                      print(fh.read())

          image: >-
            docker.artifactory.aws.athenahealth.com/athenahealth/tkalbach/kfp/add:0.0.5
  pipelineInfo:
    name: my-pipeline
  root:
    dag:
      tasks:
        comp:
          cachingOptions: {}
          componentRef:
            name: comp-comp
          taskInfo:
            name: comp
        comp2:
          cachingOptions: {}
          componentRef:
            name: comp-comp2
          dependentTasks:
            - comp
          inputs:
            artifacts:
              dataset:
                taskOutputArtifact:
                  outputArtifactKey: dataset
                  producerTask: comp
          taskInfo:
            name: comp2
  schemaVersion: 2.1.0
  sdkVersion: kfp-2.8.0
platform_spec:
  platforms:
    kubernetes:
      deploymentSpec:
        executors:
          exec-comp:
            podMetadata:
              annotations:
                run_id: '123456'
              labels:
                kubeflow.com/kfp: pipeline-node

Pipeline 2 Spec

pipeline_spec:
  components:
    comp-comp2:
      executorLabel: exec-comp2
      inputDefinitions:
        artifacts:
          dataset:
            artifactType:
              schemaTitle: system.Dataset
              schemaVersion: 0.0.1
    comp-scomp:
      executorLabel: exec-scomp
      outputDefinitions:
        artifacts:
          dataset:
            artifactType:
              schemaTitle: system.Dataset
              schemaVersion: 0.0.1
  deploymentSpec:
    executors:
      exec-comp2:
        container:
          args:
            - '--executor_input'
            - '{{$}}'
            - '--function_to_execute'
            - comp2
          command:
            - sh
            - '-ec'
            - >
              program_path=$(mktemp -d)


              printf "%s" "$0" > "$program_path/ephemeral_component.py"

              _KFP_RUNTIME=true python3 -m
              kfp.dsl.executor_main                        
              --component_module_path                        
              "$program_path/ephemeral_component.py"                        
              "$@"
            - |+

              import kfp
              from kfp import dsl
              from kfp.dsl import *
              from typing import *

              def comp2(dataset: dsl.Input[dsl.Dataset]):
                  with open(dataset.path, 'r') as fh:
                      print(fh.read())

          image: >-
            docker.artifactory.aws.athenahealth.com/athenahealth/tkalbach/kfp/add:0.0.5
      exec-scomp:
        container:
          args:
            - '--executor_input'
            - '{{$}}'
            - '--function_to_execute'
            - scomp
          command:
            - sh
            - '-ec'
            - >
              program_path=$(mktemp -d)


              printf "%s" "$0" > "$program_path/ephemeral_component.py"

              _KFP_RUNTIME=true python3 -m
              kfp.dsl.executor_main                        
              --component_module_path                        
              "$program_path/ephemeral_component.py"                        
              "$@"
            - |+

              import kfp
              from kfp import dsl
              from kfp.dsl import *
              from typing import *

              def scomp(dataset: dsl.Output[dsl.Dataset]):
                  with open(dataset.path, 'w') as fh:
                      fh.write("hello world.")

          image: >-
            docker.artifactory.aws.athenahealth.com/athenahealth/tkalbach/kfp/add:0.0.5
  pipelineInfo:
    name: my-pipeline
  root:
    dag:
      tasks:
        comp2:
          cachingOptions: {}
          componentRef:
            name: comp-comp2
          dependentTasks:
            - scomp
          inputs:
            artifacts:
              dataset:
                taskOutputArtifact:
                  outputArtifactKey: dataset
                  producerTask: scomp
          taskInfo:
            name: comp2
        scomp:
          cachingOptions: {}
          componentRef:
            name: comp-scomp
          taskInfo:
            name: scomp
  schemaVersion: 2.1.0
  sdkVersion: kfp-2.8.0
platform_spec:
  platforms:
    kubernetes:
      deploymentSpec:
        executors:
          exec-scomp:
            podMetadata:
              annotations:
                run_id: '123456'
              labels:
                kubeflow.com/kfp: pipeline-node

Pod from pipeline 1 truncated to relevant information:

apiVersion: v1
kind: Pod
metadata:
  annotations:
    karpenter.sh/do-not-disrupt: "true"
    kubectl.kubernetes.io/default-container: main
    pipelines.kubeflow.org/v2_component: "true"
    poddefault.admission.kubeflow.org/poddefault-pipeline-pod-do-not-disrupt: "76694"
    run_id: "123456"
    sidecar.istio.io/inject: "false"
    workflows.argoproj.io/node-id: my-pipeline-tf8qm-1373016741
    workflows.argoproj.io/node-name: my-pipeline-tf8qm.root.comp.executor
    workflows.argoproj.io/outputs: '{"artifacts":[{"name":"main-logs","s3":{"key":"dev/my-pipeline-tf8qm/my-pipeline-tf8qm-system-container-impl-1373016741/main.log"}}]}'
  creationTimestamp: "2024-08-16T02:49:53Z"
  labels:
    kubeflow.com/kfp: pipeline-node
    pipeline/runid: 8bc23f35-8386-47df-89b6-fe04540a5baf
    pipelines.kubeflow.org/v2_component: "true"
    workflows.argoproj.io/completed: "true"
    workflows.argoproj.io/workflow: my-pipeline-tf8qm

Pod from pipeline 2 truncated to relevant information:

apiVersion: v1
kind: Pod
metadata:
  annotations:
    karpenter.sh/do-not-disrupt: "true"
    kubectl.kubernetes.io/default-container: main
    pipelines.kubeflow.org/v2_component: "true"
    poddefault.admission.kubeflow.org/poddefault-pipeline-pod-do-not-disrupt: "76694"
    sidecar.istio.io/inject: "false"
    workflows.argoproj.io/node-id: my-pipeline-s7rjw-1345624291
    workflows.argoproj.io/node-name: my-pipeline-s7rjw.root.scomp.executor
    workflows.argoproj.io/outputs: '{"artifacts":[{"name":"main-logs","s3":{"key":"dev/my-pipeline-s7rjw/my-pipeline-s7rjw-system-container-impl-1345624291/main.log"}}]}'
  creationTimestamp: "2024-08-16T02:50:59Z"
  labels:
    pipeline/runid: f7681e38-0993-4fb2-b96c-4d9afd9ef433
    pipelines.kubeflow.org/v2_component: "true"
    workflows.argoproj.io/completed: "true"
    workflows.argoproj.io/workflow: my-pipeline-s7rjw

Impacted by this bug? Give it a 👍.

@boarder7395
Copy link
Contributor Author

Closing in favor of #11077

The author caught that it was alphabetical and that lines up with my testing as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant