-
Notifications
You must be signed in to change notification settings - Fork 6
/
remind_me.py
350 lines (294 loc) · 13.9 KB
/
remind_me.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
"""Contains cog classes for any remind_me interactions."""
from collections.abc import Sequence
__all__: Sequence[str] = ("ClearRemindersBacklogTaskCog", "RemindMeCommandCog")
import datetime
import functools
import itertools
import logging
import re
from collections.abc import Set
from logging import Logger
from typing import TYPE_CHECKING, Final, override
import discord
import parsedatetime
from discord.ext import tasks
from django.core.exceptions import ValidationError
from django.utils import timezone
from db.core.models import DiscordMember, DiscordReminder
from utils import TeXBot, TeXBotApplicationContext, TeXBotAutocompleteContext, TeXBotBaseCog
if TYPE_CHECKING:
import time
from collections.abc import Iterator
logger: Final[Logger] = logging.getLogger("TeX-Bot")
class RemindMeCommandCog(TeXBotBaseCog):
"""Cog class that defines the "/remind-me" command and its call-back method."""
@staticmethod
async def autocomplete_get_delays(ctx: TeXBotAutocompleteContext) -> Set[discord.OptionChoice] | Set[str]: # noqa: C901, PLR0912, PLR0915, E501
"""
Autocomplete callable that generates the common delay input values.
The delay entered by a member in the "remind_me" slash-command must be within this set
of common delay input values.
"""
if not ctx.value:
return {
"in 5 minutes",
"1 hours time",
"1min",
"30 secs",
"2 days time",
"22/9/2040",
"5h",
}
SECONDS_CHOICES: Final[frozenset[str]] = frozenset({"s", "sec", "second"})
MINUTES_CHOICES: Final[frozenset[str]] = frozenset({"m", "min", "minute"})
HOURS_CHOICES: Final[frozenset[str]] = frozenset({"h", "hr", "hour"})
DAYS_CHOICES: Final[frozenset[str]] = frozenset({"d", "dy", "day"})
WEEKS_CHOICES: Final[frozenset[str]] = frozenset({"w", "wk", "week"})
YEARS_CHOICES: Final[frozenset[str]] = frozenset({"y", "yr", "year"})
TIME_CHOICES: Final[frozenset[str]] = (
SECONDS_CHOICES
| MINUTES_CHOICES
| HOURS_CHOICES
| DAYS_CHOICES
| WEEKS_CHOICES
| YEARS_CHOICES
)
delay_choices: set[str] = set()
if re.fullmatch(r"\Ain? ?\Z", ctx.value):
FORMATTED_TIME_NUMS: Final[Iterator[tuple[int, str, str]]] = itertools.product(
range(1, 150),
{"", " "},
{"", "s"},
)
time_num: int
joiner: str
has_s: str
for time_num, joiner, has_s in FORMATTED_TIME_NUMS:
delay_choices.update(
f"{time_num}{joiner}{time_choice}{has_s}"
for time_choice in TIME_CHOICES
if not (len(time_choice) <= 1 and has_s)
)
return {f"in {delay_choice}" for delay_choice in delay_choices}
match: re.Match[str] | None
if match := re.fullmatch(r"\Ain (?P<partial_date>\d{0,3})\Z", ctx.value):
for joiner, has_s in itertools.product({"", " "}, {"", "s"}):
delay_choices.update(
f"""{match.group("partial_date")}{joiner}{time_choice}{has_s}"""
for time_choice in TIME_CHOICES
if not (len(time_choice) <= 1 and has_s)
)
return {f"in {delay_choice}" for delay_choice in delay_choices}
current_year: int = discord.utils.utcnow().year
if re.fullmatch(r"\A\d{1,3}\Z", ctx.value):
for joiner, has_s in itertools.product({"", " "}, {"", "s"}):
delay_choices.update(
f"{joiner}{time_choice}{has_s}"
for time_choice in TIME_CHOICES
if not (len(time_choice) <= 1 and has_s)
)
if 1 <= int(ctx.value) <= 31:
FORMATTED_DAY_DATE_CHOICES: Final[Iterator[tuple[int, int, str]]] = (
itertools.product(
range(1, 12),
range(current_year, current_year + 40),
("/", " / ", "-", " - ", ".", " . "),
)
)
month: int
year: int
for month, year, joiner in FORMATTED_DAY_DATE_CHOICES:
delay_choices.add(f"{joiner}{month}{joiner}{year}")
if month < 10:
delay_choices.add(f"{joiner}0{month}{joiner}{year}")
elif match := re.fullmatch(r"\A\d{1,3}(?P<ctx_time_choice> ?[A-Za-z]*)\Z", ctx.value):
FORMATTED_TIME_CHOICES: Final[Iterator[tuple[str, str, str]]] = itertools.product(
{"", " "},
TIME_CHOICES,
{"", "s"},
)
time_choice: str
for joiner, time_choice, has_s in FORMATTED_TIME_CHOICES:
if has_s and len(time_choice) <= 1:
continue
formatted_time_choice: str = joiner + time_choice + has_s
slice_size: int
for slice_size in range(1, len(formatted_time_choice) + 1):
if match.group("ctx_time_choice").casefold() == formatted_time_choice[:slice_size]: # noqa: E501
delay_choices.add(formatted_time_choice[slice_size:])
elif match := re.fullmatch(r"\A(?P<date>\d{1,2}) ?[/\-.] ?\Z", ctx.value):
if 1 <= int(match.group("date")) <= 31:
FORMATTED_DAY_AND_JOINER_DATE_CHOICES: Final[Iterator[tuple[int, int, str]]] = ( # noqa: E501
itertools.product(
range(1, 12),
range(current_year, current_year + 40),
("/", " / ", "-", " - ", ".", " . "),
)
)
for month, year, joiner in FORMATTED_DAY_AND_JOINER_DATE_CHOICES:
delay_choices.add(f"{month}{joiner}{year}")
if month < 10:
delay_choices.add(f"0{month}{joiner}{year}")
elif match := re.fullmatch(r"\A(?P<date>\d{1,2}) ?[/\-.] ?(?P<month>\d{1,2})\Z", ctx.value): # noqa: E501
if 1 <= int(match.group("date")) <= 31 and 1 <= int(match.group("month")) <= 12:
for year in range(current_year, current_year + 40):
for joiner in ("/", " / ", "-", " - ", ".", " . "):
delay_choices.add(f"{joiner}{year}")
elif match := re.fullmatch(r"\A(?P<date>\d{1,2}) ?[/\-.] ?(?P<month>\d{1,2}) ?[/\-.] ?\Z", ctx.value): # noqa: E501
if 1 <= int(match.group("date")) <= 31 and 1 <= int(match.group("month")) <= 12:
for year in range(current_year, current_year + 40):
delay_choices.add(f"{year}")
elif match := re.fullmatch(r"\A(?P<date>\d{1,2}) ?[/\-.] ?(?P<month>\d{1,2}) ?[/\-.] ?(?P<partial_year>\d{1,3})\Z", ctx.value): # noqa: E501
if 1 <= int(match.group("date")) <= 31 and 1 <= int(match.group("month")) <= 12:
for year in range(current_year, current_year + 40):
delay_choices.add(f"{year}"[len(match.group("partial_year")):])
return {f"{ctx.value}{delay_choice}".casefold() for delay_choice in delay_choices}
@discord.slash_command( # type: ignore[no-untyped-call, misc]
name="remind-me",
description="Responds with the given message after the specified time.",
)
@discord.option( # type: ignore[no-untyped-call, misc]
name="delay",
input_type=str,
description="The amount of time to wait before reminding you.",
required=True,
autocomplete=discord.utils.basic_autocomplete(autocomplete_get_delays), # type: ignore[arg-type]
)
@discord.option( # type: ignore[no-untyped-call, misc]
name="message",
input_type=str,
description="The message you want to be reminded with.",
required=False,
)
async def remind_me(self, ctx: TeXBotApplicationContext, delay: str, message: str) -> None:
"""
Definition & callback response of the "remind_me" command.
The "remind_me" command responds with the given message after the specified time.
"""
# noinspection PyTypeChecker
parsed_time: tuple[time.struct_time, int] = parsedatetime.Calendar().parseDT(
delay,
tzinfo=timezone.get_current_timezone(),
)
if parsed_time[1] == 0:
await self.command_send_error(
ctx,
message=(
f"""The value provided in the {"delay"!r} argument was not a time/date."""
),
)
return
if message:
message = re.sub(r"<@[&#]?\d+>", "@...", message.strip())
try:
reminder: DiscordReminder = await DiscordReminder.objects.acreate(
discord_id=ctx.user.id,
message=message or "",
channel_id=ctx.channel_id,
send_datetime=parsed_time[0],
channel_type=ctx.channel.type,
)
except ValidationError as create_discord_reminder_error:
ERROR_IS_ALREADY_EXISTS: Final[bool] = bool(
"__all__" in create_discord_reminder_error.message_dict
and any(
"already exists" in error
for error in create_discord_reminder_error.message_dict["__all__"]
) # noqa: COM812
)
if not ERROR_IS_ALREADY_EXISTS:
await self.command_send_error(ctx, message="An unrecoverable error occurred.")
logger.critical(
"Error when creating DiscordReminder object: %s",
create_discord_reminder_error,
)
await self.bot.close()
await self.command_send_error(
ctx,
message="You already have a reminder with that message in this channel!",
)
return
await ctx.respond("Reminder set!", ephemeral=True)
await discord.utils.sleep_until(reminder.send_datetime)
user_mention: str | None = None
if ctx.guild:
user_mention = ctx.user.mention
await ctx.send_followup(reminder.get_formatted_message(user_mention))
await reminder.adelete()
class ClearRemindersBacklogTaskCog(TeXBotBaseCog):
"""Cog class that defines the clear_reminders_backlog task."""
@override
def __init__(self, bot: TeXBot) -> None:
"""Start all task managers when this cog is initialised."""
self.clear_reminders_backlog.start()
super().__init__(bot)
@override
def cog_unload(self) -> None:
"""
Unload hook that ends all running tasks whenever the tasks cog is unloaded.
This may be run dynamically or when the bot closes.
"""
self.clear_reminders_backlog.cancel()
@tasks.loop(minutes=15)
async def clear_reminders_backlog(self) -> None:
"""Recurring task to send any late Discord reminders still stored in the database."""
TEXTABLE_CHANNEL_TYPES: Final[frozenset[discord.ChannelType]] = frozenset(
{
discord.ChannelType.text,
discord.ChannelType.group,
discord.ChannelType.public_thread,
discord.ChannelType.private_thread,
},
)
reminder: DiscordReminder
async for reminder in DiscordReminder.objects.select_related("discord_member").all():
time_since_reminder_needed_to_be_sent: datetime.timedelta = (
discord.utils.utcnow() - reminder.send_datetime
)
if time_since_reminder_needed_to_be_sent > datetime.timedelta(minutes=15):
user: discord.User | None = discord.utils.find(
functools.partial(
lambda _user, _reminder: (
not _user.bot
and DiscordMember.hash_discord_id(_user.id) == _reminder.discord_member.hashed_discord_id # noqa: E501
),
_reminder=reminder,
),
self.bot.users,
)
if not user:
logger.warning(
"User with hashed user ID: %s no longer exists.",
reminder.discord_member.hashed_discord_id, # type: ignore[has-type]
)
await reminder.adelete()
continue
# noinspection PyUnresolvedReferences
channel: discord.PartialMessageable = self.bot.get_partial_messageable(
reminder.channel_id,
type=(
discord.ChannelType(reminder.channel_type.value)
if reminder.channel_type
else None
),
)
user_mention: str | None = None
if channel.type in TEXTABLE_CHANNEL_TYPES:
user_mention = user.mention
elif channel.type != discord.ChannelType.private:
logger.critical(
ValueError(
"Reminder's channel_id must refer to a valid text channel/DM.",
),
)
await self.bot.close()
await channel.send(
"**Sorry it's a bit late! "
"(I'm just catching up with some reminders I missed!)**\n\n"
f"{reminder.get_formatted_message(user_mention)}",
)
await reminder.adelete()
@clear_reminders_backlog.before_loop
async def before_tasks(self) -> None:
"""Pre-execution hook, preventing any tasks from executing before the bot is ready."""
await self.bot.wait_until_ready()