-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathds_bot.py
81 lines (66 loc) · 2.15 KB
/
ds_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import asyncio
import os
import serial
import threading
import serial.tools.list_ports
from dotenv import load_dotenv
from discord import *
from discord.ext import commands
from functools import wraps, partial
load_dotenv()
intents = Intents.all()
intents.message_content = True
intents.members = True
colors = {
"white": (0b111, '⬜'),
"red": (0b100, '🟥'),
"yellow": (0b110, '🟨'),
"green": (0b010, '🟩'),
"cylan": (0b011, '🏞️'),
"blue": (0b001, '🟦'),
"magenta": (0b101, '🟪'),
"black": (0b000, '⬛')
}
ports = list(serial.tools.list_ports.comports())
print(list(map(lambda x: x.name, ports)))
cmds = []
ard = serial.Serial('COM3', 9600)
if not ard.is_open:
ard.open()
bot = commands.Bot(command_prefix='!', case_insensitive=True, intents=intents)
def t(port):
global cmds
while True: # Send command if possible
if cmds:
port.write(bytes([cmds.pop(0)]))
def async_run(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_running_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
@bot.event
async def on_ready():
print('Working...')
print(f"{bot.user} is ready and online!")
@bot.event
async def on_ready():
print(f"{bot.user} is ready and online!")
async def get_colors(ctx: AutocompleteContext):
return [f'{t} {e}' for t, (_, e) in colors.items()]
@bot.slash_command(name="set", description="Устаноить цвет светодиода")
async def set_color(ctx, c: Option(str, "Введите цвет", required=True, autocomplete=utils.basic_autocomplete(get_colors))): # type: ignore
if c := c.split()[0]:
await ctx.send(f"Color {c} {colors[c][1]} turned on")
cmds.append(colors[c][0])
else:
await ctx.send("Error")
try:
await ctx.message.delete()
except:
print("Delete failed")
event_loop = asyncio.get_event_loop()
threading.Thread(target=t, args=(ard,)).start()
event_loop.run_until_complete(bot.start(token=os.environ.get('ds_token')))