Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2compat] cannot override container image as parameters #5834

Closed
Bobgy opened this issue Jun 10, 2021 · 3 comments
Closed

[v2compat] cannot override container image as parameters #5834

Bobgy opened this issue Jun 10, 2021 · 3 comments
Assignees

Comments

@Bobgy
Copy link
Contributor

Bobgy commented Jun 10, 2021

Environment

  • KFP SDK version: master

Steps to reproduce

  1. Write any pipeline that tries to override task container image at pipeline level using a parameter, e.g.

    # optional, let training task use the same tensorflow image as specified tensorboard
    train_task.container.image = tf_image

  2. compile in v2 compatible mode

  3. It fails immediately when running the pipeline

    I got 500 api error when running the pipeline, error log on kfp api server is:

    I0630 07:49:58.838375       6 error.go:247] templates.pipeline-tensorboard-minio.tasks.train templates.train: failed to resolve {{inputs.parameters.tf_image}}
    

    so it seems we generated an invalid argo workflow

Expected result

The pipeline should work

Materials and Reference


Impacted by this bug? Give it a 👍. We prioritise the issues with the most 👍.

@Bobgy
Copy link
Contributor Author

Bobgy commented Jun 30, 2021

Generated workflow. It's clear that the error message means that the "train" template uses {{inputs.parameters.tf_image}} in image field, but the template does not have tf_image added as an input parameter.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: pipeline-tensorboard-minio-
  annotations:
    pipelines.kubeflow.org/kfp_sdk_version: 1.6.4
    pipelines.kubeflow.org/pipeline_compilation_time: '2021-06-30T07:57:21.370042'
    pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "minio-service:9000",
      "name": "minio_endpoint", "optional": true, "type": "String"}, {"default": "mlpipeline",
      "name": "log_bucket", "optional": true, "type": "String"}, {"default": "tensorboard/logs/{{workflow.uid}}",
      "name": "log_dir", "optional": true, "type": "String"}, {"default": "gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
      "name": "tf_image", "optional": true, "type": "String"}, {"default": "", "name":
      "pipeline-output-directory"}, {"default": "pipeline-tensorboard-minio", "name":
      "pipeline-name"}], "name": "pipeline-tensorboard-minio"}'
    pipelines.kubeflow.org/v2_pipeline: "true"
  labels:
    pipelines.kubeflow.org/v2_pipeline: "true"
    pipelines.kubeflow.org/kfp_sdk_version: 1.6.4
