-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathstickermanager.py
394 lines (337 loc) · 15 KB
/
stickermanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
import os
import json
import asyncio
from time import time
from io import BytesIO
from pathlib import Path
from typing import Union, NamedTuple, Callable, Optional, Tuple
from PIL import Image
from telethon.tl.custom import Message, Button
from telethon.tl.types import (
InputStickerSetID, InputStickerSetShortName, InputStickerSetItem,
InputDocument, InputMediaUploadedDocument, InputPeerSelf
)
from telethon.tl.types.messages import StickerSet
from telethon.tl.functions.stickers import CreateStickerSetRequest, AddStickerToSetRequest
from telethon.tl.functions.messages import UploadMediaRequest, GetStickerSetRequest
from telethon.tl.functions.channels import GetParticipantRequest
from telethon.errors import StickersetInvalidError, UserNotParticipantError
from telethon import events, utils, TelegramClient, errors
POLL_TEMPLATE = (
'<a href="tg://user?id={sender_id}">{sender_name}</a> has suggested this '
'sticker be added to <a href="{pack_link}">the group\'s sticker pack</a> '
'with the emoji {emoji}.\n\n'
'<strong>Current result: {score}</strong>\n'
'For: {yes}\n'
'Against: {no}'
)
POLL_FINISHED_TEMPLATE = (
'<strong>This sticker poll finished with a final score of {score}. {result}</strong>\n\n'
'<a href="tg://user?id={sender_id}">{sender_name}</a> had suggested this '
'sticker be added to the group\'s sticker pack with the emoji {emoji}.\n\n'
'For: {yes}\n'
'Against: {no}'
)
POLL_DELETED_ANGER = (
'Wait… who the fu*ck deleted my sticker poll? 3:<'
)
VOTE_TEMPLATE = '<a href="tg://user?id={uid}">{displayname}</a> ({weight})'
RESULT_ADDED = 'The sticker has been added to <a href="{pack_link}">the pack</a>.'
RESULT_REJECTED = 'The sticker was rejected from <a href="{pack_link}">the pack</a>.'
UP = '\U0001f53c'
DOWN = '\U0001f53d'
UP_DAT = b'addsticker/+'
DOWN_DAT = b'addsticker/-'
POLL_TIMEOUT = ADD_COOLDOWN = 24 * 60 * 60
BASE_DIR = Path(__file__).resolve().parent
CONFIG_FILE = BASE_DIR / 'stickermanager.tsv'
CACHE_FILE = BASE_DIR / 'stickermanager.json'
DATA_FILE_FORMAT: str = 'stickermanager.{ts}.dat'
VoteData = NamedTuple('VoteData', weight=int, displayname=str)
Number = Union[int, float]
Scores = NamedTuple('Scores', sum=Number, yes=Number, no=Number, yes_count=int, no_count=int)
current_vote: Optional[dict] = None
current_vote_lock: asyncio.Lock = asyncio.Lock()
current_vote_status: asyncio.Event = asyncio.Event()
last_accepted: int = 0
sticker_pack = None
with open(CONFIG_FILE) as f:
WEIGHTS = {
int(uid) if uid.strip().isnumeric() else uid:
(0 if weight == '-' else float(weight), None if name == '-' else name)
for uid, weight, name
in (line.strip().split('\t', 2)
for line in f
if line and not line.startswith('#') and not line.isspace())
}
VOTES_REQUIRED = WEIGHTS.pop('votes required')[0]
DEFAULT_WEIGHT = WEIGHTS.pop('default weight')[0]
STICKER_PACK_TITLE = WEIGHTS.pop('sticker title')[1]
STICKER_PACK_SHORT_NAME = WEIGHTS.pop('sticker short')[1]
ADMIN_USER_ID = int(WEIGHTS.pop('sticker owner')[1])
ALLOWED_CHATS = [int(gid)
for gid in WEIGHTS.pop('allowed chats')[1].split(',')
if not gid.isspace()]
def load_cache() -> None:
global sticker_pack, current_vote, last_accepted
try:
with open(CACHE_FILE) as file:
data = json.load(file)
sp_data = data['sticker_pack']
if sp_data:
sticker_pack = InputStickerSetID(id=sp_data['id'], access_hash=sp_data['access_hash'])
cv_data = data['current_vote']
if cv_data:
current_vote = cv_data
current_vote['votes'] = {int(uid): VoteData(*data)
for uid, data in cv_data['votes'].items()}
last_accepted = data['last_accepted'] or 0
except OSError:
pass
def save_cache() -> None:
with open(CACHE_FILE, 'w') as file:
json.dump({
'sticker_pack': {
'id': sticker_pack.id,
'access_hash': sticker_pack.access_hash
} if sticker_pack else None,
'last_accepted': last_accepted,
'current_vote': current_vote,
}, file)
async def create_sticker_pack(bot: TelegramClient, item: InputStickerSetItem
) -> Tuple[bool, Optional[StickerSet]]:
try:
stickerset: StickerSet = await bot(GetStickerSetRequest(
InputStickerSetShortName(STICKER_PACK_SHORT_NAME)))
created = False
except StickersetInvalidError:
stickerset: StickerSet = await bot(CreateStickerSetRequest(
user_id=ADMIN_USER_ID,
title=STICKER_PACK_TITLE,
short_name=STICKER_PACK_SHORT_NAME,
stickers=[item]
))
created = True
global sticker_pack
sticker_pack = InputStickerSetID(id=stickerset.set.id,
access_hash=stickerset.set.access_hash)
save_cache()
return created, stickerset
async def add_sticker_to_pack(bot: TelegramClient) -> Tuple[StickerSet, InputDocument]:
global sticker_pack
animated = current_vote['animated']
if animated:
file = await bot.upload_file(current_vote['filepath'])
mime = 'application/x-tgsticker'
else:
img = Image.open(current_vote['filepath'])
w, h = img.size
if w > h:
img = img.resize((512, int(h * (512 / w))), Image.ANTIALIAS)
else:
img = img.resize((int((w * (512 / h))), 512), Image.ANTIALIAS)
dat = BytesIO()
img.save(dat, format='PNG')
file = await bot.upload_file(dat.getvalue())
mime = 'image/png'
os.remove(current_vote['filepath'])
file = InputMediaUploadedDocument(file, mime, [])
document = await bot(UploadMediaRequest(InputPeerSelf(), file))
document = utils.get_input_document(document)
item = InputStickerSetItem(document=document, emoji=current_vote['emoji'])
pack: Optional[StickerSet] = None
added = False
# TODO add support for animated stickers
if not sticker_pack:
added, pack = await create_sticker_pack(bot, item)
if not added:
pack = await bot(AddStickerToSetRequest(stickerset=sticker_pack, sticker=item))
return pack, utils.get_input_document(pack.documents[-1])
def get_template_data() -> dict:
def format_votes(cond: Callable[[Number], bool]) -> str:
return ', '.join(VOTE_TEMPLATE.format(uid=uid, displayname=displayname, weight=weight)
for uid, (weight, displayname) in current_vote['votes'].items()
if cond(weight))
return {
**current_vote,
'votes': '',
'yes': format_votes(lambda weight: weight > 0) or 'nobody',
'no': format_votes(lambda weight: weight < 0) or 'nobody',
'pack_link': f'https://t.me/addstickers/{STICKER_PACK_SHORT_NAME}',
}
def calculate_scores() -> Scores:
yes = 0.0
no = 0.0
yes_count = 0
no_count = 0
for vote in current_vote['votes'].values():
if vote.weight > 0:
yes += vote.weight
yes_count += 1
else:
no -= vote.weight
no_count += 1
return Scores(fancy_round(yes - no), fancy_round(yes), fancy_round(no), yes_count, no_count)
def fancy_round(val: Number) -> Number:
if isinstance(val, float) and val.is_integer():
return int(val)
return round(val, 2)
async def init(bot: TelegramClient, modules: dict) -> None:
utils = modules['utils']
@bot.on(events.NewMessage(pattern='#addsticker(?: (.+))?', chats=ALLOWED_CHATS))
async def start_poll(event: Union[events.NewMessage.Event, Message]) -> None:
if not event.is_reply:
return
elif current_vote:
poll = await bot.get_messages(current_vote['chat'], ids=current_vote['poll'])
if poll is not None:
await event.reply('There\'s already an ongoing sticker poll.')
return
else:
# Will attempt to edit the poll and fail with anger so there's
# no need to send the anger message here.
await _locked_finish_poll()
async with current_vote_lock:
await _locked_start_poll(event)
save_cache()
async def _locked_start_poll(event: Union[events.NewMessage.Event, Message]) -> None:
global current_vote
if current_vote:
await event.reply('There\'s already an ongoing sticker poll.')
return
elif last_accepted + ADD_COOLDOWN > int(time()):
await event.reply('Less than 24 hours have passed since the '
'previous sticker was added.')
return
emoji = event.pattern_match.group(1)
try:
_, sender_name = WEIGHTS[event.sender_id]
except KeyError:
await event.reply('Please upgrade to a Telethon OffTopic Premium '
'Membership to start sticker polls.')
return
orig_evt: Message = await event.get_reply_message()
# TODO add support for animated stickers
if not orig_evt.photo and (not orig_evt.sticker or
orig_evt.sticker.mime_type == 'application/x-tgsticker'):
return
if emoji is None:
if orig_evt.file.emoji:
emoji = orig_evt.file.emoji
else:
return
filename = Path(DATA_FILE_FORMAT.format(ts=int(time())))
await orig_evt.download_media(filename)
delete_task = asyncio.ensure_future(event.delete())
current_vote_status.clear()
current_vote = {
'chat': event.chat_id,
'started_at': int(time()),
'sender_id': event.sender_id,
'sender_name': sender_name,
'score': 0,
'emoji': emoji,
'votes': {},
'filepath': str(filename),
'animated': orig_evt.sticker and orig_evt.sticker.mime_type == 'application/x-tgsticker'
}
reply_evt: Message = await orig_evt.reply(
POLL_TEMPLATE.format_map(get_template_data()),
buttons=[Button.inline(UP, UP_DAT), Button.inline(DOWN, DOWN_DAT)],
parse_mode='html')
pin_task = asyncio.ensure_future(reply_evt.pin())
current_vote['poll'] = reply_evt.id
asyncio.ensure_future(wait_for_poll())
await asyncio.gather(delete_task, pin_task)
async def wait_for_poll(timeout: int = POLL_TIMEOUT) -> None:
try:
await asyncio.wait_for(current_vote_status.wait(), timeout=timeout)
except asyncio.TimeoutError:
async with current_vote_lock:
await _locked_finish_poll()
save_cache()
async def _locked_finish_poll() -> bool:
global current_vote, last_accepted
if not current_vote:
return False
unpin_task = asyncio.ensure_future(bot.pin_message(current_vote['chat'], message=None))
accepted = current_vote['score'] >= VOTES_REQUIRED
result_tpl = RESULT_ADDED if accepted else RESULT_REJECTED
current_vote['result'] = result_tpl.format_map(get_template_data())
try:
await bot.edit_message(current_vote['chat'], current_vote['poll'],
POLL_FINISHED_TEMPLATE.format_map(get_template_data()),
parse_mode='html')
except errors.MessageIdInvalidError:
# The poll was deleted, so cancel the related wait_for_poll task
# If this isn't done, the task would eventually close the new poll too early.
current_vote_status.set()
await bot.send_message(current_vote['chat'], POLL_DELETED_ANGER)
if accepted:
pack, document = await add_sticker_to_pack(bot)
await bot.send_file(current_vote['chat'], file=document, reply_to=current_vote['poll'])
last_accepted = int(time())
current_vote = None
try:
await unpin_task
except errors.ChatNotModifiedError:
pass # either poll was deleted or pin was removed anyhow else
return accepted
@bot.on(events.CallbackQuery(chats=ALLOWED_CHATS, data=lambda data: data in (UP_DAT, DOWN_DAT)))
async def vote_poll(event: events.CallbackQuery.Event) -> None:
if not current_vote or current_vote['poll'] != event.message_id:
await event.answer('That poll is closed.')
return
async with current_vote_lock:
await _locked_vote_poll(event)
save_cache()
async def _locked_vote_poll(event: events.CallbackQuery.Event) -> None:
global current_vote
if not current_vote or current_vote['poll'] != event.message_id:
await event.answer('That poll is closed.')
return
try:
await bot(GetParticipantRequest(channel=current_vote["chat"],
user_id=event.input_sender))
except UserNotParticipantError:
await event.answer('You\'re not participating in the chat.')
return
weight, displayname = WEIGHTS.get(event.sender_id, (DEFAULT_WEIGHT, None))
if weight == 0:
await event.answer('You don\'t have the permission to vote.')
return
if event.data == b'addsticker/-':
weight = -weight
displayname = displayname or utils.get_display(await event.get_sender())
try:
existing = current_vote['votes'][event.sender_id]
if existing.weight == weight:
await event.answer(f'You already voted {weight}')
return
except KeyError:
pass
current_vote['votes'][event.sender_id] = VoteData(weight=weight, displayname=displayname)
scores = calculate_scores()
current_vote['score'] = scores.sum
if abs(scores.sum) >= VOTES_REQUIRED:
current_vote_status.set()
accepted = await _locked_finish_poll()
res = 'accepted' if accepted else 'rejected'
await event.answer(f'Successfully voted {fancy_round(weight)},'
f' which made the sticker be {res} \U0001f389')
else:
await bot.edit_message(current_vote['chat'], current_vote['poll'],
POLL_TEMPLATE.format_map(get_template_data()),
buttons=[Button.inline(f'{UP} ({scores.yes_count})', UP_DAT),
Button.inline(f'{DOWN} ({scores.no_count})', DOWN_DAT)],
parse_mode='html')
await event.answer(f'Successfully voted {weight}')
load_cache()
if current_vote:
remaining_time = POLL_TIMEOUT - (int(time()) - current_vote['started_at'])
if remaining_time < 0:
async with current_vote_lock:
await _locked_finish_poll()
save_cache()
else:
asyncio.ensure_future(wait_for_poll(remaining_time))