-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Brute force tool for administration panels #668
base: main
Are you sure you want to change the base?
Changes from 51 commits
af40a65
580502a
eaab344
6bcc377
e23da6d
36a441b
aeff8b8
ccfd1b9
4d389bd
60f605c
de088a4
0656dad
9e505f9
569e248
5cf0327
9c8b22c
9a51133
107822e
62549d5
50caff6
639b67f
9821931
4b805ff
c12e28d
51c1afb
6112675
4d94cc9
ee232b2
c39299b
2640a7d
1f2be78
32dd054
865ac03
dd4d515
05da7ee
1552196
1db7e9b
6e7dbf7
0f59176
eab1399
43cf86f
ce26974
d960043
359c747
78c6117
f2175fa
cd8d6a9
0465532
c694a77
1e1e0c6
aa8e57b
298bb74
d8d033d
73b7bd9
8e42a5f
2f282b9
8149e02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,247 @@ | ||
import logging | ||
import time | ||
import urllib.parse | ||
from typing import Optional, Tuple | ||
|
||
from karton.core import Task | ||
from selenium import webdriver | ||
from selenium.common.exceptions import NoSuchElementException, TimeoutException | ||
from selenium.webdriver.chrome.options import Options | ||
from selenium.webdriver.chrome.service import Service | ||
from selenium.webdriver.chrome.webdriver import WebDriver | ||
from selenium.webdriver.common.by import By | ||
from selenium.webdriver.common.keys import Keys | ||
from selenium.webdriver.remote.webelement import WebElement | ||
from selenium.webdriver.support import expected_conditions | ||
from selenium.webdriver.support.wait import WebDriverWait | ||
|
||
from artemis.binds import TaskStatus, TaskType | ||
from artemis.config import Config | ||
from artemis.module_base import ArtemisBase | ||
from artemis.passwords import get_passwords_for_url | ||
from artemis.utils import remove_standard_ports_from_url | ||
|
||
|
||
class AdminPanelBruterException(Exception): | ||
pass | ||
|
||
|
||
class AdminPanelLoginBruter(ArtemisBase): | ||
""" | ||
Tries to log into admin panels with easy passwords (needs "bruter" and "port_scanner" to be enabled as well to find URLs where admin panels reside) | ||
""" | ||
|
||
identity = "admin_panel_login_bruter" | ||
filters = [{"type": TaskType.URL.value}] | ||
|
||
USERNAMES = ["admin"] | ||
|
||
LOGIN_FAILED_MSGS = [ | ||
"Please enter the correct username and password for a staff account. " | ||
"Note that both fields may be case-sensitive.", | ||
"Unrecognized username or password. Forgot your password?", | ||
"Username and password do not match or you do not have an account yet.", | ||
"Invalid credentials", | ||
"The login is invalid", | ||
"Login failed", | ||
"not seem to be correct", | ||
"Access denied", | ||
"Cannot log in", | ||
"login details do not seem to be correct", | ||
# rate limit | ||
"failed login attempts for this account", | ||
# pl_PL | ||
"Podano błędne dane logowania", | ||
"Wprowadź poprawne dane", | ||
"Nieprawidłowa nazwa użytkownika lub hasło", | ||
"Nazwa użytkownika lub hasło nie jest", | ||
"Złe hasło", | ||
"niepoprawne hasło", | ||
"Błędna nazwa użytkownika", | ||
] | ||
|
||
def run(self, task: Task) -> None: | ||
url = task.get_payload(TaskType.URL) | ||
url_parsed = urllib.parse.urlparse(url) | ||
|
||
is_root_url = url_parsed.path in ["", "/"] | ||
|
||
if ( | ||
not is_root_url | ||
and not url_parsed.path.endswith("/") | ||
and not any([item in url.lower() for item in ["login", "admin", "cms", "backend", "panel", "index.php"]]) | ||
): | ||
self.db.save_task_result( | ||
task=task, | ||
status=TaskStatus.OK, | ||
status_reason=f"URL {url} doesn't look like an admin panel URL", | ||
data=None, | ||
) | ||
return | ||
|
||
if "admin" in url and not url.endswith("/admin") and "admin" in task.get_payload("found_urls"): | ||
self.db.save_task_result( | ||
task=task, | ||
status=TaskStatus.OK, | ||
status_reason=f"Requested to brute {url}, but /admin is also on the list of found URLs - skipping " | ||
"brute-forcing of this url so that we don't duplicate work", | ||
data=None, | ||
) | ||
return | ||
|
||
credentials = self._brute(url) | ||
|
||
if credentials: | ||
username, password = credentials | ||
|
||
self.db.save_task_result( | ||
task=task, | ||
status=TaskStatus.INTERESTING, | ||
status_reason=f"Found working credentials for {url}: username={username}, password={password}", | ||
data=credentials, | ||
) | ||
else: | ||
self.db.save_task_result(task=task, status=TaskStatus.OK, status_reason=None, data=None) | ||
|
||
def _brute(self, url: str) -> Optional[Tuple[str, str]]: | ||
working_credentials = [] | ||
for username in self.USERNAMES: | ||
for password in get_passwords_for_url(url): | ||
driver = AdminPanelLoginBruter._get_webdriver() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would not it be better to get only one webdriver for whole |
||
driver.get(url) | ||
|
||
try: | ||
WebDriverWait(driver, Config.Modules.AdminPanelLoginBruter.WAIT_TIME_SECONDS).until( | ||
expected_conditions.url_matches(remove_standard_ports_from_url(url)) | ||
) | ||
except TimeoutException: | ||
self.log.info( | ||
"Timeout occured when waiting for the URL to match, let's try " | ||
f"to login even if the url doesn't match, url={driver.current_url}" | ||
) | ||
|
||
# Ignore alerts | ||
driver.execute_script("window.alert = function() {};") # type: ignore | ||
driver.implicitly_wait(Config.Modules.AdminPanelLoginBruter.WAIT_TIME_SECONDS) | ||
self._sleep_after_performing_requests(driver) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we are waiting and sleeping in the same time? I think that waiting would be sufficient |
||
|
||
inputs = AdminPanelLoginBruter._find_form_inputs(url, driver) | ||
|
||
if inputs: | ||
user_input, password_input = inputs | ||
else: | ||
driver.close() | ||
driver.quit() | ||
break | ||
|
||
AdminPanelLoginBruter._send_credentials( | ||
user_input=user_input, | ||
password_input=password_input, | ||
username=username, | ||
password=password, | ||
) | ||
driver.implicitly_wait(Config.Modules.AdminPanelLoginBruter.WAIT_TIME_SECONDS) | ||
result = AdminPanelLoginBruter._get_logging_in_result(driver, self.LOGIN_FAILED_MSGS) | ||
self._sleep_after_performing_requests(driver) | ||
|
||
if result: | ||
self.log.info(f"Detected following 'login failed' messages: {result}") | ||
continue | ||
else: | ||
working_credentials.append((username, password)) | ||
|
||
driver.close() | ||
driver.quit() | ||
|
||
if len(working_credentials) > 1: | ||
raise AdminPanelBruterException( | ||
f"Found more than one working credential pair: {working_credentials} - please check the heuristics" | ||
) | ||
elif len(working_credentials) == 0: | ||
return None | ||
else: | ||
return working_credentials[0] | ||
|
||
@staticmethod | ||
def _get_webdriver() -> WebDriver: | ||
service = Service(executable_path="/usr/bin/chromedriver") | ||
|
||
chrome_options = Options() | ||
chrome_options.add_argument("--headless") | ||
chrome_options.add_argument("--no-sandbox") | ||
chrome_options.add_argument("--ignore-certificate-errors") | ||
chrome_options.add_argument("--disable-infobars") | ||
chrome_options.add_argument("--ignore-certificate-errors-spki-list") | ||
chrome_options.add_argument("--disable-dev-shm-usage") | ||
chrome_options.add_argument("window-size=1920x1080") | ||
|
||
if Config.Miscellaneous.CUSTOM_USER_AGENT: | ||
chrome_options.add_argument("--user-agent=" + Config.Miscellaneous.CUSTOM_USER_AGENT) | ||
return webdriver.Chrome(service=service, options=chrome_options) # type: ignore | ||
|
||
@staticmethod | ||
def _find_form_inputs(url: str, driver: WebDriver) -> Optional[tuple[WebElement, WebElement]]: | ||
user_input, password_input = None, None | ||
inputs = driver.find_elements(By.TAG_NAME, "input") | ||
if not inputs: | ||
logging.error(f"Login form has not been found on {url}") | ||
return None | ||
else: | ||
for field in inputs: | ||
if field.get_attribute("type").lower() == "text": # type: ignore | ||
tag_values = driver.execute_script( # type: ignore | ||
"var items = []; for (index = 0; index < arguments[0].attributes.length; ++index)" | ||
"items.push(arguments[0].attributes[index].value); return items;", | ||
field, | ||
) | ||
for value in tag_values: | ||
value = value.lower() | ||
if any(matcher in value for matcher in ["usr", "user", "log", "name"]): | ||
user_input = field | ||
break | ||
elif field.get_attribute("type").lower() == "password": # type: ignore | ||
password_input = field | ||
if not password_input or not user_input: | ||
logging.error(f"Login form has not been found on {url}") | ||
return None | ||
return user_input, password_input | ||
|
||
@staticmethod | ||
def _send_credentials(user_input: WebElement, password_input: WebElement, username: str, password: str) -> None: | ||
if user_input: | ||
user_input.send_keys(username) | ||
if password_input: | ||
password_input.send_keys(password) | ||
password_input.send_keys(Keys.ENTER) | ||
|
||
@staticmethod | ||
def _get_logging_in_result(driver: WebDriver, login_failure_msgs: list[str]) -> Optional[list[str]]: | ||
try: | ||
web_content = driver.find_element(By.XPATH, "html/body").text | ||
result = [msg for msg in login_failure_msgs if (msg.lower() in web_content.lower())] | ||
return result | ||
except NoSuchElementException: | ||
return None | ||
|
||
def _sleep_after_performing_requests(self, driver: WebDriver) -> None: | ||
# It is easier to limit requests that way than to hook into Selenium to limit every one. | ||
# This is not dangerous to servers as loading a page together with all resources in a burst | ||
# is how websites are normally used - and we have a lock so we know we are the only module | ||
# scanning this IP. | ||
num_requests = driver.execute_script('return 1 + window.performance.getEntriesByType("resource").length;') # type: ignore | ||
if Config.Limits.REQUESTS_PER_SECOND > 0: | ||
sleep_time_seconds = max( | ||
0, | ||
num_requests * 1.0 / Config.Limits.REQUESTS_PER_SECOND | ||
- Config.Modules.AdminPanelLoginBruter.WAIT_TIME_SECONDS, | ||
) | ||
else: | ||
sleep_time_seconds = 0 | ||
|
||
if sleep_time_seconds > 0: | ||
time.sleep(sleep_time_seconds) | ||
self.log.info(f"{num_requests} requests performed, sleeping {sleep_time_seconds} seconds") | ||
|
||
|
||
if __name__ == "__main__": | ||
AdminPanelLoginBruter().loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this check and task result reason doesn't give clear indication what it is about, can you give context what it is for?