Skip to content

Commit

Permalink
Update Annotation to English, And Update Operator.json
Browse files Browse the repository at this point in the history
  • Loading branch information
didiforgithub committed Oct 18, 2024
1 parent d99054a commit 2b788b2
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 52 deletions.
26 changes: 26 additions & 0 deletions examples/aflow/data/operator.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"Custom": {
"description": "Generates anything based on customized input and instruction.",
"interface": "custom(input: str, instruction: str) -> dict with key 'response' of type str"
},
"ScEnsemble": {
"description": "Uses self-consistency to select the solution that appears most frequently in the solution list, improve the selection to enhance the choice of the best solution.",
"interface": "sc_ensemble(solutions: List[str]) -> dict with key 'response' of type str"
},
"AnswerGenerate": {
"description": "Generate step by step based on the input. The step by step thought process is in the field of 'thought', and the final answer is in the field of 'answer'.",
"interface": "answer_generate(input: str) -> dict with key 'thought' of type str, 'answer' of type str"
},
"CustomCodeGenerate": {
"description": "Generates code based on customized input and instruction.",
"interface": "custom_code_generate(problem: str, entry_point: str, instruction: str) -> dict with key 'response' of type str"
},
"Test": {
"description": "Tests the solution using public test cases. If the solution fails, it reflects on the errors and attempts to modify the solution. Returns True and the solution if all tests pass after modifications. Returns False and the current solution if it still fails after modifications.",
"interface": "test(problem: str, solution: str, entry_point: str) -> dict with key 'result' of type bool and key 'solution' of type str"
},
"Programmer": {
"description": "Automatically writes, executes Python code, and returns the solution based on the provided problem description and analysis. The `output` only contains the final answer. If you want to see the detailed solution process, it's recommended to retrieve the `code`.",
"interface": "programmer(problem: str, analysis: str = 'None') -> dict with keys 'code' and 'output' of type str"
}
}
24 changes: 12 additions & 12 deletions examples/aflow/scripts/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,26 +309,26 @@ def __init__(self, llm: LLM, name: str = "Programmer"):
async def exec_code(code, timeout=180):
def run_code():
try:
# 创建一个新的全局命名空间
# Create a new global namespace
global_namespace = {}

# 使用exec执行代码
# Use exec to execute the code
exec(code, global_namespace)

# 假设代码中定义了一个名为'solve'的函数
# Assume the code defines a function named 'solve'
if 'solve' in global_namespace:
result = global_namespace['solve']()
return "Success", str(result)
else:
return "Error", "未找到'solve'函数"
return "Error", "Function 'solve' not found"
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
return "Error", f"执行错误: {str(e)}\n{''.join(tb_str)}"
return "Error", f"Execution error: {str(e)}\n{''.join(tb_str)}"

# 创建一个事件来标记任务完成
# Create an event to mark task completion
done_event = threading.Event()
result = ["Error", "执行无结果,子进程异常"]
result = ["Error", "Execution resulted in no output, subprocess exception"]

def wrapper():
nonlocal result
Expand All @@ -338,15 +338,15 @@ def wrapper():
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(wrapper)
try:
# 等待任务完成或超时
# Wait for task completion or timeout
if done_event.wait(timeout=timeout):
return result
else:
# 超时,尝试取消任务
# Timeout, attempt to cancel the task
future.cancel()
return "Error", "代码执行超时"
return "Error", "Code execution timed out"
finally:
# 确保线程池被正确关闭
# Ensure the thread pool is properly shut down
executor.shutdown(wait=False)

