Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MyPy CI failures #1488

Merged
merged 23 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[flake8]
ignore = E501, F405, F403, F401, E402
ignore = E203, E501, F405, F403, F401, E402, W503
4 changes: 2 additions & 2 deletions .github/workflows/theHarvester.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ jobs:
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --show-source --statistics
flake8 . --count --show-source --statistics --config .flake8
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-line-length=127 --statistics
flake8 . --count --exit-zero --max-line-length=127 --statistics --config .flake8

- name: Test with pytest
run: |
Expand Down
22 changes: 12 additions & 10 deletions theHarvester/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async def store(search_engine: Any, source: str, process_param: Any = None, stor
print(f'\033[94m[*] Searching {source[0].upper() + source[1:]}. ')

if store_host:
host_names = {host for host in filter(await search_engine.get_hostnames()) if f'.{word}' in host}
host_names = list({host for host in filter(await search_engine.get_hostnames()) if f'.{word}' in host})
host_names = list(host_names)
if source != 'hackertarget' and source != 'pentesttools' and source != 'rapiddns':
# If a source is inside this conditional, it means the hosts returned must be resolved to obtain ip
Expand Down Expand Up @@ -574,10 +574,13 @@ async def store(search_engine: Any, source: str, process_param: Any = None, stor
if isinstance(e, MissingKey):
print(e)
else:
try:
# Check if dns_brute is defined
rest_args.dns_brute
except Exception:
if rest_args is not None:
try:
rest_args.dns_brute
except Exception:
print('\n[!] Invalid source.\n')
sys.exit(1)
else:
print('\n[!] Invalid source.\n')
sys.exit(1)

Expand Down Expand Up @@ -848,7 +851,6 @@ async def handler(lst):
print('\n[*] Virtual hosts:')
print('------------------')
for data in host_ip:
from theHarvester.discovery import bingsearch
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from theHarvester.discovery import * already exists at the top of the file.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some times it's needed as it does not actually work without that, if you tested it and it still works then am happy for it to be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tested it out myself after making this change yet, only ran pytest after. I'll give it a shot tomorrow. There's still more I've got to go through it looks like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LMK if you got a command I can run that results in some vhosts. I tried a few stuff with keeping and removing the import, but both resulsts didn't have any vhosts to show. I can experiment with harvesting more a bit later.

basic_search = bingsearch.SearchBing(data, limit, start)
await basic_search.process_vhost()
results = await basic_search.get_allhostnames()
Expand Down Expand Up @@ -890,19 +892,19 @@ async def handler(lst):
async with Pool(10) as pool:
results = await pool.map(screen_shotter.visit, list(unique_resolved_domains))
# Filter out domains that we couldn't connect to
unique_resolved_domains = list(sorted({tup[0] for tup in results if len(tup[1]) > 0}))
unique_resolved_domains_list = list(sorted({tup[0] for tup in results if len(tup[1]) > 0}))
async with Pool(3) as pool:
print(f'Length of unique resolved domains: {len(unique_resolved_domains)} chunking now!\n')
print(f'Length of unique resolved domains: {len(unique_resolved_domains_list)} chunking now!\n')
# If you have the resources, you could make the function faster by increasing the chunk number
chunk_number = 14
for chunk in screen_shotter.chunk_list(unique_resolved_domains, chunk_number):
for chunk in screen_shotter.chunk_list(unique_resolved_domains_list, chunk_number):
try:
screenshot_tups.extend(await pool.map(screen_shotter.take_screenshot, chunk))
except Exception as ee:
print(f'An exception has occurred while mapping: {ee}')
end = time.perf_counter()
# There is probably an easier way to do this
total = end - start_time
total = int(end - start_time)
mon, sec = divmod(total, 60)
hr, mon = divmod(mon, 60)
total_time = "%02d:%02d" % (mon, sec)
Expand Down
76 changes: 46 additions & 30 deletions theHarvester/discovery/bingsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,56 +4,72 @@


class SearchBing:

def __init__(self, word, limit, start) -> None:
self.word = word.replace(' ', '%20')
self.results = ""
self.word = word.replace(" ", "%20")
self.results: tuple[Any, ...] = ()
self.total_results = ""
self.server = 'www.bing.com'
self.apiserver = 'api.search.live.net'
self.hostname = 'www.bing.com'
self.server = "www.bing.com"
self.apiserver = "api.search.live.net"
self.hostname = "www.bing.com"
self.limit = int(limit)
self.bingApi = Core.bing_key()
self.counter = start
self.proxy = False

async def do_search(self) -> None:
headers = {
'Host': self.hostname,
'Cookie': 'SRCHHPGUSR=ADLT=DEMOTE&NRSLT=50',
'Accept-Language': 'en-us,en',
'User-agent': Core.get_user_agent()
"Host": self.hostname,
"Cookie": "SRCHHPGUSR=ADLT=DEMOTE&NRSLT=50",
"Accept-Language": "en-us,en",
"User-agent": Core.get_user_agent(),
}
base_url = f'https://{self.server}/search?q=%40"{self.word}"&count=50&first=xx'
urls = [base_url.replace("xx", str(num)) for num in range(0, self.limit, 50) if num <= self.limit]
responses = await AsyncFetcher.fetch_all(urls, headers=headers, proxy=self.proxy)
urls = [
base_url.replace("xx", str(num))
for num in range(0, self.limit, 50)
if num <= self.limit
]
responses = await AsyncFetcher.fetch_all(
urls, headers=headers, proxy=self.proxy
)
for response in responses:
self.total_results += response

async def do_search_api(self) -> None:
url = 'https://api.bing.microsoft.com/v7.0/search?'
url = "https://api.bing.microsoft.com/v7.0/search?"
params = {
'q': self.word,
'count': str(self.limit),
'offset': '0',
'mkt': 'en-us',
'safesearch': 'Off'
"q": self.word,
"count": str(self.limit),
"offset": "0",
"mkt": "en-us",
"safesearch": "Off",
}
headers = {
"User-Agent": Core.get_user_agent(),
"Ocp-Apim-Subscription-Key": self.bingApi,
}
headers = {'User-Agent': Core.get_user_agent(), 'Ocp-Apim-Subscription-Key': self.bingApi}
self.results = await AsyncFetcher.fetch_all([url], headers=headers, params=params, proxy=self.proxy)
self.results = await AsyncFetcher.fetch_all(
[url], headers=headers, params=params, proxy=self.proxy
)
for res in self.results:
self.total_results += res

async def do_search_vhost(self) -> None:
headers = {
'Host': self.hostname,
'Cookie': 'mkt=en-US;ui=en-US;SRCHHPGUSR=NEWWND=0&ADLT=DEMOTE&NRSLT=50',
'Accept-Language': 'en-us,en',
'User-agent': Core.get_user_agent()
"Host": self.hostname,
"Cookie": "mkt=en-US;ui=en-US;SRCHHPGUSR=NEWWND=0&ADLT=DEMOTE&NRSLT=50",
"Accept-Language": "en-us,en",
"User-agent": Core.get_user_agent(),
}
base_url = f'http://{self.server}/search?q=ip:{self.word}&go=&count=50&FORM=QBHL&qs=n&first=xx'
urls = [base_url.replace("xx", str(num)) for num in range(0, self.limit, 50) if num <= self.limit]
responses = await AsyncFetcher.fetch_all(urls, headers=headers, proxy=self.proxy)
base_url = f"http://{self.server}/search?q=ip:{self.word}&go=&count=50&FORM=QBHL&qs=n&first=xx"
urls = [
base_url.replace("xx", str(num))
for num in range(0, self.limit, 50)
if num <= self.limit
]
responses = await AsyncFetcher.fetch_all(
urls, headers=headers, proxy=self.proxy
)
for response in responses:
self.total_results += response

Expand All @@ -71,13 +87,13 @@ async def get_allhostnames(self):

async def process(self, api, proxy: bool = False) -> None:
self.proxy = proxy
if api == 'yes':
if api == "yes":
if self.bingApi is None:
raise MissingKey('BingAPI')
raise MissingKey("BingAPI")
await self.do_search_api()
else:
await self.do_search()
print(f'\tSearching {self.counter} results.')
print(f"\tSearching {self.counter} results.")

async def process_vhost(self) -> None:
await self.do_search_vhost()
16 changes: 9 additions & 7 deletions theHarvester/discovery/censysearch.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from typing import Set
from theHarvester.discovery.constants import MissingKey
from theHarvester.lib.core import Core
from theHarvester.lib.version import version as thehavester_version
from censys.search import CensysCertificates

from censys.common import __version__
from censys.common.exceptions import (
CensysRateLimitExceededException,
CensysUnauthorizedException,
)
from censys.search import CensysCerts

from theHarvester.discovery.constants import MissingKey
from theHarvester.lib.core import Core
from theHarvester.lib.version import version as thehavester_version


class SearchCensys:
Expand All @@ -23,13 +25,13 @@ def __init__(self, domain, limit: int = 500) -> None:

async def do_search(self) -> None:
try:
cert_search = CensysCertificates(
cert_search = CensysCerts(
api_id=self.key[0],
api_secret=self.key[1],
user_agent=f"censys/{__version__} (theHarvester/{thehavester_version}); +https://github.com/laramies/theHarvester)",
)
except CensysUnauthorizedException:
raise MissingKey('Censys ID and/or Secret')
raise MissingKey("Censys ID and/or Secret")

query = f"parsed.names: {self.word}"
try:
Expand All @@ -38,7 +40,7 @@ async def do_search(self) -> None:
fields=["parsed.names", "metadata", "parsed.subject.email_address"],
max_records=self.limit,
)
for cert in response:
for cert in response():
self.totalhosts.update(cert.get("parsed.names", []))
self.emails.update(cert.get("parsed.subject.email_address", []))
except CensysRateLimitExceededException:
Expand Down
77 changes: 50 additions & 27 deletions theHarvester/discovery/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from theHarvester.lib.core import *
from typing import Union, Optional
import random
from typing import Optional, Union

from theHarvester.lib.core import *


async def splitter(links):
Expand Down Expand Up @@ -41,15 +42,19 @@ def filter(lst):
new_lst = []
for item in lst:
item = str(item)
if (item[0].isalpha() or item[0].isdigit()) and ('xxx' not in item) and ('..' not in item):
item = item.replace('252f', '').replace('2F', '').replace('2f', '')
if (
(item[0].isalpha() or item[0].isdigit())
and ("xxx" not in item)
and (".." not in item)
):
item = item.replace("252f", "").replace("2F", "").replace("2f", "")
new_lst.append(item.lower())
return new_lst


def get_delay() -> float:
"""Method that is used to generate a random delay"""
return random.randint(1, 3) - .5
return random.randint(1, 3) - 0.5


async def search(text: str) -> bool:
Expand All @@ -58,8 +63,12 @@ async def search(text: str) -> bool:
:return bool:
"""
for line in text.strip().splitlines():
if 'This page appears when Google automatically detects requests coming from your computer network' in line \
or 'http://www.google.com/sorry/index' in line or 'https://www.google.com/sorry/index' in line:
if (
"This page appears when Google automatically detects requests coming from your computer network"
in line
or "http://www.google.com/sorry/index" in line
or "https://www.google.com/sorry/index" in line
):
# print('\tGoogle is blocking your IP due to too many automated requests, wait or change your IP')
return True
return False
Expand All @@ -71,46 +80,60 @@ async def google_workaround(visit_url: str) -> Union[bool, str]:
:param visit_url: Url to scrape
:return: Correct html that can be parsed by BS4
"""
url = 'https://websniffer.cc/'
url = "https://websniffer.cc/"
data = {
'Cookie': '',
'url': visit_url,
'submit': 'Submit',
'type': 'GET&http=1.1',
'uak': str(random.randint(4, 8)) # select random UA to send to Google
"Cookie": "",
"url": visit_url,
"submit": "Submit",
"type": "GET&http=1.1",
"uak": str(random.randint(4, 8)), # select random UA to send to Google
}
returned_html = await AsyncFetcher.post_fetch(url, headers={'User-Agent': Core.get_user_agent()}, data=data)
returned_html = "This page appears when Google automatically detects requests coming from your computer network" \
if returned_html == "" else returned_html[0]

returned_html = "" if 'Please Wait... | Cloudflare' in returned_html else returned_html

if len(returned_html) == 0 or await search(returned_html) or '&lt;html' not in returned_html:
returned_html = await AsyncFetcher.post_fetch(
url, headers={"User-Agent": Core.get_user_agent()}, data=data
)
returned_html = (
"This page appears when Google automatically detects requests coming from your computer network"
if returned_html == ""
else returned_html[0]
)

returned_html = (
"" if "Please Wait... | Cloudflare" in returned_html else returned_html
)

if (
len(returned_html) == 0
or await search(returned_html)
or "&lt;html" not in returned_html
):
# indicates that google is serving workaround a captcha
# That means we will try out second option which will utilize proxies
return True
# the html we get is malformed for BS4 as there are no greater than or less than signs
if '&lt;html&gt;' in returned_html:
start_index = returned_html.index('&lt;html&gt;')
if "&lt;html&gt;" in returned_html:
start_index = returned_html.index("&lt;html&gt;")
else:
start_index = returned_html.index('&lt;html')
start_index = returned_html.index("&lt;html")

end_index = returned_html.index('&lt;/html&gt;') + 1
end_index = returned_html.index("&lt;/html&gt;") + 1
correct_html = returned_html[start_index:end_index]
# Slice list to get the response's html
correct_html = ''.join([ch.strip().replace('&lt;', '<').replace('&gt;', '>') for ch in correct_html])
correct_html = "".join(
[ch.strip().replace("&lt;", "<").replace("&gt;", ">") for ch in correct_html]
)
return correct_html


class MissingKey(Exception):
"""
:raise: When there is a module that has not been provided its API key
"""

def __init__(self, source: Optional[str]) -> None:
if source:
self.message = f'\n\033[93m[!] Missing API key for {source}. \033[0m'
self.message = f"\n\033[93m[!] Missing API key for {source}. \033[0m"
else:
self.message = '\n\033[93m[!] Missing CSE id. \033[0m'
self.message = "\n\033[93m[!] Missing CSE id. \033[0m"

def __str__(self) -> str:
return self.message
Loading