Skip to content

Commit

Permalink
✨ feta:添加娱乐功能
Browse files Browse the repository at this point in the history
  • Loading branch information
yzyyz1387 committed Sep 25, 2023
1 parent 424e739 commit 91ed16b
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 6 deletions.
2 changes: 2 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
recursive-include cqsat/cqsat_resource/bank/imgs *.jpg *.png
recursive-include cqsat/cqsat_resource/bank *.json
recursive-include cqsat/cqsat_resource/voices/prefix *.mp3
recursive-include cqsat/cqsat_resource/voices/suffix *.mp3
recursive-include cqsat/pre *.txt
5 changes: 4 additions & 1 deletion cqsat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
# @Software: PyCharm
from . import (
exercise,
entertainment,
mgsl,
sat,
config,
log,
path,
utils
utils,
media_utils
)
from nonebot.plugin import PluginMetadata

Expand Down
20 changes: 20 additions & 0 deletions cqsat/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# python3
# -*- coding: utf-8 -*-
# @Time : 2023-09-24 23:44
# @Author : yzyyz
# @Email : youzyyz1384@qq.com
# @File : config.py
# @Software: PyCharm
from typing import Optional

from nonebot import get_driver
from pydantic import BaseModel, Extra


class Config(BaseModel, extra=Extra.ignore):
nonebot_plugin_go_cqhttp: Optional[bool] = False
go_cqhttp_path: Optional[str] = './'


global_config = get_driver().config
plugin_config = Config.parse_obj(get_driver().config.dict())
Binary file not shown.
Binary file added cqsat/cqsat_resource/voices/suffix/mdc1200.mp3
Binary file not shown.
Binary file added cqsat/cqsat_resource/voices/suffix/mdc1200_2.mp3
Binary file not shown.
63 changes: 63 additions & 0 deletions cqsat/entertainment/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# python3
# -*- coding: utf-8 -*-
# @Time : 2023-09-24 21:36
# @Author : yzyyz
# @Email : youzyyz1384@qq.com
# @File : __init__.py
# @Software: PyCharm
import random

from nonebot import on_command
from nonebot.adapters.onebot.v11 import MessageSegment, MessageEvent, Event, GroupMessageEvent, PrivateMessageEvent, \
Bot, Message
from nonebot.internal.params import ArgStr, ArgPlainText
from nonebot.params import CommandArg
from nonebot.permission import SUPERUSER
from nonebot.typing import T_State

from ..media_utils import convert_to_silk_wav, MergeTwoMp3
from ..config import *
from ..utils import *
from ..path import *

RESOURCE = Path(__file__).parent.parent / "cqsat_resource"
VOICES = RESOURCE / "voices"

voice_synthesis = on_command("/v", block=True, aliases={"/合成", "/vt"})


@voice_synthesis.handle()
async def _(event: Event, state: T_State, args: Message = CommandArg()):
if args and "-p" in str(args).strip():
state["prefix"] = True
if "-n" in str(args):
try:
msg = MsgText(event.json()).replace(' ', '').replace('禁', '')
noise = int(''.join(map(str, list(map(lambda x: int(x), filter(lambda x: x.isdigit(), msg))))))
except:
noise = 0
state["noise"] = noise


# @itsevin
# from https://github.com/itsevin/nonebot_plugin_record/blob/main/nonebot_plugin_record/__init__.py
@voice_synthesis.got("recording", prompt="请发送语音")
async def _(bot: Bot, event: MessageEvent, state: T_State):
mdc_list = [item for item in (VOICES / 'suffix').iterdir() if item.is_file()]
prefixes_path = (VOICES / 'prefix' / 'unknown_pre.mp3').absolute() if state.get("prefix", False) else None
if event.get_message()[0].type == "record":
if plugin_config.nonebot_plugin_go_cqhttp is True:
path_amr = "./accounts/" + bot.self_id + "/data/voices/" + event.get_message()[0].data["file"]
else:
path_amr = plugin_config.go_cqhttp_path + "data/voices/" + event.get_message()[0].data["file"]

