Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 工作流表单节点 #1608

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions apps/application/flow/i_step_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
@desc:
"""
import time
import uuid
from abc import abstractmethod
from typing import Type, Dict, List

Expand All @@ -31,7 +32,7 @@ def write_context(step_variable: Dict, global_variable: Dict, node, workflow):
if workflow.is_result(node, NodeResult(step_variable, global_variable)) and 'answer' in step_variable:
answer = step_variable['answer']
yield answer
workflow.answer += answer
workflow.append_answer(answer)
if global_variable is not None:
for key in global_variable:
workflow.context[key] = global_variable[key]
Expand All @@ -54,15 +55,27 @@ def handler(self, chat_id,
'message_tokens' in row and row.get('message_tokens') is not None])
answer_tokens = sum([row.get('answer_tokens') for row in details.values() if
'answer_tokens' in row and row.get('answer_tokens') is not None])
chat_record = ChatRecord(id=chat_record_id,
chat_id=chat_id,
problem_text=question,
answer_text=answer,
details=details,
message_tokens=message_tokens,
answer_tokens=answer_tokens,
run_time=time.time() - workflow.context['start_time'],
index=0)
answer_text_list = workflow.get_answer_text_list()
answer_text = '\n\n'.join(answer_text_list)
if workflow.chat_record is not None:
chat_record = workflow.chat_record
chat_record.answer_text = answer_text
chat_record.details = details
chat_record.message_tokens = message_tokens
chat_record.answer_tokens = answer_tokens
chat_record.answer_text_list = answer_text_list
chat_record.run_time = time.time() - workflow.context['start_time']
else:
chat_record = ChatRecord(id=chat_record_id,
chat_id=chat_id,
problem_text=question,
answer_text=answer_text,
details=details,
message_tokens=message_tokens,
answer_tokens=answer_tokens,
answer_text_list=answer_text_list,
run_time=time.time() - workflow.context['start_time'],
index=0)
self.chat_info.append_chat_record(chat_record, self.client_id)
# 重新设置缓存
chat_cache.set(chat_id,
Expand Down Expand Up @@ -118,7 +131,15 @@ class FlowParamsSerializer(serializers.Serializer):


class INode:
def __init__(self, node, workflow_params, workflow_manage):

@abstractmethod
def save_context(self, details, workflow_manage):
pass

def get_answer_text(self):
return self.answer_text

def __init__(self, node, workflow_params, workflow_manage, runtime_node_id=None):
# 当前步骤上下文,用于存储当前步骤信息
self.status = 200
self.err_message = ''
Expand All @@ -129,7 +150,12 @@ def __init__(self, node, workflow_params, workflow_manage):
self.node_params_serializer = None
self.flow_params_serializer = None
self.context = {}
self.answer_text = None
self.id = node.id
if runtime_node_id is None:
self.runtime_node_id = str(uuid.uuid1())
else:
self.runtime_node_id = runtime_node_id

def valid_args(self, node_params, flow_params):
flow_params_serializer_class = self.get_flow_params_serializer_class()
Expand Down
14 changes: 9 additions & 5 deletions apps/application/flow/step_node/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,23 @@
from .ai_chat_step_node import *
from .application_node import BaseApplicationNode
from .condition_node import *
from .question_node import *
from .search_dataset_node import *
from .start_node import *
from .direct_reply_node import *
from .form_node import *
from .function_lib_node import *
from .function_node import *
from .question_node import *
from .reranker_node import *

from .document_extract_node import *
from .image_understand_step_node import *

from .search_dataset_node import *
from .start_node import *

node_list = [BaseStartStepNode, BaseChatNode, BaseSearchDatasetNode, BaseQuestionNode, BaseConditionNode, BaseReplyNode,
BaseFunctionNodeNode, BaseFunctionLibNodeNode, BaseRerankerNode, BaseApplicationNode, BaseDocumentExtractNode,
BaseImageUnderstandNode]
BaseFunctionNodeNode, BaseFunctionLibNodeNode, BaseRerankerNode, BaseApplicationNode,
BaseDocumentExtractNode,
BaseImageUnderstandNode, BaseFormNode]


def get_node(node_type):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def _write_context(node_variable: Dict, workflow_variable: Dict, node: INode, wo
node.context['question'] = node_variable['question']
node.context['run_time'] = time.time() - node.context['start_time']
if workflow.is_result(node, NodeResult(node_variable, workflow_variable)):
workflow.answer += answer
node.answer_text = answer


def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INode, workflow):
Expand Down Expand Up @@ -73,6 +73,11 @@ def get_default_model_params_setting(model_id):


class BaseChatNode(IChatNode):
def save_context(self, details, workflow_manage):
self.context['answer'] = details.get('answer')
self.context['question'] = details.get('question')
self.answer_text = details.get('answer')

def execute(self, model_id, system, prompt, dialogue_number, history_chat_record, stream, chat_id, chat_record_id,
model_params_setting=None,
**kwargs) -> NodeResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def _write_context(node_variable: Dict, workflow_variable: Dict, node: INode, wo
node.context['question'] = node_variable['question']
node.context['run_time'] = time.time() - node.context['start_time']
if workflow.is_result(node, NodeResult(node_variable, workflow_variable)):
workflow.answer += answer
node.answer_text = answer


def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INode, workflow):
Expand Down Expand Up @@ -64,6 +64,12 @@ def write_context(node_variable: Dict, workflow_variable: Dict, node: INode, wor

class BaseApplicationNode(IApplicationNode):

def save_context(self, details, workflow_manage):
self.context['answer'] = details.get('answer')
self.context['question'] = details.get('question')
self.context['type'] = details.get('type')
self.answer_text = details.get('answer')

def execute(self, application_id, message, chat_id, chat_record_id, stream, re_chat, client_id, client_type,
**kwargs) -> NodeResult:
from application.serializers.chat_message_serializers import ChatMessageSerializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@


class BaseConditionNode(IConditionNode):
def save_context(self, details, workflow_manage):
self.context['branch_id'] = details.get('branch_id')
self.context['branch_name'] = details.get('branch_name')

def execute(self, **kwargs) -> NodeResult:
branch_list = self.node_params_serializer.data['branch']
branch = self._execute(branch_list)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@


class BaseReplyNode(IReplyNode):
def save_context(self, details, workflow_manage):
self.context['answer'] = details.get('answer')
self.answer_text = details.get('answer')
def execute(self, reply_type, stream, fields=None, content=None, **kwargs) -> NodeResult:
if reply_type == 'referencing':
result = self.get_reference_content(fields)
Expand Down
9 changes: 9 additions & 0 deletions apps/application/flow/step_node/form_node/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎
@file: __init__.py.py
@date:2024/11/4 14:48
@desc:
"""
from .impl import *
32 changes: 32 additions & 0 deletions apps/application/flow/step_node/form_node/i_form_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎
@file: i_form_node.py
@date:2024/11/4 14:48
@desc:
"""
from typing import Type

from rest_framework import serializers

from application.flow.i_step_node import INode, NodeResult
from common.util.field_message import ErrMessage


class FormNodeParamsSerializer(serializers.Serializer):
form_field_list = serializers.ListField(required=True, error_messages=ErrMessage.list("表单配置"))
form_content_format = serializers.CharField(required=True, error_messages=ErrMessage.char('表单输出内容'))


class IFormNode(INode):
type = 'form-node'

def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
return FormNodeParamsSerializer

def _run(self):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)

def execute(self, form_field_list, form_content_format, **kwargs) -> NodeResult:
pass
9 changes: 9 additions & 0 deletions apps/application/flow/step_node/form_node/impl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎
@file: __init__.py.py
@date:2024/11/4 14:49
@desc:
"""
from .base_form_node import BaseFormNode
84 changes: 84 additions & 0 deletions apps/application/flow/step_node/form_node/impl/base_form_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎
@file: base_form_node.py
@date:2024/11/4 14:52
@desc:
"""
import json
import time
from typing import Dict

from langchain_core.prompts import PromptTemplate

from application.flow.i_step_node import NodeResult
from application.flow.step_node.form_node.i_form_node import IFormNode


def write_context(step_variable: Dict, global_variable: Dict, node, workflow):
if step_variable is not None:
for key in step_variable:
node.context[key] = step_variable[key]
if workflow.is_result(node, NodeResult(step_variable, global_variable)) and 'result' in step_variable:
result = step_variable['result']
yield result
node.answer_text = result
node.context['run_time'] = time.time() - node.context['start_time']


class BaseFormNode(IFormNode):
def save_context(self, details, workflow_manage):
self.context['result'] = details.get('result')
self.context['form_content_format'] = details.get('form_content_format')
self.context['form_field_list'] = details.get('form_field_list')
self.context['run_time'] = details.get('run_time')
self.context['start_time'] = details.get('start_time')
self.answer_text = details.get('result')

def execute(self, form_field_list, form_content_format, **kwargs) -> NodeResult:
form_setting = {"form_field_list": form_field_list, "runtime_node_id": self.runtime_node_id,
"chat_record_id": self.flow_params_serializer.data.get("chat_record_id"),
"is_submit": self.context.get("is_submit", False)}
form = f'<form_rander>{json.dumps(form_setting)}</form_rander>'
prompt_template = PromptTemplate.from_template(form_content_format, template_format='jinja2')
value = prompt_template.format(form=form)
return NodeResult(
{'result': value, 'form_field_list': form_field_list, 'form_content_format': form_content_format}, {},
_write_context=write_context)

def get_answer_text(self):
form_content_format = self.context.get('form_content_format')
form_field_list = self.context.get('form_field_list')
form_setting = {"form_field_list": form_field_list, "runtime_node_id": self.runtime_node_id,
"chat_record_id": self.flow_params_serializer.data.get("chat_record_id"),
'form_data': self.context.get('form_data', {}),
"is_submit": self.context.get("is_submit", False)}
form = f'<form_rander>{json.dumps(form_setting)}</form_rander>'
prompt_template = PromptTemplate.from_template(form_content_format, template_format='jinja2')
value = prompt_template.format(form=form)
return value

def get_details(self, index: int, **kwargs):
form_content_format = self.context.get('form_content_format')
form_field_list = self.context.get('form_field_list')
form_setting = {"form_field_list": form_field_list, "runtime_node_id": self.runtime_node_id,
"chat_record_id": self.flow_params_serializer.data.get("chat_record_id"),
'form_data': self.context.get('form_data', {}),
"is_submit": self.context.get("is_submit", False)}
form = f'<form_rander>{json.dumps(form_setting)}</form_rander>'
prompt_template = PromptTemplate.from_template(form_content_format, template_format='jinja2')
value = prompt_template.format(form=form)
return {
'name': self.node.properties.get('stepName'),
"index": index,
"result": value,
"form_content_format": self.context.get('form_content_format'),
"form_field_list": self.context.get('form_field_list'),
'form_data': self.context.get('form_data'),
'start_time': self.context.get('start_time'),
'run_time': self.context.get('run_time'),
'type': self.node.type,
'status': self.status,
'err_message': self.err_message
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个类和方法没有明显的错误或规范性问题;然而在业务层面上,请确保参数化设置与你的实际应用一致。

project_name: MyProject
author: MyUserName
created_at: 2024/11/6 17:39

base_form_node:

Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def convert_value(name: str, value, _type, is_required, source, node):


class BaseFunctionLibNodeNode(IFunctionLibNode):
def save_context(self, details, workflow_manage):
self.context['result'] = details.get('result')
self.answer_text = details.get('result')
def execute(self, function_lib_id, input_field_list, **kwargs) -> NodeResult:
function_lib = QuerySet(FunctionLib).filter(id=function_lib_id).first()
if not function_lib.is_active:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ def convert_value(name: str, value, _type, is_required, source, node):


class BaseFunctionNodeNode(IFunctionNode):
def save_context(self, details, workflow_manage):
self.context['result'] = details.get('result')
self.answer_text = details.get('result')

def execute(self, input_field_list, code, **kwargs) -> NodeResult:
params = {field.get('name'): convert_value(field.get('name'), field.get('value'), field.get('type'),
field.get('is_required'), field.get('source'), self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ def write_context(node_variable: Dict, workflow_variable: Dict, node: INode, wor


class BaseImageUnderstandNode(IImageUnderstandNode):
def save_context(self, details, workflow_manage):
self.context['answer'] = details.get('answer')
self.context['question'] = details.get('question')
self.answer_text = details.get('answer')

def execute(self, model_id, system, prompt, dialogue_number, history_chat_record, stream, chat_id, chat_record_id,
image,
**kwargs) -> NodeResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def _write_context(node_variable: Dict, workflow_variable: Dict, node: INode, wo
node.context['question'] = node_variable['question']
node.context['run_time'] = time.time() - node.context['start_time']
if workflow.is_result(node, NodeResult(node_variable, workflow_variable)):
workflow.answer += answer
node.answer_text = answer


def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INode, workflow):
Expand Down Expand Up @@ -73,6 +73,14 @@ def get_default_model_params_setting(model_id):


class BaseQuestionNode(IQuestionNode):
def save_context(self, details, workflow_manage):
self.context['run_time'] = details.get('run_time')
self.context['question'] = details.get('question')
self.context['answer'] = details.get('answer')
self.context['message_tokens'] = details.get('message_tokens')
self.context['answer_tokens'] = details.get('answer_tokens')
self.answer_text = details.get('answer')

def execute(self, model_id, system, prompt, dialogue_number, history_chat_record, stream, chat_id, chat_record_id,
model_params_setting=None,
**kwargs) -> NodeResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ def filter_result(document_list: List[Document], max_paragraph_char_number, top_


class BaseRerankerNode(IRerankerNode):
def save_context(self, details, workflow_manage):
self.context['document_list'] = details.get('document_list', [])
self.context['question'] = details.get('question')
self.context['run_time'] = details.get('run_time')
self.context['result_list'] = details.get('result_list')
self.context['result'] = details.get('result')

def execute(self, question, reranker_setting, reranker_list, reranker_model_id,
**kwargs) -> NodeResult:
documents = merge_reranker_list(reranker_list)
Expand Down
Loading
Loading