Skip to content

Commit

Permalink
add 知识库Milvus自动配置 (#36)
Browse files Browse the repository at this point in the history
1. 知识库更新
2. 默认配置更新
3. cicd 私有库更新
  • Loading branch information
yaojin3616 committed Sep 17, 2023
2 parents 45592d4 + cd0e4a7 commit fd24747
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 55 deletions.
30 changes: 21 additions & 9 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,29 @@ jobs:
cd ./src/bisheng-langchain
python setup.py bdist_wheel
repo="http://110.16.193.170:50083/repository/pypi-hosted/"
twine upload --verbose -u ${{ secrets.NEXUS_USER }} -p ${{ secrets.NEXUS_PASSWORD }} --repository-url $repo dist/*.whl
# 发布到
- name: Login
twine upload --verbose -u ${{ secrets.NEXUS_USER }} -p ${{ secrets.NEXUS_PASSWORD }} --repository-url $repo dist/*.whl
# 发布到 私有仓库
- name: set insecure registry
run: |
echo "{ \"insecure-registries\": [\"http://110.16.193.170:50083\"] }" | sudo tee /etc/docker/daemon.json
sudo service docker restart
- name: Set up QEMU
uses: docker/setup-qemu-action@v1

# - name: Set up Docker Buildx
# uses: docker/setup-buildx-action@v1
# with:
# config-inline: |
# [registry."http://110.16.193.170:50083"]
# http = true
# insecure = true
- name: Login Nexus Container Registry
uses: docker/login-action@v2
with:
# GitHub Repo => Settings => Secrets 增加 docker hub 登录密钥信息
# DOCKERHUB_USERNAME 是 docker hub 账号名.
# DOCKERHUB_TOKEN: docker hub => Account Setting => Security 创建.
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
registry: http://110.16.193.170:50083
username: ${{ secrets.NEXUS_PUBLIC }}
password: ${{ secrets.NEXUS_PUBLIC_PASSWORD }}
# 构建 backend 并推送到 Docker hub
- name: Build backend and push
id: docker_build_backend
Expand Down
5 changes: 5 additions & 0 deletions docker/bisheng/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ admin:
user_name: "admin"
password: "1234"

# bisheng-rt服务地址
bisheng-rt:
name: "RT-Server"
server: "127.0.0.1:9001"

# 为知识库的embedding进行模型撇脂
knowledges:
embeddings:
Expand Down
2 changes: 1 addition & 1 deletion src/backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ RUN python -m pip install --upgrade pip && \
RUN poetry config virtualenvs.create false
RUN poetry install --no-interaction --no-ansi --without dev

CMD ["uvicorn", "bisheng.main:app", "--workers", "20", "--host", "0.0.0.0", "--port", "7860"]
CMD ["uvicorn", "bisheng.main:app", "--workers", "2", "--host", "0.0.0.0", "--port", "7860"]
24 changes: 18 additions & 6 deletions src/backend/bisheng/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def remove_api_keys(flow: dict):
node_data = node.get('data').get('node')
template = node_data.get('template')
for value in template.values():
if (isinstance(value, dict) and has_api_terms(value['name']) and value.get('password')):
if (isinstance(value, dict) and has_api_terms(value['name']) and
value.get('password')):
value['value'] = None

return flow
Expand Down Expand Up @@ -76,7 +77,9 @@ def build_flow(graph_data: dict, artifacts, process_file=False, flow_id=None, ch
# 如果存在文件,当前不操作文件,避免重复操作
if not process_file:
template_dict = {
key: value for key, value in vertex.data['node']['template'].items() if isinstance(value, dict)
key: value
for key, value in vertex.data['node']['template'].items()
if isinstance(value, dict)
}
for key, value in template_dict.items():
if value.get('type') == 'file':
Expand All @@ -93,7 +96,8 @@ def build_flow(graph_data: dict, artifacts, process_file=False, flow_id=None, ch
vertex.build()
params = vertex._built_object_repr()
valid = True
logger.debug(f"Building node {str(params)[:50]}{'...' if len(str(params)) > 50 else ''}")
logger.debug(
f"Building node {str(params)[:50]}{'...' if len(str(params)) > 50 else ''}")
if vertex.artifacts:
# The artifacts will be prompt variables
# passed to build_input_keys_response
Expand Down Expand Up @@ -121,7 +125,11 @@ def build_flow(graph_data: dict, artifacts, process_file=False, flow_id=None, ch
return graph


def build_flow_no_yield(graph_data: dict, artifacts, process_file=False, flow_id=None, chat_id=None):
def build_flow_no_yield(graph_data: dict,
artifacts,
process_file=False,
flow_id=None,
chat_id=None):
try:
# Some error could happen when building the graph
graph = Graph.from_payload(graph_data)
Expand All @@ -134,7 +142,9 @@ def build_flow_no_yield(graph_data: dict, artifacts, process_file=False, flow_id
# 如果存在文件,当前不操作文件,避免重复操作
if not process_file:
template_dict = {
key: value for key, value in vertex.data['node']['template'].items() if isinstance(value, dict)
key: value
for key, value in vertex.data['node']['template'].items()
if isinstance(value, dict)
}
for key, value in template_dict.items():
if value.get('type') == 'file':
Expand All @@ -147,10 +157,12 @@ def build_flow_no_yield(graph_data: dict, artifacts, process_file=False, flow_id
if vertex.base_type == 'vectorstores':
if 'collection_name' in vertex.params and not vertex.params.get('collection_name'):
vertex.params['collection_name'] = f'tmp_{flow_id}_{chat_id}'
logger.info(f"rename_vector_col col={vertex.params['collection_name']}")

vertex.build()
params = vertex._built_object_repr()
logger.debug(f"Building node {str(params)[:50]}{'...' if len(str(params)) > 50 else ''}")
logger.debug(
f"Building node {str(params)[:50]}{'...' if len(str(params)) > 50 else ''}")
if vertex.artifacts:
# The artifacts will be prompt variables
# passed to build_input_keys_response
Expand Down
33 changes: 19 additions & 14 deletions src/backend/bisheng/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@
from typing import List, Optional
from uuid import UUID

from bisheng.api.utils import (build_flow, build_flow_no_yield,
build_input_keys_response)
from bisheng.api.v1.schemas import (BuildStatus, BuiltResponse, ChatList,
InitResponse, StreamData)
from bisheng.api.utils import build_flow, build_flow_no_yield, build_input_keys_response
from bisheng.api.v1.schemas import BuildStatus, BuiltResponse, ChatList, InitResponse, StreamData
from bisheng.cache.redis import redis_client
from bisheng.chat.manager import ChatManager
from bisheng.database.base import get_session
from bisheng.database.models.flow import Flow
from bisheng.database.models.message import ChatMessage, ChatMessageRead
from bisheng.utils.logger import logger
from bisheng.utils.util import get_cache_key
from fastapi import (APIRouter, HTTPException, WebSocket, WebSocketException,
status)
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketException, status
from fastapi.encoders import jsonable_encoder
from fastapi.params import Depends
from fastapi.responses import StreamingResponse
Expand All @@ -40,7 +37,8 @@ def get_chatmessage(*,
payload = json.loads(Authorize.get_jwt_subject())
if not chat_id or not flow_id:
return {'code': 500, 'message': 'chat_id 和 flow_id 必传参数'}
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id, ChatMessage.chat_id == chat_id,
where = select(ChatMessage).where(ChatMessage.flow_id == flow_id,
ChatMessage.chat_id == chat_id,
ChatMessage.user_id == payload.get('user_id'))
if id:
where = where.where(ChatMessage.id < id)
Expand All @@ -53,11 +51,11 @@ def get_chatlist_list(*, session: Session = Depends(get_session), Authorize: Aut
Authorize.jwt_required()
payload = json.loads(Authorize.get_jwt_subject())

smt = (select(
ChatMessage.flow_id, ChatMessage.chat_id, ChatMessage.chat_id,
func.max(ChatMessage.create_time).label('create_time'),
func.max(ChatMessage.update_time).label('update_time')).where(ChatMessage.user_id == payload.get('user_id')).group_by(
ChatMessage.flow_id).order_by(func.max(ChatMessage.create_time).desc()))
smt = (select(ChatMessage.flow_id, ChatMessage.chat_id, ChatMessage.chat_id,
func.max(ChatMessage.create_time).label('create_time'),
func.max(ChatMessage.update_time).label('update_time')).where(
ChatMessage.user_id == payload.get('user_id')).group_by(
ChatMessage.flow_id).order_by(func.max(ChatMessage.create_time).desc()))
db_message = session.exec(smt).all()
flow_ids = [message.flow_id for message in db_message]
db_flow = session.exec(select(Flow).where(Flow.id.in_(flow_ids))).all()
Expand Down Expand Up @@ -212,7 +210,10 @@ async def event_stream(flow_id, chat_id: str):
except StopIteration as e:
graph = e.value
except Exception as e:
flow_data_store.hsetkey(flow_data_key, 'status', BuildStatus.FAILURE.value, expiration=expire)
flow_data_store.hsetkey(flow_data_key,
'status',
BuildStatus.FAILURE.value,
expiration=expire)
yield str(StreamData(event='error', data={'error': str(e)}))
return

Expand All @@ -231,7 +232,11 @@ async def event_stream(flow_id, chat_id: str):
input_keys_response['memory_keys'].extend(input_keys.get('memory_keys'))
input_keys_response['handle_keys'].extend(input_keys.get('handle_keys'))
elif ('fileNode' in node.output):
input_keys_response['input_keys'].append({'file_path': '', 'type': 'file', 'id': node.id})
input_keys_response['input_keys'].append({
'file_path': '',
'type': 'file',
'id': node.id
})

yield str(StreamData(event='message', data=input_keys_response))
# We need to reset the chat history
Expand Down
15 changes: 10 additions & 5 deletions src/backend/bisheng/api/v1/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

import requests
from bisheng.database.base import get_session
from bisheng.database.models.model_deploy import (ModelDeploy, ModelDeployQuery, ModelDeployRead, ModelDeployUpdate)
from bisheng.database.models.model_deploy import (ModelDeploy, ModelDeployQuery, ModelDeployRead,
ModelDeployUpdate)
from bisheng.database.models.server import Server, ServerCreate, ServerRead
from bisheng.utils.logger import logger
from fastapi import APIRouter, Depends, HTTPException
Expand Down Expand Up @@ -96,7 +97,8 @@ async def load(*, session=Depends(get_session), deploy_id: dict):
session.commit()
session.refresh(db_deploy)
# 真正开始执行load
asyncio.get_event_loop().run_in_executor(thread_pool, load_model, url, data, deploy_id.get('deploy_id'))
asyncio.get_event_loop().run_in_executor(thread_pool, load_model, url, data,
deploy_id.get('deploy_id'))
return {'message': 'load success'}
except Exception as exc:
logger.error(f'Error load model: {exc}')
Expand Down Expand Up @@ -233,7 +235,7 @@ async def update_model(endpoint: str, server: str):
content = resp.text
except Exception as e:
logger.error(str(e))
return
return []

session = next(get_session())
db_deploy = session.exec(select(ModelDeploy).where(ModelDeploy.server == server)).all()
Expand All @@ -245,7 +247,9 @@ async def update_model(endpoint: str, server: str):
if model_name in model_dict:
db_model = model_dict.get(model_name)
else:
db_model = ModelDeploy(server=server, endpoint=f'http://{endpoint}/v2.1/models', model=model_name)
db_model = ModelDeploy(server=server,
endpoint=f'http://{endpoint}/v2.1/models',
model=model_name)

# 当前是上下线中,需要判断
origin_status = db_model.status
Expand All @@ -260,7 +264,8 @@ async def update_model(endpoint: str, server: str):
if not db_model.status:
db_model.status = '未上线'
logger.debug(
f'update_status={model_name} rt_status={status} db_status={origin_status} now_status={db_model.status}')
f'update_status={model_name} rt_status={status} db_status={origin_status} now_status={db_model.status}'
)
if not db_model.config:
# 初始化config
config_url = f'http://{endpoint}/v2/repository/models/{model_name}/config'
Expand Down
56 changes: 43 additions & 13 deletions src/backend/bisheng/chat/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ def add_message(self, client_id: str, chat_id: str, message: ChatMessage):
"""Add a message to the chat history."""

self.history[get_cache_key(client_id, chat_id)].append(message)
if (message.message or message.intermediate_steps or message.files) and message.type != 'stream':
if (message.message or message.intermediate_steps or
message.files) and message.type != 'stream':
with next(get_session()) as seesion:
from bisheng.database.models.message import ChatMessage
msg = message.copy()
msg.message = str(msg.message) if isinstance(msg.message, dict) else msg.message
files = json.dumps(msg.files) if msg.files else ''
msg.__dict__.pop('files')
db_message = ChatMessage(flow_id=client_id, chat_id=chat_id, files=files, **msg.__dict__)
db_message = ChatMessage(flow_id=client_id,
chat_id=chat_id,
files=files,
**msg.__dict__)
logger.info(f'chat={db_message}')
seesion.add(db_message)
seesion.commit()
Expand Down Expand Up @@ -97,8 +101,8 @@ def update(self):
data_type=self.last_cached_object_dict['type'],
)

self.chat_history.add_message(self.cache_manager.current_client_id, self.cache_manager.current_chat_id,
chat_response)
self.chat_history.add_message(self.cache_manager.current_client_id,
self.cache_manager.current_chat_id, chat_response)

async def connect(self, client_id: str, chat_id: str, websocket: WebSocket):
await websocket.accept()
Expand Down Expand Up @@ -130,7 +134,8 @@ async def close_connection(self, client_id: str, chat_id: str, code: int, reason
if 'after sending' in str(exc):
logger.error(exc)

async def process_file(self, client_id: str, chat_id: str, user_id: int, file_path: str, id: str):
async def process_file(self, client_id: str, chat_id: str, user_id: int, file_path: str,
id: str):
"""upload file to make flow work"""
db_flow = next(get_session()).get(Flow, client_id)
graph_data = db_flow.data
Expand All @@ -152,7 +157,11 @@ async def process_file(self, client_id: str, chat_id: str, user_id: int, file_pa
user_id=user_id)
self.chat_history.add_message(client_id, chat_id, file)
# graph_data = payload
start_resp = ChatResponse(message=None, type='begin', intermediate_steps='', category='system', user_id=user_id)
start_resp = ChatResponse(message=None,
type='begin',
intermediate_steps='',
category='system',
user_id=user_id)
await self.send_json(client_id, chat_id, start_resp)
start_resp.type = 'start'
await self.send_json(client_id, chat_id, start_resp)
Expand Down Expand Up @@ -220,9 +229,17 @@ async def process_file(self, client_id: str, chat_id: str, user_id: int, file_pa

start_resp.category = 'report'
await self.send_json(client_id, chat_id, start_resp)
response = ChatResponse(message='', type='end', intermediate_steps=report, category='report', user_id=user_id)
response = ChatResponse(message='',
type='end',
intermediate_steps=report,
category='report',
user_id=user_id)
await self.send_json(client_id, chat_id, response)
close_resp = ChatResponse(message=None, type='close', intermediate_steps='', category='system', user_id=user_id)
close_resp = ChatResponse(message=None,
type='close',
intermediate_steps='',
category='system',
user_id=user_id)
await self.send_json(client_id, chat_id, close_resp)

async def process_message(self,
Expand Down Expand Up @@ -252,9 +269,15 @@ async def process_message(self,
self.chat_history.add_message(client_id, chat_id, chat_inputs)
# graph_data = payload
if not is_bot:
start_resp = ChatResponse(message=None, type='begin', intermediate_steps='', user_id=user_id)
start_resp = ChatResponse(message=None,
type='begin',
intermediate_steps='',
user_id=user_id)
await self.send_json(client_id, chat_id, start_resp)
start_resp = ChatResponse(message=None, type='start', intermediate_steps='', user_id=user_id)
start_resp = ChatResponse(message=None,
type='start',
intermediate_steps='',
user_id=user_id)
await self.send_json(client_id, chat_id, start_resp)

# is_first_message = len(self.chat_history.get_history(client_id=client_id)) <= 1
Expand All @@ -276,7 +299,10 @@ async def process_message(self,
category='processing',
user_id=user_id)
await self.send_json(client_id, chat_id, end_resp)
close_resp = ChatResponse(message=None, type='close', intermediate_steps='', user_id=user_id)
close_resp = ChatResponse(message=None,
type='close',
intermediate_steps='',
user_id=user_id)
await self.send_json(client_id, chat_id, close_resp)
return

Expand Down Expand Up @@ -346,7 +372,10 @@ async def process_message(self,
user_id=user_id)
await self.send_json(client_id, chat_id, response)
# 循环结束
close_resp = ChatResponse(message=None, type='close', intermediate_steps='', user_id=user_id)
close_resp = ChatResponse(message=None,
type='close',
intermediate_steps='',
user_id=user_id)
await self.send_json(client_id, chat_id, close_resp)
return result

Expand All @@ -358,7 +387,8 @@ def set_cache(self, client_id: str, langchain_object: Any) -> bool:
self.in_memory_cache.set(client_id, langchain_object)
return client_id in self.in_memory_cache

async def handle_websocket(self, client_id: str, chat_id: str, websocket: WebSocket, user_id: int):
async def handle_websocket(self, client_id: str, chat_id: str, websocket: WebSocket,
user_id: int):
await self.connect(client_id, chat_id, websocket)

try:
Expand Down
Loading

0 comments on commit fd24747

Please sign in to comment.