out_this_path = MDC_GENERATOR_PATH / path_amr.split("/")[-1].replace(".amr", "")
wav_file = await convert_to_silk_wav(path_amr, out_this_path)
output_voice_path = await MergeTwoMp3(
voice=Path(wav_file),
mdc=(random.choice(mdc_list)).absolute(),
prefix=prefixes_path,
snr=state.get("noise", 0))
await voice_synthesis.finish(MessageSegment.record(f"file:///{output_voice_path}"))
else:
await voice_synthesis.finish("请回复语音,操作已退出...")
104 changes: 104 additions & 0 deletions cqsat/media_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# python3
# -*- coding: utf-8 -*-
# @Time : 2023-09-25 2:21
# @Author : yzyyz
# @Email : youzyyz1384@qq.com
# @File : media_utils.py
# @Software: PyCharm
from pathlib import Path

import pysilk
import os, pilk
from pydub import AudioSegment
import numpy as np
import librosa
from os import walk
from os.path import join as pjoin
from sys import argv
from subprocess import run as prun
import soundfile as sf
import pilk
import av


# 为什么是24000?
async def to_pcm(in_path: str) -> tuple[str, int]:
"""任意媒体文件转 pcm"""
out_path = os.path.splitext(in_path)[0] + '.pcm'
with av.open(in_path) as in_container:
in_stream = in_container.streams.audio[0]
sample_rate = in_stream.codec_context.sample_rate
with av.open(out_path, 'w', 's16le') as out_container:
out_stream = out_container.add_stream(
'pcm_s16le',
# rate=sample_rate,
rate=24000,

layout='mono'
)
try:
for frame in in_container.decode(in_stream):
frame.pts = None
for packet in out_stream.encode(frame):
out_container.mux(packet)
except:
pass
return out_path, sample_rate


async def sil_to_wav(silk_path, wav_path, rate: int = 24000):
"""
silk 文件转 wav
"""
wav_data = pysilk.decode_file(silk_path, to_wav=True, sample_rate=rate)
with open(wav_path, "wb") as f:
f.write(wav_data)
return wav_path


async def convert_to_silk_wav(media_path: str, out_path: Path) -> str:
"""任意媒体文件转 wav, 返回路径"""
pcm_path, sample_rate = await to_pcm(media_path)
silk_path = os.path.splitext(pcm_path)[0] + '.silk'
pilk.encode(pcm_path, silk_path, pcm_rate=24000, tencent=True)
os.remove(pcm_path)
wav_path = (await sil_to_wav(silk_path, str(out_path.resolve()) + '.wav'))
return wav_path


async def MergeTwoMp3(voice: Path, mdc: Path, prefix: Path = None, snr: int = 0):
audio, rate = librosa.load(voice, sr=None)
no = await awgn(audio, snr) if snr else audio
sf.write(str(voice.resolve()), no, rate)

input_voice_2 = AudioSegment.from_mp3(voice)
input_voice_3 = AudioSegment.from_mp3(mdc)
if prefix:
input_voice_1 = AudioSegment.from_mp3(prefix)
output_voice = input_voice_1 + input_voice_2 + input_voice_3
else:
output_voice = input_voice_2 + input_voice_3

output_voice_path = str(voice.resolve()).replace('.wav', '.mp3')
output_voice.export(output_voice_path, format="mp3")
Path(voice).unlink()
return output_voice_path


async def awgn(audio, snr):
# 在audio y中 添加噪声 噪声强度SNR为int
audio_power = audio ** 2
audio_average_power = np.mean(audio_power)
audio_average_db = 50 * np.log10(audio_average_power)
noise_average_db = audio_average_db - snr
noise_average_power = 100 ** (noise_average_db / 10)
mean_noise = 1
noise = np.random.normal(mean_noise, np.sqrt(noise_average_power), len(audio))
return audio + noise


if __name__ == '__main__':
import asyncio

