Skip to content

Commit

Permalink
Merge branch 'main' of github.com:dmdhrumilmistry/breach-check
Browse files Browse the repository at this point in the history
  • Loading branch information
dmdhrumilmistry committed May 24, 2024
2 parents 4ea8693 + b88d672 commit 38d90be
Show file tree
Hide file tree
Showing 10 changed files with 753 additions and 451 deletions.
5 changes: 2 additions & 3 deletions breach_check/__main__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from asyncio import run
from argparse import ArgumentParser
from breach_check.breach import BreachChecker
from breach_check.results import Results
from breach_check.utils import generate_unique_filename, extract_emails, write_json_file
from asyncio import run
from sys import exit
from breach_check.utils import generate_unique_filename, extract_emails


def main():
Expand Down
130 changes: 75 additions & 55 deletions breach_check/breach.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
from aiohttp.client_exceptions import ClientProxyConnectionError
"""
This module contains the BreachChecker class which is used to check if an email address has been involved in any data breaches.
"""
from asyncio import ensure_future, gather
from breach_check.http import AsyncRequests
from breach_check.logger import logger, console
from json import loads as json_loads
from rich.progress import Progress, TaskID
from re import compile

from aiohttp.client_exceptions import ClientProxyConnectionError
from rich.progress import Progress, TaskID

from breach_check.http import AsyncRequests
from breach_check.logger import logger, console
from breach_check.breach_factory.leakcheck import LeakCheck

class BreachChecker:
def __init__(self, rate_limit: int | None = None, delay: float | None = None, headers: dict | None = None, proxy: str | None = None, ssl: bool | None = True, allow_redirects: bool | None = True, *args, **kwargs) -> None:
"""
Wrapper for checking email breaches using breach factory.
"""

def __init__(self, *args, rate_limit: int = 60, headers: dict | None = None, proxy: str | None = None, ssl: bool | None = True, allow_redirects: bool | None = True, **kwargs) -> None:
"""
Initialize the BreachCheck object.
Args:
rate_limit (int): The rate limit for making HTTP requests. Defaults to 60.
delay (float | None): The delay between consecutive HTTP requests. Defaults to None.
headers (dict | None): The headers to be included in the HTTP requests. Defaults to None.
proxy (str | None): The proxy server to be used for making HTTP requests. Defaults to None.
ssl (bool | None): Whether to use SSL for making HTTP requests. Defaults to True.
allow_redirects (bool | None): Whether to allow HTTP redirects. Defaults to True.
**kwargs: Additional keyword arguments.
Returns:
None
"""
if not headers:
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
Expand All @@ -18,16 +42,27 @@ def __init__(self, rate_limit: int | None = None, delay: float | None = None, he
self.progress_task_id: TaskID | None = None

self._http_client = AsyncRequests(
rate_limit=rate_limit,
delay=delay,
rate_limit=rate_limit,
headers=headers,
proxy=proxy,
proxies=proxy,
ssl=ssl,
allow_redirects=allow_redirects
)
self._api_url = 'https://monitor.firefox.com/api/v1/scan'

self._breach_factory = LeakCheck(self._http_client)


async def mass_check(self, emails: list[str] | None = None):
"""
Perform a mass check for breaches using a list of emails.
Args:
emails (list[str] | None): A list of email addresses to check for breaches. Defaults to None.
Returns:
list: A list of results from the breach check.
"""
if not emails or len(emails) == 0:
return []

Expand All @@ -45,64 +80,49 @@ async def mass_check(self, emails: list[str] | None = None):
)
)

try:
results = await gather(*tasks)

self.progress.stop()
return results
except Exception as e:
logger.error(
f'[*] Exception occurred while gathering results: {e}',
stack_info=True
)
return []
# try:
results = await gather(*tasks)

self.progress.stop()
return results
# except Exception as e:
# logger.error(
# f'[*] Exception occurred while gathering results: {e}',
# stack_info=True
# )
# return []

async def check(self, email: str | None = None) -> dict:
"""
Check if an email has been involved in any data breaches.
Args:
email (str, optional): The email address to check. Defaults to None.
Returns:
dict: A dictionary containing the email, breaches, and total number of breaches.
Raises:
ConnectionRefusedError: If the server refuses the connection.
ClientProxyConnectionError: If there is an error with the proxy connection.
"""
if not email:
logger.warning('email param cannot be None')
return {}

res_data = {
'email': email,
'breaches': [],
'total': None
}

res_data = {}
try:
email_validator = compile(r"^[^@\s']+@[^@\s']+\.[^@\s']+$")
if email_validator.match(email):
json_payload = {
'email': email
}
response = await self._http_client.request(
url=self._api_url,
method='POST',
json=json_payload,
)

status_code = response.get('status')
res_body = json_loads(response.get('res_body', '{}'))
is_success = res_body.get('success', False)

