Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle cached input in objstore #27

Merged
merged 1 commit into from
Mar 15, 2024

Conversation

HumairAK
Copy link

@HumairAK HumairAK commented Mar 13, 2024

Resolves:
https://issues.redhat.com/browse/RHOAIENG-4450
https://issues.redhat.com/browse/RHOAIENG-3816

Description

This problem occurs when an executor tries to pull a cached image generated from another run. This error happens because, previously of the way the logic distinguished between "defaultBucket" and "nonDefaultBucket". The way to distinguish was based on path prefixes, for instance if artifact is generated in run with id runid1 it's stored in s3://bucket/runid1/artifact.txt. If the run with id runid2 wants to use "artifact.txt", it will expect it in s3://bucket/runid2/artifact.txt, but looking at the metadata for artifact artifact.txt if will see that the prefix is different (i.e: s3://bucket/runid1), so it will ignore the default bucket, and regenerate the bucket config based on the new s3 path.

In our carried patch here, we changed the regeneration logic. Previously, when the bucket config was regenerated, it would either: (1) rely on the default minio, or (2) rely on the user to provide in their pipeline env the obj store config.

Since we now rely on kfp-launcher config for obj store data, we removed this regeneration logic. So in this pr when evaluating if the artifact is cached, we now check whether the expected bucket is the same with a different path (i.e. runid1 vs runid2, if so, we re-use our default bucket's config that we fetched earlier from the driver.

Testing instruction

Test RHOAIENG-4450

First reproduce the issue, instructions here.

Then deploy the new changeset, only the kfp-launcher has changed, so you could just update that image if you wish.

You should start with a fresh dspa, re-run the same pipeline twice, you should see the same error this time, instead of differing errors.

Also confirm other pipelines with data passing still work, to ensure no regression is introduced.

Test RHOAIENG-3816

  1. Deploy DSPA
  2. In the minio or s3 provided to dspa, put this file in this path: customfolder/shakespeare1.txt
  3. Now run this pipeline:
pipeline.yaml
# PIPELINE DEFINITION
# Name: pipeline-with-importer
components:
  comp-importer:
    executorLabel: exec-importer
    inputDefinitions:
      parameters:
        uri:
          parameterType: STRING
    outputDefinitions:
      artifacts:
        artifact:
          artifactType:
            schemaTitle: system.Dataset
            schemaVersion: 0.0.1
  comp-train:
    executorLabel: exec-train
    inputDefinitions:
      artifacts:
        dataset:
          artifactType:
            schemaTitle: system.Dataset
            schemaVersion: 0.0.1
    outputDefinitions:
      artifacts:
        model:
          artifactType:
            schemaTitle: system.Model
            schemaVersion: 0.0.1
      parameters:
        scalar:
          parameterType: STRING
deploymentSpec:
  executors:
    exec-importer:
      importer:
        artifactUri:
          constant: s3://mlpipeline/customfolder/shakespeare1.txt
        typeSchema:
          schemaTitle: system.Dataset
          schemaVersion: 0.0.1
    exec-train:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - train
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.3.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef train(\n    dataset: Input[Dataset]\n) -> NamedTuple('Outputs',\
          \ [\n    ('scalar', str),\n    ('model', Model),\n]):\n    \"\"\"Dummy Training\
          \ step.\"\"\"\n    with open(dataset.path, 'r') as f:\n        data = f.read()\n\
          \    print('Dataset:', data)\n\n    scalar = '123'\n    model = 'My model\
          \ trained using data: {}'.format(data)\n\n    from collections import namedtuple\n\
          \    output = namedtuple('Outputs', ['scalar', 'model'])\n    return output(scalar,\
          \ model)\n\n"
        image: python:3.7
pipelineInfo:
  name: pipeline-with-importer
root:
  dag:
    tasks:
      importer:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-importer
        inputs:
          parameters:
            uri:
              runtimeValue:
                constant: s3://mlpipeline/customfolder/shakespeare1.txt
        taskInfo:
          name: importer
      train:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-train
        dependentTasks:
        - importer
        inputs:
          artifacts:
            dataset:
              taskOutputArtifact:
                outputArtifactKey: artifact
                producerTask: importer
        taskInfo:
          name: train
schemaVersion: 2.1.0
sdkVersion: kfp-2.3.0

Confirm it runs successfully

This change resolves a bug where when a pipeline step needs to retrieve
an artifact input that is cached from a different run, by re-using the
default bucket's configuration.

Signed-off-by: Humair Khan <HumairAK@users.noreply.github.com>
@HumairAK HumairAK removed the request for review from hbelmiro March 13, 2024 20:35
@HumairAK HumairAK removed the approved label Mar 13, 2024
@dsp-developers
Copy link

A set of new images have been built to help with testing out this PR:
API Server: quay.io/opendatahub/ds-pipelines-api-server:pr-27
DSP DRIVER: quay.io/opendatahub/ds-pipelines-driver:pr-27
DSP LAUNCHER: quay.io/opendatahub/ds-pipelines-launcher:pr-27
Persistence Agent: quay.io/opendatahub/ds-pipelines-persistenceagent:pr-27
Scheduled Workflow Manager: quay.io/opendatahub/ds-pipelines-scheduledworkflow:pr-27
MLMD Server: quay.io/opendatahub/mlmd-grpc-server:latest
MLMD Envoy Proxy: registry.redhat.io/openshift-service-mesh/proxyv2-rhel8:2.3.9-2
UI: quay.io/opendatahub/ds-pipelines-frontend:pr-27

@dsp-developers
Copy link

An OCP cluster where you are logged in as cluster admin is required.

The Data Science Pipelines team recommends testing this using the Data Science Pipelines Operator. Check here for more information on using the DSPO.

To use and deploy a DSP stack with these images (assuming the DSPO is deployed), first save the following YAML to a file named dspa.pr-27.yaml:

apiVersion: datasciencepipelinesapplications.opendatahub.io/v1alpha1
kind: DataSciencePipelinesApplication
metadata:
  name: pr-27
spec:
  dspVersion: v2
  apiServer:
    image: "quay.io/opendatahub/ds-pipelines-api-server:pr-27"
    argoDriverImage: "quay.io/opendatahub/ds-pipelines-driver:pr-27"
    argoLauncherImage: "quay.io/opendatahub/ds-pipelines-launcher:pr-27"
  persistenceAgent:
    image: "quay.io/opendatahub/ds-pipelines-persistenceagent:pr-27"
  scheduledWorkflow:
    image: "quay.io/opendatahub/ds-pipelines-scheduledworkflow:pr-27"
  mlmd:  
    deploy: true  # Optional component
    grpc:
      image: "quay.io/opendatahub/mlmd-grpc-server:latest"
    envoy:
      image: "registry.redhat.io/openshift-service-mesh/proxyv2-rhel8:2.3.9-2"
  mlpipelineUI:
    deploy: true  # Optional component 
    image: "quay.io/opendatahub/ds-pipelines-frontend:pr-27"
  objectStorage:
    minio:
      deploy: true
      image: 'quay.io/opendatahub/minio:RELEASE.2019-08-14T20-37-41Z-license-compliance'

Then run the following:

cd $(mktemp -d)
git clone git@github.com:opendatahub-io/data-science-pipelines.git
cd data-science-pipelines/
git fetch origin pull/27/head
git checkout -b pullrequest bef05afb5af2c3ecd4538204f003c1d972c6f935
oc apply -f dspa.pr-27.yaml

More instructions here on how to deploy and test a Data Science Pipelines Application.

@gregsheremeta
Copy link

Can you add a comment to the commit body about why the bug exists? Specifically I'm wondering if it's something we introduced with an earlier carry.

@HumairAK
Copy link
Author

This also fixes: https://issues.redhat.com/browse/RHOAIENG-3816 for objects stored in the same default bucket:

Screenshot_20240314_193734

Screenshot_20240314_193559

Screenshot_20240314_193647

Copy link

@amadhusu amadhusu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reproduced the original issue to begin with the pipeline attached below pipeline_upload_fail.yaml .Testing the new kfp-launcher image, I can confirm the logs doesn't different between consecutive runs of the same pipeline with respect to the error. Also verified two other pipelines worked as usual to confirm this didn't break anything else.

pipeline_upload_fail.yaml
components:
  comp-get-data:
    executorLabel: exec-get-data
    outputDefinitions:
      artifacts:
        data_output_path:
          artifactType:
            schemaTitle: system.Artifact
            schemaVersion: 0.0.1
  comp-train-model:
    executorLabel: exec-train-model
    inputDefinitions:
      artifacts:
        data_input_path:
          artifactType:
            schemaTitle: system.Artifact
            schemaVersion: 0.0.1
    outputDefinitions:
      artifacts:
        model_output_path:
          artifactType:
            schemaTitle: system.Artifact
            schemaVersion: 0.0.1
  comp-upload-model:
    executorLabel: exec-upload-model
    inputDefinitions:
      artifacts:
        input_model_path:
          artifactType:
            schemaTitle: system.Artifact
            schemaVersion: 0.0.1
deploymentSpec:
  executors:
    exec-get-data:
      container:
        args:
          - '--executor_input'
          - '{{$}}'
          - '--function_to_execute'
          - get_data
        command:
          - sh
          - '-c'
          - >

            if ! [ -x "$(command -v pip)" ]; then
                python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
            fi


            PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
            --no-warn-script-location 'kfp==2.7.0' '--no-deps'
            'typing-extensions>=3.7.4,<5; python_version<"3.9"' && "$0" "$@"
          - sh
          - '-ec'
          - >
            program_path=$(mktemp -d)


            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m
            kfp.dsl.executor_main                        
            --component_module_path                        
            "$program_path/ephemeral_component.py"                         "$@"
          - |+

            import kfp
            from kfp import dsl
            from kfp.dsl import *
            from typing import *

            def get_data(data_output_path: OutputPath()):
                import urllib.request
                print("starting download...")
                url = "https://raw.githubusercontent.com/rh-aiservices-bu/fraud-detection/main/data/card_transdata.csv"
                urllib.request.urlretrieve(url, data_output_path)
                print("done")

        image: >-
          quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2023b-20240301
    exec-train-model:
      container:
        args:
          - '--executor_input'
          - '{{$}}'
          - '--function_to_execute'
          - train_model
        command:
          - sh
          - '-c'
          - >

            if ! [ -x "$(command -v pip)" ]; then
                python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
            fi


            PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
            --no-warn-script-location 'kfp==2.7.0' '--no-deps'
            'typing-extensions>=3.7.4,<5; python_version<"3.9"'  &&  python3 -m
            pip install --quiet --no-warn-script-location 'tf2onnx' 'seaborn' &&
            "$0" "$@"
          - sh
          - '-ec'
          - >
            program_path=$(mktemp -d)


            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m
            kfp.dsl.executor_main                        
            --component_module_path                        
            "$program_path/ephemeral_component.py"                         "$@"
          - >+

            import kfp

            from kfp import dsl

            from kfp.dsl import *

            from typing import *


            def train_model(data_input_path: InputPath(), model_output_path:
            OutputPath()):
                import os.path

                # Replace 'path_to_file' with your file's path
                file_exists = os.path.exists(data_input_path)
                print(file_exists)  # This will print True if the file exists, otherwise False

                # fake model training, just to use output_path
                import urllib.request
                print("starting download...")
                url = "https://rhods-public.s3.amazonaws.com/modelmesh-samples/onnx/mnist.onnx"
                urllib.request.urlretrieve(url, model_output_path)
                print("done")

        image: >-
          quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2023b-20240301
    exec-upload-model:
      container:
        args:
          - '--executor_input'
          - '{{$}}'
          - '--function_to_execute'
          - upload_model
        command:
          - sh
          - '-c'
          - >

            if ! [ -x "$(command -v pip)" ]; then
                python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
            fi


            PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
            --no-warn-script-location 'kfp==2.7.0' '--no-deps'
            'typing-extensions>=3.7.4,<5; python_version<"3.9"'  &&  python3 -m
            pip install --quiet --no-warn-script-location 'boto3' 'botocore' &&
            "$0" "$@"
          - sh
          - '-ec'
          - >
            program_path=$(mktemp -d)


            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m
            kfp.dsl.executor_main                        
            --component_module_path                        
            "$program_path/ephemeral_component.py"                         "$@"
          - |+

            import kfp
            from kfp import dsl
            from kfp.dsl import *
            from typing import *

            def upload_model(input_model_path: InputPath()):
                print(f"MY_STORAGE_AWS_ACCESS_KEY_ID={os.environ.get('MY_STORAGE_AWS_ACCESS_KEY_ID')}")
                print(f"MY_STORAGE_AWS_SECRET_ACCESS_KEY={os.environ.get('MY_STORAGE_AWS_SECRET_ACCESS_KEY')}")
                print(f"MY_STORAGE_AWS_S3_ENDPOINT={os.environ.get('MY_STORAGE_AWS_S3_ENDPOINT')}")
                print(f"MY_STORAGE_AWS_DEFAULT_REGION={os.environ.get('MY_STORAGE_AWS_DEFAULT_REGION')}")
                print(f"MY_STORAGE_AWS_S3_BUCKET={os.environ.get('MY_STORAGE_AWS_S3_BUCKET')}")
                print(f"S3_KEY={os.environ.get('S3_KEY')}")
                print(f"input_model_path={input_model_path}")

        image: >-
          quay.io/modh/runtime-images:runtime-cuda-tensorflow-ubi9-python-3.9-2023b-20240301
pipelineInfo:
  name: fraud-model-training
root:
  dag:
    tasks:
      get-data:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-get-data
        taskInfo:
          name: get-data
      train-model:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-train-model
        dependentTasks:
          - get-data
        inputs:
          artifacts:
            data_input_path:
              taskOutputArtifact:
                outputArtifactKey: data_output_path
                producerTask: get-data
        taskInfo:
          name: train-model
      upload-model:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-upload-model
        dependentTasks:
          - train-model
        inputs:
          artifacts:
            input_model_path:
              taskOutputArtifact:
                outputArtifactKey: model_output_path
                producerTask: train-model
        taskInfo:
          name: upload-model
schemaVersion: 2.1.0
sdkVersion: kfp-2.7.0

Screenshot from 2024-03-15 21-08-37

Copy link

openshift-ci bot commented Mar 15, 2024

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: amadhusu, HumairAK

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@openshift-merge-bot openshift-merge-bot bot merged commit 2ccfe35 into opendatahub-io:master Mar 15, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants