-
Notifications
You must be signed in to change notification settings - Fork 0
/
shared.py
169 lines (127 loc) · 4.13 KB
/
shared.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import queue
import os
from threading import Lock, RLock
import utils
from telethon.tl.types import MessageEntityBotCommand
import replacements
_max_message_length = int(os.getenv("MAX_MESSAGE_LENGTH"))
_default_elems_fetched = int(os.getenv("DEFAULT_ELEMS_FETCHED"))
_was_started_mutex = Lock()
_was_started = False
def on_successful_login():
with _was_started_mutex:
global _was_started
_was_started = True
def is_logged_in():
with _was_started_mutex:
return _was_started
_all_messages_mutex = RLock()
_all_messages = []
def get_all_messages_mutex():
return _all_messages_mutex
def get_all_messages():
return _all_messages
def get_amount_of_messages():
return len(_all_messages)
def get_nth_message(n):
return _all_messages[n]
def get_most_recent_message():
with _all_messages_mutex:
if len(_all_messages) > 0:
return _all_messages[0]
return None
def insert_message(message):
msg_content = get_message_content(message)
message.replaced_message_content = replacements.replace_message(msg_content)
with _all_messages_mutex:
utils.insert_sorted_list(_all_messages, message, lambda m: m.id)
if len(_all_messages) > _default_elems_fetched:
_all_messages.pop()
def get_replaced_message_content(message):
if message is None:
return ""
else :
return message.replaced_message_content
def get_message_by_id(id):
with _all_messages_mutex:
return next((m for m in _all_messages if m.id == id), None)
def remove_message_by_id(id):
with _all_messages_mutex:
global _all_messages
_all_messages = utils.filter_list(_all_messages, lambda msg: msg.id == id)
def get_reply_to_msg(msg):
if msg.reply_to is not None:
return get_message_by_id(msg.reply_to.reply_to_msg_id)
return None
def get_replies(msg):
replies = []
for m in _all_messages:
if m.reply_to is not None and m.reply_to.reply_to_msg_id == msg.id:
replies.append(m)
return replies
def has_message_replies(msg):
for m in _all_messages:
if m.reply_to is not None and m.reply_to.reply_to_msg_id == msg.id:
return True
# there can't be replies before the message was sent
if m.id == msg.id:
break
return False
def is_reply_to_msg(msg):
return msg.reply_to is not None
def contains_bot_command(msg):
if msg.entities is None:
return False
return any(isinstance(e, MessageEntityBotCommand) for e in msg.entities)
def was_message_answered_by_admin(msg):
for m in _all_messages:
if m.reply_to is not None and m.reply_to.reply_to_msg_id == msg.id and was_message_sent_by_admin(m):
return True
# there can't be replies before the message was sent
if m.id == msg.id:
break
return False
_admin_user_ids = []
_admin_user_ids_lock = RLock()
def insert_admin(user_id):
with _admin_user_ids_lock:
_admin_user_ids.append(user_id)
def is_admin(user_id):
with _admin_user_ids_lock:
return user_id in _admin_user_ids
def was_message_sent_by_admin(msg):
return is_admin(msg.from_id.user_id)
confirmation_code_queue = queue.Queue()
def get_message_content(msg):
type = get_message_type(msg)
content = getattr(msg, "message", None)
if type == "voice":
return "Sprachnachricht"
if type == "photo":
return "Bild"
return truncate_message(content)
def get_message_type(msg):
content = getattr(msg, "message", None)
if (content is None or len(content) == 0) and is_voice_message(msg):
return "voice"
if (content is None or len(content) == 0) and is_photo(msg):
return "photo"
return "text"
def is_multimedia_message_without_content(msg):
return getattr(msg, "message", None) != get_message_content(msg)
def is_voice_message(msg):
media = getattr(msg, "media", None)
document = getattr(media, "document", None)
return document is not None
def is_photo(msg):
media = getattr(msg, "media", None)
document = getattr(media, "photo", None)
return document is not None
def truncate_message(msg):
if msg is None:
return None
# 3 characters are appended
truncate_behind = _max_message_length + 3
if(len(msg) > truncate_behind):
msg = msg[:truncate_behind] + "..."
return msg