loop = asyncio.get_event_loop()
loop.run_until_complete(convert_to_silk_wav('testimportant.amr'))
3 changes: 2 additions & 1 deletion cqsat/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
CACHE_CSS = TLE_CACHE_PATH / 'css.cache'
CACHE_OTHER = TLE_CACHE_PATH / 'other.cache'
SHOOTS_OUT_PATH = LOCAL / 'shoots_out'
MDC_GENERATOR_PATH = LOCAL / 'mdc_generator'


path_list = [LOCAL, EXAM_CACHE, EXAM_CACHE_A, EXAM_CACHE_B, EXAM_CACHE_C, TLE_CACHE_PATH, SHOOTS_OUT_PATH]
path_list = [LOCAL, EXAM_CACHE, EXAM_CACHE_A, EXAM_CACHE_B, EXAM_CACHE_C, TLE_CACHE_PATH, SHOOTS_OUT_PATH, MDC_GENERATOR_PATH]
driver = get_driver()
global_config = driver.config

Expand Down
6 changes: 3 additions & 3 deletions cqsat/sat/sat_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
bind_qth = on_command("绑定位置", aliases={"绑定QTH", "绑Qth", "绑定qth"}, block=True)


@bind_qth.got("QTH", prompt="请输入:\n 地名 \n或者输入:\n 经度 纬度 海拔\n\n参数用空格分隔\n")
@bind_qth.got("QTH", prompt="请输入:\n 地名 \n或者输入:\n 经度 纬度 海拔\n\n参数用空格分隔\n\n发送【取消】来取消操作")
async def _(
event: MessageEvent,
state: T_State,
Expand Down Expand Up @@ -71,7 +71,7 @@ async def _(
sub = on_command("订阅卫星", priority=2, block=True)


@sub.got("Sat", prompt="你要订阅那颗卫星?\n多颗卫星用空格分隔")
@sub.got("Sat", prompt="你要订阅那颗卫星?\n多颗卫星用空格分隔\n发送【取消】来取消操作")
async def _(
event: GroupMessageEvent,
state: T_State,
Expand Down Expand Up @@ -110,7 +110,7 @@ async def _(
await sub.finish("请先发送【绑定位置】来绑定QTH\n绑定后可以发送【订阅卫星】来订阅卫星")


@sub.got("E_angle", prompt="请输入最低仰角:")
@sub.got("E_angle", prompt="请输入最低仰角:\n发送【取消】来取消操作")
async def _(
event: GroupMessageEvent,
state: T_State,
Expand Down
19 changes: 19 additions & 0 deletions cqsat/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ async def send_ex(
else:
await matcher.reject(state["user_notice"] + reply)
return state
# FIXME 此处return无法访问,但是能跑?


async def shoot_scr(url, locator="html", img_output="out.png"):
Expand All @@ -251,3 +252,21 @@ async def shoot_scr(url, locator="html", img_output="out.png"):
await page.goto(url)
await page.locator(locator).screenshot(path=img_output)
await browser.close()


def MsgText(data: str):
"""
返回消息文本段内容(即去除 cq 码后的内容)
:param data: event.json()
:return: str
"""
try:
data = json.loads(data)
# 过滤出类型为 text 的【并且过滤内容为空的】
msg_text_list = filter(lambda x: x['type'] == 'text' and x['data']['text'].replace(' ', '') != '',
data['message'])
# 拼接成字符串并且去除两端空格
msg_text = ' '.join(map(lambda x: x['data']['text'].strip(), msg_text_list)).strip()
return msg_text
except:
return ''
9 changes: 8 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,11 @@ nonebot2>=2.0.0-beta.4
pyephem==9.99
python_dateutil==2.8.2
PyYAML==6.0
nonebot-plugin-apscheduler
nonebot-plugin-apscheduler
av
pilk
pysilk
pysilk-mod
soundfile
librosa
pydub

0 comments on commit 91ed16b

Please sign in to comment.