-
Notifications
You must be signed in to change notification settings - Fork 1
/
bingbot.py
106 lines (82 loc) · 3.58 KB
/
bingbot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import asyncio
import re
from EdgeGPT import Chatbot
from telebot.async_telebot import AsyncTeleBot
# --------------------------------- EDIT BELOW THIS LINE ---------------------------------
# Insert your Bot token
BOT_TOKEN = '<INSERT YOUR TELEGRAM BOT TOKEN>'
bot = AsyncTeleBot(BOT_TOKEN)
# Update your cookies.json path here
cookies_path = '<INSERT YOUR COOKIES.JSON PATH>'
gbot = Chatbot(cookiePath=cookies_path)
# Add your telegram id to the list without @ symbol
authorized_id = ['<INSERT YOUR TELEGRAM USER NAME, NOT THE DISPLAY NAME>']
# --------------------------------- DO NOT EDIT BELOW THIS LINE ---------------------------------
def update_gbot():
global gbot
gbot = Chatbot(cookiePath=cookies_path)
async def bing_chat(prompt, is_ref=False):
response_dict = await gbot.ask(prompt=prompt)
if is_ref:
return response_dict['item']['messages'][1]["adaptiveCards"][0]["body"][0]["text"]
return re.sub(r'\[\^\d\^\]', '', response_dict['item']['messages'][1]['text'])
async def processing_message(message, p_msg=None, is_done=False):
if not is_done:
p_msg = await bot.send_message(message.chat.id, "🧠 Processing...")
await bot.send_chat_action(message.chat.id, 'typing')
return p_msg
else:
await bot.delete_message(message.chat.id, p_msg.message_id)
@bot.message_handler(commands=['ask'])
async def ask(message, is_ref=False, new_topic=False):
try:
username = message.from_user.username
if username not in authorized_id:
await bot.reply_to(message, "Not authorized to use this bot")
return
if new_topic:
update_gbot()
await bot.reply_to(message, "New topic started, start a new conversation with /ask <message>")
return
prompt = message.text.replace("/askref", "").replace("/ask", "")
print(f"Request received from {username} - {message.text}")
# Call processing_message() with p_msg=None to create a new message
pm = await processing_message(message, None)
if not prompt:
await bot.reply_to(message, "Empty query sent. Add your query /ask <message>")
else:
bot_response = await bing_chat(prompt, is_ref)
print(f"Response received - {bot_response}")
await bot.reply_to(message, bot_response.replace('?\n\n', ''))
# Call processing_message() with the message object to delete the message
await processing_message(message, pm, is_done=True)
except Exception as e:
print(f"Exception happened: {e}")
# Call processing_message() with the message object to delete the message
await processing_message(message, pm, is_done=True)
await bot.reply_to(message, "Bing is upset! Starting a new conversation...")
update_gbot()
@bot.message_handler(commands=['newtopic'])
async def newtopic(message):
await ask(message, new_topic=True)
@bot.message_handler(commands=['askref'])
async def askref(message):
await ask(message, is_ref=True)
@bot.message_handler(commands=['start'])
async def start(message):
try:
username = message.from_user.username
result = f"""
Welcome {username}!
Start chatting with the bot by sending /ask <message>
Chat with reference links by sending /askref <message>
Start a new topic by sending /newtopic
"""
await bot.send_message(message.chat.id, result)
except Exception as e:
print("Exception happened")
print(e)
async def main():
await bot.infinity_polling()
if __name__ == "__main__":
asyncio.run(main())