Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

headless cloning using pyppeteer #294

Merged
merged 2 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions bin/clone
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import os
import sys
from datetime import datetime

from snare.cloner import Cloner
from snare.cloner import CloneRunner
from snare.utils import logger
from snare.utils.snare_helpers import check_privileges, print_color

Expand All @@ -32,6 +32,9 @@ def main():
parser.add_argument("--log_path", help="path to the log file")
parser.add_argument("--css-validate", help="enable css validation", default=False, action="store_true")
parser.add_argument("--path", help="path to save the page to be cloned", default="/opt/")
parser.add_argument(
"--headless", help="enable headless cloning using pyppeteer", default=False, action="store_true"
)
args = parser.parse_args()
default_path = os.path.join(args.path, "snare")

Expand All @@ -54,22 +57,22 @@ def main():
print_color(" Logs will be stored in {}".format(log_file), "INFO", end="\r")
cloner = None
start = datetime.now()
cloner = CloneRunner(args.target, int(args.max_depth), args.css_validate, default_path, args.headless)
if not cloner or not cloner.runner:
print_color("Error initializing cloner, please try again", "ERROR")
return
try:
cloner = Cloner(args.target, int(args.max_depth), args.css_validate, default_path)
loop.run_until_complete(cloner.get_root_host())
loop.run_until_complete(cloner.runner.get_root_host())
loop.run_until_complete(cloner.run())
except KeyboardInterrupt:
print_color("\nKeyboardInterrupt received... Quitting", "ERROR")
finally:
end = datetime.now() - start
if not cloner:
print_color("Error initializing cloner, please try again", "ERROR")
return
print("")
print_color("-" * 36 + ">SUMMARY<" + "-" * 36, "INFO")
print_color("\tTotal number of URLs cloned: {}".format(str(cloner.counter)), "INFO")
print_color("\tTotal number of URLs cloned: {}".format(str(cloner.runner.counter)), "INFO")
print_color("\tTime elapsed: {}".format(str(end)), "INFO")
print_color("\tCloned directory: {}".format(cloner.target_path), "INFO")
print_color("\tCloned directory: {}".format(cloner.runner.target_path), "INFO")
print_color("-" * 82, "INFO")


Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ beautifulsoup4==4.6.3
cssutils==1.0.2
gitpython==3.1.0
pycodestyle==2.4.0
pyppeteer==0.2.5
106 changes: 83 additions & 23 deletions snare/cloner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
from bs4 import BeautifulSoup
from asyncio import Queue
from collections import defaultdict
from pyppeteer import launch

animation = "|/-\\"


class Cloner(object):
class BaseCloner:
def __init__(self, root, max_depth, css_validate, default_path="/opt/snare"):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -150,7 +151,10 @@ def _make_filename(self, url):
hash_name = m.hexdigest()
return file_name, hash_name

async def get_body(self, session):
async def fetch_data(self, driver, current_url):
raise NotImplementedError

async def get_body(self, driver):
while not self.new_urls.empty():
print(animation[self.itr], end="\r")
self.itr = (self.itr + 1) % len(animation)
Expand All @@ -160,18 +164,7 @@ async def get_body(self, session):
self.visited_urls.append(current_url.human_repr())
file_name, hash_name = self._make_filename(current_url)
self.logger.debug("Cloned file: %s", file_name)
data = None
headers = []
content_type = None
try:
response = await session.get(current_url, headers={"Accept": "text/html"}, timeout=10.0)
headers = self.get_headers(response)
content_type = response.content_type
data = await response.read()
except (aiohttp.ClientError, asyncio.TimeoutError) as client_error:
self.logger.error(client_error)
else:
await response.release()
data, headers, content_type = await self.fetch_data(driver, current_url)

if data is not None:
self.meta[file_name]["hash"] = hash_name
Expand All @@ -192,8 +185,11 @@ async def get_body(self, session):
if carved_url.human_repr() not in self.visited_urls:
await self.new_urls.put((carved_url, level + 1))

with open(os.path.join(self.target_path, hash_name), "wb") as index_fh:
index_fh.write(data)
try:
with open(os.path.join(self.target_path, hash_name), "wb") as index_fh:
index_fh.write(data)
except TypeError:
await self.new_urls.put((current_url, level))

async def get_root_host(self):
try:
Expand All @@ -206,15 +202,79 @@ async def get_root_host(self):
self.logger.error("Can't connect to target host: %s", err)
exit(-1)


class SimpleCloner(BaseCloner):
async def fetch_data(self, session, current_url):
data = None
headers = []
content_type = None
try:
response = await session.get(current_url, headers={"Accept": "text/html"}, timeout=10.0)
headers = self.get_headers(response)
content_type = response.content_type
data = await response.read()
except (aiohttp.ClientError, asyncio.TimeoutError) as client_error:
self.logger.error(client_error)
else:
await response.release()
return [data, headers, content_type]


class HeadlessCloner(BaseCloner):
@staticmethod
def get_content_type(headers):
for header in headers:
for key, val in header.items():
if key.lower() == "content-type":
return val.split(";")[0]
return None

async def fetch_data(self, browser, current_url):
data = None
headers = []
content_type = None
page = None
try:
page = await browser.newPage()
response = await page.goto(str(current_url))
headers = self.get_headers(response)
content_type = self.get_content_type(headers)
data = await response.buffer()
except Exception as err:
self.logger.error(err)
finally:
if page:
await page.close()

return [data, headers, content_type]


class CloneRunner:
def __init__(self, root, max_depth, css_validate, default_path="/opt/snare", headless=False):
self.runner = None
if headless:
self.runner = HeadlessCloner(root, max_depth, css_validate, default_path)
else:
self.runner = SimpleCloner(root, max_depth, css_validate, default_path)
if not self.runner:
raise Exception("Error initializing cloner!")

async def run(self):
session = aiohttp.ClientSession()
if not self.runner:
raise Exception("Error initializing runner!")
driver = None
if type(self.runner) == SimpleCloner:
driver = aiohttp.ClientSession()
else:
driver = await launch()
try:
await self.new_urls.put((self.root, 0))
await self.new_urls.put((self.error_page, 0))
await self.get_body(session)
await self.runner.new_urls.put((self.runner.root, 0))
await self.runner.new_urls.put((self.runner.error_page, 0))
await self.runner.get_body(driver)
except KeyboardInterrupt:
raise
finally:
with open(os.path.join(self.target_path, "meta.json"), "w") as mj:
json.dump(self.meta, mj)
await session.close()
with open(os.path.join(self.runner.target_path, "meta.json"), "w") as mj:
json.dump(self.runner.meta, mj)
if driver:
await driver.close()