-
Notifications
You must be signed in to change notification settings - Fork 223
/
evaluator.py
124 lines (107 loc) · 5.57 KB
/
evaluator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import inspect
import json
import os
import warnings
from typing import List
from bigcode_eval import tasks
from bigcode_eval.generation import parallel_generations
_WARNING = """
################################################################################
!!!WARNING!!!
################################################################################
The "code_eval"/"apps_metric" you are about to use, execute untrusted
model-generated code in Python.
Although it is highly unlikely that model-generated code will do something
overtly malicious in response to this test suite, model-generated code may act
destructively due to a lack of model capability or alignment.
Users are strongly encouraged to sandbox this evaluation suite so that it
does not perform destructive actions on their host or network. For more
information on how OpenAI sandboxes its code, see the paper "Evaluating Large
Language Models Trained on Code" (https://arxiv.org/abs/2107.03374).
Once you have read this disclaimer and taken appropriate precautions, set the argument
"allow_code_execution" to True.
################################################################################\
"""
class Evaluator:
def __init__(self, accelerator, model, tokenizer, args):
self.accelerator = accelerator
self.model = model
self.tokenizer = tokenizer
self.args = args
# setup arguments
self.metric_output_path = args.metric_output_path
# code evaluation permission
self.allow_code_execution = args.allow_code_execution
def generate_text(self, task_name, intermediate_generations=None):
task = tasks.get_task(task_name, self.args)
dataset = task.get_dataset()
# if args.limit is None, use all samples
# if args.limit is used, make sure args.limit_start + args.limit <= len(dataset)
n_tasks = min(self.args.limit, len(dataset) - self.args.limit_start) if self.args.limit else len(dataset)
# when args.limit is None
# adjust n_tasks by args.limit_start to prevent out of bounds issues
if not self.args.limit:
n_tasks -= self.args.limit_start
references = [task.get_reference(dataset[i]) for i in range(self.args.limit_start, self.args.limit_start+n_tasks)]
if self.args.check_references:
if "get_solution" in inspect.signature(task.get_reference).parameters:
solutions = [[task.get_reference(dataset[i], get_solution=True)] for i in range(self.args.limit_start, self.args.limit_start+n_tasks)]
else:
solutions = [[ref] for ref in references]
return solutions, references
curr_generations = [] # list[list[str | None] | None]
if intermediate_generations:
curr_generations = [gen for gen in intermediate_generations if gen]
n_tasks -= len(curr_generations)
intermediate_save_generations_path = f"{os.path.splitext(self.args.save_generations_path)[0]}_{task_name}_intermediate.json"
curr_sample_idx = len(curr_generations)
generations = parallel_generations(
task,
dataset,
self.accelerator,
self.model,
self.tokenizer,
n_tasks=n_tasks,
args=self.args,
curr_sample_idx=curr_sample_idx, # curr_sample_idx will added to limit_start to fix indexing
save_every_k_tasks=self.args.save_every_k_tasks,
intermediate_generations=curr_generations,
intermediate_save_generations_path=intermediate_save_generations_path,
)
if len(generations[0]) > self.args.n_samples:
generations = [l[: self.args.n_samples] for l in generations]
warnings.warn(
f"Number of tasks wasn't proportional to number of devices, we removed extra predictions to only keep nsamples={self.args.n_samples}"
)
return generations, references
def evaluate(self, task_name, intermediate_generations=None):
task = tasks.get_task(task_name, self.args)
if task.requires_execution and not self.allow_code_execution:
raise ValueError(_WARNING)
generations, references = self.generate_text(task_name, intermediate_generations=intermediate_generations)
if self.accelerator.is_main_process:
if not self.args.load_generations_path:
save_generations_path = f"{os.path.splitext(self.args.save_generations_path)[0]}_{task_name}.json"
self.save_json_files(generations, references, save_generations_path, f"references_{task_name}.json")
# make sure tokenizer plays nice with multiprocessing
os.environ["TOKENIZERS_PARALLELISM"] = "false"
if self.allow_code_execution and task.requires_execution:
os.environ["HF_ALLOW_CODE_EVAL"] = "1"
print("Evaluating generations...")
results = task.process_results(generations, references)
return results
def save_json_files(
self,
generations: List[str],
references: List[str],
save_generations_path: str,
save_references_path: str,
) -> None:
if self.args.save_generations:
with open(save_generations_path, "w") as fp:
json.dump(generations, fp)
print(f"generations were saved at {save_generations_path}")
if self.args.save_references:
with open(save_references_path, "w") as fp:
json.dump(references, fp)
print(f"references were saved at {save_references_path}")