async def code_generate(self, problem, analysis, feedback, mode):
Expand All @@ -369,6 +369,6 @@ async def __call__(self, problem: str, analysis: str = "None"):
if status == "Success":
return {"code": code, "output": output}
else:
print(f"{i + 1}次执行错误,错误信息:{output}")
print(f"Execution error in attempt {i + 1}, error message: {output}")
feedback = f"\nThe result of the error from the code you wrote in the previous round:\nCode:{code}\n\nStatus:{status},{output}"
return {"code": code, "output": "error"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
SC_ENSEMBLE_PROMPT = """
Given the question described as follows: {problem}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Carefully evaluate these solutions and identify the answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""

PYTHON_CODE_VERIFIER_PROMPT = """
You are a professional Python programmer. Your task is to write complete, self-contained code based on a given mathematical problem and output the answer. The code should include all necessary imports and dependencies, and be ready to run without additional setup or environment configuration.
Problem description: {problem}
Other analysis: {analysis}
{feedback}
Your code should:
1. Implement the calculation steps described in the problem.
2. Define a function named `solve` that performs the calculation and returns the result. The `solve` function should not require any input parameters; instead, it should obtain all necessary inputs from within the function or from globally defined variables.
3. `solve` function return the final calculation result.
Please ensure your code is efficient, well-commented, and follows Python best practices. The output should be limited to basic data types such as strings, integers, and floats. It is prohibited to transmit images or other file formats. The code output is intended for a text-based language model.
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"Custom": {
"description": "Generates anything based on customized input and instruction.",
"interface": "custom(input: str, instruction: str) -> dict with key 'response' of type str"
},
"ScEnsemble": {
"description": "Uses self-consistency to select the solution that appears most frequently in the solution list, improve the selection to enhance the choice of the best solution.",
"interface": "sc_ensemble(solutions: List[str], problem: str) -> dict with key 'response' of type str"
},
"Programmer": {
"description": "Automatically writes, executes Python code, and returns the solution based on the provided problem description and analysis. The `output` only contains the final answer. If you want to see the detailed solution process, it's recommended to retrieve the `code`.",
"interface": "programmer(problem: str, analysis: str = 'None') -> dict with keys 'code' and 'output' of type str"
}
}
163 changes: 163 additions & 0 deletions examples/aflow/scripts/optimized/GSM8K/workflows/template/operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 17:36 PM
# @Author : didi
# @Desc : operator demo of ags
import concurrent
import sys
import traceback
from typing import List

from tenacity import retry, stop_after_attempt, wait_fixed

from examples.aflow.scripts.optimized.GSM8K.workflows.template.operator_an import *
from examples.aflow.scripts.optimized.GSM8K.workflows.template.op_prompt import *
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
import asyncio
import logging

class Operator:
def __init__(self, name, llm: LLM):
self.name = name
self.llm = llm

def __call__(self, *args, **kwargs):
raise NotImplementedError


class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)

async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
return response


def run_code(code):
try:
# 创建一个新的全局命名空间
global_namespace = {}

disallowed_imports = [
"os", "sys", "subprocess", "multiprocessing",
"matplotlib", "seaborn", "plotly", "bokeh", "ggplot",
"pylab", "tkinter", "PyQt5", "wx", "pyglet"
]

# 检查禁止导入的库
for lib in disallowed_imports:
if f"import {lib}" in code or f"from {lib}" in code:
logging.warning("检测到禁止导入的库: %s", lib)
return "Error", f"禁止导入的库: {lib} 以及绘图类功能"

# 使用 exec 执行代码
exec(code, global_namespace)
# 假设代码中定义了一个名为 'solve' 的函数
if 'solve' in global_namespace and callable(global_namespace['solve']):
result = global_namespace['solve']()
return "Success", str(result)
else:
return "Error", "未找到 'solve' 函数"
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
return "Error", f"执行错误: {str(e)}\n{''.join(tb_str)}"


class Programmer(Operator):
def __init__(self, llm: LLM, name: str = "Programmer"):
super().__init__(name, llm)

async def exec_code(self, code, timeout=30):
"""
异步执行代码,并在超时时返回错误。
"""
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
try:
# 提交 run_code 任务到进程池
future = loop.run_in_executor(executor, run_code, code)
# 等待任务完成或超时
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
# 超时,尝试关闭进程池
executor.shutdown(wait=False, cancel_futures=True)
return "Error", "代码执行超时"
except Exception as e:
return "Error", f"未知错误: {str(e)}"

