Skip to content

Commit

Permalink
<all>[refactor]type check
Browse files Browse the repository at this point in the history
  • Loading branch information
HornCopper committed Aug 23, 2024
1 parent fe7acd8 commit 30b3faf
Show file tree
Hide file tree
Showing 103 changed files with 932 additions and 1,378 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ affections.json
population.json

# Byte-compiled / optimized / DLL files
.vscode/
__pycache__/
*.py[cod]
*$py.class
Expand Down
66 changes: 32 additions & 34 deletions src/constant/jx3/skilldatalib.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from urllib.error import HTTPError
from pathlib import Path
from typing import Union, Literal, Optional

from nonebot.adapters.onebot.v11 import MessageSegment as ms

from src.tools.utils import get_url, get_status, get_content, get_api
from src.tools.utils import get_url, get_status, get_content
from src.tools.file import read, write
from src.tools.utils.path import CONSTANT, ASSETS

Expand All @@ -17,41 +17,41 @@
force_list = json.load(school)


def kftosh(kf: str) -> str:
def kungfu_to_school(kungfu_name: str) -> Union[str, Literal[False]]:
with open(CONSTANT + "/jx3/school.json", "r", encoding="utf-8") as f:
kf_dict = json.load(f)
for k, v_list in kf_dict.items():
if kf in v_list:
return k
elif kf == k:
return k # 如果传入参数本身就是键,直接返回该键
for school_name, school_aliases in kf_dict.items():
if kungfu_name in school_aliases:
return school_name
elif kungfu_name == school_name:
return school_name # 如果传入参数本身就是键,直接返回该键
return False


def aliases(SkillName: str) -> str:
def school_name_aliases(SkillName: str) -> Union[str, Literal[False]]:
with open(CONSTANT + "/jx3/kungfu.json", "r", encoding="utf-8") as f:
aliases_dict = json.load(f)

for k, v in aliases_dict.items():
if SkillName in v:
return k
for kungfu, kungfu_aliases in aliases_dict.items():
if SkillName in kungfu_aliases:
return kungfu
return False


async def getTalents():
async def get_talents():
"""
获取所有门派的奇穴。
数据来源:`JX3BOX` & `JX3API`
"""
data_list = list(force_list)
for i in data_list:
if await get_status(url=f"https://data.jx3box.com/talent/{i}.json") != 404:
info = await get_url(url=f"https://data.jx3box.com/bps/std/{i}/talent.json")
status_code = await get_status(f"https://data.jx3box.com/talent/{i}.json")
if status_code != 404:
info = await get_url(f"https://data.jx3box.com/bps/std/{i}/talent.json")
data = json.loads(info)
for a in data:
write(ASSETS + "/jx3/talents/" +
a["kungfu"] + ".json", json.dumps(a, ensure_ascii=False))
write(ASSETS + "/jx3/talents/" + a["kungfu"] + ".json", json.dumps(a, ensure_ascii=False))


