From 24c1a6ea7bf33944971dba6e66453316492aa276 Mon Sep 17 00:00:00 2001 From: Ridham Date: Tue, 26 Mar 2024 01:23:04 +0530 Subject: [PATCH 1/6] test code --- artemis/modules/urlreputation.py | 110 +++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 artemis/modules/urlreputation.py diff --git a/artemis/modules/urlreputation.py b/artemis/modules/urlreputation.py new file mode 100644 index 000000000..532531bd9 --- /dev/null +++ b/artemis/modules/urlreputation.py @@ -0,0 +1,110 @@ +import requests +from bs4 import BeautifulSoup +import re +import time + +from karton.core import Task +from artemis.binds import TaskStatus, TaskType +from artemis.module_base import ArtemisBase +from artemis.task_utils import get_target_ip, get_target_url + +class URLReputation(ArtemisBase): + identity = "url_reputation" + filters = [] + + def remove_duplicates(self, link_list): + unique_links = [] + for item in link_list: + match = re.search("(?Phttps?://[^\s]+)", item) + if match is not None and match.group("url") not in unique_links: + unique_links.append(match.group("url")) + return unique_links + + def check_url_status(self, url): + api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" + response = requests.post(api_endpoint, data={'url': url}) + + if response.status_code == 200: + data = response.json() + return data.get('query_status') == 'ok' and 'threat' in data + else: + self.log.error(f"API request failed for {url}") + return False + + def extract_and_check_urls(self, base_url, max_links=162): + source_code = requests.get(base_url) + soup = BeautifulSoup(source_code.content, 'lxml') + + links = [str(link.get('href')) for link in soup.find_all('a', href=True)] + links = self.remove_duplicates(links)[:max_links] + + for url in links: + if self.check_url_status(url): + status = TaskStatus.VULNERABLE # Mark task as vulnerable + status_reason = "Malicious URL found on page" + self.db.save_task_result(task, status, status_reason, data={'url': url}) + + def run(self, task: Task) -> None: + target = get_target_url(task) or get_target_ip(task) # Works for URLs or IPs + self.log.info(f"URL Reputation module running on {target}") + + self.extract_and_check_urls(target) + +if __name__ == "__main__": + URLReputation().loop() + + + +# import requests +# from bs4 import BeautifulSoup +# import re +# import time +# from urllib.parse import urlparse + +# def remove_duplicates(link_list): +# unique_links = [] +# for item in link_list: +# match = re.search("(?Phttps?://[^\s]+)", item) +# if match is not None and match.group("url") not in unique_links: +# unique_links.append(match.group("url")) +# return unique_links + +# def check_url_status(url): +# api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" +# response = requests.post(api_endpoint, data={'url': url}) + +# if response.status_code == 200: +# data = response.json() +# return data.get('query_status') == 'ok' and 'threat' in data +# else: +# print(f"API request failed for {url}") +# return False + +# visited_url=[] +# urls=[] +# def extract_and_check_urls(url,hostname,max_links=162): +# if url not in visited_url: +# visited_url.append(url) +# if hostname in url: +# source_code=requests.get(url) +# soup = BeautifulSoup(source_code.content, 'lxml') +# for link in soup.find_all('a', href=True): +# get_link=str(link.get('href')) +# if(len(urlparse(get_link).netloc)==0): +# get_link="http://"+hostname+"/"+get_link +# if(hostname in get_link): +# extract_and_check_urls(get_link,hostname) +# else: +# urls.append(str(link.get('href'))) +# if len(urls) >= max_links: +# break + + +# if __name__ == "__main__": +# base_url = "http://127.0.0.1:5500/index.html" +# parsed_uri=urlparse(base_url) +# extract_and_check_urls(base_url,parsed_uri.netloc) +# print("bad url in your site") +# for link in urls: +# if(check_url_status(link)): +# print(link) \ No newline at end of file From ed06fb107e007bc0286e4486acab9bcee0e6e751 Mon Sep 17 00:00:00 2001 From: RasenRhino Date: Wed, 27 Mar 2024 15:47:51 +0530 Subject: [PATCH 2/6] url reputation module --- artemis/modules/url_reputation.py | 140 ++++++++++++++++++++++++++++++ docker-compose.yaml | 10 +++ 2 files changed, 150 insertions(+) create mode 100644 artemis/modules/url_reputation.py diff --git a/artemis/modules/url_reputation.py b/artemis/modules/url_reputation.py new file mode 100644 index 000000000..105057eb2 --- /dev/null +++ b/artemis/modules/url_reputation.py @@ -0,0 +1,140 @@ +import requests +from bs4 import BeautifulSoup +import re +import time +from urllib.parse import urlparse +from karton.core import Task +from artemis.binds import Service, TaskStatus, TaskType +from artemis.module_base import ArtemisBase +from artemis.task_utils import get_target_url + +class URLReputation(ArtemisBase): + identity = "url_reputation" + filters = [] + visited_url=[] + urls=[] + filters = [ + {"type": TaskType.SERVICE.value, "service": Service.HTTP.value}, + ] + + def remove_duplicates(self, link_list): + unique_links = [] + for item in link_list: + match = re.search("(?Phttps?://[^\s]+)", item) + if match is not None and match.group("url") not in unique_links: + unique_links.append(match.group("url")) + return unique_links + + def check_url_status(url): + api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" + response = requests.post(api_endpoint, data={'url': url}) + + if response.status_code == 200: + data = response.json() + return data.get('query_status') == 'ok' and 'threat' in data + else: + print(f"API request failed for {url}") + return False + + + def extract_and_check_urls(url,hostname,max_links=162): + if url not in visited_url: + visited_url.append(url) + if hostname in url: + source_code=requests.get(url) + soup = BeautifulSoup(source_code.content, 'lxml') + for link in soup.find_all('a', href=True): + get_link=str(link.get('href')) + if(len(urlparse(get_link).netloc)==0): + get_link="http://"+hostname+"/"+get_link + if(hostname in get_link): + extract_and_check_urls(get_link,hostname) + else: + urls.append(str(link.get('href'))) + if len(urls) >= max_links: + break + + + + def run(self, task: Task) -> None: + target = get_target_url(task) + self.log.info(f"URL Reputation module running on {target}") + self.extract_and_check_urls(target) + if len(urls) == 0: + # On the default task result view only the interesting task results will be displayed + status = TaskStatus.INTERESTING + status_reason = "no url found" + else: + status = TaskStatus.OK + status_reason = "some url found" + print("UUUUURRRRRRLLLLL LIST",end=":") + print(urls) + self.db.save_task_result( + task=task, + status=status, + status_reason=status_reason, + # In the data dictionary, you may provide any additional results - the user will be able to view them + # in the interface on the single task result page. + data={"url":"someurl"}, + ) + + + +if __name__ == "__main__": + URLReputation().loop() + + + +# import requests +# from bs4 import BeautifulSoup +# import re +# import time +# from urllib.parse import urlparse + +# def remove_duplicates(link_list): +# unique_links = [] +# for item in link_list: +# match = re.search("(?Phttps?://[^\s]+)", item) +# if match is not None and match.group("url") not in unique_links: +# unique_links.append(match.group("url")) +# return unique_links + +# def check_url_status(url): +# api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" +# response = requests.post(api_endpoint, data={'url': url}) + +# if response.status_code == 200: +# data = response.json() +# return data.get('query_status') == 'ok' and 'threat' in data +# else: +# print(f"API request failed for {url}") +# return False + +# visited_url=[] +# urls=[] +# def extract_and_check_urls(url,hostname,max_links=162): +# if url not in visited_url: +# visited_url.append(url) +# if hostname in url: +# source_code=requests.get(url) +# soup = BeautifulSoup(source_code.content, 'lxml') +# for link in soup.find_all('a', href=True): +# get_link=str(link.get('href')) +# if(len(urlparse(get_link).netloc)==0): +# get_link="http://"+hostname+"/"+get_link +# if(hostname in get_link): +# extract_and_check_urls(get_link,hostname) +# else: +# urls.append(str(link.get('href'))) +# if len(urls) >= max_links: +# break + + +# if __name__ == "__main__": +# base_url = "http://127.0.0.1:5500/index.html" +# parsed_uri=urlparse(base_url) +# extract_and_check_urls(base_url,parsed_uri.netloc) +# print("bad url in your site") +# for link in urls: +# if(check_url_status(link)): +# print(link) \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index b70d498df..17aca562c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -177,6 +177,16 @@ services: restart: always volumes: ["./docker/karton.ini:/etc/karton/karton.ini", "${DOCKER_COMPOSE_ADDITIONAL_SHARED_DIRECTORY:-./shared}:/shared/"] + karton-url_reputation: + build: + context: . + dockerfile: docker/Dockerfile + command: "python3 -m artemis.modules.url_reputation" + depends_on: [karton-system] + env_file: .env + restart: always + volumes: ["./docker/karton.ini:/etc/karton/karton.ini", "${DOCKER_COMPOSE_ADDITIONAL_SHARED_DIRECTORY:-./shared}:/shared/"] + karton-ftp_bruter: build: context: . From f0be5898f9662f924914680e32963529152a2166 Mon Sep 17 00:00:00 2001 From: RasenRhino Date: Wed, 27 Mar 2024 15:53:37 +0530 Subject: [PATCH 3/6] url reputation module --- artemis/modules/url_reputation.py | 54 --------------- artemis/modules/urlreputation.py | 110 ------------------------------ 2 files changed, 164 deletions(-) delete mode 100644 artemis/modules/urlreputation.py diff --git a/artemis/modules/url_reputation.py b/artemis/modules/url_reputation.py index 105057eb2..7948cfdc4 100644 --- a/artemis/modules/url_reputation.py +++ b/artemis/modules/url_reputation.py @@ -84,57 +84,3 @@ def run(self, task: Task) -> None: URLReputation().loop() - -# import requests -# from bs4 import BeautifulSoup -# import re -# import time -# from urllib.parse import urlparse - -# def remove_duplicates(link_list): -# unique_links = [] -# for item in link_list: -# match = re.search("(?Phttps?://[^\s]+)", item) -# if match is not None and match.group("url") not in unique_links: -# unique_links.append(match.group("url")) -# return unique_links - -# def check_url_status(url): -# api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" -# response = requests.post(api_endpoint, data={'url': url}) - -# if response.status_code == 200: -# data = response.json() -# return data.get('query_status') == 'ok' and 'threat' in data -# else: -# print(f"API request failed for {url}") -# return False - -# visited_url=[] -# urls=[] -# def extract_and_check_urls(url,hostname,max_links=162): -# if url not in visited_url: -# visited_url.append(url) -# if hostname in url: -# source_code=requests.get(url) -# soup = BeautifulSoup(source_code.content, 'lxml') -# for link in soup.find_all('a', href=True): -# get_link=str(link.get('href')) -# if(len(urlparse(get_link).netloc)==0): -# get_link="http://"+hostname+"/"+get_link -# if(hostname in get_link): -# extract_and_check_urls(get_link,hostname) -# else: -# urls.append(str(link.get('href'))) -# if len(urls) >= max_links: -# break - - -# if __name__ == "__main__": -# base_url = "http://127.0.0.1:5500/index.html" -# parsed_uri=urlparse(base_url) -# extract_and_check_urls(base_url,parsed_uri.netloc) -# print("bad url in your site") -# for link in urls: -# if(check_url_status(link)): -# print(link) \ No newline at end of file diff --git a/artemis/modules/urlreputation.py b/artemis/modules/urlreputation.py deleted file mode 100644 index 532531bd9..000000000 --- a/artemis/modules/urlreputation.py +++ /dev/null @@ -1,110 +0,0 @@ -import requests -from bs4 import BeautifulSoup -import re -import time - -from karton.core import Task -from artemis.binds import TaskStatus, TaskType -from artemis.module_base import ArtemisBase -from artemis.task_utils import get_target_ip, get_target_url - -class URLReputation(ArtemisBase): - identity = "url_reputation" - filters = [] - - def remove_duplicates(self, link_list): - unique_links = [] - for item in link_list: - match = re.search("(?Phttps?://[^\s]+)", item) - if match is not None and match.group("url") not in unique_links: - unique_links.append(match.group("url")) - return unique_links - - def check_url_status(self, url): - api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" - response = requests.post(api_endpoint, data={'url': url}) - - if response.status_code == 200: - data = response.json() - return data.get('query_status') == 'ok' and 'threat' in data - else: - self.log.error(f"API request failed for {url}") - return False - - def extract_and_check_urls(self, base_url, max_links=162): - source_code = requests.get(base_url) - soup = BeautifulSoup(source_code.content, 'lxml') - - links = [str(link.get('href')) for link in soup.find_all('a', href=True)] - links = self.remove_duplicates(links)[:max_links] - - for url in links: - if self.check_url_status(url): - status = TaskStatus.VULNERABLE # Mark task as vulnerable - status_reason = "Malicious URL found on page" - self.db.save_task_result(task, status, status_reason, data={'url': url}) - - def run(self, task: Task) -> None: - target = get_target_url(task) or get_target_ip(task) # Works for URLs or IPs - self.log.info(f"URL Reputation module running on {target}") - - self.extract_and_check_urls(target) - -if __name__ == "__main__": - URLReputation().loop() - - - -# import requests -# from bs4 import BeautifulSoup -# import re -# import time -# from urllib.parse import urlparse - -# def remove_duplicates(link_list): -# unique_links = [] -# for item in link_list: -# match = re.search("(?Phttps?://[^\s]+)", item) -# if match is not None and match.group("url") not in unique_links: -# unique_links.append(match.group("url")) -# return unique_links - -# def check_url_status(url): -# api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" -# response = requests.post(api_endpoint, data={'url': url}) - -# if response.status_code == 200: -# data = response.json() -# return data.get('query_status') == 'ok' and 'threat' in data -# else: -# print(f"API request failed for {url}") -# return False - -# visited_url=[] -# urls=[] -# def extract_and_check_urls(url,hostname,max_links=162): -# if url not in visited_url: -# visited_url.append(url) -# if hostname in url: -# source_code=requests.get(url) -# soup = BeautifulSoup(source_code.content, 'lxml') -# for link in soup.find_all('a', href=True): -# get_link=str(link.get('href')) -# if(len(urlparse(get_link).netloc)==0): -# get_link="http://"+hostname+"/"+get_link -# if(hostname in get_link): -# extract_and_check_urls(get_link,hostname) -# else: -# urls.append(str(link.get('href'))) -# if len(urls) >= max_links: -# break - - -# if __name__ == "__main__": -# base_url = "http://127.0.0.1:5500/index.html" -# parsed_uri=urlparse(base_url) -# extract_and_check_urls(base_url,parsed_uri.netloc) -# print("bad url in your site") -# for link in urls: -# if(check_url_status(link)): -# print(link) \ No newline at end of file From f3f2e72b56edc6321a0145f14567f8a7a1545aab Mon Sep 17 00:00:00 2001 From: Ridham Date: Thu, 28 Mar 2024 03:34:07 +0530 Subject: [PATCH 4/6] fixing base.info issue --- artemis/modules/url_reputation.py | 86 ------------------------------- artemis/templating.py | 6 ++- docker-compose.yaml | 10 ---- 3 files changed, 5 insertions(+), 97 deletions(-) delete mode 100644 artemis/modules/url_reputation.py diff --git a/artemis/modules/url_reputation.py b/artemis/modules/url_reputation.py deleted file mode 100644 index 7948cfdc4..000000000 --- a/artemis/modules/url_reputation.py +++ /dev/null @@ -1,86 +0,0 @@ -import requests -from bs4 import BeautifulSoup -import re -import time -from urllib.parse import urlparse -from karton.core import Task -from artemis.binds import Service, TaskStatus, TaskType -from artemis.module_base import ArtemisBase -from artemis.task_utils import get_target_url - -class URLReputation(ArtemisBase): - identity = "url_reputation" - filters = [] - visited_url=[] - urls=[] - filters = [ - {"type": TaskType.SERVICE.value, "service": Service.HTTP.value}, - ] - - def remove_duplicates(self, link_list): - unique_links = [] - for item in link_list: - match = re.search("(?Phttps?://[^\s]+)", item) - if match is not None and match.group("url") not in unique_links: - unique_links.append(match.group("url")) - return unique_links - - def check_url_status(url): - api_endpoint = "https://urlhaus-api.abuse.ch/v1/url/" - response = requests.post(api_endpoint, data={'url': url}) - - if response.status_code == 200: - data = response.json() - return data.get('query_status') == 'ok' and 'threat' in data - else: - print(f"API request failed for {url}") - return False - - - def extract_and_check_urls(url,hostname,max_links=162): - if url not in visited_url: - visited_url.append(url) - if hostname in url: - source_code=requests.get(url) - soup = BeautifulSoup(source_code.content, 'lxml') - for link in soup.find_all('a', href=True): - get_link=str(link.get('href')) - if(len(urlparse(get_link).netloc)==0): - get_link="http://"+hostname+"/"+get_link - if(hostname in get_link): - extract_and_check_urls(get_link,hostname) - else: - urls.append(str(link.get('href'))) - if len(urls) >= max_links: - break - - - - def run(self, task: Task) -> None: - target = get_target_url(task) - self.log.info(f"URL Reputation module running on {target}") - self.extract_and_check_urls(target) - if len(urls) == 0: - # On the default task result view only the interesting task results will be displayed - status = TaskStatus.INTERESTING - status_reason = "no url found" - else: - status = TaskStatus.OK - status_reason = "some url found" - print("UUUUURRRRRRLLLLL LIST",end=":") - print(urls) - self.db.save_task_result( - task=task, - status=status, - status_reason=status_reason, - # In the data dictionary, you may provide any additional results - the user will be able to view them - # in the interface on the single task result page. - data={"url":"someurl"}, - ) - - - -if __name__ == "__main__": - URLReputation().loop() - - diff --git a/artemis/templating.py b/artemis/templating.py index 61b317d19..6f1e7c31c 100644 --- a/artemis/templating.py +++ b/artemis/templating.py @@ -17,10 +17,14 @@ def dedent(text: str) -> str: + if(text==None): + return '' return textwrap.dedent(text) - + def render_markdown(markdown_text: str) -> str: + if(markdown_text==None): + return '' return markdown.markdown(markdown_text) diff --git a/docker-compose.yaml b/docker-compose.yaml index 82777d405..6bedd3cac 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -175,16 +175,6 @@ services: restart: always volumes: ["./docker/karton.ini:/etc/karton/karton.ini", "${DOCKER_COMPOSE_ADDITIONAL_SHARED_DIRECTORY:-./shared}:/shared/"] - karton-url_reputation: - build: - context: . - dockerfile: docker/Dockerfile - command: "python3 -m artemis.modules.url_reputation" - depends_on: [karton-system] - env_file: .env - restart: always - volumes: ["./docker/karton.ini:/etc/karton/karton.ini", "${DOCKER_COMPOSE_ADDITIONAL_SHARED_DIRECTORY:-./shared}:/shared/"] - karton-ftp_bruter: build: context: . From 9733474222c943614730c56bbd9c9c02991f18fa Mon Sep 17 00:00:00 2001 From: Ridham Date: Fri, 29 Mar 2024 18:30:03 +0530 Subject: [PATCH 5/6] added requested changes --- artemis/templating.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/artemis/templating.py b/artemis/templating.py index 6f1e7c31c..660275068 100644 --- a/artemis/templating.py +++ b/artemis/templating.py @@ -17,13 +17,13 @@ def dedent(text: str) -> str: - if(text==None): + if not text: return '' return textwrap.dedent(text) def render_markdown(markdown_text: str) -> str: - if(markdown_text==None): + if not markdown_text: return '' return markdown.markdown(markdown_text) From c951ba1652857c1394f9ec09c9709dd125f5c398 Mon Sep 17 00:00:00 2001 From: Ridham Date: Fri, 29 Mar 2024 18:57:01 +0530 Subject: [PATCH 6/6] ran the linter --- artemis/templating.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/artemis/templating.py b/artemis/templating.py index 660275068..d428393f5 100644 --- a/artemis/templating.py +++ b/artemis/templating.py @@ -18,13 +18,13 @@ def dedent(text: str) -> str: if not text: - return '' + return "" return textwrap.dedent(text) - + def render_markdown(markdown_text: str) -> str: if not markdown_text: - return '' + return "" return markdown.markdown(markdown_text)