async def code_generate(self, problem, analysis, feedback, mode):
"""
生成代码的异步方法。
"""
prompt = PYTHON_CODE_VERIFIER_PROMPT.format(
problem=problem,
analysis=analysis,
feedback=feedback
)
fill_kwargs = {
"context": prompt,
"llm": self.llm,
"function_name": "solve"
}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(CodeGenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response

@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def __call__(self, problem: str, analysis: str = "None"):
"""
调用方法,生成代码并执行,最多重试 3 次。
"""
code = None
output = None
feedback = ""
for i in range(3):
code_response = await self.code_generate(problem, analysis, feedback, mode="code_fill")
code = code_response.get("code")
if not code:
return {"code": code, "output": "未生成代码"}
status, output = await self.exec_code(code)
if status == "Success":
return {"code": code, "output": output}
else:
print(f"第{i + 1}次执行错误,错误信息:{output}")
feedback = (
f"\nThe result of the error from the code you wrote in the previous round:\n"
f"Code: {code}\n\nStatus: {status}, {output}"
)
return {"code": code, "output": output}


class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
Paper: Universal Self-Consistency for Large Language Model Generation
Link: https://arxiv.org/abs/2311.17311
"""

def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)

async def __call__(self, solutions: List[str], problem: str):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"

prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text, problem=problem)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()

answer = response.get("solution_letter", "")
answer = answer.strip().upper()

return {"response": solutions[answer_mapping[answer]]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 19:46 PM
# @Author : didi
# @Desc : action nodes for operator

from pydantic import BaseModel, Field


class GenerateOp(BaseModel):
response: str = Field(default="", description="Your solution for this problem")


class CodeGenerateOp(BaseModel):
code: str = Field(default="", description="Your complete code solution for this problem")


class ScEnsembleOp(BaseModel):
solution_letter: str = Field(default="", description="The letter of most consistent solution.")

Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

from tenacity import retry, stop_after_attempt, wait_fixed

from examples.aflow.scripts.optimized.GSM8K.workflows.template.operator_an import *
from examples.aflow.scripts.optimized.GSM8K.workflows.template.op_prompt import *
from examples.aflow.scripts.optimized.MATH.workflows.template.operator_an import *
from examples.aflow.scripts.optimized.MATH.workflows.template.op_prompt import *
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
import asyncio
Expand Down
38 changes: 18 additions & 20 deletions examples/aflow/scripts/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from examples.aflow.scripts.optimizer_utils.convergence_utils import ConvergenceUtils

DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
QuestionType = Literal["math", "code", "quiz"]
QuestionType = Literal["math", "code", "qa"]
OptimizerType = Literal["Graph", "Test"]


Expand Down Expand Up @@ -82,25 +82,23 @@ def optimize(self, mode: OptimizerType = "Graph"):
retry_count = 0
max_retries = 1

score = loop.run_until_complete(self._optimize_graph())

# while retry_count < max_retries:
# try:
# score = loop.run_until_complete(self._optimize_graph())
# break
# except Exception as e:
# retry_count += 1
# print(f"Error occurred: {e}. Retrying... (Attempt {retry_count}/{max_retries})")
# if retry_count == max_retries:
# print("Max retries reached. Moving to next round.")
# score = None

# wait_time = 5 * retry_count
# time.sleep(wait_time)

# if retry_count < max_retries:
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
while retry_count < max_retries:
try:
score = loop.run_until_complete(self._optimize_graph())
break
except Exception as e:
retry_count += 1
print(f"Error occurred: {e}. Retrying... (Attempt {retry_count}/{max_retries})")
if retry_count == max_retries:
print("Max retries reached. Moving to next round.")
score = None

wait_time = 5 * retry_count
time.sleep(wait_time)

if retry_count < max_retries:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.round += 1
print(f"Score for round {self.round}: {score}")

Expand Down
Loading

0 comments on commit 2b788b2

Please sign in to comment.