From 24c1a6ea7bf33944971dba6e66453316492aa276 Mon Sep 17 00:00:00 2001 From: Ridham Date: Tue, 26 Mar 2024 01:23:04 +0530 Subject: [PATCH 1/3] test code --- artemis/modules/urlreputation.py | 110 +++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 artemis/modules/urlreputation.py diff --git a/artemis/modules/urlreputation.py b/artemis/modules/urlreputation.py new file mode 100644 index 000000000..532531bd9 --- /dev/null +++ b/artemis/modules/urlreputation.py @@ -0,0 +1,110 @@ +import requests +from bs4 import BeautifulSoup +import re +import time + +from karton.core import Task +from artemis.binds import TaskStatus, TaskType +from artemis.module_base import ArtemisBase +from artemis.task_utils import get_target_ip, get_target_url + +class URLReputation(ArtemisBase): + identity = "url_reputation" + filters = [] + + def remove_duplicates(self, link_list): + unique_links = [] + for item in link_list: + match = re.search("(?Phttps?://[^\s]+)", item) + if match is not None and match.group("url") not in unique_links: + unique_links.append(match.group("url")) + return unique_links + + def check_url_status(self, url): + api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" + response = requests.post(api_endpoint, data={'url': url}) + + if response.status_code == 200: + data = response.json() + return data.get('query_status') == 'ok' and 'threat' in data + else: + self.log.error(f"API request failed for {url}") + return False + + def extract_and_check_urls(self, base_url, max_links=162): + source_code = requests.get(base_url) + soup = BeautifulSoup(source_code.content, 'lxml') + + links = [str(link.get('href')) for link in soup.find_all('a', href=True)] + links = self.remove_duplicates(links)[:max_links] + + for url in links: + if self.check_url_status(url): + status = TaskStatus.VULNERABLE # Mark task as vulnerable + status_reason = "Malicious URL found on page" + self.db.save_task_result(task, status, status_reason, data={'url': url}) + + def run(self, task: Task) -> None: + target = get_target_url(task) or get_target_ip(task) # Works for URLs or IPs + self.log.info(f"URL Reputation module running on {target}") + + self.extract_and_check_urls(target) + +if __name__ == "__main__": + URLReputation().loop() + + + +# import requests +# from bs4 import BeautifulSoup +# import re +# import time +# from urllib.parse import urlparse + +# def remove_duplicates(link_list): +# unique_links = [] +# for item in link_list: +# match = re.search("(?Phttps?://[^\s]+)", item) +# if match is not None and match.group("url") not in unique_links: +# unique_links.append(match.group("url")) +# return unique_links + +# def check_url_status(url): +# api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" +# response = requests.post(api_endpoint, data={'url': url}) + +# if response.status_code == 200: +# data = response.json() +# return data.get('query_status') == 'ok' and 'threat' in data +# else: +# print(f"API request failed for {url}") +# return False + +# visited_url=[] +# urls=[] +# def extract_and_check_urls(url,hostname,max_links=162): +# if url not in visited_url: +# visited_url.append(url) +# if hostname in url: +# source_code=requests.get(url) +# soup = BeautifulSoup(source_code.content, 'lxml') +# for link in soup.find_all('a', href=True): +# get_link=str(link.get('href')) +# if(len(urlparse(get_link).netloc)==0): +# get_link="http://"+hostname+"/"+get_link +# if(hostname in get_link): +# extract_and_check_urls(get_link,hostname) +# else: +# urls.append(str(link.get('href'))) +# if len(urls) >= max_links: +# break + + +# if __name__ == "__main__": +# base_url = "http://127.0.0.1:5500/index.html" +# parsed_uri=urlparse(base_url) +# extract_and_check_urls(base_url,parsed_uri.netloc) +# print("bad url in your site") +# for link in urls: +# if(check_url_status(link)): +# print(link) \ No newline at end of file From ed06fb107e007bc0286e4486acab9bcee0e6e751 Mon Sep 17 00:00:00 2001 From: RasenRhino Date: Wed, 27 Mar 2024 15:47:51 +0530 Subject: [PATCH 2/3] url reputation module --- artemis/modules/url_reputation.py | 140 ++++++++++++++++++++++++++++++ docker-compose.yaml | 10 +++ 2 files changed, 150 insertions(+) create mode 100644 artemis/modules/url_reputation.py diff --git a/artemis/modules/url_reputation.py b/artemis/modules/url_reputation.py new file mode 100644 index 000000000..105057eb2 --- /dev/null +++ b/artemis/modules/url_reputation.py @@ -0,0 +1,140 @@ +import requests +from bs4 import BeautifulSoup +import re +import time +from urllib.parse import urlparse +from karton.core import Task +from artemis.binds import Service, TaskStatus, TaskType +from artemis.module_base import ArtemisBase +from artemis.task_utils import get_target_url + +class URLReputation(ArtemisBase): + identity = "url_reputation" + filters = [] + visited_url=[] + urls=[] + filters = [ + {"type": TaskType.SERVICE.value, "service": Service.HTTP.value}, + ] + + def remove_duplicates(self, link_list): + unique_links = [] + for item in link_list: + match = re.search("(?Phttps?://[^\s]+)", item) + if match is not None and match.group("url") not in unique_links: + unique_links.append(match.group("url")) + return unique_links + + def check_url_status(url): + api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" + response = requests.post(api_endpoint, data={'url': url}) + + if response.status_code == 200: + data = response.json() + return data.get('query_status') == 'ok' and 'threat' in data + else: + print(f"API request failed for {url}") + return False + + + def extract_and_check_urls(url,hostname,max_links=162): + if url not in visited_url: + visited_url.append(url) + if hostname in url: + source_code=requests.get(url) + soup = BeautifulSoup(source_code.content, 'lxml') + for link in soup.find_all('a', href=True): + get_link=str(link.get('href')) + if(len(urlparse(get_link).netloc)==0): + get_link="http://"+hostname+"/"+get_link + if(hostname in get_link): + extract_and_check_urls(get_link,hostname) + else: + urls.append(str(link.get('href'))) + if len(urls) >= max_links: + break + + + + def run(self, task: Task) -> None: + target = get_target_url(task) + self.log.info(f"URL Reputation module running on {target}") + self.extract_and_check_urls(target) + if len(urls) == 0: + # On the default task result view only the interesting task results will be displayed + status = TaskStatus.INTERESTING + status_reason = "no url found" + else: + status = TaskStatus.OK + status_reason = "some url found" + print("UUUUURRRRRRLLLLL LIST",end=":") + print(urls) + self.db.save_task_result( + task=task, + status=status, + status_reason=status_reason, + # In the data dictionary, you may provide any additional results - the user will be able to view them + # in the interface on the single task result page. + data={"url":"someurl"}, + ) + + + +if __name__ == "__main__": + URLReputation().loop() + + + +# import requests +# from bs4 import BeautifulSoup +# import re +# import time +# from urllib.parse import urlparse + +# def remove_duplicates(link_list): +# unique_links = [] +# for item in link_list: +# match = re.search("(?Phttps?://[^\s]+)", item) +# if match is not None and match.group("url") not in unique_links: +# unique_links.append(match.group("url")) +# return unique_links + +# def check_url_status(url): +# api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" +# response = requests.post(api_endpoint, data={'url': url}) + +# if response.status_code == 200: +# data = response.json() +# return data.get('query_status') == 'ok' and 'threat' in data +# else: +# print(f"API request failed for {url}") +# return False + +# visited_url=[] +# urls=[] +# def extract_and_check_urls(url,hostname,max_links=162): +# if url not in visited_url: +# visited_url.append(url) +# if hostname in url: +# source_code=requests.get(url) +# soup = BeautifulSoup(source_code.content, 'lxml') +# for link in soup.find_all('a', href=True): +# get_link=str(link.get('href')) +# if(len(urlparse(get_link).netloc)==0): +# get_link="http://"+hostname+"/"+get_link +# if(hostname in get_link): +# extract_and_check_urls(get_link,hostname) +# else: +# urls.append(str(link.get('href'))) +# if len(urls) >= max_links: +# break + + +# if __name__ == "__main__": +# base_url = "http://127.0.0.1:5500/index.html" +# parsed_uri=urlparse(base_url) +# extract_and_check_urls(base_url,parsed_uri.netloc) +# print("bad url in your site") +# for link in urls: +# if(check_url_status(link)): +# print(link) \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index b70d498df..17aca562c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -177,6 +177,16 @@ services: restart: always volumes: ["./docker/karton.ini:/etc/karton/karton.ini", "${DOCKER_COMPOSE_ADDITIONAL_SHARED_DIRECTORY:-./shared}:/shared/"] + karton-url_reputation: + build: + context: . + dockerfile: docker/Dockerfile + command: "python3 -m artemis.modules.url_reputation" + depends_on: [karton-system] + env_file: .env + restart: always + volumes: ["./docker/karton.ini:/etc/karton/karton.ini", "${DOCKER_COMPOSE_ADDITIONAL_SHARED_DIRECTORY:-./shared}:/shared/"] + karton-ftp_bruter: build: context: . From f0be5898f9662f924914680e32963529152a2166 Mon Sep 17 00:00:00 2001 From: RasenRhino Date: Wed, 27 Mar 2024 15:53:37 +0530 Subject: [PATCH 3/3] url reputation module --- artemis/modules/url_reputation.py | 54 --------------- artemis/modules/urlreputation.py | 110 ------------------------------ 2 files changed, 164 deletions(-) delete mode 100644 artemis/modules/urlreputation.py diff --git a/artemis/modules/url_reputation.py b/artemis/modules/url_reputation.py index 105057eb2..7948cfdc4 100644 --- a/artemis/modules/url_reputation.py +++ b/artemis/modules/url_reputation.py @@ -84,57 +84,3 @@ def run(self, task: Task) -> None: URLReputation().loop() - -# import requests -# from bs4 import BeautifulSoup -# import re -# import time -# from urllib.parse import urlparse - -# def remove_duplicates(link_list): -# unique_links = [] -# for item in link_list: -# match = re.search("(?Phttps?://[^\s]+)", item) -# if match is not None and match.group("url") not in unique_links: -# unique_links.append(match.group("url")) -# return unique_links - -# def check_url_status(url): -# api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" -# response = requests.post(api_endpoint, data={'url': url}) - -# if response.status_code == 200: -# data = response.json() -# return data.get('query_status') == 'ok' and 'threat' in data -# else: -# print(f"API request failed for {url}") -# return False - -# visited_url=[] -# urls=[] -# def extract_and_check_urls(url,hostname,max_links=162): -# if url not in visited_url: -# visited_url.append(url) -# if hostname in url: -# source_code=requests.get(url) -# soup = BeautifulSoup(source_code.content, 'lxml') -# for link in soup.find_all('a', href=True): -# get_link=str(link.get('href')) -# if(len(urlparse(get_link).netloc)==0): -# get_link="http://"+hostname+"/"+get_link -# if(hostname in get_link): -# extract_and_check_urls(get_link,hostname) -# else: -# urls.append(str(link.get('href'))) -# if len(urls) >= max_links: -# break - - -# if __name__ == "__main__": -# base_url = "http://127.0.0.1:5500/index.html" -# parsed_uri=urlparse(base_url) -# extract_and_check_urls(base_url,parsed_uri.netloc) -# print("bad url in your site") -# for link in urls: -# if(check_url_status(link)): -# print(link) \ No newline at end of file diff --git a/artemis/modules/urlreputation.py b/artemis/modules/urlreputation.py deleted file mode 100644 index 532531bd9..000000000 --- a/artemis/modules/urlreputation.py +++ /dev/null @@ -1,110 +0,0 @@ -import requests -from bs4 import BeautifulSoup -import re -import time - -from karton.core import Task -from artemis.binds import TaskStatus, TaskType -from artemis.module_base import ArtemisBase -from artemis.task_utils import get_target_ip, get_target_url - -class URLReputation(ArtemisBase): - identity = "url_reputation" - filters = [] - - def remove_duplicates(self, link_list): - unique_links = [] - for item in link_list: - match = re.search("(?Phttps?://[^\s]+)", item) - if match is not None and match.group("url") not in unique_links: - unique_links.append(match.group("url")) - return unique_links - - def check_url_status(self, url): - api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" - response = requests.post(api_endpoint, data={'url': url}) - - if response.status_code == 200: - data = response.json() - return data.get('query_status') == 'ok' and 'threat' in data - else: - self.log.error(f"API request failed for {url}") - return False - - def extract_and_check_urls(self, base_url, max_links=162): - source_code = requests.get(base_url) - soup = BeautifulSoup(source_code.content, 'lxml') - - links = [str(link.get('href')) for link in soup.find_all('a', href=True)] - links = self.remove_duplicates(links)[:max_links] - - for url in links: - if self.check_url_status(url): - status = TaskStatus.VULNERABLE # Mark task as vulnerable - status_reason = "Malicious URL found on page" - self.db.save_task_result(task, status, status_reason, data={'url': url}) - - def run(self, task: Task) -> None: - target = get_target_url(task) or get_target_ip(task) # Works for URLs or IPs - self.log.info(f"URL Reputation module running on {target}") - - self.extract_and_check_urls(target) - -if __name__ == "__main__": - URLReputation().loop() - - - -# import requests -# from bs4 import BeautifulSoup -# import re -# import time -# from urllib.parse import urlparse - -# def remove_duplicates(link_list): -# unique_links = [] -# for item in link_list: -# match = re.search("(?Phttps?://[^\s]+)", item) -# if match is not None and match.group("url") not in unique_links: -# unique_links.append(match.group("url")) -# return unique_links - -# def check_url_status(url): -# api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" -# response = requests.post(api_endpoint, data={'url': url}) - -# if response.status_code == 200: -# data = response.json() -# return data.get('query_status') == 'ok' and 'threat' in data -# else: -# print(f"API request failed for {url}") -# return False - -# visited_url=[] -# urls=[] -# def extract_and_check_urls(url,hostname,max_links=162): -# if url not in visited_url: -# visited_url.append(url) -# if hostname in url: -# source_code=requests.get(url) -# soup = BeautifulSoup(source_code.content, 'lxml') -# for link in soup.find_all('a', href=True): -# get_link=str(link.get('href')) -# if(len(urlparse(get_link).netloc)==0): -# get_link="http://"+hostname+"/"+get_link -# if(hostname in get_link): -# extract_and_check_urls(get_link,hostname) -# else: -# urls.append(str(link.get('href'))) -# if len(urls) >= max_links: -# break - - -# if __name__ == "__main__": -# base_url = "http://127.0.0.1:5500/index.html" -# parsed_uri=urlparse(base_url) -# extract_and_check_urls(base_url,parsed_uri.netloc) -# print("bad url in your site") -# for link in urls: -# if(check_url_status(link)): -# print(link) \ No newline at end of file