Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete pipeline pods after running #5234

Closed
andre-lx opened this issue Mar 3, 2021 · 12 comments
Closed

Delete pipeline pods after running #5234

andre-lx opened this issue Mar 3, 2021 · 12 comments

Comments

@andre-lx
Copy link

andre-lx commented Mar 3, 2021

Hi.

I was trying to delete the pods of the pipelines after running.

This will help a lot, for example in gpu pipeline functions. As an example:

def some_func():
    some_task = (
        kfp_generator().set_gpu_limit(1, 'nvidia')

In our enviorment, this pipeline step will start a gpu node in our pool, but, the only way of deleting this node is deleting the run.

In the argo project, I can see that this is possible using the podGC: https://github.com/Duske/argo/blob/bd4750fbb9413eea8ced0ca642664f54fb5b3c47/examples/pod-gc-strategy.yaml#L9-L15

Currently, there are any way of doing this throw the kfp pipeline in python?

Thanks

@Bobgy
Copy link
Contributor

Bobgy commented Mar 5, 2021

Hi @andre-lx, is #3938 the corresponding config?

@NikeNano
Copy link
Member

I think:

.set_ttl_seconds_after_finished(seconds: int)

is what you are looking for if you like to remove the underlying pipeline after if is finished for a specific pipeline, reference posted by Bobgy above will be for the all pipeline in the cluster. Docs can be found here

@andre-lx
Copy link
Author

andre-lx commented Mar 12, 2021

Hi.

I had already tested this, but I tried again with both gpu and non gpu pods, with this config:

@dsl.pipeline(
    name="test",
    description="test",
)

def some_func():
    some_task = kfp_generator().set_gpu_limit(1, 'nvidia')

    dsl.get_pipeline_conf().set_ttl_seconds_after_finished(20)

But nothing happen after the 20 seconds.

I am missing something?

Thanks @Bobgy and @NikeNano

@NikeNano
Copy link
Member

I tested the following pipeline:

import kfp
from kfp import dsl


def random_num_op(low, high):
    """Generate a random number between low and high."""
    return dsl.ContainerOp(
        name='Generate random number',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; print(random.randint($0, $1))" | tee $2', str(low), str(high), '/tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def flip_coin_op():
    """Flip a coin and output heads or tails randomly."""
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 '
                  'else \'tails\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )
    

@dsl.pipeline(
    name='Conditional execution pipeline',
    description='Shows how to use dsl.Condition().'
)
def flipcoin_pipeline():
    flip = flip_coin_op()
    with dsl.Condition(flip.output == 'heads'):
        random_num_head = random_num_op(0, 9)
        with dsl.Condition(random_num_head.output > 5):
            print_op('heads and %s > 5!' % random_num_head.output)
        with dsl.Condition(random_num_head.output <= 5):
            print_op('heads and %s <= 5!' % random_num_head.output)

    with dsl.Condition(flip.output == 'tails'):
        random_num_tail = random_num_op(10, 19)
        with dsl.Condition(random_num_tail.output > 15):
            print_op('tails and %s > 15!' % random_num_tail.output)
        with dsl.Condition(random_num_tail.output <= 15):
            print_op('tails and %s <= 15!' % random_num_tail.output)
    dsl.get_pipeline_conf().set_ttl_seconds_after_finished(20)

Watching the kubeflow namespace for the pods:

watch kubectl get pods -n kubeflow

I could confirm that the pods where deleted ~20 seconds from completion. Also checked removing the line:


dsl.get_pipeline_conf().set_ttl_seconds_after_finished(20)

which resulted in that the pods where not deleted.

Could you @andre-lx share your version of kubeflow pipelines and a full example where it is not working for you so I can try to reproduce it?

@andre-lx
Copy link
Author

Hi @NikeNano. Thanks for the full example. I also tried with that example, and it didn't work. The only thing I added is the part of compiling and running the pipeline.

pipeline_func = flipcoin_pipeline
pipeline_filename = pipeline_func.__name__ + ".pipeline.tar.gz"

import kfp.compiler as comp

comp.Compiler().compile(pipeline_func, pipeline_filename)

client = kfp.Client()
my_experiment = client.create_experiment(name='demo')
my_run = client.run_pipeline(my_experiment.id, pipeline_func.__name__, 
  pipeline_filename)

After that, the pipeline run successful:

image

But the pods didn't get deleted:

kubectl get pods -n xx
NAME                                               READY   STATUS      RESTARTS   AGE
conditional-execution-pipeline-bh8ln-1858453776    0/2     Completed   0          2m48s
conditional-execution-pipeline-bh8ln-2630484688    0/2     Completed   0          2m44s
conditional-execution-pipeline-bh8ln-3054820269    0/2     Completed   0          2m51s

I can also see the "flag" in the workflow yaml:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  ...
spec:
  ...
  ttlSecondsAfterFinished: 20

I'm using the following versions:

pip freeze | grep kfp
kfp==1.4.0
kfp-pipeline-spec==0.1.6

And:

images:
- name: gcr.io/ml-pipeline/frontend
  newTag: 1.3.0
- name: gcr.io/ml-pipeline/api-server
  newTag: 1.3.0
- name: gcr.io/ml-pipeline/persistenceagent
  newTag: 1.3.0
- name: gcr.io/ml-pipeline/scheduledworkflow
  newTag: 1.3.0
- name: gcr.io/ml-pipeline/viewer-crd-controller
  newTag: 1.3.0
- name: gcr.io/ml-pipeline/visualization-server
  newTag: 1.3.0

@NikeNano
Copy link
Member

I can not recreate this issues @andre-lx, I am running Kubeflow pipelines 1.3 as well but get the pods get killed. In between different attempts it take some time(minutes extra when I try it out).

Complete example:

import kfp
from kfp import dsl


def random_num_op(low, high):
    """Generate a random number between low and high."""
    return dsl.ContainerOp(
        name='Generate random number',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; print(random.randint($0, $1))" | tee $2', str(low), str(high), '/tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def flip_coin_op():
    """Flip a coin and output heads or tails randomly."""
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 '
                  'else \'tails\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )
    

@dsl.pipeline(
    name='Conditional execution pipeline',
    description='Shows how to use dsl.Condition().'
)
def flipcoin_pipeline():
    flip = flip_coin_op()
    with dsl.Condition(flip.output == 'heads'):
        random_num_head = random_num_op(0, 9)
        with dsl.Condition(random_num_head.output > 5):
            print_op('heads and %s > 5!' % random_num_head.output)
        with dsl.Condition(random_num_head.output <= 5):
            print_op('heads and %s <= 5!' % random_num_head.output)

    with dsl.Condition(flip.output == 'tails'):
        random_num_tail = random_num_op(10, 19)
        with dsl.Condition(random_num_tail.output > 15):
            print_op('tails and %s > 15!' % random_num_tail.output)
        with dsl.Condition(random_num_tail.output <= 15):
            print_op('tails and %s <= 15!' % random_num_tail.output)
    dsl.get_pipeline_conf().set_ttl_seconds_after_finished(20)
pipeline_func = flipcoin_pipeline
pipeline_filename = pipeline_func.__name__ + ".pipeline.tar.gz"

import kfp.compiler as comp

comp.Compiler().compile(flipcoin_pipeline, pipeline_filename)

client = kfp.Client(host="http://localhost:8080/")
my_experiment = client.create_experiment(name='demo')
my_run = client.run_pipeline(my_experiment.id, pipeline_func.__name__, 
  pipeline_filename)

I run this on a minkube cluster, using the following set up with commit c484cfa, instructions can be found here; https://github.com/kubeflow/pipelines/tree/1.3.0/manifests/kustomize#option-1-install-it-to-any-k8s-cluster

@andre-lx
Copy link
Author

andre-lx commented Mar 20, 2021

Thanks for the quick response.

I'm doing a few debug around the workflow and the pipelines code. And I notice a few things, that can maybe help solving my issue.

I also downgrade the kfp sdk to version 1.3.0 to match all the resources:

pip freeze | grep kfp
kfp==1.3.0
kfp-pipeline-spec==0.1.7

Changing the default value in the persistenceagent deployment, it works well, the pods are deleted after the 20 seconds for example:

- name: TTL_SECONDS_AFTER_WORKFLOW_FINISH
value: "86400"

Then, I set again the env variable to the default of 86400.

I checked the logs, and I'm getting the following logs for the example you provided:

level=info msg="Skip syncing Workflow (conditional-execution-pipeline-mkr59): workflow marked as persisted."
level=info msg="Success while syncing resource (xxxx/conditional-execution-pipeline-mkr59)"

After checking the code, I saw that the logs are printed by the function:

if wf.PersistedFinalState() && time.Now().Unix()-wf.FinishedAt() < s.ttlSecondsAfterWorkflowFinish {
// Skip persisting the workflow if the workflow is finished
// and the workflow hasn't being passing the TTL
log.Infof("Skip syncing Workflow (%v): workflow marked as persisted.", name)
return nil
}

Analysing the workflow yaml, I can see that the label, the ttl and the finish at exists:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  annotations:
    pipelines.kubeflow.org/kfp_sdk_version: 1.3.0
    pipelines.kubeflow.org/pipeline_compilation_time: ...
    pipelines.kubeflow.org/pipeline_spec: '...'
  labels:
    pipeline/persistedFinalState: "true"
    pipeline/runid: e8d0c54a-57fe-44d2-a242-8880e170ac92
    pipelines.kubeflow.org/kfp_sdk_version: 1.3.0
    workflows.argoproj.io/completed: "true"
    workflows.argoproj.io/phase: Succeeded
  name: conditional-execution-pipeline-mkr59
  namespace: xxxx
spec:
  arguments: {}
  entrypoint: conditional-execution-pipeline
  serviceAccountName: default-editor
  templates:
  - dag:
    ...
  - dag:
  .
  .
  .
  - container:
    ...
  - container:
  .
  .
  .
  ttlSecondsAfterWorkflowFinished: 20
status:
  finishedAt: "2021-03-20T16:45:04Z"
  nodes:
    ...
  phase: Succeeded
  startedAt: "2021-03-20T16:44:54Z"

So, looking to the function, assuming for example that the run finished at 00h:10m and the time now, is 00h:12m, 12-10=2 minutes = 120 seconds, and 120 seconds are > than the 20 seconds defined for the ttl. So, why are this log being printed?

I looks like that the ttl is not being passed, or is overrided by the default value.

Additional notes:

I also made the same tests without the env variable set in the deployment.

I also try it out with resources version 1.4.1 and kfp 1.4.0:

pip freeze | grep kfp
kfp==1.4.0
kfp-pipeline-spec==0.1.7
Name:         conditional-execution-pipeline-xsbfw
Namespace:    xxxx
Labels:       pipeline/persistedFinalState=true
              pipeline/runid=1bfdfa01-31c2-4fde-8892-0c78193b7b3e
              pipelines.kubeflow.org/kfp_sdk_version=1.4.0
              workflows.argoproj.io/completed=true
              workflows.argoproj.io/phase=Succeeded
Annotations:  pipelines.kubeflow.org/kfp_sdk_version: 1.4.0
              pipelines.kubeflow.org/pipeline_compilation_time: ...
              pipelines.kubeflow.org/pipeline_spec: ...
              pipelines.kubeflow.org/run_name: flipcoin_pipeline
API Version:  argoproj.io/v1alpha1
Kind:         Workflow
Metadata:
  Creation Timestamp:  2021-03-20T18:50:26Z
  Generate Name:       conditional-execution-pipeline-
Spec:
  Arguments:
  Entrypoint:            conditional-execution-pipeline
  Service Account Name:  default-editor
  Templates:
    Dag:
    .
    .
    .
    Container:
    .
    .
    .
  Ttl Seconds After Finished:  20
Status:
  Finished At:  2021-03-20T18:50:34Z
  Nodes:
    ...
  Phase:              Succeeded
  Started At:         2021-03-20T18:50:26Z

No changes at all.

Thanks

@NikeNano
Copy link
Member

I will give it a look and come back. Is this something you know more about @Bobgy?

@NikeNano
Copy link
Member

Quick first notes, there are a two types of garbage collection at play here, the one argo does and the one tha persistence agent does. I am not sure though why it seems for you like the one related to the persistence agent TTL_SECONDS_AFTER_WORKFLOW_FINISH affects the one set for argo: ttlSecondsAfterWorkflowFinished in this case I think the shorter one should decide when the workflow gets deleted but might be missing something.

@Bobgy
Copy link
Contributor

Bobgy commented Mar 24, 2021

I'd suggest to use TTL_SECONDS_AFTER_WORKFLOW_FINISH, and avoid using the argo one, because TTL_SECONDS_AFTER_WORKFLOW_FINISH will only GC the workflow after persistence agent has backed it up in KFP mysql DB. If you use the argo parameter and the workflow was not backed up before it was GCed, the record would be lost forever.

level=info msg="Skip syncing Workflow (conditional-execution-pipeline-mkr59): workflow marked as persisted."
level=info msg="Success while syncing resource (xxxx/conditional-execution-pipeline-mkr59)"

The logs are expected, because persistence agent loops over these workflows periodically, it won't take any actions until the TTL has reached.

@Bobgy
Copy link
Contributor

Bobgy commented Mar 24, 2021

The conversation has been pretty long, are there any remaining questions not addressed?

@andre-lx
Copy link
Author

andre-lx commented Mar 24, 2021

Found it!

After checking the workflow-controller logs, I found the error:

E0324 14:29:49.195776       1 ttlcontroller.go:124] error deleting 'namespace_name/workflow_name': workflows.argoproj.io "workflow_name" is forbidden: User "system:serviceaccount:kubeflow:argo" cannot delete resource "workflows" in API group "argoproj.io" in the namespace "namespace_name"

https://github.com/kubeflow/manifests/blob/3e08dc102059def5a0b0d04560c7d119959bf506/contrib/argo/base/cluster-role.yaml#L36-L46

So, after add the missing verb, everything worked out with the set_ttl_seconds_after_finished config:

apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  labels:
    app: argo
  name: argo
rules:
- apiGroups:
...
- apiGroups:
  - argoproj.io
  resources:
  - workflows
  - workflows/finalizers
  verbs:
  - get
  - list
  - watch
  - update
  - patch
  - delete
- apiGroups:
...

Thanks to both!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants