From 22920bccb9b9011935da7531ef5c5a8d5f90d1fc Mon Sep 17 00:00:00 2001 From: deepanker13 Date: Sat, 13 Apr 2024 01:24:48 +0530 Subject: [PATCH] adding fine tune example with s3 as the dataset store (#2006) * s3 as dataset source code review changes Signed-off-by: deepanker13 * fixing python black test Signed-off-by: deepanker13 * removing conflicts in example file Signed-off-by: deepanker13 * retriggering CI Signed-off-by: deepanker13 * removing dummy keys Signed-off-by: deepanker13 * code review change for adding s3 keys block Signed-off-by: deepanker13 --------- Signed-off-by: deepanker13 --- ...n_api.ipynb => train_api_hf_dataset.ipynb} | 12 +- .../train_api_s3_dataset.ipynb | 164 ++++++++++++++++++ sdk/python/kubeflow/storage_initializer/s3.py | 33 ++-- 3 files changed, 197 insertions(+), 12 deletions(-) rename examples/pytorch/language-modeling/{train_api.ipynb => train_api_hf_dataset.ipynb} (94%) create mode 100644 examples/pytorch/language-modeling/train_api_s3_dataset.ipynb diff --git a/examples/pytorch/language-modeling/train_api.ipynb b/examples/pytorch/language-modeling/train_api_hf_dataset.ipynb similarity index 94% rename from examples/pytorch/language-modeling/train_api.ipynb rename to examples/pytorch/language-modeling/train_api_hf_dataset.ipynb index 96c4e34c9f..f284804e02 100644 --- a/examples/pytorch/language-modeling/train_api.ipynb +++ b/examples/pytorch/language-modeling/train_api_hf_dataset.ipynb @@ -12,12 +12,13 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "# import the libraries\n", "from kubeflow.training.api.training_client import TrainingClient\n", + "from kubeflow.storage_initializer.s3 import S3DatasetParams\n", "from kubeflow.storage_initializer.hugging_face import (\n", " HuggingFaceModelParams,\n", " HuggingFaceTrainParams,\n", @@ -40,6 +41,13 @@ "client = TrainingClient()" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "USING HUGGING FACE HUB AS THE DATASET STORE" + ] + }, { "cell_type": "code", "execution_count": null, @@ -129,7 +137,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.4" + "version": "3.11.6" } }, "nbformat": 4, diff --git a/examples/pytorch/language-modeling/train_api_s3_dataset.ipynb b/examples/pytorch/language-modeling/train_api_s3_dataset.ipynb new file mode 100644 index 0000000000..19038dfa1e --- /dev/null +++ b/examples/pytorch/language-modeling/train_api_s3_dataset.ipynb @@ -0,0 +1,164 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# install kubeflow-training extra 'huggingface'\n", + "!pip install -U 'kubeflow-training[huggingface]'" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# import the libraries\n", + "from kubeflow.training.api.training_client import TrainingClient\n", + "from kubeflow.storage_initializer.hugging_face import (\n", + " HuggingFaceModelParams,\n", + " HuggingFaceTrainParams,\n", + " HfDatasetParams,\n", + ")\n", + "from kubeflow.storage_initializer.constants import INIT_CONTAINER_MOUNT_PATH\n", + "from peft import LoraConfig\n", + "import transformers\n", + "from transformers import TrainingArguments\n", + "from kubeflow.training import constants" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# create a training client, pass config_file parameter if you want to use kubeconfig other than \"~/.kube/config\"\n", + "client = TrainingClient()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "USING S3 AS THE DATASET SOURCE" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Need to set S3 credentials\n", + "s3_access_key = \"\"\n", + "s3_secret_key = \"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# mention the model, datasets and training parameters\n", + "client.train(\n", + " name=\"s3-test\",\n", + " num_workers=2,\n", + " num_procs_per_worker=1,\n", + " # specify the storage class if you don't want to use the default one for the storage-initializer PVC\n", + " # storage_config={\n", + " # \"size\": \"10Gi\",\n", + " # \"storage_class\": \"\",\n", + " # },\n", + " model_provider_parameters=HuggingFaceModelParams(\n", + " model_uri=\"hf://TinyLlama/TinyLlama-1.1B-Chat-v1.0\",\n", + " transformer_type=transformers.AutoModelForCausalLM,\n", + " ),\n", + " # it is assumed for text related tasks, you have 'text' column in the dataset.\n", + " # for more info on how dataset is loaded check load_and_preprocess_data function in sdk/python/kubeflow/trainer/hf_llm_training.py\n", + " dataset_provider_parameters=S3DatasetParams(\n", + " {\n", + " \"endpoint_url\": \"http://10.117.63.3\",\n", + " \"bucket_name\": \"test\",\n", + " \"file_key\": \"imdatta0___ultrachat_1k\",\n", + " \"region_name\": \"us-east-1\",\n", + " \"access_key\": s3_access_key,\n", + " \"secret_key\": s3_secret_key,\n", + " }\n", + " ),\n", + " train_parameters=HuggingFaceTrainParams(\n", + " lora_config=LoraConfig(\n", + " r=8,\n", + " lora_alpha=8,\n", + " lora_dropout=0.2,\n", + " bias=\"none\",\n", + " task_type=\"CAUSAL_LM\",\n", + " ),\n", + " training_parameters=TrainingArguments(\n", + " num_train_epochs=1,\n", + " per_device_train_batch_size=1,\n", + " gradient_accumulation_steps=1,\n", + " gradient_checkpointing=True,\n", + " gradient_checkpointing_kwargs={\n", + " \"use_reentrant\": False\n", + " }, # this is mandatory if checkpointng is enabled\n", + " warmup_steps=0.02,\n", + " learning_rate=1,\n", + " lr_scheduler_type=\"cosine\",\n", + " bf16=False,\n", + " logging_steps=0.01,\n", + " output_dir=INIT_CONTAINER_MOUNT_PATH,\n", + " optim=f\"sgd\",\n", + " save_steps=0.01,\n", + " save_total_limit=3,\n", + " disable_tqdm=False,\n", + " resume_from_checkpoint=True,\n", + " remove_unused_columns=True,\n", + " ddp_backend=\"nccl\", # change the backend to gloo if you want cpu based training and remove the gpu key in resources_per_worker\n", + " ),\n", + " ),\n", + " resources_per_worker={\n", + " \"gpu\": 1,\n", + " \"cpu\": 8,\n", + " \"memory\": \"8Gi\",\n", + " }, # remove the gpu key if you don't want to attach gpus to the pods\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# check the logs of the job\n", + "client.get_job_logs(name=\"s3-test\", job_kind=constants.PYTORCHJOB_KIND)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "myenv3.11", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/sdk/python/kubeflow/storage_initializer/s3.py b/sdk/python/kubeflow/storage_initializer/s3.py index ea1ba3b362..5f60bbc72d 100644 --- a/sdk/python/kubeflow/storage_initializer/s3.py +++ b/sdk/python/kubeflow/storage_initializer/s3.py @@ -42,18 +42,31 @@ def download_dataset(self): import boto3 # Create an S3 client for Nutanix Object Store/S3 - s3_client = boto3.client( - "s3", + s3_client = boto3.Session( aws_access_key_id=self.config.access_key, aws_secret_access_key=self.config.secret_key, - endpoint_url=self.config.endpoint_url, region_name=self.config.region_name, ) + s3_resource = s3_client.resource("s3", endpoint_url=self.config.endpoint_url) + # Get the bucket object + bucket = s3_resource.Bucket(self.config.bucket_name) - # Download the file - s3_client.download_file( - self.config.bucket_name, - self.config.file_key, - os.path.join(VOLUME_PATH_DATASET, self.config.file_key), - ) - print(f"File downloaded to: {VOLUME_PATH_DATASET}") + # Filter objects with the specified prefix + objects = bucket.objects.filter(Prefix=self.config.file_key) + # Iterate over filtered objects + for obj in objects: + # Extract the object key (filename) + obj_key = obj.key + path_components = obj_key.split(os.path.sep) + path_excluded_first_last_parts = os.path.sep.join(path_components[1:-1]) + + # Create directories if they don't exist + os.makedirs( + os.path.join(VOLUME_PATH_DATASET, path_excluded_first_last_parts), + exist_ok=True, + ) + + # Download the file + file_path = os.path.sep.join(path_components[1:]) + bucket.download_file(obj_key, os.path.join(VOLUME_PATH_DATASET, file_path)) + print(f"Files downloaded")