async def getSkills():
Expand All @@ -62,15 +62,16 @@ async def getSkills():
"""
data_list = list(force_list)
for i in data_list:
if await get_status(url=f"https://data.jx3box.com/bps/std/{i}/skill.json") != 404:
info = await get_url(url=f"https://data.jx3box.com/bps/std/{i}/skill.json")
status_code = await get_status(f"https://data.jx3box.com/bps/std/{i}/skill.json")
if status_code != 404:
info = await get_url(f"https://data.jx3box.com/bps/std/{i}/skill.json")
data = json.loads(info)
for a in data:
write(ASSETS + "/jx3/skills/" +
a["kungfuName"] + ".json", json.dumps(a, ensure_ascii=False))


async def get_icon(skillName: str, type_: str, api_icon: str = None, kungfu: str = None) -> str:
async def get_icon(skillName: str, type_: str, api_icon: str = "", kungfu: Optional[str] = "") -> Union[str, ms]:
if kungfu is None:
raise ValueError("Key value `kungfu` was not found.")
final_path = ASSETS + "/jx3/icons/" + kungfu + "_" + skillName + ".png"
Expand All @@ -81,10 +82,7 @@ async def get_icon(skillName: str, type_: str, api_icon: str = None, kungfu: str
return ms.image(Path(final_path).as_uri())
else:
api_icon_url = api_icon
try:
icon = await get_content(api_icon_url)
except:
raise HTTPError(msg="Can't connect to " + api_icon_url + ".")
icon = await get_content(api_icon_url)
cache = open(ASSETS + "/jx3/icons/" + kungfu +
"_" + skillName + ".png", mode="wb")
cache.write(icon)
Expand All @@ -94,23 +92,23 @@ async def get_icon(skillName: str, type_: str, api_icon: str = None, kungfu: str
else:
return ms.image(Path(final_path).as_uri())

async def getSingleSkill(kungfu: str, skillName: str):
kungfu = aliases(kungfu)
if kungfu == "隐龙诀":
kungfu == "隐龙决" # 由于`JX3Box`的`API`的数据错误问题,目前只能这样适配,等到数据纠正后删除这块代码。其实是推栏的代码错了笑死。
if not kungfu:
async def getSingleSkill(kungfu_name: str, skillName: str):
final_kungfu_name = school_name_aliases(kungfu_name)
if final_kungfu_name == "隐龙诀":
final_kungfu_name = "隐龙决"
if not final_kungfu_name:
return False
try:
data = json.loads(read(ASSETS + "/jx3/skills/" + kungfu + ".json"))
data = json.loads(read(ASSETS + "/jx3/skills/" + final_kungfu_name + ".json"))
except Exception:
await getSkills()
await getSingleSkill(kungfu, skillName)
await getSingleSkill(final_kungfu_name, skillName)
moreInfo = data["remarks"]
msg = ""
for i in moreInfo:
for x in i["forceSkills"]:
if x["skillName"] == skillName:
image = await get_icon(x["skillName"], "ms", x["icon"]["FileName"], kungfu)
image = await get_icon(x["skillName"], "ms", x["icon"]["FileName"], final_kungfu_name)
releaseType = x["releaseType"] # 释放类型
if releaseType != "瞬间释放":
releaseType = releaseType + "释放"
Expand Down Expand Up @@ -141,14 +139,14 @@ async def getSingleSkill(kungfu: str, skillName: str):


async def getSingleTalent(Kungfu: str, TalentName: str):
kungfuname = aliases(Kungfu)
kungfuname = school_name_aliases(Kungfu)
if not kungfuname:
return "此心法不存在哦,请检查后重试~"
try:
data = json.loads(
read(ASSETS + "/jx3/talents/" + kungfuname + ".json"))
except Exception:
await getTalents()
await get_talents()
await getSingleTalent(kungfuname, TalentName)
detail = data["kungfuLevel"]
correct = {str(i["level"]): i for i in detail}
Expand Down
34 changes: 18 additions & 16 deletions src/plugins/ban/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import Union, Any, List, Dict

from nonebot import on_command
from nonebot.adapters import Message, Event, Bot
from nonebot.adapters import Message, Bot
from nonebot.matcher import Matcher
from nonebot.adapters.onebot.v11 import GroupMessageEvent
from nonebot.adapters.onebot.v11 import GroupMessageEvent, MessageEvent
from nonebot.params import CommandArg, Arg

from src.tools.config import Config
Expand All @@ -18,11 +20,11 @@

leave_msg = leave_msg + "\n" + add_

def banned(qq: str):
banned: BannedList = group_db.where_one(BannedList(), default=BannedList())
banned = banned.banned_list
def banned(user_id: str) -> bool:
banned_data: Union[BannedList, Any] = group_db.where_one(BannedList(), default=BannedList())
banned: List[Dict[str, str]] = banned_data.banned_list
for one in banned:
if one["uid"] == qq:
if one["uid"] == user_id:
return True
return False

Expand All @@ -31,7 +33,7 @@ def banned(qq: str):


@ban.handle()
async def _(bot: Bot, event: Event, args: Message = CommandArg()):
async def _(bot: Bot, event: MessageEvent, args: Message = CommandArg()):
if args.extract_plain_text() == "":
return
if not checker(str(event.user_id), 10):
Expand All @@ -49,7 +51,7 @@ async def _(bot: Bot, event: Event, args: Message = CommandArg()):
elif banned(sb):
return ban.finish("唔……全域封禁失败,这个人已经被封禁了。")
else:
current_data: BannedList = group_db.where_one(BannedList(), default=BannedList())
current_data: Union[BannedList, Any] = group_db.where_one(BannedList(), default=BannedList())
current = current_data.banned_list
current.append({
"uid": sb,
Expand All @@ -65,7 +67,7 @@ async def _(bot: Bot, event: Event, args: Message = CommandArg()):


@unban.handle()
async def _(bot: Bot, event: Event, args: Message = CommandArg()):
async def _(bot: Bot, event: MessageEvent, args: Message = CommandArg()):
if args.extract_plain_text() == "":
return
if not checker(str(event.user_id), 10):
Expand All @@ -77,7 +79,7 @@ async def _(bot: Bot, event: Event, args: Message = CommandArg()):
await unban.finish("您输入了什么?")
if banned(sb) is False:
await unban.finish("全域解封失败,并没有封禁此人哦~")
current_data: BannedList = group_db.where_one(BannedList(), default=BannedList())
current_data: Union[BannedList, Any] = group_db.where_one(BannedList(), default=BannedList())
current = current_data.banned_list
for one in current:
if one["uid"] == sb:
Expand All @@ -88,8 +90,8 @@ async def _(bot: Bot, event: Event, args: Message = CommandArg()):


@preprocess.handle()
async def _(matcher: Matcher, event: Event):
current: BannedList = group_db.where_one(BannedList(), default=BannedList())
async def _(matcher: Matcher, event: MessageEvent):
current: Union[BannedList, Any] = group_db.where_one(BannedList(), default=BannedList())
for i in current.banned_list:
if str(event.user_id) == i["uid"] and not checker(str(event.user_id),10):
matcher.stop_propagation()
Expand All @@ -99,7 +101,7 @@ async def _(matcher: Matcher, event: Event):


@dismiss.handle()
async def _(bot: Bot, event: Event, args: Message = CommandArg()):
async def _(bot: Bot, event: GroupMessageEvent, args: Message = CommandArg()):
if args.extract_plain_text() != "":
return
personal_data = await bot.call_api("get_group_member_info", group_id=event.group_id, user_id=event.user_id, no_cache=True)
Expand All @@ -111,7 +113,7 @@ async def _(bot: Bot, event: Event, args: Message = CommandArg()):


@dismiss.got("confirm")
async def _(bot: Bot, event: Event, confirm: Message = Arg()):
async def _(bot: Bot, event: GroupMessageEvent, confirm: Message = Arg()):
u_input = confirm.extract_plain_text()
if u_input == "移除音卡":
await dismiss.send(leave_msg)
Expand All @@ -122,7 +124,7 @@ async def _(bot: Bot, event: Event, confirm: Message = Arg()):
recovery = on_command("recovery", aliases={"重置音卡"}, force_whitespace=True, priority=5)

@recovery.handle()
async def _(bot: Bot, event: Event, args: Message = CommandArg()):
async def _(bot: Bot, event: GroupMessageEvent, args: Message = CommandArg()):
if args.extract_plain_text() != "":
return
personal_data = await bot.call_api("get_group_member_info", group_id=event.group_id, user_id=event.user_id, no_cache=True)
Expand All @@ -137,7 +139,7 @@ async def _(bot: Bot, event: GroupMessageEvent, confirm: Message = Arg()):
u_input = confirm.extract_plain_text()
if u_input == "重置音卡":
group_id = str(event.group_id)
group_settings: GroupSettings = group_db.where_one(GroupSettings(), "group_id = ?", group_id, default=None)
group_settings: Union[GroupSettings, Any] = group_db.where_one(GroupSettings(), "group_id = ?", group_id, default=None)
group_settings = GroupSettings(id=group_settings.id, group_id=group_settings.group_id)
group_db.save(group_settings)
await dismiss.send("重置成功!可以重新开始绑定本群数据了!")
6 changes: 4 additions & 2 deletions src/plugins/banword/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Union, Any

from nonebot import on_command
from nonebot.adapters import Message
from nonebot.adapters.onebot.v11 import GroupMessageEvent
Expand All @@ -18,7 +20,7 @@ async def _(event: GroupMessageEvent, args: Message = CommandArg()): # 违禁
if not checker(str(event.user_id), 10):
await banword.finish(error(5))
if bw:
current_data: BannedWordList = group_db.where_one(BannedWordList(), default=BannedWordList())
current_data: Union[BannedWordList, Any] = group_db.where_one(BannedWordList(), default=BannedWordList())
current_list = current_data.banned_word_list
if bw in current_list:
await banword.finish("唔……封禁失败,已经封禁过了。")
Expand All @@ -40,7 +42,7 @@ async def _(event: GroupMessageEvent, args: Message = CommandArg()):
await unbanword.finish(error(5))
bw = args.extract_plain_text()
if bw:
current_data: BannedWordList = group_db.where_one(BannedWordList(), default=BannedWordList())
current_data: Union[BannedWordList, Any] = group_db.where_one(BannedWordList(), default=BannedWordList())
current_list = current_data.banned_word_list
if bw in current_list:
current_list.remove(bw)
Expand Down
19 changes: 11 additions & 8 deletions src/plugins/blacklist/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from pathlib import Path
from typing import Union, Any

from nonebot import on_command
from nonebot.adapters import Message, Bot, Event
from nonebot.adapters import Message, Bot
from nonebot.adapters.onebot.v11 import GroupMessageEvent, MessageSegment as ms
from nonebot.params import CommandArg

Expand All @@ -15,7 +16,7 @@


@block.handle()
async def _(bot: Bot, event: Event, args: Message = CommandArg()):
async def _(bot: Bot, event: GroupMessageEvent, args: Message = CommandArg()):
if args.extract_plain_text() == "":
return
arg = args.extract_plain_text().split(" ")
Expand All @@ -30,7 +31,7 @@ async def _(bot: Bot, event: Event, args: Message = CommandArg()):
reason = arg[1]
new = {"ban": sb, "reason": reason}
group_id = str(event.group_id)
current_data: GroupSettings = group_db.where_one(GroupSettings(), "group_id = ?", group_id, default=GroupSettings())
current_data: Union[GroupSettings, Any] = group_db.where_one(GroupSettings(), "group_id = ?", group_id, default=GroupSettings())
current_blacklist = current_data.blacklist
for i in current_blacklist:
if i["ban"] == sb:
Expand All @@ -43,7 +44,7 @@ async def _(bot: Bot, event: Event, args: Message = CommandArg()):
unblock = on_command("unblock", aliases={"移出黑名单"}, force_whitespace=True, priority=5) # 解除避雷

@unblock.handle()
async def _(bot: Bot, event: Event, args: Message = CommandArg()):
async def _(bot: Bot, event: GroupMessageEvent, args: Message = CommandArg()):
if args.extract_plain_text() == "":
return
arg = args.extract_plain_text().split(" ")
Expand All @@ -56,7 +57,7 @@ async def _(bot: Bot, event: Event, args: Message = CommandArg()):
await unblock.finish("参数仅为玩家名,请勿附带任何信息!")
sb = arg[0]
group_id = str(event.group_id)
current_data: GroupSettings = group_db.where_one(GroupSettings(), "group_id = ?", group_id, default=GroupSettings(group_id=str(event.group_id)))
current_data: Union[GroupSettings, Any] = group_db.where_one(GroupSettings(), "group_id = ?", group_id, default=GroupSettings(group_id=str(event.group_id)))
current_blacklist = current_data.blacklist
for i in current_blacklist:
if i["ban"] == sb:
Expand All @@ -69,15 +70,15 @@ async def _(bot: Bot, event: Event, args: Message = CommandArg()):
sblock = on_command("sblock", aliases={"查找黑名单"}, force_whitespace=True, priority=5) # 查询是否在避雷名单

@sblock.handle()
async def _(event: Event, args: Message = CommandArg()):
async def _(event: GroupMessageEvent, args: Message = CommandArg()):
if args.extract_plain_text() == "":
return
arg = args.extract_plain_text().split(" ")
if len(arg) != 1:
await sblock.finish("参数仅为玩家名,请勿附带任何信息!")
sb = arg[0]
group_id = str(event.group_id)
current_data: GroupSettings = group_db.where_one(GroupSettings(), "group_id = ?", group_id, default=GroupSettings(group_id=str(event.group_id)))
current_data: Union[GroupSettings, Any] = group_db.where_one(GroupSettings(), "group_id = ?", group_id, default=GroupSettings(group_id=str(event.group_id)))
current_blacklist = current_data.blacklist
for i in current_blacklist:
if i["ban"] == sb:
Expand All @@ -101,7 +102,7 @@ async def _(bot: Bot, event: GroupMessageEvent, args: Message = CommandArg()):
if args.extract_plain_text() != "":
return
group_id = str(event.group_id)
current_data: GroupSettings = group_db.where_one(GroupSettings(), "group_id = ?", group_id, default=GroupSettings(group_id=str(event.group_id)))
current_data: Union[GroupSettings, Any] = group_db.where_one(GroupSettings(), "group_id = ?", group_id, default=GroupSettings(group_id=str(event.group_id)))
current_blacklist = current_data.blacklist
table = []
if len(current_blacklist) == 0:
Expand All @@ -116,5 +117,7 @@ async def _(bot: Bot, event: GroupMessageEvent, args: Message = CommandArg()):
final_html = CACHE + "/" + get_uuid() + ".html"
write(final_html, html)
final_path = await generate(final_html, False, "table", False)
if not isinstance(final_path, str):
return
send_path = Path(final_path).as_uri()
await lblock.finish(ms.image(send_path))
4 changes: 4 additions & 0 deletions src/plugins/bulletin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ async def _(event: GroupMessageEvent, args: Message = CommandArg()):
await bulletin_glad.finish("字数请控制在20字以内!")
else:
img = await get_bulletinG(msg, "G")
if not isinstance(img, str):
return
await bulletin_glad.finish(ms.image(img))

bulletin_sad = on_command("悲报", force_whitespace=True, priority=5)
Expand All @@ -33,4 +35,6 @@ async def _(event: GroupMessageEvent, args: Message = CommandArg()):
await bulletin_sad.finish("字数请控制在20字以内!")
else:
img = await get_bulletinG(msg, "S")
if not isinstance(img, str):
return
await bulletin_sad.finish(ms.image(img))
Loading

0 comments on commit 30b3faf

Please sign in to comment.