diff --git a/requirements-freeze.txt b/requirements-freeze.txt index 883abafa9bf..ff2d63d356e 100644 --- a/requirements-freeze.txt +++ b/requirements-freeze.txt @@ -133,6 +133,7 @@ rsa==4.9 s3transfer==0.6.0 sacrebleu==2.2.1 sacremoses==0.0.53 +scaleapi==2.13.0 scikit-learn==1.1.2 scipy==1.9.1 selenium==4.8.0 diff --git a/requirements.txt b/requirements.txt index 2a97e02b45b..bc18a7e4dca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -60,6 +60,7 @@ rouge-score~=0.1.2 pyext~=0.7 pytrec_eval==0.5 sacrebleu~=2.2.1 +scaleapi~=2.13.0 # Work around https://github.com/p-lambda/verified_calibration/issues/11 # TODO: Remove after this issue is resolved scikit-learn~=1.1.2 diff --git a/scripts/scale/create_and_setup_project.py b/scripts/scale/create_and_setup_project.py new file mode 100644 index 00000000000..cddcc0d3f32 --- /dev/null +++ b/scripts/scale/create_and_setup_project.py @@ -0,0 +1,148 @@ +import argparse +from scale_utils import get_scale_client +from scaleapi.tasks import TaskType +from scaleapi.exceptions import ScaleDuplicateResource + +parser = argparse.ArgumentParser() +parser.add_argument("--project_name", type=str, help="Name of the project to create") +parser.add_argument( + "--credentials_path", type=str, default="prod_env/credentials.conf", help="Path to the credentials file" +) +args = parser.parse_args() +project_name = args.project_name +client = get_scale_client(args.credentials_path) + +print("\nGetting project...") +try: + print(f"Trying to create project {project_name} ...") + project = client.create_project( + project_name=project_name, + task_type=TaskType.TextCollection, + rapid=True, + params={}, + ) + print("Project created.") +except ScaleDuplicateResource as err: + print(f"Project {project_name} already exists. Using existing project. Error: {err}") + project = client.get_project(project_name) + + +# Create a calibration batch +print("\nCreating calibration batch...") +try: + calib_batch_name = project_name + "_calibration" + batch = client.create_batch( + project=project_name, + batch_name=calib_batch_name, + calibration_batch=True, + ) + print("Calibration batch created.") + # Create 10 tasks in the calibration batch + for i in range(10): + payload = dict( + project=project_name, + batch=calib_batch_name, + instruction="This is a fake calibration task to bypass the API. Please simply answer Yes.", + attachment_type="text", + attachments=[ + { + "type": "text", + "content": "This is a fake calibration task to bypass the API. " + "We do not need calibration but would like to be able to send actual task. " + "In order to do this, we need to finish calibration. Please simply answer Yes.", + } + ], + fields=[ + { + "type": "category", + "field_id": "answer", + "title": "Continue to the next task?", + "choices": [{"label": "Yes", "value": "yes"}, {"label": "No", "value": "no"}], + } + ], + ) + client.create_task(TaskType.TextCollection, **payload) + print(f" Calibration task {i} created.") + print("Finalizing calibration batch...") + client.finalize_batch(calib_batch_name) + print("Calibration batch finalized.") +except ScaleDuplicateResource as err: + print(f"Calibration batch {calib_batch_name} already exists. It will not be recreated. Error: {err}") + + +# Create evaluation tasks +expected_response = { + "annotations": {"answer_reasonable": {"type": "category", "field_id": "answer", "response": [["no"]]}} +} +initial_response = { + "annotations": {"answer_reasonable": {"type": "category", "field_id": "answer", "response": [["yes"]]}} +} +attachments = [ + { + "type": "text", + "content": "Please Answer Yes to this question. This is simply a way to bypass the need for evaluation tasks.", + }, +] +payload = dict( + project=project_name, + rapid=True, + attachments=attachments, + initial_response=initial_response, + expected_response=expected_response, + fields=[ + { + "type": "category", + "field_id": "answer", + "title": "Continue to the next task?", + "choices": [{"label": "Yes", "value": "yes"}, {"label": "No", "value": "no"}], + } + ], +) +print("\nCreating evaluation tasks...") +for i in range(10): + evaluation_task = client.create_evaluation_task(TaskType.TextCollection, **payload) + print(f" Evaluation task {i} created.") +print("Evaluation tasks created.") + +# Create a test batch +print("\nCreating test batch...") +try: + test_batch_name = project_name + "_test" + batch = client.create_batch( + project=project_name, + batch_name=test_batch_name, + calibration_batch=False, + ) + print("Test batch created.") +except ScaleDuplicateResource as err: + print(f"Test batch {test_batch_name} already exists. It will not be recreated. Error: {err}") +# Try to create a single task in the test batch +payload = dict( + project=project_name, + batch=test_batch_name, + instruction="This is a test task to check that we can create tasks. If you are a worker please simply answer Yes.", + attachment_type="text", + attachments=[ + { + "type": "text", + "content": "This is a placeholder for the test task. If you are a worker please simply answer Yes.", + } + ], + fields=[ + { + "type": "category", + "field_id": "answer", + "title": "Finish?", + "choices": [{"label": "Yes", "value": "yes"}, {"label": "No", "value": "no"}], + } + ], +) +print("Creating test task...") +client.create_task(TaskType.TextCollection, **payload) +print("Test task created.") +print("The test batch is not going to be finalized so that it does not get sent to workers.") + +# If we are here, it means that the project is ready. +# Print the project_name and a success message. +print(f"\n\nProject {project_name} is ready.") +print("Please go to https://app.scale.com/projects to check that the project is ready.") diff --git a/scripts/scale/finalize_batch.py b/scripts/scale/finalize_batch.py new file mode 100644 index 00000000000..b17b4e32ae2 --- /dev/null +++ b/scripts/scale/finalize_batch.py @@ -0,0 +1,12 @@ +import argparse +from scale_utils import get_scale_client + +parser = argparse.ArgumentParser() +parser.add_argument("--batch_name", type=str, help="Name of the batch to finalize") +parser.add_argument( + "--credentials_path", type=str, default="prod_env/credentials.conf", help="Path to the credentials file" +) +args = parser.parse_args() + +client = get_scale_client(args.credentials_path) +client.finalize_batch(args.batch_name) diff --git a/scripts/scale/scale_utils.py b/scripts/scale/scale_utils.py new file mode 100644 index 00000000000..6bf2655de69 --- /dev/null +++ b/scripts/scale/scale_utils.py @@ -0,0 +1,30 @@ +import os +from typing import Dict +from scaleapi import ScaleClient + + +def get_credentials(path: str) -> Dict[str, str]: + # Reads the credentials from the given path + with open(path, "r") as f: + # Read line by line, replaces the spaces, splits on the first ":" + # The first part is the key, the second part contians the value in between quotes + credentials = {} + for line in f.readlines(): + elt = line.replace(" ", "").replace("\n", "").split(":") + if len(elt) == 2: + credentials[elt[0]] = elt[1].split('"')[1] + return credentials + + +def get_scale_client(relative_credentials_path: str) -> ScaleClient: + credentials_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), relative_credentials_path) + print(f"Reading credentials from {credentials_path}") + credentials = get_credentials(credentials_path) + + # Check that scaleApiKey is set + if "scaleApiKey" not in credentials: + raise Exception("scaleApiKey not found in credentials.conf") + + # Get scale client + client = ScaleClient(credentials["scaleApiKey"]) + return client diff --git a/setup.cfg b/setup.cfg index fce3cfc8dd4..8c3473db56a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -68,6 +68,7 @@ install_requires= pyext~=0.7 pytrec_eval==0.5 sacrebleu~=2.2.1 + scaleapi~=2.13.0 # Work around https://github.com/p-lambda/verified_calibration/issues/11 # TODO: Remove after this issue is resolved scikit-learn~=1.1.2 diff --git a/src/helm/proxy/clients/auto_client.py b/src/helm/proxy/clients/auto_client.py index 2069a12cef1..86073816904 100644 --- a/src/helm/proxy/clients/auto_client.py +++ b/src/helm/proxy/clients/auto_client.py @@ -14,7 +14,7 @@ DecodeRequestResult, ) from helm.proxy.retry import retry_request -from .critique_client import CritiqueClient, RandomCritiqueClient, SurgeAICritiqueClient +from .critique_client import CritiqueClient, RandomCritiqueClient, SurgeAICritiqueClient, ScaleCritiqueClient from .mechanical_turk_critique_client import MechanicalTurkCritiqueClient from .client import Client from .ai21_client import AI21Client @@ -263,8 +263,20 @@ def get_critique_client(self) -> CritiqueClient: if not surgeai_credentials: raise ValueError("surgeaiApiKey credentials are required for SurgeAICritiqueClient") self.critique_client = SurgeAICritiqueClient(surgeai_credentials, self._build_cache_config("surgeai")) + + elif critique_type == "scale": + scale_credentials = self.credentials.get("scaleApiKey") + scale_batch = self.credentials.get("scaleBatch", None) + if scale_batch is None: + raise ValueError("scaleBatch is required for ScaleCritiqueClient for now.") + if not scale_credentials: + raise ValueError("scaleApiKey credentials are required for ScaleCritiqueClient") + self.critique_client = ScaleCritiqueClient( + scale_credentials, self._build_cache_config("scale"), scale_batch + ) else: raise ValueError( - "CritiqueClient is not configured; set critiqueType to 'mturk', 'mturk-sandbox', 'surgeai' or 'random'" + "CritiqueClient is not configured; set critiqueType to 'mturk'," + "'mturk-sandbox', 'surgeai', 'scale' or 'random'" ) return self.critique_client diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index 3714a777e94..7238877e907 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -1,12 +1,20 @@ from abc import ABC, abstractmethod +from hashlib import sha512 +import json import random import threading from typing import Dict, List, Union +# Surge AI from cattrs import unstructure import surge from surge import questions as surge_questions +# Scale +import scaleapi +from scaleapi.tasks import TaskType, TaskStatus +from scaleapi.exceptions import ScaleDuplicateResource + from helm.common.hierarchical_logger import hlog from helm.common.cache import Cache, CacheConfig from helm.common.critique_request import ( @@ -20,6 +28,7 @@ _surge_cache_lock = threading.Lock() +_scale_cache_lock = threading.Lock() class CritiqueClient(ABC): @@ -55,6 +64,7 @@ def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResu class SurgeAICritiqueClient(CritiqueClient): + # TODO #1614: Move this to its own file """A CritiqueClient that creates tasks for workers on Surge AI. Surge AI concepts: @@ -209,3 +219,347 @@ def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResu task_id = self._get_or_create_task(project_id, request.fields) worker_responses = self._get_worker_responses(task_id, request.template.questions) return CritiqueRequestResult(worker_responses) + + +class ScaleCritiqueClient(CritiqueClient): + # TODO #1614: Move this to its own file + """A CritiqueClient that creates tasks for workers on Scale. + + Scale AI concepts: + + - A **project** contains **tasks** which can be in **batches** (not used here) + - A **task** is created in a project. It represents an individual unit of work to be done by a Tasker + It contains **attachments** which is the data to be annotated, and **fields** which are the + instructions and questions to be displayed to the Tasker. A task has also a general **instruction** + which is displayed before the fields. + - A **task response**: represents for each question a list of answers from different workers. + + Mapping of HELM concepts to Scale AI concepts: + + - A `CritiqueRequest` maps to a **task** + - `CritiqueRequest.template` indicates which **project** the task should be created in. + - A `CritiqueTaskTemplate` maps to a **task** + - A `CritiqueQuestionTemplate` maps to a **field** in a task. + - A `CritiqueResponse` maps to a **task response**. + """ + + def __init__( + self, + api_key: str, + cache_config: CacheConfig, + batch_name: str, + ): + self._cache = Cache(cache_config) + self._client = scaleapi.ScaleClient(api_key) + self._batch_name = batch_name + + # def _get_or_create_scale_project_and_batch(self, template: CritiqueTaskTemplate) -> Tuple[str, str]: + # """Get or create a project and a batch associated to it on Scale and return the + # Scale project name and batch name. + + # If the project_name was specified in the credentials, use it (should already exist). + # In order to send tasks, the project needs to already have a calibration batch. + # Otherwise, create a new project using the template name. + # If the project is created, there will be no calibration batch so the batch will be set + # as self-labeling. + + # If the batch_name was specified in the credentials, use it (does not necessarily need to exist). + # Otherwise, create a new batch using the project name and the template name. + + # This is using Scale Rapid. + + # Attempt to find a Scale project for the template. If one exists, reuse that project. + # Otherwise, create a new project using the template. Return the Scale project name. + # Same for the batch.""" + + # project_was_created: bool = self._project_name is None + # project_name: Optional[str] = None + # batch_name: Optional[str] = None + + # def create_scale_project(): + # try: + # project = self._client.create_project( + # project_name=template.name, + # task_type=TaskType.TextCollection, + # rapid=True, + # params={}, + # ) + # except ScaleDuplicateResource as err: + # hlog(f"ScaleDuplicateResource when creating project: {template.name}. Error: {err.message}") + # # Get the existing project and checks that it has the same params + # # NOTE: This should not happen with the cache but in case the cache is deleted + # # we want to make sure we don't create a new project with the same name + # project = self._client.get_project(template.name) + # if project.type != TaskType.TextCollection.value: + # raise RuntimeError( + # f"Project {template.name} already exists with different task_type: " + # f"'{project.type}' instead of '{TaskType.TextCollection.value}'" + # ) from err + # return {"project_name": project.name} + + # def create_scale_batch(): + # if project_name is None: + # raise RuntimeError("Trying to create a batch without a project name.") + # try: + # batch_name: str = f"{project_name}-HELMBatch" + # batch = self._client.create_batch( + # project=project_name, + # batch_name=batch_name, + # calibration_batch=False, + # # If the project was created, there is no calibration batch so we set it as self-labeling + # # otherwise the API will return an error. + # self_label_batch=True, # project_was_created, + # ) + # except ScaleDuplicateResource as err: + # hlog( + # f"ScaleDuplicateResource when creating batch: '{batch_name}' in project: " + # f"{project_name}. Error: {err.message}" + # ) + # # Get the existing batch and checks that it has the same project + # # NOTE: This should not happen with the cache but in case the cache is deleted + # # we want to make sure we don't create a new batch with the same name + # batch = self._client.get_batch(batch_name) + # if batch.project != project.name: + # raise RuntimeError( + # f"Batch {batch_name} already exists with different project: " f"{batch.project}" + # ) from err + # return {"batch_name": batch.name} + + # # Check if a project_name was specified in the credentials + # if self._project_name is not None: + # project_name = self._project_name + # hlog(f"Using existing Scale project: {project_name} (defined in credentials)") + # # Checks that the project exists + # try: + # project = self._client.get_project(project_name) + # project_name = project.name + # except Exception as err: + # raise RuntimeError(f"Project {project_name} does not exist") from err + + # # Check if a batch_name was specified in the credentials + # if self._batch_name is not None: + # batch_name = self._batch_name + # hlog(f"Using existing Scale batch: {batch_name} (defined in credentials)") + # # Checks that the batch exists + # try: + # batch = self._client.get_batch(batch_name) + # except Exception as err: + # raise RuntimeError(f"Batch {batch_name} does not exist") from err + # # Checks that the batch is in the project + # if batch.project != project_name: + # raise RuntimeError( + # f"Batch {batch_name} is not in project {project_name}. " + # f"It is in project {batch.project} instead." + # ) + # return project_name, batch_name + + # # If we reach here it means that a project_name was specified but not a batch_name + # # Create a new batch using the template name + # with _scale_cache_lock: + # batch_response, is_cached = self._cache.get( + # {"testing": True, "project": project_name, "template": template.name}, create_scale_batch + # ) + # batch_name = batch_response["batch_name"] + # else: + # # If we reach here it means that no project_name was specified + # # Create a new project using the template name + # # Create a new batch using the project name and the template name + + # with _scale_cache_lock: + # project_response, is_cached = self._cache.get({"template": template.name}, create_scale_project) + # project_name = project_response["project_name"] + # if is_cached: + # hlog(f"Reusing existing Scale project: {project_name}") + # else: + # hlog(f"Creating new Scale project: {project_name}") + + # with _scale_cache_lock: + # batch_response, is_cached = self._cache.get( + # {"testing": True, "project": project_name, "template": template.name}, create_scale_batch + # ) + # batch_name = batch_response["batch_name"] + + # if is_cached: + # hlog(f"Reusing existing Scale batch: {batch_name}") + # else: + # hlog(f"Creating new Scale batch: {batch_name}") + + # return project_name, batch_name + + def _interpolate_fields(self, text: str, fields: Dict[str, str]) -> str: + for field_name, field_value in fields.items(): + text = text.replace("{{" + field_name + "}}", field_value) + return text + + def _critique_question_to_scale_field(self, question: CritiqueQuestionTemplate, fields: Dict[str, str]): + if question.question_type == "multiple_choice" or question.question_type == "checkbox": + return { + "type": "category", + "field_id": question.name, # This must be unique, so we use the question name + "title": question.name, + "description": self._interpolate_fields(question.text, fields), + "choices": [{"label": option, "value": option} for option in question.options], + "min_choices": 0 if question.question_type == "checkbox" else 1, + "max_choices": len(question.options) if question.question_type == "checkbox" else 1, + } + else: + raise ValueError(f"Unsupported question type {question.question_type}") + + def _get_or_create_scale_task(self, batch_name: str, template: CritiqueTaskTemplate, fields: Dict[str, str]) -> str: + """Get or create a task on Scale and return the Scale task ID.""" + + # Used both for the cache key and the task unique_id + cache_key = { + "testing": True, + "batch": batch_name, + "task": unstructure(template), + "fields": fields, + } + + def create_scale_task() -> Dict[str, str]: + """ + Creates a Scale Task (which is one single question from a CritiqueQuestionTemplate) + Returns the Scale Task ID. + """ + + # We create a unique_id for the task so that we can reuse it if it already exists + # It contains the same information as the task itself (like the cache key) + # This is redundant with the cache but it's a good safety net + # NOTE: Technically, sha512 could have collisions but it's unlikely. + unique_id: str = sha512(str(json.dumps(cache_key, sort_keys=True)).encode()).hexdigest() + instructions: str = self._interpolate_fields(template.instructions, fields) + payload = dict( + batch=batch_name, + unique_id=unique_id, + instruction="Evaluate the AI model generated output following the instructions below", + attachment_type="text", + attachments=[ + { + "type": "text", + "content": instructions, + } + ], + response_required=template.num_respondents, + fields=[self._critique_question_to_scale_field(question, fields) for question in template.questions], + ) + + try: + task = self._client.create_task(TaskType.TextCollection, **payload) + return {"id": task.id} + except ScaleDuplicateResource as err: + hlog(f"ScaleDuplicateResource when creating task: {unique_id}. Error: {err.message}") + # Get the existing task and checks that it has the same content (attachments and fields) + # NOTE: This should not happen with the cache but in case the cache is deleted + # we want to make sure we don't create a new task with the same content + task = self._client.get_task(unique_id) + if task.params["attachments"] != payload["attachments"]: + raise RuntimeError( + f"Task {unique_id} already exists with different attachments: " f"{task.params['attachments']}" + ) from err + # No need to check for fields, project_name and instructions because they are part of the unique_id + return {"id": task.id} + + with _scale_cache_lock: + task_response, is_cached = self._cache.get( + cache_key, + create_scale_task, + ) + task_id: str = task_response["id"] + if is_cached: + hlog(f"Reusing existing Scale task: {task_id}") + else: + hlog(f"Creating new Scale task: {task_id}") + return task_id + + def finalize_batch(self, batch_name: str): + self._client.finalize_batch(batch_name=batch_name) + + def _get_worker_responses(self, task_id: str) -> List[CritiqueResponse]: + task: scaleapi.tasks.Task = self._client.get_task(task_id) + if task.status != TaskStatus.Completed.value: + return [] + else: + annotations: Dict[str, List[str]] = task.response["annotations"] + + # The format of annotations is: + # { + # "category_field_1": [ + # answer_1_respondent_1, + # answer_1_respondent_2, + # ... + # ], + # "category_field_2": [ + # answer_2_respondent_1, + # answer_2_respondent_2, + # ... + # ], + # ... + # } + # We want to convert it to: + # [ + # { + # "id": "respondent_1", + # "answers": { + # "category_field_1": answer_1_respondent_1 + # "category_field_2": answer_2_respondent_1 + # ... + # } + # }, + # { + # "id": "respondent_2", + # "answers": { + # "category_field_1": answer_1_respondent_2 + # "category_field_2": answer_2_respondent_2 + # ... + # } + # }, + # ... + # ] + + # First, we get the list of respondents + num_respondents: int = len(annotations[list(annotations.keys())[0]]) + + # Then, we create the list of responses + responses: List[CritiqueResponse] = [] + for respondent_index in range(num_respondents): + answers: Dict[str, Union[str, List[str]]] = {} + for field_name, field_answers in annotations.items(): + answers[field_name] = field_answers[respondent_index] + responses.append( + CritiqueResponse(id=str(respondent_index), respondent_id=str(respondent_index), answers=answers) + ) + return responses + + def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResult: + """ + Create a task on Scale AI and fetch responses from Scale AI if available. + + Returns CritiqueRequestResult if worker answers are complete, or None otherwise. + The intended use is to call it once to create the task, wait a while, and then call it + later to fetch answers. + + First, attempt to find a Scale AI project for the template. If one exists, reuse that project. + Otherwise, create a new project using the template. + + Second, attempt to find a Scale AI task inside this project for the fields. If one exists, + reuse that task. Otherwise, create a new task inside the project using the fields. + + Finally, check if responses are available by checking if the number of workers who have responded + is equal to the requested number of workers. If so, return those responses. + + This method is idempotent, because projects and tasks are not created if they already exist. + + The cache will store the mappings from template to Scale AI Project ID and from questions to Scale AI + task ID. If the cache is deleted, the mappings should be conserved on Scale AI side and the API calls + should return a ScaleDuplicateResource error which is handled by the method. We still prefer to use + the cache to avoid unnecessary API calls and to not depend on Scale AI side. + Note that worker responses are currently not cached. + """ + # TODO: Remove/fix _get_or_create_scale_project_and_batch(). + # For now we are forcing the user to provide a batch_name in the credentials. + + # _, batch_name = self._get_or_create_scale_project_and_batch(request.template) + # self._batch_name = batch_name + task_id: str = self._get_or_create_scale_task(self._batch_name, request.template, request.fields) + worker_responses: List[CritiqueResponse] = self._get_worker_responses(task_id) + return CritiqueRequestResult(worker_responses)