-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathtelegramexporter.py
869 lines (749 loc) · 43.3 KB
/
telegramexporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
from pyrogram import Client
from pyrogram.errors import FloodWait
from pyrogram.errors import ChatAdminRequired
from datetime import datetime
from classes import classes
import time
import os
import json
import shutil
import zipfile
import hashlib
from colorama import init
init()
_FORMAT_LOG_STRING = "{:40}§{:19}§{}§{}"
_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
_ALL_CHATS_HEADER_STRING = "USERNAME§NAME§PHONE_NUMBER§TIMESTAMP§MESSAGE§DETAILS (OPTIONAL)"
_OS_SEP = os.sep
# CURRENT EXTRACTION FOLDER
_EXTRACTION_FOLDER = ""
# PATH USED FOR THE EXTRACTION OF CHATS, MEDIA AND MEMBERS
_CHATS = "chats"
_DOWNLOAD_MEDIA_PATH = "media"
_MEMBERS_FILE_SUFFIX = "members"
_CHAT_PATH = ""
_MEDIA_PATH = ""
_MEMBERS_PATH = ""
# EXTRACTION ZIP AND HASH FILE
_EXTRACTION_ZIP = ""
_FILE_HASH = ""
def update_folders():
# DATETIME FOR EXTRACTION
_extraction_date = datetime.now().strftime("%d-%m-%Y %H-%M-%S")
# CURRENT EXTRACTION FOLDER
global _EXTRACTION_FOLDER
_EXTRACTION_FOLDER = "extraction" + _OS_SEP + "Extraction_" + _extraction_date
global _CHAT_PATH
_CHAT_PATH = _EXTRACTION_FOLDER + _OS_SEP + _CHATS
global _MEDIA_PATH
_MEDIA_PATH = _EXTRACTION_FOLDER + _OS_SEP + _DOWNLOAD_MEDIA_PATH
global _MEMBERS_PATH
_MEMBERS_PATH = _EXTRACTION_FOLDER + _OS_SEP + _MEMBERS_FILE_SUFFIX
# EXTRACTION ZIP AND HASH FILE
global _EXTRACTION_ZIP
_EXTRACTION_ZIP = _EXTRACTION_FOLDER + _OS_SEP + "extraction.zip"
global _FILE_HASH
_FILE_HASH = _EXTRACTION_FOLDER + _OS_SEP + "extraction_archive_hash.txt"
# Get the all messages in the chat with a given user
def get_chat_logs_by_identifier(client_instance, chat_identifier, directory_name):
"""
Iterates over all messages retrieved by the chat and generates the related logs;
if medias are found, it downloads them;
generates the list with data associated to chat partecipants
Args:
client_instance: Pyrogram Client, the main means for interacting with Telegram.
chat_identifier: the ID of the chat (username or chat_id)
directory_name: name of the directory into which create the logs and download medias
Returns:
formatted_log: list with chat logs (each element of the list is a chat log)
partecipants_ids: list with the ids associated to the partecipants of the chat
"""
partecipants_ids = list()
try:
for members in client_instance.iter_chat_members(chat_identifier):
partecipants_ids.append(members.user.id)
except Exception as e:
if e.__str__().__contains__("ChatParticipantsForbidden"):
print(f"[{classes.BColor.FAIL}write_all_members_channel_logs_file{classes.BColor.ENDC}] "
f"Members can not be retrieved because it's a channel or an old private group. \nIn the latter case, "
f"Telegram denies the possibility to get the full list of members;\n it's possible to show only users"
f"who wrote at least one message into the chat." + "\n\n")
# Retrieves the folder into which create the chat's media folder
json_config = open("configuration.json", "r")
load_json = json.load(json_config)
export_media = load_json["export_media"]
# Identifies the type of chat, to obtain the channel name in case of channel chats
chat_obj = None
while chat_obj is None:
try:
chat_obj = client_instance.get_chat(chat_identifier)
except FloodWait:
print(f"{classes.BColor.FAIL}[get_chat_logs_by_identifier] FloodWait exception may be fired by Telegram. "
f"Waiting 22s{classes.BColor.ENDC}")
time.sleep(22) # this value is specifically provided by Telegram,
# relating to the particular API calling which caused the exception
chat_title = ""
if chat_obj.type == "channel":
if chat_obj.username is not None:
chat_title = chat_obj.username
else:
chat_title = chat_obj.title
while True:
try:
formatted_log = list()
# Create a list with ALL messages exchanged with userIdentifier
chat = list()
# DEBUG: for message in client_instance.get_history(chat_identifier, limit=3): instead of for message in client.iter_history(chat_identifier):
for message in client_instance.iter_history(chat_identifier):
chat.append(message)
# Iterate over the previously created list
for msg in chat:
# export media if JSON is 1
if export_media == 1:
if msg.media:
try:
create_directory = _MEDIA_PATH
if not os.path.exists(create_directory):
os.mkdir(create_directory)
create_path = create_directory + _OS_SEP + directory_name + _OS_SEP
print(
f"[{classes.BColor.OKBLUE}get_chat_logs_by_identifier{classes.BColor.ENDC}] Downloading attached media...")
client_instance.download_media(msg, file_name=create_path)
except ValueError:
print(
f"[{classes.BColor.FAIL}get_chat_logs_by_identifier{classes.BColor.ENDC}] This media is not downloadable.")
except Exception as e:
print('Failed to download. Reason: {}'.format(e))
# Creates the log first column
if msg.from_user is not None:
_sender_username = classes.User(msg.from_user).to_string()
if msg.from_user.id not in partecipants_ids:
partecipants_ids.append(msg.from_user.id)
else:
_sender_username = chat_title
_formatted_message_date = datetime.utcfromtimestamp(msg.date).strftime(_TIME_FORMAT)
if msg.text is not None:
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date,
msg.text.replace('\r', ' ').replace('\n', ' '), "")
formatted_log.append(log_line)
elif msg.audio is not None:
audio_obj = classes.Audio(msg.audio)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Audio",
audio_obj.to_string())
formatted_log.append(log_line)
elif msg.document is not None:
doc_obj = classes.Document(msg.document)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Document",
doc_obj.to_string())
formatted_log.append(log_line)
elif msg.photo is not None:
photo_obj = classes.Photo(msg.photo)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Photo",
photo_obj.to_string())
formatted_log.append(log_line)
elif msg.sticker is not None:
sticker_obj = classes.Sticker(msg.sticker)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Sticker",
sticker_obj.to_string())
formatted_log.append(log_line)
elif msg.animation is not None:
animation_obj = classes.Animation(msg.animation)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Animation",
animation_obj.to_string())
formatted_log.append(log_line)
elif msg.game is not None:
game_obj = classes.Game(msg.game)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Game",
game_obj.to_string())
formatted_log.append(log_line)
elif msg.video is not None:
video_obj = classes.Video(msg.video)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Video",
video_obj.to_string())
formatted_log.append(log_line)
elif msg.voice is not None:
voice_obj = classes.Voice(msg.voice)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Voice message",
voice_obj.to_string())
formatted_log.append(log_line)
elif msg.video_note is not None:
videonote_obj = classes.Videonote(msg.video_note)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Video note",
videonote_obj.to_string())
formatted_log.append(log_line)
elif msg.contact is not None:
contact_obj = classes.Contact(msg.contact)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Contact",
contact_obj.to_string())
formatted_log.append(log_line)
elif msg.location is not None:
location_obj = classes.Location(msg.location)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Location",
location_obj.to_string())
formatted_log.append(log_line)
elif msg.venue is not None:
venue_obj = classes.Venue(msg.venue)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Venue",
venue_obj.to_string())
formatted_log.append(log_line)
elif msg.web_page is not None:
web_page_obj = classes.WebPage(msg.web_page)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Web page",
web_page_obj.to_string())
formatted_log.append(log_line)
elif msg.poll is not None:
poll_obj = classes.Poll(msg.poll)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Poll",
poll_obj.to_string())
formatted_log.append(log_line)
elif msg.dice is not None:
dice_obj = classes.Dice(msg.dice)
log_line = _FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date, "Dice",
dice_obj.to_string())
formatted_log.append(log_line)
elif msg.service is not None:
formatted_log.append(_FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date,
"Telegram service message", ""))
elif msg.empty is not None:
formatted_log.append(_FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date,
"Message was deleted", ""))
elif msg.caption is not None:
formatted_log.append(_FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date,
"Caption", msg.caption))
else:
formatted_log.append(_FORMAT_LOG_STRING.format(_sender_username, _formatted_message_date,
"Not possible to find the type of message", ""))
return formatted_log, partecipants_ids
except FloodWait:
print(f"{classes.BColor.FAIL}[get_chat_logs_by_identifier] FloodWait exception may be fired by Telegram. "
f"Waiting 29s{classes.BColor.ENDC}")
time.sleep(29) # this value is specifically provided by Telegram,
# relating to the particular API calling which caused the exception
def get_contact(client_instance, targets=None):
"""
Searches from matching chats with the ids entered by the user
The function distinguishes between “private”, “bot”, “group”, “supergroup” or “channel”.
Args:
client_instance: Pyrogram Client, the main means for interacting with Telegram.
targets: can be: list of full name, username or phone number for each user
Returns:
saved_contact: list of contacts saved into user's phone book
non_contact_chat_dict: list of non-contact: “bot”, “group”, “supergroup” or “channel”
"""
if targets is None:
targets = []
saved_contact = list()
non_contact_chat_dict = dict()
non_contact_type_dict = dict()
print(f"\n[{classes.BColor.OKBLUE}get_contact{classes.BColor.ENDC}] Retrieving all matching contacts\n")
# iterate over chats
for target in targets:
for dialog in client_instance.iter_dialogs():
# Users and bot are handled in the same way by Telegram
if dialog.chat.type == 'private' or dialog.chat.type == 'bot':
user = client_instance.get_users(dialog.chat.id)
first_name = '' if user["first_name"] is None else str(user["first_name"]).lower()
last_name = '' if user["last_name"] is None else str(user["last_name"]).lower()
phone_number = '' if user["phone_number"] is None else str(user["phone_number"]).lower()
username = '' if user["username"] is None else str(user["username"]).lower()
full_name = first_name + " " + last_name
is_present = True if target in full_name or target in username or target in phone_number else False
# if user still exists and the user has specified a name to search or if he wants all users
if (not user["is_deleted"]) and ((target != "" and is_present) or (target == "")):
print(f"[{classes.BColor.OKBLUE}get_contact{classes.BColor.ENDC}] "
f"Person chat match found{classes.BColor.ENDC}")
# add the dictionary to the resulting variable
saved_contact.append(user)
# in this case, if dialog.chat.type is not private
# else is "group", "supergroup" or "channel"
else:
title = dialog.chat.title
# for target in targets:
if target in title.lower():
print(f"[{classes.BColor.OKBLUE}get_contact{classes.BColor.ENDC}] " +
dialog.chat.type +
" chat match found")
non_contact_chat_dict[dialog.chat.id] = title
non_contact_type_dict[dialog.chat.id] = dialog.chat.type
return saved_contact, non_contact_chat_dict, non_contact_type_dict
def menu_get_contact(client_instance):
"""
Reads user input for single researches and gives a feedback about the research (chats found or not)
Args:
client_instance: Pyrogram Client, the main means for interacting with Telegram.
Returns:
the id of the chat, distinguished as user chat or non-user chat
"""
target_name = input("You can enter one of the following information: "
"\n- Phone Book name \n- Telegram username \n- Channel name \n- Group name "
"\n- Phone number (in this case remember to indicate also the phone prefix): "
"\n- Or press enter if you want to see a list of the chats"
"\n Please enter your decision: ")
# necessary [target_name.lower()] as list for method get_contact
users, non_user_dict, non_contact_type_dict = get_contact(client_instance, [target_name.lower()])
if not users and not bool(non_user_dict):
print(f"{classes.BColor.FAIL}No contacts found!{classes.BColor.ENDC}")
raise Exception("No contacts found")
key = 0
total_contacts_count = len(users) + len(non_user_dict)
if total_contacts_count > 1:
print(f"\n[{classes.BColor.OKBLUE}menu_get_contact{classes.BColor.ENDC}]"
f"{classes.BColor.WARNING} There are multiple matching chats. "
f"Which one do you want to choose?{classes.BColor.ENDC}\n")
for user in users:
chat_data_to_log = ""
if user.username is not None:
chat_data_to_log = chat_data_to_log + "Username: {} ".format(user.username)
if user.first_name is not None:
chat_data_to_log = chat_data_to_log + "First Name: {} ".format(user.first_name)
if user.last_name is not None:
chat_data_to_log = chat_data_to_log + "Last Name: {} ".format(user.last_name)
if user.phone_number is not None:
chat_data_to_log = chat_data_to_log + "Telephone number: {} ".format(user.phone_number)
print(f"[{classes.BColor.OKBLUE}*{classes.BColor.ENDC}] " + str(key) + " " + chat_data_to_log)
key += 1
for chat_id in non_user_dict:
print(f"[{classes.BColor.OKBLUE}*{classes.BColor.ENDC}] " + str(key) + " " + non_user_dict[chat_id] + " (" + str(non_contact_type_dict[chat_id]) + ")")
key += 1
select_key = True
while select_key != 0:
print(f"[{classes.BColor.OKBLUE}menu_get_contact{classes.BColor.ENDC}] Select number please: ")
try:
key = int(input())
if key < 0 or key >= len(users) + len(non_user_dict):
print(f"{classes.BColor.WARNING}[menu_get_contact] Invalid selection.{classes.BColor.ENDC}")
else:
select_key = False
except ValueError:
print(f"{classes.BColor.WARNING}[menu_get_contact] Please, insert a number.{classes.BColor.ENDC}")
# returns the chatId connected to the user/group/channel/etc.
if key < len(users):
return users[key].id, client_instance.get_chat(users[key].id).type
else:
chat_id = list(non_user_dict)[key - len(users)]
return chat_id, non_contact_type_dict[chat_id]
def menu_get_multiple_contact(client_instance):
"""
Reads user input (for multiple research) and splits it by ";"
Args:
client_instance: Pyrogram Client, the main means for interacting with Telegram.
Returns:
ids: list with ids of the chats
"""
target_name = str(input("User separator ';' to select multiple name.\n"
"Enter your decision: "))
non_user_dict = list()
users = list()
if target_name.__contains__(";"):
users_split = target_name.split(";")
users_split = [usr.lower() for usr in users_split]
users_split = [usr.strip() for usr in users_split]
users_split = list(set(users_split))
users, non_user_dict, non_contact_type_dict = get_contact(client_instance, users_split)
else:
print("Please, use ;")
if not users and not bool(non_user_dict):
print(f"{classes.BColor.FAIL}No contacts found!{classes.BColor.ENDC}")
raise Exception("No contacts found")
key = 0
ids = []
types_dict = dict()
for user in users:
chat_data_to_log = ""
if user.username is not None:
chat_data_to_log = chat_data_to_log + "Username: {} ".format(user.username)
if user.first_name is not None:
chat_data_to_log = chat_data_to_log + "First Name: {} ".format(user.first_name)
if user.last_name is not None:
chat_data_to_log = chat_data_to_log + "Last Name: {} ".format(user.last_name)
if user.phone_number is not None:
chat_data_to_log = chat_data_to_log + "Telephone number: {} ".format(user.phone_number)
print(f"[{classes.BColor.OKBLUE}*{classes.BColor.ENDC}] " + str(key) + " " + chat_data_to_log)
key += 1
ids.append(user.id)
types_dict[user.id] = client_instance.get_chat(user.id).type
for chat_id in non_user_dict:
print(f"[{classes.BColor.OKBLUE}*{classes.BColor.ENDC}] " + str(key) + " " + non_user_dict[chat_id] + " (" + str(non_contact_type_dict[chat_id]) + ")")
key += 1
ids.append(chat_id)
types_dict[chat_id] = non_contact_type_dict[chat_id]
return ids, types_dict
def get_multiple_chat_ids_by_dialogs(client_instance, multiple_ids_chats):
"""
Analyze the list of chat ids
Args:
client_instance: Pyrogram Client, the main means for interacting with Telegram.
multiple_ids_chats: list of chats ids to analyze.
Returns:
chat_ids_list: list of all chat ids to analyze
chat_id_usernames_dict: dictionary with chat_id as keys and usernames as values
chat_id_title_dict: dictionary with chat_id as keys and chat title as values
chat_id_full_name_dict: dictionary with chat_id as keys and full name (first name and last name) as values
chat_id_phone_number_dict: dictionary with chat_id as keys and phone number as values
"""
chat_ids_list = list()
chat_id_usernames_dict = dict()
chat_id_title_dict = dict()
chat_id_full_name_dict = dict()
chat_id_phone_number_dict = dict()
for ids_chats in multiple_ids_chats:
for dialog in client_instance.iter_dialogs():
# If user hasn't specified a particular user to extract or if he wants to extract a particular chat
if dialog.chat.id == ids_chats:
# if (single_chat_id is None) or (single_chat_id is not None and dialog.chat.id == sci):
if dialog.chat.username is not None:
chat_ids_list.append(dialog.chat.id)
chat_id_usernames_dict[dialog.chat.id] = dialog.chat.username
# Tries to get the person phone number retrieving his id;
# it's necessary a single-item list for get_users()
ids = list()
ids.append(dialog.chat.id)
user_obj_list = client_instance.get_users(ids)
if user_obj_list and user_obj_list[0].phone_number is not None:
chat_id_phone_number_dict[dialog.chat.id] = user_obj_list[0].phone_number
print(f"\n{classes.BColor.OKBLUE}[get_chat_ids_by_dialogs]{classes.BColor.ENDC}" +
" Retrieved chat with username: {}".format(dialog.chat.username))
if dialog.chat.title is not None and dialog.chat.id not in chat_ids_list:
chat_ids_list.append(dialog.chat.id)
chat_id_title_dict[dialog.chat.id] = dialog.chat.title
print(f"\n{classes.BColor.OKBLUE}[get_chat_ids_by_dialogs]{classes.BColor.ENDC}" +
" Retrieved chat with title: {}".format(dialog.chat.title))
if dialog.chat.first_name is not None and dialog.chat.id not in chat_ids_list:
if dialog.chat.id not in chat_ids_list:
chat_ids_list.append(dialog.chat.id)
# Identify the full name of the person who the chat relates to
formatted_name = dialog.chat.first_name
if dialog.chat.last_name is not None:
formatted_name = formatted_name + " " + dialog.chat.last_name
chat_id_full_name_dict[dialog.chat.id] = formatted_name
# Tries to get the person phone number retrieving his id
ids = list()
ids.append(dialog.chat.id)
user_obj_list = client_instance.get_users(ids)
if user_obj_list and user_obj_list[0].phone_number is not None:
chat_id_phone_number_dict[dialog.chat.id] = user_obj_list[0].phone_number
return chat_ids_list, chat_id_usernames_dict, chat_id_title_dict, chat_id_full_name_dict, chat_id_phone_number_dict
def get_chat_ids_by_dialogs(client_instance, single_chat_id=None):
"""
One specified chat or all chats
Args:
client_instance: Pyrogram Client, the main means for interacting with Telegram.
single_chat_id: if this param is None, all chats are retrieved; otherwise, only one chat is retrieved.
Returns:
chat_ids_list: list of all chat ids to analyze
chat_id_usernames_dict: dictionary with chat_id as keys and usernames as values
chat_id_title_dict: dictionary with chat_id as keys and chat title as values
chat_id_full_name_dict: dictionary with chat_id as keys and full name (first name and last name) as values
deleted_chat_ids: list of deleted chats' ids
chat_id_phone_number_dict: dictionary with chat_id as keys and phone number as values
"""
chat_ids_list = list()
chat_id_usernames_dict = dict()
chat_id_title_dict = dict()
chat_id_full_name_dict = dict()
chat_id_phone_number_dict = dict()
deleted_chat_ids = list()
chat_type_dict = dict()
for dialog in client_instance.iter_dialogs():
# If user hasn't specified a particular user to extract or if he wants to extract a particular chat
if (single_chat_id is None) or (single_chat_id is not None and dialog.chat.id == single_chat_id):
if dialog.chat.username is not None:
chat_ids_list.append(dialog.chat.id)
chat_id_usernames_dict[dialog.chat.id] = dialog.chat.username
# Tries to get the person phone number retrieving his id;
# it's necessary a single-item list for get_users()
ids = list()
ids.append(dialog.chat.id)
user_obj_list = client_instance.get_users(ids)
if user_obj_list and user_obj_list[0].phone_number is not None:
chat_id_phone_number_dict[dialog.chat.id] = user_obj_list[0].phone_number
print(f"\n{classes.BColor.OKBLUE}[get_chat_ids_by_dialogs]{classes.BColor.ENDC}" +
" Retrieved chat with username: {}".format(dialog.chat.username))
if dialog.chat.title is not None:
chat_ids_list.append(dialog.chat.id)
chat_id_title_dict[dialog.chat.id] = dialog.chat.title
print(f"\n{classes.BColor.OKBLUE}[get_chat_ids_by_dialogs]{classes.BColor.ENDC}" +
" Retrieved chat with title: {}".format(dialog.chat.title))
if dialog.chat.first_name is not None:
if dialog.chat.id not in chat_ids_list:
chat_ids_list.append(dialog.chat.id)
# Identify the full name of the person who the chat relates to
formatted_name = dialog.chat.first_name
if dialog.chat.last_name is not None:
formatted_name = formatted_name + " " + dialog.chat.last_name
chat_id_full_name_dict[dialog.chat.id] = formatted_name
# Tries to get the person phone number retrieving his id
ids = list()
ids.append(dialog.chat.id)
user_obj_list = client_instance.get_users(ids)
if user_obj_list and user_obj_list[0].phone_number is not None:
chat_id_phone_number_dict[dialog.chat.id] = user_obj_list[0].phone_number
if dialog.chat.username is None and dialog.chat.title is None and dialog.chat.first_name is None:
print("\n[get_chat_ids_by_dialogs] No info found for chat {}; "
"it means the other user deleted his account".format(dialog.chat.id))
deleted_chat_ids.append(dialog.chat.id)
chat_type_dict[dialog.chat.id] = dialog.chat.type
return chat_ids_list, chat_id_usernames_dict, chat_id_title_dict, \
chat_id_full_name_dict, deleted_chat_ids, chat_id_phone_number_dict, chat_type_dict
def write_all_chats_logs_file(client_instance, chat_ids_list, chat_id_usernames_dict, chat_id_title_dict,
chat_id_full_name_dict, deleted_chat_ids, chat_id_phone_number_dict, chat_type):
"""
Writes the chat logs for all chats (also deleted chats)
Args:
client_instance: Pyrogram Client, the main means for interacting with Telegram.
single_chat_id: if this param is None, all chats are retrieved; otherwise, only one chat is retrieved.
Returns:
chat_ids_list: list of all chat ids to analyze
chat_id_usernames_dict: dictionary with chat_id as keys and usernames as values
chat_id_title_dict: dictionary with chat_id as keys and chat title as values
chat_id_full_name_dict: dictionary with chat_id as keys and full name (first name and last name) as values
deleted_chat_ids: list of deleted chats' ids
chat_id_phone_number_dict: dictionary with chat_id as keys and phone number as values
"""
header_string = _ALL_CHATS_HEADER_STRING
# Create logs file for every contact on the phone
for chat_id in chat_ids_list:
chat_data_to_log = ""
if chat_id in chat_id_usernames_dict:
chat_data_to_log = chat_data_to_log + "{};".format(chat_id_usernames_dict[chat_id])
if chat_id in chat_id_full_name_dict:
chat_data_to_log = chat_data_to_log + "{};".format(chat_id_full_name_dict[chat_id])
if chat_id in chat_id_phone_number_dict:
chat_data_to_log = chat_data_to_log + "{};".format(chat_id_phone_number_dict[chat_id])
if chat_id in chat_id_title_dict:
chat_data_to_log = chat_data_to_log + "{};".format(chat_id_title_dict[chat_id])
# creating file name
file_name_prefix = ""
if chat_id in chat_id_usernames_dict:
file_name_prefix = file_name_prefix + "{}_".format(chat_id_usernames_dict[chat_id])
if chat_id in chat_id_title_dict:
file_name_prefix = file_name_prefix + "{}_".format(chat_id_title_dict[chat_id])
if chat_id in chat_id_full_name_dict:
file_name_prefix = file_name_prefix + "{}_".format(chat_id_full_name_dict[chat_id])
if chat_id in chat_id_phone_number_dict:
file_name_prefix = file_name_prefix + "{}_".format(chat_id_phone_number_dict[chat_id])
if type(chat_type) is str:
file_name_prefix = file_name_prefix + chat_type
else:
file_name_prefix = file_name_prefix + chat_type[chat_id]
# Removing illegal characters from file name name
file_name_prefix = (file_name_prefix.replace("\\", "_")).replace("/", "_")
# Creates the directory where to store medias
directory_name = file_name_prefix
file_name = file_name_prefix + ".csv"
file_name = _CHAT_PATH + _OS_SEP + file_name
# Logs about existing chats
print(f"[{classes.BColor.OKBLUE}write_all_chats_logs_file{classes.BColor.ENDC}]" +
" Processing chat with {}".format(chat_data_to_log))
log_lines, partecipants_ids = get_chat_logs_by_identifier(client_instance, chat_id, directory_name)
with open(file_name, 'w', encoding='utf-16') as file: # encoding necessary to correctly represent emojis
file.write(header_string)
for msgLog in log_lines:
file.write("\n" + msgLog)
# Partecipants file
if partecipants_ids:
print(f"[{classes.BColor.OKBLUE}write_all_chats_logs_file{classes.BColor.ENDC}] "
f"Processing members chats \n\n")
header = "USERNAME§NAME§PHONE NUMBER"
directory = _MEMBERS_PATH
if not os.path.exists(directory):
os.mkdir(directory)
saving_file_path = directory + _OS_SEP + file_name_prefix + ".csv"
with open(saving_file_path, "w", encoding="UTF-16") as file:
file.write(header + "\n")
for user in client_instance.get_users(partecipants_ids):
file.write(classes.User(user).to_string() + "\n" )
else:
print(f"[{classes.BColor.FAIL}write_all_members_channel_logs_file{classes.BColor.ENDC}] "
f"Members can not be retrieved because it's a channel or an old private group. \nIn the latter case, "
f"Telegram denies the possibility to get the full list of members;\n it's possible to show only users"
f"who wrote at least one message into the chat." + "\n\n")
# if there are deleted chats
if len(deleted_chat_ids) != 0:
# Logs about deleted chats
print(f"[{classes.BColor.OKBLUE}write_all_chats_logs_file{classes.BColor.ENDC}] Processing deleted chats \n\n")
for chat_id in deleted_chat_ids:
header_string = _ALL_CHATS_HEADER_STRING
directory_name = str(chat_id) + "_deleted"
file_name = str(chat_id) + "_deleted.csv"
file_name = _CHAT_PATH + _OS_SEP + file_name
print(f"[{classes.BColor.OKBLUE}write_all_chats_logs_file{classes.BColor.ENDC}] Processing "
+ str(chat_id) + " deleted chat")
log_lines, partecipants_ids = get_chat_logs_by_identifier(client_instance, chat_id, directory_name)
with open(file_name, 'w', encoding='utf-16') as file: # encoding necessary to correctly represent emojis
file.write(header_string)
for msgLog in log_lines:
file.write("\n" + msgLog)
def write_group_chats_members(client_instance, chat_title_list):
"""
Writes the log file with the partecipants of a chat.
Log is in format: FirstName_LastName_ID or Username_ID or FirstName_ID or FirstName_LastName_ID
Args:
client_instance: client instance
chat_title_list: the dictionary contained id and title for channel
"""
for chat_id in chat_title_list:
title = chat_title_list[chat_id]
list_username = list()
try:
for member in client_instance.get_chat_members(chat_id):
list_username.append(classes.User(member.user).to_string())
except AttributeError:
print(f"[{classes.BColor.FAIL}write_all_members_channel_logs_file{classes.BColor.ENDC}] "
f"This operation is Forbidden \n\n")
except ChatAdminRequired:
print(f"[{classes.BColor.FAIL}write_all_members_channel_logs_file{classes.BColor.ENDC}] "
f"This operation is allowed only by Admin \n\n")
if len(list_username) != 0:
print(f"[{classes.BColor.OKBLUE}write_all_members_channel_logs_file{classes.BColor.ENDC}] "
f"Processing members chats \n\n")
header = "MEMBERS"
# Removing illegal characters from file name name
file_name = (title.replace("\\", "_")).replace("/", "_")
name_file = file_name + ".csv"
directory = _MEMBERS_PATH
if not os.path.exists(directory):
os.mkdir(directory)
saved_file = directory + _OS_SEP + name_file
with open(saved_file, "w", encoding="UTF-16") as file:
file.write(header + "\n")
for username in list_username:
file.write(username)
else:
print(f"[{classes.BColor.FAIL}write_all_members_channel_logs_file{classes.BColor.ENDC}] "
f"No members into chat " + title + "\n\n")
def clean_extraction_folder():
"""
Cleans the entire extraction folder, deleting all previous extractions
"""
folder = "extraction"
print(f"[{classes.BColor.OKBLUE}clean_extraction_folder{classes.BColor.ENDC}] "
f"Removing files from folder " + folder)
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print('Failed to delete %s. Reason: %s' % (file_path, e))
print(f"[{classes.BColor.OKBLUE}clean_extraction_folder{classes.BColor.ENDC}] Folder cleaned successfully\n")
def create_extraction_folders():
"""
Creates the extraction folders used to save extracted chats, media and members
"""
print(f"[{classes.BColor.OKBLUE}create_extraction_folders{classes.BColor.ENDC}] Creating extraction folders")
# creating chat path
if not os.path.exists(_CHAT_PATH):
os.makedirs(_CHAT_PATH)
# creating members path
if not os.path.exists(_MEMBERS_PATH):
os.makedirs(_MEMBERS_PATH)
# creating media path
if not os.path.exists(_MEDIA_PATH):
os.makedirs(_MEDIA_PATH)
print(
f"[{classes.BColor.OKBLUE}create_extraction_folders{classes.BColor.ENDC}] Extraction folders created successfully")
def compress_and_hash_extraction():
"""
Creates a zip archive with the content of the current extraction
and a txt file wish the hashes of the archive in MD5 and SHA512
"""
print(
f"[{classes.BColor.OKBLUE}compress_and_hash_extraction{classes.BColor.ENDC}] Creating extraction zip archive...")
try:
zip_file = zipfile.ZipFile(_EXTRACTION_ZIP, 'w', zipfile.ZIP_DEFLATED)
for root, dirs, files in os.walk(_CHAT_PATH):
for file in files:
zip_file.write(os.path.join(root, file))
for root, dirs, files in os.walk(_MEDIA_PATH):
for file in files:
zip_file.write(os.path.join(root, file))
for root, dirs, files in os.walk(_MEMBERS_PATH):
for file in files:
zip_file.write(os.path.join(root, file))
zip_file.close()
print(
f"[{classes.BColor.OKBLUE}compress_and_hash_extraction{classes.BColor.ENDC}] Extraction zip archive created successfully")
except Exception:
print(f"{classes.BColor.FAIL}Error creating zip archive{classes.BColor.ENDC}")
try:
print(f"[{classes.BColor.OKBLUE}compress_and_hash_extraction{classes.BColor.ENDC}] Creating zip hashes...")
sha512_hash = hashlib.sha512()
md5_hash = hashlib.md5()
with open(_EXTRACTION_ZIP, "rb") as f:
# Read and update hash string value in blocks of 4K
for byte_block in iter(lambda: f.read(4096), b""):
sha512_hash.update(byte_block)
md5_hash.update(byte_block)
sha = sha512_hash.hexdigest()
md5 = md5_hash.hexdigest()
with open(_FILE_HASH, 'w', encoding='utf-16') as file:
file.write('MD5: ' + md5)
file.write('\nSHA512: ' + sha)
print(
f"[{classes.BColor.OKBLUE}compress_and_hash_extraction{classes.BColor.ENDC}] Zip hashes created successfully\n")
except Exception:
print(f"{classes.BColor.FAIL}Error creating hash file{classes.BColor.ENDC}")
def show_banner():
print(" _______ _ ______ _ \n"
"|__ __| | | | ____| | | \n"
" | | ___| | ___ __ _ _ __ __ _ _ __ ___ | |__ __ ___ __ ___ _ __| |_ ___ _ __ \n"
" | |/ _ \ |/ _ \/ _` | '__/ _` | '_ ` _ \ | __| \ \/ / '_ \ / _ \| '__| __/ _ \ '__|\n"
" | | __/ | __/ (_| | | | (_| | | | | | | | |____ > <| |_) | (_) | | | || __/ | \n"
" |_|\___|_|\___|\__, |_| \__,_|_| |_| |_| |______/_/\_\ .__/ \___/|_| \__\___|_| \n"
" __/ | | | \n"
" |___/ |_| -By DMD \n"
)
if __name__ == "__main__":
show_banner()
response = -1
# Create an instance of the pyrogram client
if os.path.exists("extraction"):
clean_folder = input("Do you want to clean extraction folder from previous extractions files? (y/N): ")
if clean_folder == 'y':
clean_extraction_folder()
while response != 0:
update_folders()
with Client("my_account", hide_password=True) as client:
try:
type_of_extraction = int(input("\nEnter: \n[1] to extract the chats for a single user "
" \n[2] to extract the chats for multiple users"
" \n[3] to extract all chats"
" \n[-1] to quit"
" \nPlease enter your choice: "))
if type_of_extraction == 1:
create_extraction_folders()
# Get chat logs for a user-specified chat
chatId, chat_type = menu_get_contact(client)
chatIdsList, chatIdUsernamesDict, chatIdTitleDict, chatIdFullNameDict, deletedChatIds, chatIdPhoneNumberDict, chat_type_list = get_chat_ids_by_dialogs(
client, chatId)
write_all_chats_logs_file(client, chatIdsList, chatIdUsernamesDict, chatIdTitleDict,
chatIdFullNameDict, deletedChatIds, chatIdPhoneNumberDict, chat_type)
compress_and_hash_extraction()
elif type_of_extraction == 2:
create_extraction_folders()
chatIds, chat_types = menu_get_multiple_contact(client)
chatIdsList, chatIdUsernamesDict, chatIdTitleDict, chatIdFullNameDict, chatIdPhoneNumberDict = \
get_multiple_chat_ids_by_dialogs(client, chatIds)
write_all_chats_logs_file(client, chatIdsList, chatIdUsernamesDict, chatIdTitleDict,
chatIdFullNameDict, [], chatIdPhoneNumberDict, chat_types)
compress_and_hash_extraction()
elif type_of_extraction == 3:
create_extraction_folders()
# Get chat logs for all chats
chatIdsList, chatIdUsernamesDict, chatIdTitleDict, chatIdFullNameDict, deletedChatIds, \
chatIdPhoneNumberDict, chat_type_dict = get_chat_ids_by_dialogs(client)
write_all_chats_logs_file(client, chatIdsList, chatIdUsernamesDict, chatIdTitleDict, chatIdFullNameDict,
deletedChatIds, chatIdPhoneNumberDict, chat_type_dict)
compress_and_hash_extraction()
elif type_of_extraction == -1:
response = 0
else:
print("Please select a correct number.")
except ValueError:
print("Please select a correct number.")
except Exception as e:
if not e.__str__().__contains__("No contacts found"):
print(e.__str__())