spec:
  entrypoint: pipeline-tensorboard-minio
  templates:
  - name: create-tensorboard-visualization
    container:
      args:
      - sh
      - -ex
      - -c
      - |
        log_dir="$0"
        output_metadata_path="$1"
        pod_template_spec="$2"
        image="$3"

        mkdir -p "$(dirname "$output_metadata_path")"

        echo '
            {
              "outputs" : [{
                "type": "tensorboard",
                "source": "'"$log_dir"'",
                "image": "'"$image"'",
                "pod_template_spec": '"$pod_template_spec"'
              }]
            }
        ' >"$output_metadata_path"
      - '{{$.inputs.parameters[''Log dir URI'']}}'
      - '{{$.outputs.artifacts[''mlpipeline-ui-metadata''].path}}'
      - '{{$.inputs.parameters[''Pod Template Spec'']}}'
      - '{{$.inputs.parameters[''Image'']}}'
      command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST),
        --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO),
        --container_image, $(KFP_V2_IMAGE), --task_name, create-tensorboard-visualization,
        --pipeline_name, '{{inputs.parameters.pipeline-name}}', --pipeline_run_id,
        $(WORKFLOW_ID), --pipeline_task_id, $(KFP_POD_NAME), --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}']
      env:
      - name: KFP_POD_NAME
        valueFrom:
          fieldRef: {fieldPath: metadata.name}
      - name: KFP_NAMESPACE
        valueFrom:
          fieldRef: {fieldPath: metadata.namespace}
      - name: WORKFLOW_ID
        valueFrom:
          fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
      - {name: KFP_V2_IMAGE, value: alpine}
      - {name: KFP_V2_RUNTIME_INFO, value: '{"inputParameters": {"Image": {"type":
          "STRING", "value": "BEGIN-KFP-PARAM[{{inputs.parameters.tf_image}}]END-KFP-PARAM"},
          "Log dir URI": {"type": "STRING", "value": "BEGIN-KFP-PARAM[s3://{{inputs.parameters.log_bucket}}/{{inputs.parameters.log_dir}}]END-KFP-PARAM"},
          "Pod Template Spec": {"type": "STRING", "value": "BEGIN-KFP-PARAM[{\"spec\":
          {\"containers\": [{\"env\": [{\"name\": \"AWS_ACCESS_KEY_ID\", \"valueFrom\":
          {\"secretKeyRef\": {\"name\": \"mlpipeline-minio-artifact\", \"key\": \"accesskey\"}}},
          {\"name\": \"AWS_SECRET_ACCESS_KEY\", \"valueFrom\": {\"secretKeyRef\":
          {\"name\": \"mlpipeline-minio-artifact\", \"key\": \"secretkey\"}}}, {\"name\":
          \"AWS_REGION\", \"value\": \"minio\"}, {\"name\": \"S3_ENDPOINT\", \"value\":
          \"{{inputs.parameters.minio_endpoint}}\"}, {\"name\": \"S3_USE_HTTPS\",
          \"value\": \"0\"}, {\"name\": \"S3_VERIFY_SSL\", \"value\": \"0\"}]}]}}]END-KFP-PARAM"}},
          "inputArtifacts": {}, "outputParameters": {}, "outputArtifacts": {"mlpipeline-ui-metadata":
          {"schemaTitle": "system.Artifact", "instanceSchema": "", "metadataPath":
          "/tmp/outputs/mlpipeline-ui-metadata/data"}}}'}
      envFrom:
      - configMapRef: {name: metadata-grpc-configmap, optional: true}
      image: alpine
      volumeMounts:
      - {mountPath: /kfp-launcher, name: kfp-launcher}
    inputs:
      parameters:
      - {name: log_bucket}
      - {name: log_dir}
      - {name: minio_endpoint}
      - {name: pipeline-name}
      - {name: pipeline-output-directory}
      - {name: tf_image}
    outputs:
      artifacts:
      - {name: create-tensorboard-visualization-mlpipeline-ui-metadata, path: /tmp/outputs/mlpipeline-ui-metadata/data}
    metadata:
      annotations:
        pipelines.kubeflow.org/v2_component: "true"
        pipelines.kubeflow.org/component_ref: '{"digest": "583d215a7b720ecea88eb3a53346a137f69c0d1e8ba130895a304aeabee1f14f",
          "url": "../../../components/tensorflow/tensorboard/prepare_tensorboard/component.yaml"}'
        pipelines.kubeflow.org/arguments.parameters: '{"Image": "{{inputs.parameters.tf_image}}",
          "Log dir URI": "s3://{{inputs.parameters.log_bucket}}/{{inputs.parameters.log_dir}}",
          "Pod Template Spec": "{\"spec\": {\"containers\": [{\"env\": [{\"name\":
          \"AWS_ACCESS_KEY_ID\", \"valueFrom\": {\"secretKeyRef\": {\"name\": \"mlpipeline-minio-artifact\",
          \"key\": \"accesskey\"}}}, {\"name\": \"AWS_SECRET_ACCESS_KEY\", \"valueFrom\":
          {\"secretKeyRef\": {\"name\": \"mlpipeline-minio-artifact\", \"key\": \"secretkey\"}}},
          {\"name\": \"AWS_REGION\", \"value\": \"minio\"}, {\"name\": \"S3_ENDPOINT\",
          \"value\": \"{{inputs.parameters.minio_endpoint}}\"}, {\"name\": \"S3_USE_HTTPS\",
          \"value\": \"0\"}, {\"name\": \"S3_VERIFY_SSL\", \"value\": \"0\"}]}]}}"}'
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.6.4
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/v2_component: "true"
    initContainers:
    - command: [/bin/mount_launcher.sh]
      image: gcr.io/ml-pipeline/kfp-launcher:1.6.4
      name: kfp-launcher
      mirrorVolumeMounts: true
    volumes:
    - {name: kfp-launcher}
  - name: pipeline-tensorboard-minio
    inputs:
      parameters:
      - {name: log_bucket}
      - {name: log_dir}
      - {name: minio_endpoint}
      - {name: pipeline-name}
      - {name: pipeline-output-directory}
      - {name: tf_image}
    dag:
      tasks:
      - name: create-tensorboard-visualization
        template: create-tensorboard-visualization
        arguments:
          parameters:
          - {name: log_bucket, value: '{{inputs.parameters.log_bucket}}'}
          - {name: log_dir, value: '{{inputs.parameters.log_dir}}'}
          - {name: minio_endpoint, value: '{{inputs.parameters.minio_endpoint}}'}
          - {name: pipeline-name, value: '{{inputs.parameters.pipeline-name}}'}
          - {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
          - {name: tf_image, value: '{{inputs.parameters.tf_image}}'}
      - name: train
        template: train
        dependencies: [create-tensorboard-visualization]
        arguments:
          parameters:
          - {name: log_bucket, value: '{{inputs.parameters.log_bucket}}'}
          - {name: log_dir, value: '{{inputs.parameters.log_dir}}'}
          - {name: minio_endpoint, value: '{{inputs.parameters.minio_endpoint}}'}
          - {name: pipeline-name, value: '{{inputs.parameters.pipeline-name}}'}
          - {name: pipeline-output-directory, value: '{{inputs.parameters.pipeline-output-directory}}'}
  - name: train
    container:
      args:
      - sh
      - -c
      - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
        'minio' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
        --no-warn-script-location 'minio' --user) && "$0" "$@"
      - sh
      - -ec
      - |
        program_path=$(mktemp)
        printf "%s" "$0" > "$program_path"
        python3 -u "$program_path" "$@"
      - |
        def train(minio_endpoint, log_bucket, log_dir):
            # Reference: https://www.tensorflow.org/tensorboard/get_started
            import tensorflow as tf

            mnist = tf.keras.datasets.mnist

            (x_train, y_train), (x_test, y_test) = mnist.load_data()
            x_train, x_test = x_train / 255.0, x_test / 255.0

            def create_model():
                return tf.keras.models.Sequential([
                    tf.keras.layers.Flatten(input_shape=(28, 28)),
                    tf.keras.layers.Dense(512, activation='relu'),
                    tf.keras.layers.Dropout(0.2),
                    tf.keras.layers.Dense(10, activation='softmax')
                ])

            model = create_model()
            model.compile(
                optimizer='adam',
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy']
            )

            log_dir_local = "logs/fit"
            tensorboard_callback = tf.keras.callbacks.TensorBoard(
                log_dir=log_dir_local, histogram_freq=1
            )

            model.fit(
                x=x_train,
                y=y_train,
                epochs=5,
                validation_data=(x_test, y_test),
                callbacks=[tensorboard_callback]
            )

            # Copy the local logs folder to minio.
            #
            # TODO: we may write a filesystem watch process that continuously copy logs
            # dir to minio, so that we can watch live training logs via tensorboard.
            #
            # Note, although tensorflow supports minio via s3:// protocol. We want to
            # demo how minio can be used instead, e.g. the same approach can be used with
            # frameworks only support local path.
            from minio import Minio
            import os
            minio_access_key = os.getenv('MINIO_ACCESS_KEY')
            minio_secret_key = os.getenv('MINIO_SECRET_KEY')
            if not minio_access_key or not minio_secret_key:
                raise Exception('MINIO_ACCESS_KEY or MINIO_SECRET_KEY env is not set')
            client = Minio(
                minio_endpoint,
                access_key=minio_access_key,
                secret_key=minio_secret_key,
                secure=False
            )
            count = 0
            from pathlib import Path
            for path in Path("logs").rglob("*"):
                if not path.is_dir():
                    object_name = os.path.join(
                        log_dir, os.path.relpath(start=log_dir_local, path=path)
                    )
                    client.fput_object(
                        bucket_name=log_bucket,
                        object_name=object_name,
                        file_path=path,
                    )
                    count = count + 1
                    print(f'{path} uploaded to minio://{log_bucket}/{object_name}')
            print(f'{count} log files uploaded to minio://{log_bucket}/{log_dir}')

        import argparse
        _parser = argparse.ArgumentParser(prog='Train', description='')
        _parser.add_argument("--minio-endpoint", dest="minio_endpoint", type=str, required=True, default=argparse.SUPPRESS)
        _parser.add_argument("--log-bucket", dest="log_bucket", type=str, required=True, default=argparse.SUPPRESS)
        _parser.add_argument("--log-dir", dest="log_dir", type=str, required=True, default=argparse.SUPPRESS)
        _parsed_args = vars(_parser.parse_args())

        _outputs = train(**_parsed_args)
      - --minio-endpoint
      - '{{$.inputs.parameters[''minio_endpoint'']}}'
      - --log-bucket
      - '{{$.inputs.parameters[''log_bucket'']}}'
      - --log-dir
      - '{{$.inputs.parameters[''log_dir'']}}'
      command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST),
        --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO),
        --container_image, $(KFP_V2_IMAGE), --task_name, train, --pipeline_name, '{{inputs.parameters.pipeline-name}}',
        --pipeline_run_id, $(WORKFLOW_ID), --pipeline_task_id, $(KFP_POD_NAME), --pipeline_root,
        '{{inputs.parameters.pipeline-output-directory}}']
      env:
      - name: MINIO_SECRET_KEY
        valueFrom:
          secretKeyRef: {key: secretkey, name: mlpipeline-minio-artifact}
      - name: MINIO_ACCESS_KEY
        valueFrom:
          secretKeyRef: {key: accesskey, name: mlpipeline-minio-artifact}
      - name: KFP_POD_NAME
        valueFrom:
          fieldRef: {fieldPath: metadata.name}
      - name: KFP_NAMESPACE
        valueFrom:
          fieldRef: {fieldPath: metadata.namespace}
      - name: WORKFLOW_ID
        valueFrom:
          fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
      - {name: KFP_V2_IMAGE, value: '{{inputs.parameters.tf_image}}'}
      - {name: KFP_V2_RUNTIME_INFO, value: '{"inputParameters": {"log_bucket": {"type":
          "STRING", "value": "BEGIN-KFP-PARAM[{{inputs.parameters.log_bucket}}]END-KFP-PARAM"},
          "log_dir": {"type": "STRING", "value": "BEGIN-KFP-PARAM[{{inputs.parameters.log_dir}}]END-KFP-PARAM"},
          "minio_endpoint": {"type": "STRING", "value": "BEGIN-KFP-PARAM[{{inputs.parameters.minio_endpoint}}]END-KFP-PARAM"}},
          "inputArtifacts": {}, "outputParameters": {}, "outputArtifacts": {}}'}
      envFrom:
      - configMapRef: {name: metadata-grpc-configmap, optional: true}
      image: '{{inputs.parameters.tf_image}}'
      volumeMounts:
      - {mountPath: /kfp-launcher, name: kfp-launcher}
    inputs:
      parameters:
      - {name: log_bucket}
      - {name: log_dir}
      - {name: minio_endpoint}
      - {name: pipeline-name}
      - {name: pipeline-output-directory}
    metadata:
      annotations:
        pipelines.kubeflow.org/v2_component: "true"
        pipelines.kubeflow.org/component_ref: '{}'
        pipelines.kubeflow.org/arguments.parameters: '{"log_bucket": "{{inputs.parameters.log_bucket}}",
          "log_dir": "{{inputs.parameters.log_dir}}", "minio_endpoint": "{{inputs.parameters.minio_endpoint}}"}'
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.6.4
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/v2_component: "true"
    initContainers:
    - command: [/bin/mount_launcher.sh]
      image: gcr.io/ml-pipeline/kfp-launcher:1.6.4
      name: kfp-launcher
      mirrorVolumeMounts: true
    volumes:
    - {name: kfp-launcher}
  arguments:
    parameters:
    - {name: minio_endpoint, value: 'minio-service:9000'}
    - {name: log_bucket, value: mlpipeline}
    - {name: log_dir, value: 'tensorboard/logs/{{workflow.uid}}'}
    - {name: tf_image, value: 'gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest'}
    - {name: pipeline-output-directory, value: ''}
    - {name: pipeline-name, value: pipeline-tensorboard-minio}
  serviceAccountName: pipeline-runner

@Bobgy
Copy link
Contributor Author

Bobgy commented Jul 20, 2021

Workaround:

import kfp
def create_sleep_op(image):
  sleep_op = kfp.components.load_component_from_file('template.yaml')
  sleep_op.component_spec.implementation.container.image = 'my-new-image-'
  return sleep_op

It'll be better not to support overriding image using parameters from outside of the component, because it breaks component interface.

@Bobgy Bobgy closed this as completed Jul 20, 2021
@Bobgy
Copy link
Contributor Author

Bobgy commented Jul 26, 2021

Note, because we decided not to support this feature, it is still a breaking change from v1 to v2 compatible. We have some ideas to support the same functionality in a component interface friendly way later.

@Bobgy Bobgy changed the title [v2compat] cannot override container image [v2compat] cannot override container image as parameters Jul 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants