forked from ULTRA-OP/ULTRA-X
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ultragoogle.py
137 lines (127 loc) · 5.22 KB
/
ultragoogle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
""" Powered by @Google
Available Commands:
.google search <query>
.google image <query>
.google reverse search"""
import asyncio
import os
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from google_images_download import google_images_download
from ULTRA.utils import admin_cmd
def progress(current, total):
logger.info("Downloaded {} of {}\nCompleted {}".format(current, total, (current / total) * 100))
@borg.on(admin_cmd(pattern="google search (.*)"))
async def _(event):
if event.fwd_from:
return
start = datetime.now()
await event.edit("Processing ...")
# SHOW_DESCRIPTION = False
input_str = event.pattern_match.group(1) # + " -inurl:(htm|html|php|pls|txt) intitle:index.of \"last modified\" (mkv|mp4|avi|epub|pdf|mp3)"
input_url = "https://bots.shrimadhavuk.me/search/?q={}".format(input_str)
headers = {"USER-AGENT": "UniBorg"}
response = requests.get(input_url, headers=headers).json()
output_str = " "
for result in response["results"]:
text = result.get("title")
url = result.get("url")
description = result.get("description")
image = result.get("image")
output_str += " 👉🏻 [{}]({}) \n\n".format(text, url)
end = datetime.now()
ms = (end - start).seconds
await event.edit("searched Google for {} in {} seconds. \n{}".format(input_str, ms, output_str), link_preview=False)
await asyncio.sleep(5)
await event.edit("Google: {}\n{}".format(input_str, output_str), link_preview=False)
@borg.on(admin_cmd(pattern="google image (.*)"))
async def _(event):
if event.fwd_from:
return
start = datetime.now()
await event.edit("Processing ...")
input_str = event.pattern_match.group(1)
response = google_images_download.googleimagesdownload()
if not os.path.isdir(Config.TMP_DOWNLOAD_DIRECTORY):
os.makedirs(Config.TMP_DOWNLOAD_DIRECTORY)
arguments = {
"keywords": input_str,
"limit": Config.TG_GLOBAL_ALBUM_LIMIT,
"format": "jpg",
"delay": 1,
"safe_search": True,
"output_directory": Config.TMP_DOWNLOAD_DIRECTORY
}
paths = response.download(arguments)
logger.info(paths)
lst = paths[0].get(input_str)
if len(lst) == 0:
await event.delete()
return
await borg.send_file(
event.chat_id,
lst,
caption=input_str,
reply_to=event.message.id,
progress_callback=progress
)
logger.info(lst)
for each_file in lst:
os.remove(each_file)
end = datetime.now()
ms = (end - start).seconds
await event.edit("searched Google for {} in {} seconds.".format(input_str, ms), link_preview=False)
await asyncio.sleep(5)
await event.delete()
@borg.on(admin_cmd(pattern="google reverse search"))
async def _(event):
if event.fwd_from:
return
start = datetime.now()
BASE_URL = "http://www.google.com"
OUTPUT_STR = "Reply to an image to do Google Reverse Search"
if event.reply_to_msg_id:
await event.edit("Pre Processing Media")
previous_message = await event.get_reply_message()
previous_message_text = previous_message.message
if previous_message.media:
downloaded_file_name = await borg.download_media(
previous_message,
Config.TMP_DOWNLOAD_DIRECTORY
)
SEARCH_URL = "{}/searchbyimage/upload".format(BASE_URL)
multipart = {
"encoded_image": (downloaded_file_name, open(downloaded_file_name, "rb")),
"image_content": ""
}
# https://stackoverflow.com/a/28792943/4723940
google_rs_response = requests.post(SEARCH_URL, files=multipart, allow_redirects=False)
the_location = google_rs_response.headers.get("Location")
os.remove(downloaded_file_name)
else:
previous_message_text = previous_message.message
SEARCH_URL = "{}/searchbyimage?image_url={}"
request_url = SEARCH_URL.format(BASE_URL, previous_message_text)
google_rs_response = requests.get(request_url, allow_redirects=False)
the_location = google_rs_response.headers.get("Location")
await event.edit("Found Google Result. Pouring some soup on it!")
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:58.0) Gecko/20100101 Firefox/58.0"
}
response = requests.get(the_location, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")
# document.getElementsByClassName("r5a77d"): PRS
prs_div = soup.find_all("div", {"class": "r5a77d"})[0]
prs_anchor_element = prs_div.find("a")
prs_url = BASE_URL + prs_anchor_element.get("href")
prs_text = prs_anchor_element.text
# document.getElementById("jHnbRc")
img_size_div = soup.find(id="jHnbRc")
img_size = img_size_div.find_all("div")
end = datetime.now()
ms = (end - start).seconds
OUTPUT_STR = """{img_size}
**Possible Related Search**: <a href="{prs_url}">{prs_text}</a>
More Info: Open this <a href="{the_location}">Link</a> in {ms} seconds""".format(**locals())
await event.edit(OUTPUT_STR, parse_mode="HTML", link_preview=False)