if status_code == 200 and is_success:
breaches = res_body.get('breaches', [])
total = res_body.get('total', -1)
res_data['breaches'] = breaches
res_data['total'] = total

elif status_code == 429:
logger.warning('Rate Limited')

else:
logger.error(f'Failed with status code: {status_code}')
logger.error(response, res_body)

res_data = await self._breach_factory.check_email_breaches(email)
else:
logger.warning(f'{email} is not a valid email')
logger.warning('%s is not a valid email', email)

# advance progress bar
if self.progress_task_id != None:
if self.progress_task_id is not None:
self.progress.update(self.progress_task_id,
advance=1, refresh=True)
advance=1, refresh=True)
else:
logger.error('No Progress Bar Task Found!')

Expand All @@ -113,4 +133,4 @@ async def check(self, email: str | None = None) -> dict:
except ConnectionRefusedError:
logger.error('Connection Failed! Server refused Connection!!')
except ClientProxyConnectionError as e:
logger.error(f'Proxy Connection Error: {e}')
logger.error('Proxy Connection Error: %s', str(e))
Empty file.
32 changes: 32 additions & 0 deletions breach_check/breach_factory/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Base class for breach backends.
"""
from ..http import AsyncRequests


class BaseBreachBackend:
"""
Base class for breach backends.
Args:
http_client (AsyncRequests): An instance of the HTTP client used for making requests.
Attributes:
_http_client (AsyncRequests): The HTTP client used for making requests.
"""

def __init__(self, http_client: AsyncRequests) -> None:
self._http_client = http_client

async def check_email_breaches(self, email: str):
"""
Check for breaches using the backend.
Args:
email (str): The email address to check for breaches.
Returns:
dict: A dictionary containing the results of the breach check.
"""
raise NotImplementedError

57 changes: 57 additions & 0 deletions breach_check/breach_factory/leakcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
This module contains the implementation of MozillaMonitor class which checks email breaches using mozilla monitor API.
"""
from json import loads as json_loads
from breach_check.breach_factory.base import BaseBreachBackend
from breach_check.logger import logger
from breach_check.http import AsyncRequests


class LeakCheck(BaseBreachBackend):
"""
Checks email breaches using leakcheck public API.
"""

def __init__(self, http_client: AsyncRequests, *args, **kwargs) -> None:
super().__init__(http_client, *args, **kwargs)
self._api_url = 'https://leakcheck.io/api/public'

async def check_email_breaches(self, email: str) -> dict:
res_data = {
'email': email,
'breaches': [],
'fields':[],
'total': None
}

payload = {
'check': email
}
response = await self._http_client.request(
url=self._api_url,
method='GET',
params=payload
)

status_code = response.get('status')
res_body = json_loads(response.get('res_body', '{}'))
is_success = res_body.get('success', False)

if status_code == 200 and is_success:
logger.warning('Breaches found for %s', email)
res_data['breaches'] = res_body.get('sources', [])
res_data['fields'] = res_body.get('fields', [])
res_data['total'] = res_body.get('found', -1)

elif status_code == 200:
logger.info('No breaches found for %s', email)
res_data['total'] = 0

elif status_code == 429:
logger.warning('Rate Limited')

else:
logger.error('Failed with status code: %s', str(status_code))
logger.error('Response: %s\nResponse Body: %s', str(response), str(res_body))

return res_data
54 changes: 54 additions & 0 deletions breach_check/breach_factory/mozilla.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
This module contains the implementation of MozillaMonitor class which checks email breaches using mozilla monitor API.
"""
from json import loads as json_loads
from breach_check.breach_factory.base import BaseBreachBackend
from breach_check.logger import logger
from breach_check.http import AsyncRequests


class MozillaMonitor(BaseBreachBackend):
"""
Checks email breaches using mozilla monitor API.
"""

def __init__(self, http_client: AsyncRequests, *args, **kwargs) -> None:
logger.warning(
'MozillaMonitor is deprecated since it API is no longer available. Please use other breach backends.')
super().__init__(http_client, *args, **kwargs)
self._api_url = 'https://monitor.firefox.com/api/v1/scan'

async def check_email_breaches(self, email: str) -> dict:
res_data = {
'email': email,
'breaches': [],
'total': None
}

json_payload = {
'email': email
}
response = await self._http_client.request(
url=self._api_url,
method='POST',
json=json_payload,
)

status_code = response.get('status')
res_body = json_loads(response.get('res_body', '{}'))
is_success = res_body.get('success', False)

if status_code == 200 and is_success:
breaches = res_body.get('breaches', [])
total = res_body.get('total', -1)
res_data['breaches'] = breaches
res_data['total'] = total

elif status_code == 429:
logger.warning('Rate Limited')

else:
logger.error('Failed with status code: %s', str(status_code))
logger.error(response, res_body)

return res_data
Loading

0 comments on commit 38d90be

Please sign in to comment.