-
Notifications
You must be signed in to change notification settings - Fork 1
/
telegram.py
174 lines (144 loc) · 6.51 KB
/
telegram.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from io import BytesIO
import requests
from translation_functions import translate_ru_to_ua
from settings import settings, LOG, bot
import re
from html.parser import HTMLParser
class MyHTMLParser(HTMLParser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tags = []
def handle_starttag(self, tag, attrs):
self.tags.append(tag)
def handle_endtag(self, tag):
if self.tags and self.tags[-1] == tag:
self.tags.pop()
def check_html_tags(text):
parser = MyHTMLParser()
parser.feed(text)
return parser.tags
def split_text_by_length(text, length):
lines = text.split('\n')
split_texts = []
current_text = ''
for line in lines:
if len(current_text) + len(line) + 1 > length: # plus 1 for the newline character
# Check for unclosed tags
unclosed_tags = check_html_tags(current_text)
if unclosed_tags:
print(f'Warning: found unclosed HTML tags in text: {unclosed_tags}')
split_texts.append(current_text)
current_text = line
else:
current_text = current_text + '\n' + line if current_text else line
if current_text: # add the last chunk of text
# Check for unclosed tags
unclosed_tags = check_html_tags(current_text)
if unclosed_tags:
print(f'Warning: found unclosed HTML tags in text: {unclosed_tags}')
split_texts.append(current_text)
return split_texts
import requests
from io import BytesIO
import threading
def download_image(image_url, timeout=10):
image = None
try:
print("Downloading image...")
response = requests.get(image_url, timeout=timeout)
if response.status_code == 200:
image_data = response.content
image = BytesIO(image_data)
else:
print(f"Failed to download image from {image_url}")
except requests.exceptions.Timeout:
print("Image download timed out. Changing the URL...")
image_url = 'https://static.komputronik.pl/product-picture/11/NINTENDOSWITCHRB19-1.jpg'
response = requests.get(image_url)
if response.status_code == 200:
image_data = response.content
image = BytesIO(image_data)
else:
print(f"Failed to download image from {image_url}")
except Exception as e:
print(f"An error occurred: {e}")
return image
def send_to_telegram(title_with_link, image_url, magnet_link, description):
try:
message_text = f"{title_with_link}\n\n<b>Скачать</b>: <code>{magnet_link}</code>\n\n{description}"
if LOG: print("message_text:\n", message_text)
image = download_image(image_url)
for group in settings['GROUPS']:
chat_id = group['chat_id']
topic_id = group['topic_id']
group_name = group['group_name']
group_lang = group['language']
print(f"Obtaining message to {group_name}... in {group_lang} language")
if group_lang == "UA":
message_text = translate_ru_to_ua(message_text)
print("Translated to UA")
if image:
image.seek(0)
message_parts = split_text_for_telegram(message_text, group_lang)
if len(message_parts) > 1:
print(f"Sending photo with truncated caption to {group_name}...")
print(f"Current chat_id: {chat_id}, topic_id: {topic_id}")
if LOG: print("message_parts[0]:\n", message_parts[0])
bot.send_photo(chat_id=chat_id, message_thread_id=topic_id, photo=image, caption=message_parts[0], parse_mode="HTML")
print(f"Sending message with remaining text to {group_name}...")
if len(message_parts[1]) > 4000:
split_parts = split_text_by_length(message_parts[1], 4000)
for i, part in enumerate(split_parts):
if LOG: print(f"message_parts[{i}]:\n", part)
bot.send_message(chat_id=chat_id, message_thread_id=topic_id, text=part, parse_mode="HTML")
else:
bot.send_message(chat_id=chat_id, message_thread_id=topic_id, text=message_parts[1], parse_mode="HTML")
else:
print(f"Sending message with photo to {group_name}...")
if LOG: print("message_text:\n", message_text)
bot.send_photo(chat_id=chat_id, message_thread_id=topic_id, photo=image, caption=message_text, parse_mode="HTML")
else:
if LOG: print("message_text:\n", message_text)
print(f"Sending message without photo to {group_name}...")
bot.send_message(chat_id=chat_id, message_thread_id=topic_id, text=message_text, parse_mode="HTML")
except Exception as e:
raise
def split_text_for_telegram(text, language):
if len(text) > 900:
print("Text is long enough: " + str(len(text)))
if language == "RU":
split_index = text.find("<b>Описание")
print("Found <b>Описание")
print("split_index:", split_index)
elif language == "UA":
split_index = text.find("<b>Опис")
print("Found <b>Опис")
print("split_index:", split_index)
else:
print("Text is too short")
return [text]
first_message_text = text[:split_index].strip()
second_message_text = text[split_index:].strip()
if LOG:
print("first_message_text:\n", first_message_text)
print("second_message_text:\n", second_message_text)
return [first_message_text, second_message_text]
def send_message_to_telegram(message):
message_text = f"{message}"
error_tg_list = settings['ERROR_TG']
for error_tg in error_tg_list:
chat_id = error_tg['chat_id']
tg_url = f"https://api.telegram.org/bot{settings['TELEGRAM_BOT_TOKEN']}/sendMessage"
data = {
'chat_id': chat_id,
'text': message_text,
'parse_mode': 'HTML',
'disable_web_page_preview': True
}
try:
response = requests.post(tg_url, data=data)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Failed to send error message to Telegram group: {e}")
def send_error_to_telegram(error_message):
send_message_to_telegram(error_message)