Skip to content

Commit

Permalink
fix: hung
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyoucao577 committed Oct 23, 2024
1 parent e08a07c commit 1a91ba5
Showing 1 changed file with 105 additions and 40 deletions.
145 changes: 105 additions & 40 deletions agents/ten_packages/extension/minimax_v2v_python/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import base64
import requests
import json
import httpx

from queue import Queue
from typing import Iterator, Any, List
Expand Down Expand Up @@ -248,50 +249,108 @@ def _complete_with_history(self, ts: datetime, buff: bytearray) -> Iterator[byte
"top_p": 0.95
}

response = requests.post(url, headers=headers, json=payload, stream=True)
logger.info(f"Get response, trace-id: {response.headers.get('Trace-Id')}")
start_time = datetime.now()
logger.info(f"start request, data len {len(buff)}")
# response = requests.post(url, headers=headers, json=payload, stream=True, timeout=5)
# logger.info(f"Get response, trace-id: {response.headers.get('Trace-Id')}, cost_time {self._duration_in_ms_since(start_time)}ms")
self.transcript = ""
i = 0
for line in response.iter_lines(decode_unicode=True):
if self._need_interrupt(ts):
logger.warning("interrupted")
with httpx.Client(timeout=httpx.Timeout(5)) as client:
try:
# 发送 POST 请求
with client.stream("POST", url, headers=headers, json=payload) as response:
response.raise_for_status() # 检查响应状态
for line in response.iter_lines():
# logger.info(f"-> line {line}")
if self._need_interrupt(ts):
logger.warning("interrupted")
if self.transcript:
self.transcript += "[interrupted]"
self._append_message("assistant", self.transcript)
self._send_transcript("", "assistant", True)
break

if not line.startswith("data:"):
logger.warning(f"ignore line {len(line)}")
continue

i+=1

resp = json.loads(line.strip("data:"))
if resp.get("choices") and resp["choices"][0].get("delta"):
delta = resp["choices"][0]["delta"]
if delta.get("role") == "assistant":
if delta.get("content"):
content = delta['content']
self.transcript += content
logger.info(f"[sse] data chunck-{i} get assistant transcript {content}")
self._send_transcript(content, "assistant", False)
if delta.get("audio_content") and delta["audio_content"] != "":
logger.info(f"[sse] data chunck-{i} get audio_content")
base64_str = delta["audio_content"]
# with open(f"minimax_v2v_data_{i}.txt", "a") as f:
# f.write(base64_str)
buff = base64.b64decode(base64_str)
self._send_audio_out(buff)
if delta.get("tool_calls"):
logger.info(f"ignore tool call {delta}")
continue
if delta.get("role") == "user":
self._send_transcript(delta['content'], "user", True)

except httpx.TimeoutException:
logger.warning("timeout")
except httpx.HTTPStatusError as e:
logger.warning(f"http status error: {e}")
except httpx.RequestError as e:
logger.warning(f"request error: {e}")
finally:
logger.info(f"http loop done, cost_time {self._duration_in_ms_since(start_time)}ms")
if self.transcript:
self.transcript += "[interrupted]"
self._append_message("assistant", self.transcript)
self._send_transcript("", "assistant", True)
break

if not line.startswith("data:"):
logger.warning(f"ignore line {len(line)}")
continue

i+=1

resp = json.loads(line.strip("data:"))
if resp.get("choices") and resp["choices"][0].get("delta"):
delta = resp["choices"][0]["delta"]
if delta.get("role") == "assistant":
if delta.get("content"):
content = delta['content']
self.transcript += content
logger.info(f"[sse] data chunck-{i} get assistant transcript {content}")
self._send_transcript(content, "assistant", False)
if delta.get("audio_content") and delta["audio_content"] != "":
logger.info(f"[sse] data chunck-{i} get audio_content")
base64_str = delta["audio_content"]
# with open(f"minimax_v2v_data_{i}.txt", "a") as f:
# f.write(base64_str)
buff = base64.b64decode(base64_str)
self._send_audio_out(buff)
if delta.get("tool_calls"):
logger.info(f"ignore tool call {delta}")
continue
elif delta.get("role") == "user":
self._send_transcript(delta['content'], "user", True)

if self.transcript:
self._append_message("assistant", self.transcript)
self._send_transcript("", "assistant", True)

# for line in response.iter_lines(decode_unicode=True):
# if self._need_interrupt(ts):
# logger.warning("interrupted")
# if self.transcript:
# self.transcript += "[interrupted]"
# self._append_message("assistant", self.transcript)
# self._send_transcript("", "assistant", True)
# break

# if not line.startswith("data:"):
# logger.warning(f"ignore line {len(line)}")
# continue

# i+=1

# resp = json.loads(line.strip("data:"))
# if resp.get("choices") and resp["choices"][0].get("delta"):
# delta = resp["choices"][0]["delta"]
# if delta.get("role") == "assistant":
# if delta.get("content"):
# content = delta['content']
# self.transcript += content
# logger.info(f"[sse] data chunck-{i} get assistant transcript {content}")
# self._send_transcript(content, "assistant", False)
# if delta.get("audio_content") and delta["audio_content"] != "":
# logger.info(f"[sse] data chunck-{i} get audio_content")
# base64_str = delta["audio_content"]
# # with open(f"minimax_v2v_data_{i}.txt", "a") as f:
# # f.write(base64_str)
# buff = base64.b64decode(base64_str)
# self._send_audio_out(buff)
# if delta.get("tool_calls"):
# logger.info(f"ignore tool call {delta}")
# continue
# if delta.get("role") == "user":
# self._send_transcript(delta['content'], "user", True)

# logger.info(f"Get response loop done, cost_time {self._duration_in_ms_since(start_time)}ms")
# if self.transcript:
# self._append_message("assistant", self.transcript)
# self._send_transcript("", "assistant", True)

def _get_messages(self) -> List[Any]:
messages = []
Expand Down Expand Up @@ -353,4 +412,10 @@ def _dump_audio_if_need(self, buf: bytearray, suffix: str) -> None:
return

with open("{}_{}.pcm".format("minimax_v2v", suffix), "ab") as dump_file:
dump_file.write(buf)
dump_file.write(buf)

def _duration_in_ms(self, start: datetime, end: datetime) -> int:
return int((end - start).total_seconds() * 1000)

def _duration_in_ms_since(self, start: datetime) -> int:
return self._duration_in_ms(start, datetime.now())

0 comments on commit 1a91ba5

Please sign in to comment.