-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
252 lines (201 loc) · 8.81 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import datetime
import http.client as httplib
import json
import os
import re
import time
import urllib.parse
from http.cookies import SimpleCookie
from pathlib import Path
import openpyxl
import requests
from PIL import Image, UnidentifiedImageError
from bs4 import BeautifulSoup
from openpyxl.worksheet.worksheet import Worksheet
from rich.console import Console
from urllib3.exceptions import ReadTimeoutError
def have_internet() -> bool:
"""
Checks if internet connection is ON
:return: True if it can read headers of google DNS server (8.8.8.8)
"""
conn = httplib.HTTPSConnection("8.8.8.8", timeout=5)
try:
conn.request("HEAD", "/")
return True
except Exception:
return False
finally:
conn.close()
def get_excel_sheet(filename: str) -> Worksheet:
"""
Get active worksheet
:param filename: path to the file
:return: worksheet
"""
wb = openpyxl.load_workbook(filename, read_only=True)
return wb.active
def strfdelta(tdelta, fmt):
"""
Format timedelta
:param tdelta: timedelta
:param fmt: format i.e. {hours}:{minutes}:{seconds}
"""
d = {"days": tdelta.days}
d["hours"], rem = divmod(tdelta.seconds, 3600)
d["minutes"], d["seconds"] = divmod(rem, 60)
return fmt.format(**d)
def print_status(start_time: datetime, current_position: int, total_positions: int, passed_positions: int = 0) -> str:
"""
Count current position and calculate time till all files will be saved, average time per file
:param passed_positions: how many files was already processed before (exclude them from time per file calc)
:param start_time: datetime when processing started
:param current_position: current row in excel sheet
:param total_positions: total rows in Excel sheet
"""
# remove how many files that already been downloaded
current_position += 1
# calculate time difference
time_diff = datetime.datetime.now() - start_time
speed = time_diff.seconds / (current_position - passed_positions)
time_left = (total_positions - current_position) * speed
formatted_time_left = strfdelta(datetime.timedelta(seconds=time_left), "{hours}:{minutes}:{seconds}")
return f"{current_position} out of {total_positions}." \
f" estimated time left {formatted_time_left}. Time per single file in avg {round(speed, 2)} sec"
def save_image(url_hires: str, url_lowres: str, file_name: str, file_format: str = 'WEBP') -> bool:
"""
Trying to save image from given URLs
:param url_hires: link to HIGH resolution image
:param url_lowres: link to LOW resolution image
:param file_name: name file for saving
:param file_format: i.e.: PNG, JPG, WEBP...
:raises UnidentifiedImageError: if PIL cannot read image
:return: True if saved successfully, else - False
"""
if url_hires.startswith('//'):
url_hires = 'https:' + url_hires
if url_lowres.startswith('//'):
url_lowres = 'https:' + url_lowres
try:
if url_lowres == url_hires:
img = Image.open(requests.get(url_lowres, stream=True, timeout=15).raw)
img.save(f'img/lo-res/{file_name}.{file_format}', format=file_format)
img.save(f'img/hi-res/{file_name}.{file_format}', format=file_format)
else:
img = Image.open(requests.get(url_lowres, stream=True, timeout=15).raw)
img.save(f'img/lo-res/{file_name}.{file_format}', format=file_format)
img = Image.open(requests.get(url_hires, stream=True, timeout=15).raw)
img.save(f'img/hi-res/{file_name}.{file_format}', format=file_format)
except UnidentifiedImageError:
return False
except Exception:
return False
return True
def create_img_folders():
"""
Creates folders for images to save if they not exist
"""
path = 'img/hi-res'
os.makedirs(path, exist_ok=True)
path = 'img/lo-res'
os.makedirs(path, exist_ok=True)
def get_cookies() -> dict:
"""
Get cookies from env or user input if env not found
"""
raw_cookie = os.getenv('COOKIES')
if raw_cookie is None:
raw_cookie = input('Enter cookies: ')
cookie = SimpleCookie()
cookie.load(raw_cookie) # read cookie from env
return {k: v.value for k, v in cookie.items()}
def main():
header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"}
# create folders
create_img_folders()
# console
console = Console()
# get filename/file path
file = input('Enter path to excel file: ')
while not Path.is_file(Path(file)):
# console.log('[bold red]ERROR: File not found, try again...')
file = input('File doesnt exist, please enter correct path to excel file: ')
image_format = 'webp'
image_format = input('What format you want to save images? (webp, jpg, png, gif...): ')
while '.' + image_format.lower() not in Image.registered_extensions().keys():
image_format = input('Unsupported format, please enter another one (webp, jpg, png, gif...): ')
# delay to not get banned by yandex (less than 2 sec not recommended)
request_delay = None
while not request_delay:
try:
request_delay = int(input('Choose delay between requests to yandex (recommend 2 sec to avoid ban): '))
except ValueError:
pass
# column in Excel file to read names of songs from
column_index = 0
# read cookies
cookies = get_cookies()
# open Excel file
console.log('[bold green]Reading excel file', highlight=False)
sheet = get_excel_sheet(file)
# takes current time on start to measure average time to complete operations
start_time = datetime.datetime.now()
# counter for files to exclude from calculation time spent for each file to download
amount_existed_files = 0
with console.status('[bold blue]Downloading...') as status:
for index, row in enumerate(sheet.iter_rows()):
status.update('[bold blue]' + print_status(start_time, index, sheet.max_row, amount_existed_files))
cell = row[column_index].value
# check if file already exist to not re download it
if os.path.exists(f'img/hi-res/{cell}.{image_format}'):
amount_existed_files += 1
continue
# search text for yandex
search_query = re.sub(r"[\(\[].*?[\)\]]", "", cell).strip() + ' обложка'
# check if we have connection, pause if no connection
no_conn_counter = 0
while not have_internet():
no_conn_counter += 1
console.log(f'\r[bold red]INTERNET CONNECTION OFF:[/bold red] for {no_conn_counter * 10} sec',
highlight=False,
end='')
time.sleep(10)
# main. trying to get response from yandex
try:
request = requests.get(
f'https://yandex.ru/images/search?iorient=square&text='
f'{urllib.parse.quote(search_query)}',
headers=header, cookies=cookies, timeout=10)
except ReadTimeoutError:
console.log(f'[bold red]ERROR:[/bold red] URL TIMEOUT for {cell}', highlight=False, end='')
continue
except Exception as e:
console.log(f'[bold red]ERROR:[/bold red] {e}', highlight=False, end='')
continue
finally:
# delay before next request
time.sleep(request_delay)
if not request:
console.log(f'[bold red]ERROR:[/bold red] request is NONE for {cell}', highlight=False, end='')
continue
# parsing content
soup = BeautifulSoup(request.content, "html.parser")
item = soup.find(class_='serp-item_type_search')
try:
image_json = json.loads(item['data-bem'])
except Exception:
console.log(f'[bold red]ERROR:[/bold red] cant resolve data-bem for {cell}', highlight=False, end='')
continue
hi_res_url = image_json['serp-item']['preview'][0]['url']
item = soup.find(class_='serp-item__link')
low_res_url = item.find('img')['src']
if low_res_url == '':
console.log(f'[bold red]ERROR:[/bold red] URL was empty for {cell}', highlight=False, end='')
continue
if save_image(hi_res_url, low_res_url, row[column_index].value, image_format):
console.log(f'[bold green]SUCCESS:[/bold green] {cell}', highlight=False, end='')
else:
console.log(f'[bold red]FAILED:[/bold red] cannot save file for {cell}', highlight=False, end='')
if __name__ == '__main__':
main()