From f0ada779f8ae1cde1ed478bbe7fe8a379c228613 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Wed, 24 May 2023 12:08:30 -0700 Subject: [PATCH 01/25] First Draft of ScaleCritiqueClient --- src/helm/proxy/clients/critique_client.py | 189 ++++++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index 3714a777e94..e1d9986d690 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -3,10 +3,16 @@ import threading from typing import Dict, List, Union +# Surge AI from cattrs import unstructure import surge from surge import questions as surge_questions +# Scale +import scaleapi +from scaleapi.tasks import TaskType, TaskStatus +from scaleapi.exceptions import ScaleDuplicateResource + from helm.common.hierarchical_logger import hlog from helm.common.cache import Cache, CacheConfig from helm.common.critique_request import ( @@ -17,9 +23,11 @@ CritiqueResponse, QuestionType, ) +from executing.executing import assert_linenos _surge_cache_lock = threading.Lock() +_scale_cache_lock = threading.Lock() class CritiqueClient(ABC): @@ -209,3 +217,184 @@ def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResu task_id = self._get_or_create_task(project_id, request.fields) worker_responses = self._get_worker_responses(task_id, request.template.questions) return CritiqueRequestResult(worker_responses) + + +class ScaleCritiqueClient(CritiqueClient): + """A CritiqueClient that creates tasks for workers on Scale. + + Scale concepts: + + - A **project** contains **tasks** and an **instruction**. + - A **task** is created in a project and contains **fields** that are interpolated into the + placeholders in the project's instructions and questions templates to instantiate the actual instructions + and questions in the task. + - A **task response** is a response to a task by a single worker and contains answers to all the questions + in the task. + + Mapping of HELM concepts to Surge AI concepts: + + - A `CritiqueTaskTemplate` maps to a **project**. + - A `CritiqueQuestionTemplate` maps to a **question** template in a project. + - A `CritiqueRequest` maps to a **task** + - `CritiqueRequest.template` indicates which **project** the task should be created in. + - `CritiqueRequest.fields` provides the fields that are interpolated into the placeholders in the + projects' instructions and questions templates. + - A `CritiqueResponse` maps to a **task response**. + - A `CritiqueRequestResult` maps to a list of **task responses** across multiple workers for a task. + """ + + def __init__(self, api_key: str, cache_config: CacheConfig, api_key: str): + surge.api_key = api_key + self._cache = Cache(cache_config) + self.client = scaleapi.ScaleClient(api_key) + + def _get_or_create_scale_project(self, template: CritiqueTaskTemplate) -> str: + """Get or create a project on Scale and return the Scale project name. + + Attempt to find a Scale project for the template. If one exists, reuse that project. + Otherwise, create a new project using the template. Return the Scale project name.""" + + def create_scale_project(): + project = self.client.create_project( + project_name=template.name, + task_type=TaskType.TextCollection, + params={ + "instructions": template.instructions, + }, + ) + return {"name": project.name} + + with _scale_cache_lock: + # Example cache key: + # { + # "template": { + # # See CritiqueQuestionTemplate for complete schema + # "name": "some_name", + # "instructions": "some_instructions", + # "num_respondents": 1, + # "questions": [] + # } + # } + # + # Example cache value: + # {"name": "some_name"} + project_response, is_cached = self._cache.get({"template": unstructure(template)}, create_scale_project) + project_name = project_response["name"] + if is_cached: + hlog(f"Reusing existing Scale project: {project_name}") + else: + hlog(f"Creating new Scale project: {project_name}") + return project_name + + def _interpolate_fields(self, text: str, fields: Dict[str, str]) -> str: + for field_name, field_value in fields.items(): + text = text.replace(f"{{{field_name}}}", field_value) + return text + + def get_or_create_scale_task( + self, project_name: str, question: CritiqueQuestionTemplate, num_respondents: str, fields: Dict[str, str] + ) -> str: + """Get or create a task on Scale and return the Scale task ID.""" + + def create_scale_task() -> Dict[str, str]: + """ + Creates a Scale Task (which is one single question from a CritiqueQuestionTemplate) + Returns the Scale Task ID. + """ + payload: Dict[str, str] = {} + if question.question_type == "multiple_choice" or question.question_type == "checkbox": + payload = dict( + project=project_name, + instruction=self._interpolate_fields(question.text, fields), + attachment_type="text", + response_required=num_respondents, + fields=[ + { + "type": "category", + "field_id": "category_field", + "choices": [{"label": option, "value": option} for option in question.options], + } + ], + ) + else: + raise ValueError(f"Unsupported question type {question.question_type}") + + try: + task = self.client.create_task(TaskType.TextCollection, **payload) + return {"id": task.id} + except ScaleDuplicateResource as err: + hlog(f"Scale Error: {err.message}") # If unique_id is already used for a different task + + with _scale_cache_lock: + task_response, is_cached = self._cache.get( + {"project": project_name, "question": unstructure(question), "fields": fields}, create_scale_task + ) + task_id: str = task_response["id"] + if is_cached: + hlog(f"Reusing existing Scale task: {task_id}") + else: + hlog(f"Creating new Scale task: {task_id}") + return task_id + + def _get_or_create_scale_tasks_from_task_template( + self, project_name: str, template: CritiqueTaskTemplate, fields: Dict[str, str] + ) -> List[str]: + """ + Get or create a task on Scale all the tasks for a CritiqueTaskTemplate. + Returns the Scale Task IDs. + """ + task_ids: List[str] = [] + for question in template.questions: + task_id: str = self.get_or_create_scale_task(project_name, question, template.num_respondents, fields) + task_ids.append(task_id) + return task_ids + + def _get_worker_response_single_task(self, task_id: str): + task: scaleapi.tasks.Task = self.client.get_task(task_id) + if task.status != TaskStatus.Completed: + return None + else: + # TODO: Figure out what goes into task.response + return task.response + + def _get_worker_responses( + self, project_name: str, questions: List[CritiqueQuestionTemplate], task_ids: List[str] + ) -> List[CritiqueResponse]: + assert len(task_ids) == len(questions), "Number of tasks must be equal to number of questions" + + responses = [] + for task_id in task_ids: + response = self._get_worker_response_single_task(task_id) + responses.append(response) + return responses + + def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResult: + """Create a task on Surge AI and fetch responses from Surge AI if available. + + Returns CritiqueRequestResult if worker answers are complete, or None otherwise. + The intended use is to call it once to create the task, wait a while, and then call it + later to fetch answers. + + First, attempt to find a Surge AI project for the template. If one exists, reuse that project. + Otherwise, create a new project using the template. + + Second, attempt to find a Surge AI task inside this project for the fields. If one exists, + reuse that task. Otherwise, create a new task inside the project using the fields. + + Finally, check if responses are available by checking if the number of workers who have responded + is equal to the requested number of workers. If so, return those responses. + + This method is idempotent, because projects and tasks are not created if they already exist. + + The cache will store the mappings from template to Surge AI Project ID and from fields to Surge AI + question ID. If the cache is deleted, the mappings will be lost, and this method will not be able + to fetch results from the previous projects and tasks, and will have to create new projects and tasks. + Note that worker responses are currently not cached.""" + project_name: str = self._get_or_create_scale_project(request.template) + task_ids: List[str] = self._get_or_create_scale_tasks_from_task_template( + project_name, request.template, request.fields + ) + worker_responses: List[CritiqueResponse] = self._get_worker_responses( + project_name, request.template.questions, task_ids + ) + return CritiqueRequestResult(worker_responses) From 8930ca69ea65f0fa73a4262b0a10fbb4b56a326e Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Wed, 24 May 2023 12:33:45 -0700 Subject: [PATCH 02/25] Fixed a few issues --- src/helm/proxy/clients/auto_client.py | 10 ++++++++-- src/helm/proxy/clients/critique_client.py | 16 +++++++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/helm/proxy/clients/auto_client.py b/src/helm/proxy/clients/auto_client.py index 2069a12cef1..7d6b36ff664 100644 --- a/src/helm/proxy/clients/auto_client.py +++ b/src/helm/proxy/clients/auto_client.py @@ -14,7 +14,7 @@ DecodeRequestResult, ) from helm.proxy.retry import retry_request -from .critique_client import CritiqueClient, RandomCritiqueClient, SurgeAICritiqueClient +from .critique_client import CritiqueClient, RandomCritiqueClient, SurgeAICritiqueClient, ScaleCritiqueClient from .mechanical_turk_critique_client import MechanicalTurkCritiqueClient from .client import Client from .ai21_client import AI21Client @@ -263,8 +263,14 @@ def get_critique_client(self) -> CritiqueClient: if not surgeai_credentials: raise ValueError("surgeaiApiKey credentials are required for SurgeAICritiqueClient") self.critique_client = SurgeAICritiqueClient(surgeai_credentials, self._build_cache_config("surgeai")) + + elif critique_type == "scale": + scale_credentials = self.credentials.get("scaleApiKey") + if not scale_credentials: + raise ValueError("scaleApiKey credentials are required for ScaleCritiqueClient") + self.critique_client = ScaleCritiqueClient(scale_credentials, self._build_cache_config("scale")) else: raise ValueError( - "CritiqueClient is not configured; set critiqueType to 'mturk', 'mturk-sandbox', 'surgeai' or 'random'" + "CritiqueClient is not configured; set critiqueType to 'mturk', 'mturk-sandbox', 'surgeai', 'scale' or 'random'" ) return self.critique_client diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index e1d9986d690..e564550e783 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -23,7 +23,6 @@ CritiqueResponse, QuestionType, ) -from executing.executing import assert_linenos _surge_cache_lock = threading.Lock() @@ -243,8 +242,7 @@ class ScaleCritiqueClient(CritiqueClient): - A `CritiqueRequestResult` maps to a list of **task responses** across multiple workers for a task. """ - def __init__(self, api_key: str, cache_config: CacheConfig, api_key: str): - surge.api_key = api_key + def __init__(self, api_key: str, cache_config: CacheConfig): self._cache = Cache(cache_config) self.client = scaleapi.ScaleClient(api_key) @@ -303,10 +301,19 @@ def create_scale_task() -> Dict[str, str]: """ payload: Dict[str, str] = {} if question.question_type == "multiple_choice" or question.question_type == "checkbox": + from toolbox.printing import sdebug + + sdebug(question) payload = dict( project=project_name, instruction=self._interpolate_fields(question.text, fields), attachment_type="text", + attachments=[ + { + "type": "text", + "content": "The content is in the project instructions", + } + ], response_required=num_respondents, fields=[ { @@ -390,6 +397,9 @@ def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResu question ID. If the cache is deleted, the mappings will be lost, and this method will not be able to fetch results from the previous projects and tasks, and will have to create new projects and tasks. Note that worker responses are currently not cached.""" + from toolbox.printing import sdebug + + sdebug(request) project_name: str = self._get_or_create_scale_project(request.template) task_ids: List[str] = self._get_or_create_scale_tasks_from_task_template( project_name, request.template, request.fields From 80cf99ae437077078dd5749939592b6909bea23a Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Wed, 24 May 2023 12:50:51 -0700 Subject: [PATCH 03/25] Changed task to fields, now runs --- src/helm/proxy/clients/critique_client.py | 138 ++++++---------------- 1 file changed, 38 insertions(+), 100 deletions(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index e564550e783..ff1876d8cf4 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -221,25 +221,7 @@ def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResu class ScaleCritiqueClient(CritiqueClient): """A CritiqueClient that creates tasks for workers on Scale. - Scale concepts: - - - A **project** contains **tasks** and an **instruction**. - - A **task** is created in a project and contains **fields** that are interpolated into the - placeholders in the project's instructions and questions templates to instantiate the actual instructions - and questions in the task. - - A **task response** is a response to a task by a single worker and contains answers to all the questions - in the task. - - Mapping of HELM concepts to Surge AI concepts: - - - A `CritiqueTaskTemplate` maps to a **project**. - - A `CritiqueQuestionTemplate` maps to a **question** template in a project. - - A `CritiqueRequest` maps to a **task** - - `CritiqueRequest.template` indicates which **project** the task should be created in. - - `CritiqueRequest.fields` provides the fields that are interpolated into the placeholders in the - projects' instructions and questions templates. - - A `CritiqueResponse` maps to a **task response**. - - A `CritiqueRequestResult` maps to a list of **task responses** across multiple workers for a task. + TODO """ def __init__(self, api_key: str, cache_config: CacheConfig): @@ -289,8 +271,20 @@ def _interpolate_fields(self, text: str, fields: Dict[str, str]) -> str: text = text.replace(f"{{{field_name}}}", field_value) return text - def get_or_create_scale_task( - self, project_name: str, question: CritiqueQuestionTemplate, num_respondents: str, fields: Dict[str, str] + def _critique_question_to_scale_field(self, question: CritiqueQuestionTemplate, fields: Dict[str, str]): + if question.question_type == "multiple_choice" or question.question_type == "checkbox": + return { + "type": "category", + "field_id": question.name, # This must be unique, so we use the question name + "title": question.name, + "description": self._interpolate_fields(question.text, fields), + "choices": [{"label": option, "value": option} for option in question.options], + } + else: + raise ValueError(f"Unsupported question type {question.question_type}") + + def _get_or_create_scale_task( + self, project_name: str, template: CritiqueTaskTemplate, fields: Dict[str, str] ) -> str: """Get or create a task on Scale and return the Scale task ID.""" @@ -299,32 +293,22 @@ def create_scale_task() -> Dict[str, str]: Creates a Scale Task (which is one single question from a CritiqueQuestionTemplate) Returns the Scale Task ID. """ - payload: Dict[str, str] = {} - if question.question_type == "multiple_choice" or question.question_type == "checkbox": - from toolbox.printing import sdebug - - sdebug(question) - payload = dict( - project=project_name, - instruction=self._interpolate_fields(question.text, fields), - attachment_type="text", - attachments=[ - { - "type": "text", - "content": "The content is in the project instructions", - } - ], - response_required=num_respondents, - fields=[ - { - "type": "category", - "field_id": "category_field", - "choices": [{"label": option, "value": option} for option in question.options], - } - ], - ) - else: - raise ValueError(f"Unsupported question type {question.question_type}") + from toolbox.printing import sdebug + + sdebug(template) + payload = dict( + project=project_name, + instruction=self._interpolate_fields(template.instructions, fields), + attachment_type="text", + attachments=[ + { + "type": "text", + "content": "The content is in the project instructions", + } + ], + response_required=template.num_respondents, + fields=[self._critique_question_to_scale_field(question, fields) for question in template.questions], + ) try: task = self.client.create_task(TaskType.TextCollection, **payload) @@ -334,7 +318,7 @@ def create_scale_task() -> Dict[str, str]: with _scale_cache_lock: task_response, is_cached = self._cache.get( - {"project": project_name, "question": unstructure(question), "fields": fields}, create_scale_task + {"project": project_name, "task": unstructure(template), "fields": fields}, create_scale_task ) task_id: str = task_response["id"] if is_cached: @@ -343,20 +327,7 @@ def create_scale_task() -> Dict[str, str]: hlog(f"Creating new Scale task: {task_id}") return task_id - def _get_or_create_scale_tasks_from_task_template( - self, project_name: str, template: CritiqueTaskTemplate, fields: Dict[str, str] - ) -> List[str]: - """ - Get or create a task on Scale all the tasks for a CritiqueTaskTemplate. - Returns the Scale Task IDs. - """ - task_ids: List[str] = [] - for question in template.questions: - task_id: str = self.get_or_create_scale_task(project_name, question, template.num_respondents, fields) - task_ids.append(task_id) - return task_ids - - def _get_worker_response_single_task(self, task_id: str): + def _get_worker_responses(self, task_id: str) -> List[CritiqueResponse]: task: scaleapi.tasks.Task = self.client.get_task(task_id) if task.status != TaskStatus.Completed: return None @@ -364,47 +335,14 @@ def _get_worker_response_single_task(self, task_id: str): # TODO: Figure out what goes into task.response return task.response - def _get_worker_responses( - self, project_name: str, questions: List[CritiqueQuestionTemplate], task_ids: List[str] - ) -> List[CritiqueResponse]: - assert len(task_ids) == len(questions), "Number of tasks must be equal to number of questions" - - responses = [] - for task_id in task_ids: - response = self._get_worker_response_single_task(task_id) - responses.append(response) - return responses - def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResult: - """Create a task on Surge AI and fetch responses from Surge AI if available. - - Returns CritiqueRequestResult if worker answers are complete, or None otherwise. - The intended use is to call it once to create the task, wait a while, and then call it - later to fetch answers. - - First, attempt to find a Surge AI project for the template. If one exists, reuse that project. - Otherwise, create a new project using the template. - - Second, attempt to find a Surge AI task inside this project for the fields. If one exists, - reuse that task. Otherwise, create a new task inside the project using the fields. - - Finally, check if responses are available by checking if the number of workers who have responded - is equal to the requested number of workers. If so, return those responses. - - This method is idempotent, because projects and tasks are not created if they already exist. - - The cache will store the mappings from template to Surge AI Project ID and from fields to Surge AI - question ID. If the cache is deleted, the mappings will be lost, and this method will not be able - to fetch results from the previous projects and tasks, and will have to create new projects and tasks. - Note that worker responses are currently not cached.""" + """ + TODO + """ from toolbox.printing import sdebug sdebug(request) project_name: str = self._get_or_create_scale_project(request.template) - task_ids: List[str] = self._get_or_create_scale_tasks_from_task_template( - project_name, request.template, request.fields - ) - worker_responses: List[CritiqueResponse] = self._get_worker_responses( - project_name, request.template.questions, task_ids - ) + task_id: str = self._get_or_create_scale_task(project_name, request.template, request.fields) + worker_responses: List[CritiqueResponse] = self._get_worker_responses(task_id) return CritiqueRequestResult(worker_responses) From 433286ef19747d7b08f41698cc97ac3ad31d5d62 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Wed, 24 May 2023 15:32:02 -0700 Subject: [PATCH 04/25] Added documentation and handled duplication --- src/helm/proxy/clients/critique_client.py | 108 +++++++++++++++++----- 1 file changed, 84 insertions(+), 24 deletions(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index ff1876d8cf4..e28407d2e6c 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -221,7 +221,22 @@ def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResu class ScaleCritiqueClient(CritiqueClient): """A CritiqueClient that creates tasks for workers on Scale. - TODO + Scale AI concepts: + + - A **project** contains **tasks** which can be in **batches** (not used here) + - A **task** is created in a project. It represents an individual unit of work to be done by a Tasker + It contains **attachments** which is the data to be annotated, and **fields** which are the + instructions and questions to be displayed to the Tasker. A task has also a general **instruction** + which is displayed before the fields. + - A **task response**: TODO + + Mapping of HELM concepts to Scale AI concepts: + + - A `CritiqueRequest` maps to a **task** + - `CritiqueRequest.template` indicates which **project** the task should be created in. + - A `CritiqueTaskTemplate` maps to a **task** + - A `CritiqueQuestionTemplate` maps to a **field** in a task. + - A `CritiqueResponse` maps to TODO """ def __init__(self, api_key: str, cache_config: CacheConfig): @@ -235,30 +250,43 @@ def _get_or_create_scale_project(self, template: CritiqueTaskTemplate) -> str: Otherwise, create a new project using the template. Return the Scale project name.""" def create_scale_project(): - project = self.client.create_project( - project_name=template.name, - task_type=TaskType.TextCollection, - params={ - "instructions": template.instructions, - }, - ) + try: + project = self.client.create_project( + project_name=template.name, + task_type=TaskType.TextCollection, + params={ + "instructions": template.instructions, + }, + ) + except ScaleDuplicateResource as err: + hlog(f"ScaleDuplicateResource when creating project: {template.name}. Error: {err.message}") + # Get the existing project and checks that it has the same instructions + # NOTE: This should not happen with the cache but in case the cache is deleted + # we want to make sure we don't create a new project with the same name + project = self.client.get_project(template.name) + if project.params["instructions"] != template.instructions: + raise RuntimeError( + f"Project {template.name} already exists with different instructions: " + f"{project.params['instructions']}" + ) from err + elif project.type != TaskType.TextCollection: + raise RuntimeError( + f"Project {template.name} already exists with different task_type: {project.type}" + ) from err return {"name": project.name} with _scale_cache_lock: # Example cache key: # { - # "template": { - # # See CritiqueQuestionTemplate for complete schema - # "name": "some_name", - # "instructions": "some_instructions", - # "num_respondents": 1, - # "questions": [] - # } + # "name": "some_name", + # "instructions": "some_instructions" # } # # Example cache value: # {"name": "some_name"} - project_response, is_cached = self._cache.get({"template": unstructure(template)}, create_scale_project) + project_response, is_cached = self._cache.get( + {"name": template.name, "instructions": template.instructions}, create_scale_project + ) project_name = project_response["name"] if is_cached: hlog(f"Reusing existing Scale project: {project_name}") @@ -293,12 +321,16 @@ def create_scale_task() -> Dict[str, str]: Creates a Scale Task (which is one single question from a CritiqueQuestionTemplate) Returns the Scale Task ID. """ - from toolbox.printing import sdebug - sdebug(template) + # We create a unique_id for the task so that we can reuse it if it already exists + # It contains the same information as the task itself (like the cache key) + # This is redundant with the cache but it's a good safety net + unique_id: str = str({"project": project_name, "task": unstructure(template), "fields": fields}) + instructions: str = self._interpolate_fields(template.instructions, fields) payload = dict( project=project_name, - instruction=self._interpolate_fields(template.instructions, fields), + instruction=instructions, + unique_id=unique_id, attachment_type="text", attachments=[ { @@ -314,7 +346,17 @@ def create_scale_task() -> Dict[str, str]: task = self.client.create_task(TaskType.TextCollection, **payload) return {"id": task.id} except ScaleDuplicateResource as err: - hlog(f"Scale Error: {err.message}") # If unique_id is already used for a different task + hlog(f"ScaleDuplicateResource when creating task: {unique_id}. Error: {err.message}") + # Get the existing task and checks that it has the same content (attachments and fields) + # NOTE: This should not happen with the cache but in case the cache is deleted + # we want to make sure we don't create a new task with the same content + task = self.client.get_task(unique_id) + if task.params["attachments"] != payload["attachments"]: + raise RuntimeError( + f"Task {unique_id} already exists with different attachments: " f"{task.params['attachments']}" + ) from err + # No need to check for fields, project_name and instructions because they are part of the unique_id + return {"id": task.id} with _scale_cache_lock: task_response, is_cached = self._cache.get( @@ -337,11 +379,29 @@ def _get_worker_responses(self, task_id: str) -> List[CritiqueResponse]: def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResult: """ - TODO - """ - from toolbox.printing import sdebug + Create a task on Scale AI and fetch responses from Scale AI if available. + + Returns CritiqueRequestResult if worker answers are complete, or None otherwise. + The intended use is to call it once to create the task, wait a while, and then call it + later to fetch answers. + + First, attempt to find a Scale AI project for the template. If one exists, reuse that project. + Otherwise, create a new project using the template. - sdebug(request) + Second, attempt to find a Scale AI task inside this project for the fields. If one exists, + reuse that task. Otherwise, create a new task inside the project using the fields. + + Finally, check if responses are available by checking if the number of workers who have responded + is equal to the requested number of workers. If so, return those responses. + + This method is idempotent, because projects and tasks are not created if they already exist. + + The cache will store the mappings from template to Scale AI Project ID and from questions to Scale AI + task ID. If the cache is deleted, the mappings should be conserved on Scale AI side and the API calls + should return a ScaleDuplicateResource error which is handled by the method. We still prefer to use + the cache to avoid unnecessary API calls and to not depend on Scale AI side. + Note that worker responses are currently not cached. + """ project_name: str = self._get_or_create_scale_project(request.template) task_id: str = self._get_or_create_scale_task(project_name, request.template, request.fields) worker_responses: List[CritiqueResponse] = self._get_worker_responses(task_id) From e64dc28dd95adf6c12047a28bf3d9927cf6dee82 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Wed, 24 May 2023 15:58:12 -0700 Subject: [PATCH 05/25] Swapped instruction and attachments for a task --- src/helm/proxy/clients/critique_client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index e28407d2e6c..f774418dd6a 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -269,9 +269,9 @@ def create_scale_project(): f"Project {template.name} already exists with different instructions: " f"{project.params['instructions']}" ) from err - elif project.type != TaskType.TextCollection: + elif project.type != TaskType.TextCollection.value: raise RuntimeError( - f"Project {template.name} already exists with different task_type: {project.type}" + f"Project {template.name} already exists with different task_type: '{project.type}' instead of '{TaskType.TextCollection.value}'" ) from err return {"name": project.name} @@ -329,13 +329,13 @@ def create_scale_task() -> Dict[str, str]: instructions: str = self._interpolate_fields(template.instructions, fields) payload = dict( project=project_name, - instruction=instructions, unique_id=unique_id, + instruction="The instructions are described in the attachments.", attachment_type="text", attachments=[ { "type": "text", - "content": "The content is in the project instructions", + "content": instructions, } ], response_required=template.num_respondents, From dbb26006cb70fd2f5e980c87917b7b86439b146e Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Wed, 24 May 2023 16:58:09 -0700 Subject: [PATCH 06/25] Added response handling --- src/helm/proxy/clients/critique_client.py | 53 ++++++++++++++++++++--- 1 file changed, 48 insertions(+), 5 deletions(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index f774418dd6a..fcf9c0fba15 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -329,7 +329,7 @@ def create_scale_task() -> Dict[str, str]: instructions: str = self._interpolate_fields(template.instructions, fields) payload = dict( project=project_name, - unique_id=unique_id, + # unique_id=unique_id, instruction="The instructions are described in the attachments.", attachment_type="text", attachments=[ @@ -371,11 +371,54 @@ def create_scale_task() -> Dict[str, str]: def _get_worker_responses(self, task_id: str) -> List[CritiqueResponse]: task: scaleapi.tasks.Task = self.client.get_task(task_id) - if task.status != TaskStatus.Completed: - return None + if task.status != TaskStatus.Completed.value: + return [] else: - # TODO: Figure out what goes into task.response - return task.response + annotations: Dict[List[str, str]] = task.response["annotations"] + + # The format of annotations is: + # { + # "category_field_1": [ + # answer_1_respondent_1, + # answer_1_respondent_2, + # ... + # ] + # ... + # } + # We want to convert it to: + # [ + # { + # "id": "respondent_1", + # "answers": { + # "category_field_1": answer_1_respondent_1 + # "category_field_2": answer_2_respondent_1 + # ... + # } + # }, + # { + # "id": "respondent_2", + # "answers": { + # "category_field_1": answer_1_respondent_2 + # "category_field_2": answer_2_respondent_2 + # ... + # } + # }, + # ... + # ] + + # First, we get the list of respondents + num_respondents: int = len(annotations[list(annotations.keys())[0]]) + + # Then, we create the list of responses + responses: List[CritiqueResponse] = [] + for respondent_index in range(num_respondents): + answers: Dict[str, Union[str, List[str]]] = {} + for field_name, field_answers in annotations.items(): + answers[field_name] = field_answers[respondent_index] + responses.append( + CritiqueResponse(id=str(respondent_index), respondent_id=str(respondent_index), answers=answers) + ) + return responses def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResult: """ From 4a6095f827fd6a529493dba85698b98932b090c4 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Wed, 24 May 2023 17:09:29 -0700 Subject: [PATCH 07/25] Change unique_id to sha512 --- src/helm/proxy/clients/critique_client.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index fcf9c0fba15..cf658c7eaa9 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from hashlib import sha512 import random import threading from typing import Dict, List, Union @@ -325,11 +326,14 @@ def create_scale_task() -> Dict[str, str]: # We create a unique_id for the task so that we can reuse it if it already exists # It contains the same information as the task itself (like the cache key) # This is redundant with the cache but it's a good safety net - unique_id: str = str({"project": project_name, "task": unstructure(template), "fields": fields}) + # NOTE: Technically, sha512 could have collisions but it's unlikely. + unique_id: str = sha512( + str({"project": project_name, "task": unstructure(template), "fields": fields}).encode() + ).hexdigest() instructions: str = self._interpolate_fields(template.instructions, fields) payload = dict( project=project_name, - # unique_id=unique_id, + unique_id=unique_id, instruction="The instructions are described in the attachments.", attachment_type="text", attachments=[ From 89546a0f672a8612c5b396577d10d6381f860522 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 11:02:51 -0700 Subject: [PATCH 08/25] Fixed interpolation of files (brackets not removed) --- src/helm/proxy/clients/critique_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index cf658c7eaa9..21b9dc6513a 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -297,7 +297,7 @@ def create_scale_project(): def _interpolate_fields(self, text: str, fields: Dict[str, str]) -> str: for field_name, field_value in fields.items(): - text = text.replace(f"{{{field_name}}}", field_value) + text = text.replace("{{" + field_name + "}}", field_value) return text def _critique_question_to_scale_field(self, question: CritiqueQuestionTemplate, fields: Dict[str, str]): From 801b9bff8d813660606cc6d2ac0100e531d25aa4 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 11:06:07 -0700 Subject: [PATCH 09/25] Fix flake --- src/helm/proxy/clients/auto_client.py | 3 ++- src/helm/proxy/clients/critique_client.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/helm/proxy/clients/auto_client.py b/src/helm/proxy/clients/auto_client.py index 7d6b36ff664..4d1799dd2f3 100644 --- a/src/helm/proxy/clients/auto_client.py +++ b/src/helm/proxy/clients/auto_client.py @@ -271,6 +271,7 @@ def get_critique_client(self) -> CritiqueClient: self.critique_client = ScaleCritiqueClient(scale_credentials, self._build_cache_config("scale")) else: raise ValueError( - "CritiqueClient is not configured; set critiqueType to 'mturk', 'mturk-sandbox', 'surgeai', 'scale' or 'random'" + "CritiqueClient is not configured; set critiqueType to 'mturk'," + "'mturk-sandbox', 'surgeai', 'scale' or 'random'" ) return self.critique_client diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index 21b9dc6513a..214c7a36afc 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -272,7 +272,8 @@ def create_scale_project(): ) from err elif project.type != TaskType.TextCollection.value: raise RuntimeError( - f"Project {template.name} already exists with different task_type: '{project.type}' instead of '{TaskType.TextCollection.value}'" + f"Project {template.name} already exists with different task_type: " + f"'{project.type}' instead of '{TaskType.TextCollection.value}'" ) from err return {"name": project.name} From 9056fcbde207cfba72ae762836fed04b68de08be Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 11:34:26 -0700 Subject: [PATCH 10/25] Fix mypy --- src/helm/proxy/clients/critique_client.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index 214c7a36afc..de67c6caed2 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -379,7 +379,7 @@ def _get_worker_responses(self, task_id: str) -> List[CritiqueResponse]: if task.status != TaskStatus.Completed.value: return [] else: - annotations: Dict[List[str, str]] = task.response["annotations"] + annotations: Dict[str, List[str]] = task.response["annotations"] # The format of annotations is: # { @@ -387,8 +387,13 @@ def _get_worker_responses(self, task_id: str) -> List[CritiqueResponse]: # answer_1_respondent_1, # answer_1_respondent_2, # ... - # ] - # ... + # ], + # "category_field_2": [ + # answer_2_respondent_1, + # answer_2_respondent_2, + # ... + # ], + # ... # } # We want to convert it to: # [ From 0efbd2043e0ebde987d87b4192dcd665ac3b5779 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 12:54:32 -0700 Subject: [PATCH 11/25] Add scaleapi to requirements --- requirements-freeze.txt | 1 + requirements.txt | 1 + setup.cfg | 1 + 3 files changed, 3 insertions(+) diff --git a/requirements-freeze.txt b/requirements-freeze.txt index 883abafa9bf..ff2d63d356e 100644 --- a/requirements-freeze.txt +++ b/requirements-freeze.txt @@ -133,6 +133,7 @@ rsa==4.9 s3transfer==0.6.0 sacrebleu==2.2.1 sacremoses==0.0.53 +scaleapi==2.13.0 scikit-learn==1.1.2 scipy==1.9.1 selenium==4.8.0 diff --git a/requirements.txt b/requirements.txt index 2a97e02b45b..bc18a7e4dca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -60,6 +60,7 @@ rouge-score~=0.1.2 pyext~=0.7 pytrec_eval==0.5 sacrebleu~=2.2.1 +scaleapi~=2.13.0 # Work around https://github.com/p-lambda/verified_calibration/issues/11 # TODO: Remove after this issue is resolved scikit-learn~=1.1.2 diff --git a/setup.cfg b/setup.cfg index fce3cfc8dd4..8c3473db56a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -68,6 +68,7 @@ install_requires= pyext~=0.7 pytrec_eval==0.5 sacrebleu~=2.2.1 + scaleapi~=2.13.0 # Work around https://github.com/p-lambda/verified_calibration/issues/11 # TODO: Remove after this issue is resolved scikit-learn~=1.1.2 From 8ecfdf0161a451b70be09049d748fd48509ab1d0 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 13:28:03 -0700 Subject: [PATCH 12/25] Changes requested: mainly including switching to Rapid --- src/helm/proxy/clients/critique_client.py | 100 ++++++++++++++-------- 1 file changed, 65 insertions(+), 35 deletions(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index de67c6caed2..8a1c3b87dab 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -1,8 +1,9 @@ from abc import ABC, abstractmethod from hashlib import sha512 +import json import random import threading -from typing import Dict, List, Union +from typing import Dict, List, Union, Tuple # Surge AI from cattrs import unstructure @@ -63,6 +64,7 @@ def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResu class SurgeAICritiqueClient(CritiqueClient): + # TODO #1614: Move this to its own file """A CritiqueClient that creates tasks for workers on Surge AI. Surge AI concepts: @@ -220,6 +222,7 @@ def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResu class ScaleCritiqueClient(CritiqueClient): + # TODO #1614: Move this to its own file """A CritiqueClient that creates tasks for workers on Scale. Scale AI concepts: @@ -242,40 +245,57 @@ class ScaleCritiqueClient(CritiqueClient): def __init__(self, api_key: str, cache_config: CacheConfig): self._cache = Cache(cache_config) - self.client = scaleapi.ScaleClient(api_key) + self._client = scaleapi.ScaleClient(api_key) - def _get_or_create_scale_project(self, template: CritiqueTaskTemplate) -> str: - """Get or create a project on Scale and return the Scale project name. + def _get_or_create_scale_project_and_batch(self, template: CritiqueTaskTemplate) -> Tuple[str, str]: + """Get or create a project and a btach associated to it on Scale and return the + Scale project name and batch name. + + This is using Scale Rapid. Attempt to find a Scale project for the template. If one exists, reuse that project. - Otherwise, create a new project using the template. Return the Scale project name.""" + Otherwise, create a new project using the template. Return the Scale project name. + Same for the batch.""" - def create_scale_project(): + def create_scale_project_and_batch(): try: - project = self.client.create_project( + project = self._client.create_project( project_name=template.name, task_type=TaskType.TextCollection, - params={ - "instructions": template.instructions, - }, + rapid=True, + params={}, ) except ScaleDuplicateResource as err: hlog(f"ScaleDuplicateResource when creating project: {template.name}. Error: {err.message}") - # Get the existing project and checks that it has the same instructions + # Get the existing project and checks that it has the same params # NOTE: This should not happen with the cache but in case the cache is deleted # we want to make sure we don't create a new project with the same name - project = self.client.get_project(template.name) - if project.params["instructions"] != template.instructions: - raise RuntimeError( - f"Project {template.name} already exists with different instructions: " - f"{project.params['instructions']}" - ) from err - elif project.type != TaskType.TextCollection.value: + project = self._client.get_project(template.name) + if project.type != TaskType.TextCollection.value: raise RuntimeError( f"Project {template.name} already exists with different task_type: " f"'{project.type}' instead of '{TaskType.TextCollection.value}'" ) from err - return {"name": project.name} + try: + batch_name: str = f"{project.name}-HELMBatch" + batch = self._client.create_batch( + project=project.name, + batch_name=batch_name, + calibration_batch=False, + ) + except ScaleDuplicateResource as err: + hlog( + f"ScaleDuplicateResource when creating batch: '{batch_name}' in project: {project.name}. Error: {err.message}" + ) + # Get the existing batch and checks that it has the same project + # NOTE: This should not happen with the cache but in case the cache is deleted + # we want to make sure we don't create a new batch with the same name + batch = self._client.get_batch(batch_name) + if batch.project != project.name: + raise RuntimeError( + f"Batch {batch_name} already exists with different project: " f"{batch.project}" + ) from err + return {"project_name": project.name, "batch_name": batch.name} with _scale_cache_lock: # Example cache key: @@ -287,14 +307,15 @@ def create_scale_project(): # Example cache value: # {"name": "some_name"} project_response, is_cached = self._cache.get( - {"name": template.name, "instructions": template.instructions}, create_scale_project + {"name": template.name, "instructions": template.instructions}, create_scale_project_and_batch ) - project_name = project_response["name"] + project_name = project_response["project_name"] + batch_name = project_response["batch_name"] if is_cached: - hlog(f"Reusing existing Scale project: {project_name}") + hlog(f"Reusing existing Scale project: {project_name} and batch: {batch_name}") else: - hlog(f"Creating new Scale project: {project_name}") - return project_name + hlog(f"Creating new Scale project: {project_name} and batch: {batch_name}") + return project_name, batch_name def _interpolate_fields(self, text: str, fields: Dict[str, str]) -> str: for field_name, field_value in fields.items(): @@ -309,15 +330,20 @@ def _critique_question_to_scale_field(self, question: CritiqueQuestionTemplate, "title": question.name, "description": self._interpolate_fields(question.text, fields), "choices": [{"label": option, "value": option} for option in question.options], + "min_choices": 1, + "max_choices": len(question.options) if question.question_type == "checkbox" else 1, } else: raise ValueError(f"Unsupported question type {question.question_type}") def _get_or_create_scale_task( - self, project_name: str, template: CritiqueTaskTemplate, fields: Dict[str, str] + self, project_name: str, batch_name: str, template: CritiqueTaskTemplate, fields: Dict[str, str] ) -> str: """Get or create a task on Scale and return the Scale task ID.""" + # Used both for the cache key and the task unique_id + cache_key = {"project": project_name, "batch": batch_name, "task": unstructure(template), "fields": fields} + def create_scale_task() -> Dict[str, str]: """ Creates a Scale Task (which is one single question from a CritiqueQuestionTemplate) @@ -328,14 +354,13 @@ def create_scale_task() -> Dict[str, str]: # It contains the same information as the task itself (like the cache key) # This is redundant with the cache but it's a good safety net # NOTE: Technically, sha512 could have collisions but it's unlikely. - unique_id: str = sha512( - str({"project": project_name, "task": unstructure(template), "fields": fields}).encode() - ).hexdigest() + unique_id: str = sha512(str(json.dumps(cache_key, sort_keys=True)).encode()).hexdigest() instructions: str = self._interpolate_fields(template.instructions, fields) payload = dict( project=project_name, + batch=batch_name, unique_id=unique_id, - instruction="The instructions are described in the attachments.", + instruction="Evaluate the AI model generated output following the instructions below", attachment_type="text", attachments=[ { @@ -348,14 +373,14 @@ def create_scale_task() -> Dict[str, str]: ) try: - task = self.client.create_task(TaskType.TextCollection, **payload) + task = self._client.create_task(TaskType.TextCollection, **payload) return {"id": task.id} except ScaleDuplicateResource as err: hlog(f"ScaleDuplicateResource when creating task: {unique_id}. Error: {err.message}") # Get the existing task and checks that it has the same content (attachments and fields) # NOTE: This should not happen with the cache but in case the cache is deleted # we want to make sure we don't create a new task with the same content - task = self.client.get_task(unique_id) + task = self._client.get_task(unique_id) if task.params["attachments"] != payload["attachments"]: raise RuntimeError( f"Task {unique_id} already exists with different attachments: " f"{task.params['attachments']}" @@ -365,7 +390,8 @@ def create_scale_task() -> Dict[str, str]: with _scale_cache_lock: task_response, is_cached = self._cache.get( - {"project": project_name, "task": unstructure(template), "fields": fields}, create_scale_task + cache_key, + create_scale_task, ) task_id: str = task_response["id"] if is_cached: @@ -374,8 +400,11 @@ def create_scale_task() -> Dict[str, str]: hlog(f"Creating new Scale task: {task_id}") return task_id + def _finalize_batch(self, batch_name: str): + self._client.finalize_batch(batch_name=batch_name) + def _get_worker_responses(self, task_id: str) -> List[CritiqueResponse]: - task: scaleapi.tasks.Task = self.client.get_task(task_id) + task: scaleapi.tasks.Task = self._client.get_task(task_id) if task.status != TaskStatus.Completed.value: return [] else: @@ -455,7 +484,8 @@ def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResu the cache to avoid unnecessary API calls and to not depend on Scale AI side. Note that worker responses are currently not cached. """ - project_name: str = self._get_or_create_scale_project(request.template) - task_id: str = self._get_or_create_scale_task(project_name, request.template, request.fields) + project_name, batch_name = self._get_or_create_scale_project_and_batch(request.template) + task_id: str = self._get_or_create_scale_task(project_name, batch_name, request.template, request.fields) worker_responses: List[CritiqueResponse] = self._get_worker_responses(task_id) + self._finalize_batch(batch_name) return CritiqueRequestResult(worker_responses) From 101966e30c808f021350d791ed55ad5c5f3e9b8b Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 15:02:24 -0700 Subject: [PATCH 13/25] Remove finalizing and set to self labeling --- src/helm/proxy/clients/critique_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index 8a1c3b87dab..2ca886f4936 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -282,6 +282,7 @@ def create_scale_project_and_batch(): project=project.name, batch_name=batch_name, calibration_batch=False, + self_label_batch=True, ) except ScaleDuplicateResource as err: hlog( @@ -487,5 +488,5 @@ def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResu project_name, batch_name = self._get_or_create_scale_project_and_batch(request.template) task_id: str = self._get_or_create_scale_task(project_name, batch_name, request.template, request.fields) worker_responses: List[CritiqueResponse] = self._get_worker_responses(task_id) - self._finalize_batch(batch_name) + # self._finalize_batch(batch_name) return CritiqueRequestResult(worker_responses) From 9eb9708e20cc82f035686c93ec75a4c916e9ee0f Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 17:21:17 -0700 Subject: [PATCH 14/25] Add finalize script --- scripts/scale/finalize_batch.py | 35 +++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 scripts/scale/finalize_batch.py diff --git a/scripts/scale/finalize_batch.py b/scripts/scale/finalize_batch.py new file mode 100644 index 00000000000..aa49e0c2732 --- /dev/null +++ b/scripts/scale/finalize_batch.py @@ -0,0 +1,35 @@ +from typing import Dict +import argparse +from scaleapi import ScaleClient +import os + +parser = argparse.ArgumentParser() +parser.add_argument("--batch_name", type=str, help="Name of the batch to finalize") +args = parser.parse_args() + + +def get_credentials(path: str) -> Dict[str, str]: + # Reads the credentials from the given path + with open(path, "r") as f: + # Read line by line, replaces the spaces, splits on the first ":" + # The first part is the key, the second part contians the value in between quotes + credentials = {} + for line in f.readlines(): + elt = line.replace(" ", "").replace("\n", "").split(":") + if len(elt) == 2: + credentials[elt[0]] = elt[1].split('"')[1] + return credentials + + +relative_credentials_path = "../../prod_env/credentials.conf" +credentials_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), relative_credentials_path) +print(f"Reading credentials from {credentials_path}") +credentials = get_credentials(credentials_path) + +# Check that scaleApiKey is set +if "scaleApiKey" not in credentials: + raise Exception("scaleApiKey not found in credentials.conf") + +# Get scale client +client = ScaleClient(credentials["scaleApiKey"]) +client.finalize_batch(args.batch_name) From 56bcc3f02cf3e201505e6b675811a6b7a605e4e7 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 18:07:08 -0700 Subject: [PATCH 15/25] Add script to create project --- scripts/scale/create_and_setup_project.py | 143 ++++++++++++++++++++++ scripts/scale/finalize_batch.py | 30 +---- scripts/scale/scale_utils.py | 31 +++++ 3 files changed, 176 insertions(+), 28 deletions(-) create mode 100644 scripts/scale/create_and_setup_project.py create mode 100644 scripts/scale/scale_utils.py diff --git a/scripts/scale/create_and_setup_project.py b/scripts/scale/create_and_setup_project.py new file mode 100644 index 00000000000..d545a76a5d7 --- /dev/null +++ b/scripts/scale/create_and_setup_project.py @@ -0,0 +1,143 @@ +import argparse +from scale_utils import get_scale_client +from scaleapi.tasks import TaskType +from scaleapi.exceptions import ScaleDuplicateResource + +parser = argparse.ArgumentParser() +parser.add_argument("--project_name", type=str, help="Name of the project to create") +args = parser.parse_args() +project_name = args.project_name +client = get_scale_client() + +print("\nGetting project...") +try: + print(f"Trying to create project {project_name} ...") + project = client.create_project( + project_name=project_name, + task_type=TaskType.TextCollection, + rapid=True, + params={}, + ) + print("Project created.") +except ScaleDuplicateResource as err: + print(f"Project {project_name} already exists. Using existing project.") + project = client.get_project(project_name) + + +# Create a calibration batch +print("\nCreating calibration batch...") +try: + calib_batch_name = project_name + "_calibration" + batch = client.create_batch( + project=project_name, + batch_name=calib_batch_name, + calibration_batch=True, + ) + print("Calibration batch created.") + # Create 10 tasks in the calibration batch + for i in range(10): + payload = dict( + project=project_name, + batch=calib_batch_name, + instruction="This is a fake calibration task to bypass the API. Please simply answer Yes.", + attachment_type="text", + attachments=[ + { + "type": "text", + "content": "This is a placeholder for the calibration task. Please simply answer Yes.", + } + ], + fields=[ + { + "type": "category", + "field_id": "answer", + "title": "Continue to the next task?", + "choices": [{"label": "Yes", "value": "yes"}, {"label": "No", "value": "no"}], + } + ], + ) + client.create_task(TaskType.TextCollection, **payload) + print(f" Calibration task {i} created.") + print("Finalizing calibration batch...") + client.finalize_batch(calib_batch_name) + print("Calibration batch finalized.") +except ScaleDuplicateResource as err: + print(f"Calibration batch {calib_batch_name} already exists. It will not be recreated.") + + +# Create evaluation tasks +expected_response = { + "annotations": {"answer_reasonable": {"type": "category", "field_id": "answer", "response": [["no"]]}} +} +initial_response = { + "annotations": {"answer_reasonable": {"type": "category", "field_id": "answer", "response": [["yes"]]}} +} +attachments = [ + { + "type": "text", + "content": "Please Answer Yes to this question. This is simply a way to bypass the need for evaluation tasks.", + }, +] +payload = dict( + project=project_name, + rapid=True, + attachments=attachments, + initial_response=initial_response, + expected_response=expected_response, + fields=[ + { + "type": "category", + "field_id": "answer", + "title": "Continue to the next task?", + "choices": [{"label": "Yes", "value": "yes"}, {"label": "No", "value": "no"}], + } + ], +) +print("\nCreating evaluation tasks...") +for i in range(10): + evaluation_task = client.create_evaluation_task(TaskType.TextCollection, **payload) + print(f" Evaluation task {i} created.") +print("Evaluation tasks created.") + +# Create a test batch +print("\nCreating test batch...") +try: + test_batch_name = project_name + "_test" + batch = client.create_batch( + project=project_name, + batch_name=test_batch_name, + calibration_batch=False, + ) + print("Test batch created.") +except ScaleDuplicateResource as err: + print(f"Test batch {test_batch_name} already exists. It will not be recreated.") +# Try to create a single task in the test batch +payload = dict( + project=project_name, + batch=test_batch_name, + instruction="This is a test task to check that we can create tasks. If you are a worker please simply answer Yes.", + attachment_type="text", + attachments=[ + { + "type": "text", + "content": "This is a placeholder for the test task. If you are a worker please simply answer Yes.", + } + ], + fields=[ + { + "type": "category", + "field_id": "answer", + "title": "Finish?", + "choices": [{"label": "Yes", "value": "yes"}, {"label": "No", "value": "no"}], + } + ], +) +print("Creating test task...") +client.create_task(TaskType.TextCollection, **payload) +print("Test task created.") +print("The test batch is not going to be finalized so that it does not get sent to workers.") + +# If we are here, it means that the project is ready. +# Print the project_name and a success message. +print(f"\n\nProject {project_name} is ready.") +print("Please go to https://app.scale.com/projects to check that the project is ready.") diff --git a/scripts/scale/finalize_batch.py b/scripts/scale/finalize_batch.py index aa49e0c2732..56c4179cd47 100644 --- a/scripts/scale/finalize_batch.py +++ b/scripts/scale/finalize_batch.py @@ -1,35 +1,9 @@ -from typing import Dict import argparse -from scaleapi import ScaleClient -import os +from scale_utils import get_scale_client parser = argparse.ArgumentParser() parser.add_argument("--batch_name", type=str, help="Name of the batch to finalize") args = parser.parse_args() - -def get_credentials(path: str) -> Dict[str, str]: - # Reads the credentials from the given path - with open(path, "r") as f: - # Read line by line, replaces the spaces, splits on the first ":" - # The first part is the key, the second part contians the value in between quotes - credentials = {} - for line in f.readlines(): - elt = line.replace(" ", "").replace("\n", "").split(":") - if len(elt) == 2: - credentials[elt[0]] = elt[1].split('"')[1] - return credentials - - -relative_credentials_path = "../../prod_env/credentials.conf" -credentials_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), relative_credentials_path) -print(f"Reading credentials from {credentials_path}") -credentials = get_credentials(credentials_path) - -# Check that scaleApiKey is set -if "scaleApiKey" not in credentials: - raise Exception("scaleApiKey not found in credentials.conf") - -# Get scale client -client = ScaleClient(credentials["scaleApiKey"]) +client = get_scale_client() client.finalize_batch(args.batch_name) diff --git a/scripts/scale/scale_utils.py b/scripts/scale/scale_utils.py new file mode 100644 index 00000000000..e12ac8ed597 --- /dev/null +++ b/scripts/scale/scale_utils.py @@ -0,0 +1,31 @@ +import os +from typing import Dict +from scaleapi import ScaleClient + + +def get_credentials(path: str) -> Dict[str, str]: + # Reads the credentials from the given path + with open(path, "r") as f: + # Read line by line, replaces the spaces, splits on the first ":" + # The first part is the key, the second part contians the value in between quotes + credentials = {} + for line in f.readlines(): + elt = line.replace(" ", "").replace("\n", "").split(":") + if len(elt) == 2: + credentials[elt[0]] = elt[1].split('"')[1] + return credentials + + +def get_scale_client() -> ScaleClient: + relative_credentials_path = "../../prod_env/credentials.conf" + credentials_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), relative_credentials_path) + print(f"Reading credentials from {credentials_path}") + credentials = get_credentials(credentials_path) + + # Check that scaleApiKey is set + if "scaleApiKey" not in credentials: + raise Exception("scaleApiKey not found in credentials.conf") + + # Get scale client + client = ScaleClient(credentials["scaleApiKey"]) + return client From 8a6dbcd3868b9629326dbfeeea9bd42f3167a396 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 18:07:49 -0700 Subject: [PATCH 16/25] Allow the user to specify the project and batch name in credentials.conf --- src/helm/proxy/clients/auto_client.py | 6 +- src/helm/proxy/clients/critique_client.py | 121 +++++++++++++++++----- 2 files changed, 101 insertions(+), 26 deletions(-) diff --git a/src/helm/proxy/clients/auto_client.py b/src/helm/proxy/clients/auto_client.py index 4d1799dd2f3..daa40af4111 100644 --- a/src/helm/proxy/clients/auto_client.py +++ b/src/helm/proxy/clients/auto_client.py @@ -266,9 +266,13 @@ def get_critique_client(self) -> CritiqueClient: elif critique_type == "scale": scale_credentials = self.credentials.get("scaleApiKey") + scale_project = self.credentials.get("scaleProject", None) + scale_batch = self.credentials.get("scaleBatch", None) if not scale_credentials: raise ValueError("scaleApiKey credentials are required for ScaleCritiqueClient") - self.critique_client = ScaleCritiqueClient(scale_credentials, self._build_cache_config("scale")) + self.critique_client = ScaleCritiqueClient( + scale_credentials, self._build_cache_config("scale"), scale_project, scale_batch + ) else: raise ValueError( "CritiqueClient is not configured; set critiqueType to 'mturk'," diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index 2ca886f4936..2cab1ece437 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -3,7 +3,7 @@ import json import random import threading -from typing import Dict, List, Union, Tuple +from typing import Dict, List, Union, Tuple, Optional # Surge AI from cattrs import unstructure @@ -25,6 +25,7 @@ CritiqueResponse, QuestionType, ) +from gymnasium.vector.utils.spaces import batch_space _surge_cache_lock = threading.Lock() @@ -243,21 +244,42 @@ class ScaleCritiqueClient(CritiqueClient): - A `CritiqueResponse` maps to TODO """ - def __init__(self, api_key: str, cache_config: CacheConfig): + def __init__( + self, + api_key: str, + cache_config: CacheConfig, + project_name: Optional[str] = None, + batch_name: Optional[str] = None, + ): self._cache = Cache(cache_config) self._client = scaleapi.ScaleClient(api_key) + self._project_name = project_name + self._batch_name = batch_name def _get_or_create_scale_project_and_batch(self, template: CritiqueTaskTemplate) -> Tuple[str, str]: - """Get or create a project and a btach associated to it on Scale and return the + """Get or create a project and a batch associated to it on Scale and return the Scale project name and batch name. + If the project_name was specified in the credentials, use it (should already exist). + In order to send tasks, the project needs to already have a calibration batch. + Otherwise, create a new project using the template name. + If the project is created, there will be no calibration batch so the batch will be set + as self-labeling. + + If the batch_name was specified in the credentials, use it (does not necessarily need to exist). + Otherwise, create a new batch using the project name and the template name. + This is using Scale Rapid. Attempt to find a Scale project for the template. If one exists, reuse that project. Otherwise, create a new project using the template. Return the Scale project name. Same for the batch.""" - def create_scale_project_and_batch(): + project_was_created: bool = False + project_name: Optional[str] = None + batch_name: Optional[str] = None + + def create_scale_project(): try: project = self._client.create_project( project_name=template.name, @@ -265,6 +287,8 @@ def create_scale_project_and_batch(): rapid=True, params={}, ) + project_name = project.name + project_was_created = True except ScaleDuplicateResource as err: hlog(f"ScaleDuplicateResource when creating project: {template.name}. Error: {err.message}") # Get the existing project and checks that it has the same params @@ -276,17 +300,24 @@ def create_scale_project_and_batch(): f"Project {template.name} already exists with different task_type: " f"'{project.type}' instead of '{TaskType.TextCollection.value}'" ) from err + return {"project_name": project.name} + + def create_scale_batch(): + if project_name is None: + raise RuntimeError("Trying to create a batch without a project name.") try: - batch_name: str = f"{project.name}-HELMBatch" + batch_name: str = f"{project_name}-HELMBatch" batch = self._client.create_batch( - project=project.name, + project=project_name, batch_name=batch_name, calibration_batch=False, - self_label_batch=True, + # If the project was created, there is no calibration batch so we set it as self-labeling + # otherwise the API will return an error. + self_label_batch=project_was_created, ) except ScaleDuplicateResource as err: hlog( - f"ScaleDuplicateResource when creating batch: '{batch_name}' in project: {project.name}. Error: {err.message}" + f"ScaleDuplicateResource when creating batch: '{batch_name}' in project: {project_name}. Error: {err.message}" ) # Get the existing batch and checks that it has the same project # NOTE: This should not happen with the cache but in case the cache is deleted @@ -296,26 +327,66 @@ def create_scale_project_and_batch(): raise RuntimeError( f"Batch {batch_name} already exists with different project: " f"{batch.project}" ) from err - return {"project_name": project.name, "batch_name": batch.name} + return {"batch_name": batch.name} + + # Check if a project_name was specified in the credentials + if self._project_name is not None: + project_name = self._project_name + hlog(f"Using existing Scale project: {project_name} (defined in credentials)") + # Checks that the project exists + try: + project = self._client.get_project(project_name) + except Exception as err: + raise RuntimeError(f"Project {project_name} does not exist") from err + + # Check if a batch_name was specified in the credentials + if self._batch_name is not None: + batch_name = self._batch_name + hlog(f"Using existing Scale batch: {batch_name} (defined in credentials)") + # Checks that the batch exists + try: + batch = self._client.get_batch(batch_name) + except Exception as err: + raise RuntimeError(f"Batch {batch_name} does not exist") from err + # Checks that the batch is in the project + if batch.project != project_name: + raise RuntimeError( + f"Batch {batch_name} is not in project {project_name}. " + f"It is in project {batch.project} instead." + ) + return project_name, batch_name + + # If we reach here it means that a project_name was specified but not a batch_name + # Create a new batch using the template name + with _scale_cache_lock: + batch_response, is_cached = self._cache.get( + {"project": project_name, "template": template.name}, create_scale_batch + ) + batch_name = batch_response["batch_name"] + else: + # If we reach here it means that no project_name was specified + # Create a new project using the template name + # Create a new batch using the project name and the template name + + with _scale_cache_lock: + project_response, is_cached = self._cache.get({"template": template.name}, create_scale_project) + project_name = project_response["project_name"] + if is_cached: + hlog(f"Reusing existing Scale project: {project_name}") + else: + hlog(f"Creating new Scale project: {project_name}") + + with _scale_cache_lock: + batch_response, is_cached = self._cache.get( + {"project": project_name, "template": template.name}, create_scale_batch + ) + batch_name = batch_response["batch_name"] - with _scale_cache_lock: - # Example cache key: - # { - # "name": "some_name", - # "instructions": "some_instructions" - # } - # - # Example cache value: - # {"name": "some_name"} - project_response, is_cached = self._cache.get( - {"name": template.name, "instructions": template.instructions}, create_scale_project_and_batch - ) - project_name = project_response["project_name"] - batch_name = project_response["batch_name"] if is_cached: - hlog(f"Reusing existing Scale project: {project_name} and batch: {batch_name}") + hlog(f"Reusing existing Scale batch: {batch_name}") else: - hlog(f"Creating new Scale project: {project_name} and batch: {batch_name}") + hlog(f"Creating new Scale batch: {batch_name}") + return project_name, batch_name def _interpolate_fields(self, text: str, fields: Dict[str, str]) -> str: From 9c8ef9bb6ce812ce4146330909ca9efb3b10a3d3 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 18:13:30 -0700 Subject: [PATCH 17/25] Fix flake --- scripts/scale/create_and_setup_project.py | 6 +++--- src/helm/proxy/clients/critique_client.py | 9 ++++----- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/scripts/scale/create_and_setup_project.py b/scripts/scale/create_and_setup_project.py index d545a76a5d7..bdd621470cd 100644 --- a/scripts/scale/create_and_setup_project.py +++ b/scripts/scale/create_and_setup_project.py @@ -20,7 +20,7 @@ ) print("Project created.") except ScaleDuplicateResource as err: - print(f"Project {project_name} already exists. Using existing project.") + print(f"Project {project_name} already exists. Using existing project. Error: {err}") project = client.get_project(project_name) @@ -62,7 +62,7 @@ client.finalize_batch(calib_batch_name) print("Calibration batch finalized.") except ScaleDuplicateResource as err: - print(f"Calibration batch {calib_batch_name} already exists. It will not be recreated.") + print(f"Calibration batch {calib_batch_name} already exists. It will not be recreated. Error: {err}") # Create evaluation tasks @@ -110,7 +110,7 @@ ) print("Test batch created.") except ScaleDuplicateResource as err: - print(f"Test batch {test_batch_name} already exists. It will not be recreated.") + print(f"Test batch {test_batch_name} already exists. It will not be recreated. Error: {err}") # Try to create a single task in the test batch payload = dict( project=project_name, diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index 2cab1ece437..801cceb064a 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -25,7 +25,6 @@ CritiqueResponse, QuestionType, ) -from gymnasium.vector.utils.spaces import batch_space _surge_cache_lock = threading.Lock() @@ -275,7 +274,7 @@ def _get_or_create_scale_project_and_batch(self, template: CritiqueTaskTemplate) Otherwise, create a new project using the template. Return the Scale project name. Same for the batch.""" - project_was_created: bool = False + project_was_created: bool = self._project_name is None project_name: Optional[str] = None batch_name: Optional[str] = None @@ -287,8 +286,6 @@ def create_scale_project(): rapid=True, params={}, ) - project_name = project.name - project_was_created = True except ScaleDuplicateResource as err: hlog(f"ScaleDuplicateResource when creating project: {template.name}. Error: {err.message}") # Get the existing project and checks that it has the same params @@ -317,7 +314,8 @@ def create_scale_batch(): ) except ScaleDuplicateResource as err: hlog( - f"ScaleDuplicateResource when creating batch: '{batch_name}' in project: {project_name}. Error: {err.message}" + f"ScaleDuplicateResource when creating batch: '{batch_name}' in project: " + f"{project_name}. Error: {err.message}" ) # Get the existing batch and checks that it has the same project # NOTE: This should not happen with the cache but in case the cache is deleted @@ -336,6 +334,7 @@ def create_scale_batch(): # Checks that the project exists try: project = self._client.get_project(project_name) + project_name = project.name except Exception as err: raise RuntimeError(f"Project {project_name} does not exist") from err From 0d0ce1f087677353b4ecb96ea7a18b6202b9a40f Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 18:26:03 -0700 Subject: [PATCH 18/25] Simple changes to project creation script --- scripts/scale/create_and_setup_project.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/scale/create_and_setup_project.py b/scripts/scale/create_and_setup_project.py index bdd621470cd..c44a380a9ee 100644 --- a/scripts/scale/create_and_setup_project.py +++ b/scripts/scale/create_and_setup_project.py @@ -44,7 +44,9 @@ attachments=[ { "type": "text", - "content": "This is a placeholder for the calibration task. Please simply answer Yes.", + "content": "This is a fake calibration task to bypass the API. " + "We do not need calibration but would like to be able to send actual task. " + "In order to do this, we need to finish calibration. Please simply answer Yes.", } ], fields=[ From eb3d36c0639677edf9045c9438454666faf0c567 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 18:29:41 -0700 Subject: [PATCH 19/25] Set to self labeling for Tony --- src/helm/proxy/clients/critique_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index 801cceb064a..0a7e632911f 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -310,7 +310,7 @@ def create_scale_batch(): calibration_batch=False, # If the project was created, there is no calibration batch so we set it as self-labeling # otherwise the API will return an error. - self_label_batch=project_was_created, + self_label_batch=True, # project_was_created, ) except ScaleDuplicateResource as err: hlog( From 08ae8d03f72c87c6eb21c28739979583cb404922 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Thu, 25 May 2023 18:32:31 -0700 Subject: [PATCH 20/25] Add testing to the cache key for Tony --- src/helm/proxy/clients/critique_client.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index 0a7e632911f..97c1c371478 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -359,7 +359,7 @@ def create_scale_batch(): # Create a new batch using the template name with _scale_cache_lock: batch_response, is_cached = self._cache.get( - {"project": project_name, "template": template.name}, create_scale_batch + {"testing": True, "project": project_name, "template": template.name}, create_scale_batch ) batch_name = batch_response["batch_name"] else: @@ -377,7 +377,7 @@ def create_scale_batch(): with _scale_cache_lock: batch_response, is_cached = self._cache.get( - {"project": project_name, "template": template.name}, create_scale_batch + {"testing": True, "project": project_name, "template": template.name}, create_scale_batch ) batch_name = batch_response["batch_name"] @@ -413,7 +413,13 @@ def _get_or_create_scale_task( """Get or create a task on Scale and return the Scale task ID.""" # Used both for the cache key and the task unique_id - cache_key = {"project": project_name, "batch": batch_name, "task": unstructure(template), "fields": fields} + cache_key = { + "testing": True, + "project": project_name, + "batch": batch_name, + "task": unstructure(template), + "fields": fields, + } def create_scale_task() -> Dict[str, str]: """ From bb1746a8db12b40adff6271778bec2206820d2cc Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Tue, 30 May 2023 11:32:52 -0700 Subject: [PATCH 21/25] Changes requested --- src/helm/proxy/clients/auto_client.py | 3 +- src/helm/proxy/clients/critique_client.py | 289 +++++++++++----------- 2 files changed, 144 insertions(+), 148 deletions(-) diff --git a/src/helm/proxy/clients/auto_client.py b/src/helm/proxy/clients/auto_client.py index daa40af4111..e71469a7837 100644 --- a/src/helm/proxy/clients/auto_client.py +++ b/src/helm/proxy/clients/auto_client.py @@ -266,12 +266,11 @@ def get_critique_client(self) -> CritiqueClient: elif critique_type == "scale": scale_credentials = self.credentials.get("scaleApiKey") - scale_project = self.credentials.get("scaleProject", None) scale_batch = self.credentials.get("scaleBatch", None) if not scale_credentials: raise ValueError("scaleApiKey credentials are required for ScaleCritiqueClient") self.critique_client = ScaleCritiqueClient( - scale_credentials, self._build_cache_config("scale"), scale_project, scale_batch + scale_credentials, self._build_cache_config("scale"), scale_batch ) else: raise ValueError( diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index 97c1c371478..de06f43e659 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -232,7 +232,7 @@ class ScaleCritiqueClient(CritiqueClient): It contains **attachments** which is the data to be annotated, and **fields** which are the instructions and questions to be displayed to the Tasker. A task has also a general **instruction** which is displayed before the fields. - - A **task response**: TODO + - A **task response**: represents for each question a list of answers from different workers. Mapping of HELM concepts to Scale AI concepts: @@ -240,153 +240,151 @@ class ScaleCritiqueClient(CritiqueClient): - `CritiqueRequest.template` indicates which **project** the task should be created in. - A `CritiqueTaskTemplate` maps to a **task** - A `CritiqueQuestionTemplate` maps to a **field** in a task. - - A `CritiqueResponse` maps to TODO + - A `CritiqueResponse` maps to a **task response**. """ def __init__( self, api_key: str, cache_config: CacheConfig, - project_name: Optional[str] = None, batch_name: Optional[str] = None, ): self._cache = Cache(cache_config) self._client = scaleapi.ScaleClient(api_key) - self._project_name = project_name self._batch_name = batch_name - def _get_or_create_scale_project_and_batch(self, template: CritiqueTaskTemplate) -> Tuple[str, str]: - """Get or create a project and a batch associated to it on Scale and return the - Scale project name and batch name. - - If the project_name was specified in the credentials, use it (should already exist). - In order to send tasks, the project needs to already have a calibration batch. - Otherwise, create a new project using the template name. - If the project is created, there will be no calibration batch so the batch will be set - as self-labeling. - - If the batch_name was specified in the credentials, use it (does not necessarily need to exist). - Otherwise, create a new batch using the project name and the template name. - - This is using Scale Rapid. - - Attempt to find a Scale project for the template. If one exists, reuse that project. - Otherwise, create a new project using the template. Return the Scale project name. - Same for the batch.""" - - project_was_created: bool = self._project_name is None - project_name: Optional[str] = None - batch_name: Optional[str] = None - - def create_scale_project(): - try: - project = self._client.create_project( - project_name=template.name, - task_type=TaskType.TextCollection, - rapid=True, - params={}, - ) - except ScaleDuplicateResource as err: - hlog(f"ScaleDuplicateResource when creating project: {template.name}. Error: {err.message}") - # Get the existing project and checks that it has the same params - # NOTE: This should not happen with the cache but in case the cache is deleted - # we want to make sure we don't create a new project with the same name - project = self._client.get_project(template.name) - if project.type != TaskType.TextCollection.value: - raise RuntimeError( - f"Project {template.name} already exists with different task_type: " - f"'{project.type}' instead of '{TaskType.TextCollection.value}'" - ) from err - return {"project_name": project.name} - - def create_scale_batch(): - if project_name is None: - raise RuntimeError("Trying to create a batch without a project name.") - try: - batch_name: str = f"{project_name}-HELMBatch" - batch = self._client.create_batch( - project=project_name, - batch_name=batch_name, - calibration_batch=False, - # If the project was created, there is no calibration batch so we set it as self-labeling - # otherwise the API will return an error. - self_label_batch=True, # project_was_created, - ) - except ScaleDuplicateResource as err: - hlog( - f"ScaleDuplicateResource when creating batch: '{batch_name}' in project: " - f"{project_name}. Error: {err.message}" - ) - # Get the existing batch and checks that it has the same project - # NOTE: This should not happen with the cache but in case the cache is deleted - # we want to make sure we don't create a new batch with the same name - batch = self._client.get_batch(batch_name) - if batch.project != project.name: - raise RuntimeError( - f"Batch {batch_name} already exists with different project: " f"{batch.project}" - ) from err - return {"batch_name": batch.name} - - # Check if a project_name was specified in the credentials - if self._project_name is not None: - project_name = self._project_name - hlog(f"Using existing Scale project: {project_name} (defined in credentials)") - # Checks that the project exists - try: - project = self._client.get_project(project_name) - project_name = project.name - except Exception as err: - raise RuntimeError(f"Project {project_name} does not exist") from err - - # Check if a batch_name was specified in the credentials - if self._batch_name is not None: - batch_name = self._batch_name - hlog(f"Using existing Scale batch: {batch_name} (defined in credentials)") - # Checks that the batch exists - try: - batch = self._client.get_batch(batch_name) - except Exception as err: - raise RuntimeError(f"Batch {batch_name} does not exist") from err - # Checks that the batch is in the project - if batch.project != project_name: - raise RuntimeError( - f"Batch {batch_name} is not in project {project_name}. " - f"It is in project {batch.project} instead." - ) - return project_name, batch_name - - # If we reach here it means that a project_name was specified but not a batch_name - # Create a new batch using the template name - with _scale_cache_lock: - batch_response, is_cached = self._cache.get( - {"testing": True, "project": project_name, "template": template.name}, create_scale_batch - ) - batch_name = batch_response["batch_name"] - else: - # If we reach here it means that no project_name was specified - # Create a new project using the template name - # Create a new batch using the project name and the template name - - with _scale_cache_lock: - project_response, is_cached = self._cache.get({"template": template.name}, create_scale_project) - project_name = project_response["project_name"] - if is_cached: - hlog(f"Reusing existing Scale project: {project_name}") - else: - hlog(f"Creating new Scale project: {project_name}") - - with _scale_cache_lock: - batch_response, is_cached = self._cache.get( - {"testing": True, "project": project_name, "template": template.name}, create_scale_batch - ) - batch_name = batch_response["batch_name"] - - if is_cached: - hlog(f"Reusing existing Scale batch: {batch_name}") - else: - hlog(f"Creating new Scale batch: {batch_name}") - - return project_name, batch_name + # def _get_or_create_scale_project_and_batch(self, template: CritiqueTaskTemplate) -> Tuple[str, str]: + # """Get or create a project and a batch associated to it on Scale and return the + # Scale project name and batch name. + + # If the project_name was specified in the credentials, use it (should already exist). + # In order to send tasks, the project needs to already have a calibration batch. + # Otherwise, create a new project using the template name. + # If the project is created, there will be no calibration batch so the batch will be set + # as self-labeling. + + # If the batch_name was specified in the credentials, use it (does not necessarily need to exist). + # Otherwise, create a new batch using the project name and the template name. + + # This is using Scale Rapid. + + # Attempt to find a Scale project for the template. If one exists, reuse that project. + # Otherwise, create a new project using the template. Return the Scale project name. + # Same for the batch.""" + + # project_was_created: bool = self._project_name is None + # project_name: Optional[str] = None + # batch_name: Optional[str] = None + + # def create_scale_project(): + # try: + # project = self._client.create_project( + # project_name=template.name, + # task_type=TaskType.TextCollection, + # rapid=True, + # params={}, + # ) + # except ScaleDuplicateResource as err: + # hlog(f"ScaleDuplicateResource when creating project: {template.name}. Error: {err.message}") + # # Get the existing project and checks that it has the same params + # # NOTE: This should not happen with the cache but in case the cache is deleted + # # we want to make sure we don't create a new project with the same name + # project = self._client.get_project(template.name) + # if project.type != TaskType.TextCollection.value: + # raise RuntimeError( + # f"Project {template.name} already exists with different task_type: " + # f"'{project.type}' instead of '{TaskType.TextCollection.value}'" + # ) from err + # return {"project_name": project.name} + + # def create_scale_batch(): + # if project_name is None: + # raise RuntimeError("Trying to create a batch without a project name.") + # try: + # batch_name: str = f"{project_name}-HELMBatch" + # batch = self._client.create_batch( + # project=project_name, + # batch_name=batch_name, + # calibration_batch=False, + # # If the project was created, there is no calibration batch so we set it as self-labeling + # # otherwise the API will return an error. + # self_label_batch=True, # project_was_created, + # ) + # except ScaleDuplicateResource as err: + # hlog( + # f"ScaleDuplicateResource when creating batch: '{batch_name}' in project: " + # f"{project_name}. Error: {err.message}" + # ) + # # Get the existing batch and checks that it has the same project + # # NOTE: This should not happen with the cache but in case the cache is deleted + # # we want to make sure we don't create a new batch with the same name + # batch = self._client.get_batch(batch_name) + # if batch.project != project.name: + # raise RuntimeError( + # f"Batch {batch_name} already exists with different project: " f"{batch.project}" + # ) from err + # return {"batch_name": batch.name} + + # # Check if a project_name was specified in the credentials + # if self._project_name is not None: + # project_name = self._project_name + # hlog(f"Using existing Scale project: {project_name} (defined in credentials)") + # # Checks that the project exists + # try: + # project = self._client.get_project(project_name) + # project_name = project.name + # except Exception as err: + # raise RuntimeError(f"Project {project_name} does not exist") from err + + # # Check if a batch_name was specified in the credentials + # if self._batch_name is not None: + # batch_name = self._batch_name + # hlog(f"Using existing Scale batch: {batch_name} (defined in credentials)") + # # Checks that the batch exists + # try: + # batch = self._client.get_batch(batch_name) + # except Exception as err: + # raise RuntimeError(f"Batch {batch_name} does not exist") from err + # # Checks that the batch is in the project + # if batch.project != project_name: + # raise RuntimeError( + # f"Batch {batch_name} is not in project {project_name}. " + # f"It is in project {batch.project} instead." + # ) + # return project_name, batch_name + + # # If we reach here it means that a project_name was specified but not a batch_name + # # Create a new batch using the template name + # with _scale_cache_lock: + # batch_response, is_cached = self._cache.get( + # {"testing": True, "project": project_name, "template": template.name}, create_scale_batch + # ) + # batch_name = batch_response["batch_name"] + # else: + # # If we reach here it means that no project_name was specified + # # Create a new project using the template name + # # Create a new batch using the project name and the template name + + # with _scale_cache_lock: + # project_response, is_cached = self._cache.get({"template": template.name}, create_scale_project) + # project_name = project_response["project_name"] + # if is_cached: + # hlog(f"Reusing existing Scale project: {project_name}") + # else: + # hlog(f"Creating new Scale project: {project_name}") + + # with _scale_cache_lock: + # batch_response, is_cached = self._cache.get( + # {"testing": True, "project": project_name, "template": template.name}, create_scale_batch + # ) + # batch_name = batch_response["batch_name"] + + # if is_cached: + # hlog(f"Reusing existing Scale batch: {batch_name}") + # else: + # hlog(f"Creating new Scale batch: {batch_name}") + + # return project_name, batch_name def _interpolate_fields(self, text: str, fields: Dict[str, str]) -> str: for field_name, field_value in fields.items(): @@ -401,21 +399,18 @@ def _critique_question_to_scale_field(self, question: CritiqueQuestionTemplate, "title": question.name, "description": self._interpolate_fields(question.text, fields), "choices": [{"label": option, "value": option} for option in question.options], - "min_choices": 1, + "min_choices": 0 if question.question_type == "checkbox" else 1, "max_choices": len(question.options) if question.question_type == "checkbox" else 1, } else: raise ValueError(f"Unsupported question type {question.question_type}") - def _get_or_create_scale_task( - self, project_name: str, batch_name: str, template: CritiqueTaskTemplate, fields: Dict[str, str] - ) -> str: + def _get_or_create_scale_task(self, batch_name: str, template: CritiqueTaskTemplate, fields: Dict[str, str]) -> str: """Get or create a task on Scale and return the Scale task ID.""" # Used both for the cache key and the task unique_id cache_key = { "testing": True, - "project": project_name, "batch": batch_name, "task": unstructure(template), "fields": fields, @@ -434,7 +429,6 @@ def create_scale_task() -> Dict[str, str]: unique_id: str = sha512(str(json.dumps(cache_key, sort_keys=True)).encode()).hexdigest() instructions: str = self._interpolate_fields(template.instructions, fields) payload = dict( - project=project_name, batch=batch_name, unique_id=unique_id, instruction="Evaluate the AI model generated output following the instructions below", @@ -477,7 +471,7 @@ def create_scale_task() -> Dict[str, str]: hlog(f"Creating new Scale task: {task_id}") return task_id - def _finalize_batch(self, batch_name: str): + def finalize_batch(self, batch_name: str): self._client.finalize_batch(batch_name=batch_name) def _get_worker_responses(self, task_id: str) -> List[CritiqueResponse]: @@ -561,8 +555,11 @@ def make_critique_request(self, request: CritiqueRequest) -> CritiqueRequestResu the cache to avoid unnecessary API calls and to not depend on Scale AI side. Note that worker responses are currently not cached. """ - project_name, batch_name = self._get_or_create_scale_project_and_batch(request.template) - task_id: str = self._get_or_create_scale_task(project_name, batch_name, request.template, request.fields) + # TODO: Remove/fix _get_or_create_scale_project_and_batch(). + # For now we are forcing the user to provide a batch_name in the credentials. + + # _, batch_name = self._get_or_create_scale_project_and_batch(request.template) + # self._batch_name = batch_name + task_id: str = self._get_or_create_scale_task(self._batch_name, request.template, request.fields) worker_responses: List[CritiqueResponse] = self._get_worker_responses(task_id) - # self._finalize_batch(batch_name) return CritiqueRequestResult(worker_responses) From d385c230ae4d480c770726f3b3063afded717e97 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Tue, 30 May 2023 11:35:22 -0700 Subject: [PATCH 22/25] Set credentials paths as a flag for scale scripts --- scripts/scale/create_and_setup_project.py | 5 ++++- scripts/scale/finalize_batch.py | 5 ++++- scripts/scale/scale_utils.py | 3 +-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/scripts/scale/create_and_setup_project.py b/scripts/scale/create_and_setup_project.py index c44a380a9ee..cddcc0d3f32 100644 --- a/scripts/scale/create_and_setup_project.py +++ b/scripts/scale/create_and_setup_project.py @@ -5,9 +5,12 @@ parser = argparse.ArgumentParser() parser.add_argument("--project_name", type=str, help="Name of the project to create") +parser.add_argument( + "--credentials_path", type=str, default="prod_env/credentials.conf", help="Path to the credentials file" +) args = parser.parse_args() project_name = args.project_name -client = get_scale_client() +client = get_scale_client(args.credentials_path) print("\nGetting project...") try: diff --git a/scripts/scale/finalize_batch.py b/scripts/scale/finalize_batch.py index 56c4179cd47..b17b4e32ae2 100644 --- a/scripts/scale/finalize_batch.py +++ b/scripts/scale/finalize_batch.py @@ -3,7 +3,10 @@ parser = argparse.ArgumentParser() parser.add_argument("--batch_name", type=str, help="Name of the batch to finalize") +parser.add_argument( + "--credentials_path", type=str, default="prod_env/credentials.conf", help="Path to the credentials file" +) args = parser.parse_args() -client = get_scale_client() +client = get_scale_client(args.credentials_path) client.finalize_batch(args.batch_name) diff --git a/scripts/scale/scale_utils.py b/scripts/scale/scale_utils.py index e12ac8ed597..6bf2655de69 100644 --- a/scripts/scale/scale_utils.py +++ b/scripts/scale/scale_utils.py @@ -16,8 +16,7 @@ def get_credentials(path: str) -> Dict[str, str]: return credentials -def get_scale_client() -> ScaleClient: - relative_credentials_path = "../../prod_env/credentials.conf" +def get_scale_client(relative_credentials_path: str) -> ScaleClient: credentials_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), relative_credentials_path) print(f"Reading credentials from {credentials_path}") credentials = get_credentials(credentials_path) From 093e5d44dfb47cfa29c815e873fe19589bca52ad Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Tue, 30 May 2023 11:35:48 -0700 Subject: [PATCH 23/25] Fix flake --- src/helm/proxy/clients/critique_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index de06f43e659..31497c338d6 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -3,7 +3,7 @@ import json import random import threading -from typing import Dict, List, Union, Tuple, Optional +from typing import Dict, List, Union, Optional # Surge AI from cattrs import unstructure From 8be04df16b99ca6974e358270a4d99b0a0f27e79 Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Tue, 30 May 2023 17:54:17 -0700 Subject: [PATCH 24/25] Make batch name mandatory --- src/helm/proxy/clients/auto_client.py | 2 ++ src/helm/proxy/clients/critique_client.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/helm/proxy/clients/auto_client.py b/src/helm/proxy/clients/auto_client.py index e71469a7837..86073816904 100644 --- a/src/helm/proxy/clients/auto_client.py +++ b/src/helm/proxy/clients/auto_client.py @@ -267,6 +267,8 @@ def get_critique_client(self) -> CritiqueClient: elif critique_type == "scale": scale_credentials = self.credentials.get("scaleApiKey") scale_batch = self.credentials.get("scaleBatch", None) + if scale_batch is None: + raise ValueError("scaleBatch is required for ScaleCritiqueClient for now.") if not scale_credentials: raise ValueError("scaleApiKey credentials are required for ScaleCritiqueClient") self.critique_client = ScaleCritiqueClient( diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index 31497c338d6..f07d4765e24 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -247,7 +247,7 @@ def __init__( self, api_key: str, cache_config: CacheConfig, - batch_name: Optional[str] = None, + batch_name: str, ): self._cache = Cache(cache_config) self._client = scaleapi.ScaleClient(api_key) From 72c79c58e68fa453e3e6d12bdef474b6d50829ae Mon Sep 17 00:00:00 2001 From: JosselinSomervilleRoberts Date: Tue, 30 May 2023 17:55:20 -0700 Subject: [PATCH 25/25] Fix flake --- src/helm/proxy/clients/critique_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/helm/proxy/clients/critique_client.py b/src/helm/proxy/clients/critique_client.py index f07d4765e24..7238877e907 100644 --- a/src/helm/proxy/clients/critique_client.py +++ b/src/helm/proxy/clients/critique_client.py @@ -3,7 +3,7 @@ import json import random import threading -from typing import Dict, List, Union, Optional +from typing import Dict, List, Union # Surge AI from cattrs import unstructure