Skip to content

Commit

Permalink
Test parse_tanner_response (#135)
Browse files Browse the repository at this point in the history
* Test handle_error

* Suggested changes including other tests

* minor change

* pep8

* test directory fix

* minor change

* Small change

* Changes

* unique directory creater

* minor changes

* Suggested changes

* Suggested changes

* Suggested changes
  • Loading branch information
viskey98 authored and afeena committed May 23, 2018
1 parent bc91325 commit 65771a6
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 53 deletions.
10 changes: 5 additions & 5 deletions tests/test_add_meta_tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
import snare
import shutil
import configparser
from utils.page_path_generator import generate_unique_path


class TestAddMetaTag(unittest.TestCase):

def setUp(self):
if not os.path.exists("/opt/snare/pages/test"):
os.makedirs("/opt/snare/pages/test")
self.main_page_path = generate_unique_path()
os.makedirs(self.main_page_path)
self.content = '<html><head>title</head><body>sample</body></html>'
self.page_dir = "test"
self.page_dir = self.main_page_path.rsplit('/')[-1]
self.index_page = "index.html"
self.main_page_path = '/opt/snare/pages/test'
with open(os.path.join(self.main_page_path, 'index.html'), 'w') as f:
f.write(self.content)

Expand All @@ -29,4 +29,4 @@ def test_add_meta_tag(self):
soup.find("meta", attrs={"name": "msvalidate.01"}))

def tearDown(self):
shutil.rmtree("/opt/snare/pages/test")
shutil.rmtree(self.main_page_path)
33 changes: 9 additions & 24 deletions tests/test_get_dorks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,9 @@
import shutil
import yarl
import os
from utils.asyncmock import AsyncMock
from snare import HttpRequestHandler


class AsyncMock(Mock): # custom function defined to mock asyncio coroutines

def __call__(self, *args, **kwargs):
sup = super(AsyncMock, self)

async def coro():
return sup.__call__(*args, **kwargs)
return coro()

def __await__(self):
return self().__await__()
from utils.page_path_generator import generate_unique_path


class TestGetDorks(unittest.TestCase):
Expand All @@ -28,19 +17,19 @@ def setUp(self):
run_args = argparse.ArgumentParser()
run_args.add_argument("--tanner")
run_args.add_argument("--page-dir")
if not os.path.exists("/opt/snare/pages/test"):
os.makedirs("/opt/snare/pages/test")
self.args = run_args.parse_args(['--tanner', 'test'])
self.args = run_args.parse_args(['--page-dir', 'test'])
self.main_page_path = generate_unique_path()
os.makedirs(self.main_page_path)
self.page_dir = self.main_page_path.rsplit('/')[-1]
self.args = run_args.parse_args(['--page-dir', self.page_dir])
self.dorks = dict(response={'dorks': "test_dorks"})
self.loop = asyncio.new_event_loop()
aiohttp.ClientSession.get = AsyncMock(
return_value=aiohttp.ClientResponse(url=yarl.URL("http://www.example.com"), method="GET")
)

def test_get_dorks(self):
self.handler = HttpRequestHandler(self.meta, self.args)
self.handler.run_args.tanner = "tanner.mushmush.org"

def test_get_dorks(self):
aiohttp.ClientResponse.json = AsyncMock(return_value=dict(response={'dorks': "test_dorks"}))

async def test():
Expand All @@ -49,8 +38,6 @@ async def test():
aiohttp.ClientSession.get.assert_called_with('http://tanner.mushmush.org:8090/dorks')

def test_return_dorks(self):
self.handler = HttpRequestHandler(self.meta, self.args)
self.handler.run_args.tanner = "tanner.mushmush.org"
aiohttp.ClientResponse.json = AsyncMock(return_value=self.dorks)

async def test():
Expand All @@ -59,8 +46,6 @@ async def test():
self.assertEquals(self.data, self.dorks['response']['dorks'])

def test_return_dorks_exception(self):
self.handler = HttpRequestHandler(self.meta, self.args)
self.handler.run_args.tanner = "tanner.mushmush.org"
aiohttp.ClientResponse.json = AsyncMock(side_effect=Exception())

async def test():
Expand All @@ -69,4 +54,4 @@ async def test():
self.loop.run_until_complete(test())

def tearDown(self):
shutil.rmtree("/opt/snare/pages/test")
shutil.rmtree(self.main_page_path)
113 changes: 113 additions & 0 deletions tests/test_parse_tanner_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import unittest
from unittest.mock import Mock
from unittest.mock import call
import asyncio
import argparse
import aiohttp
import shutil
import os
import json
import yarl
from aiohttp.protocol import HttpVersion
from utils.asyncmock import AsyncMock
from snare import HttpRequestHandler
from utils.page_path_generator import generate_unique_path
from urllib.parse import unquote


class TestParseTannerResponse(unittest.TestCase):
def setUp(self):
run_args = argparse.ArgumentParser()
run_args.add_argument("--tanner")
run_args.add_argument("--page-dir")
self.main_page_path = generate_unique_path()
os.makedirs(self.main_page_path)
self.page_dir = self.main_page_path.rsplit('/')[-1]
self.meta_content = {"/index.html": {"hash": "hash_name", "content_type": "text/html"}}
self.page_content = "<html><body></body></html>"
self.content_type = "text/html"
with open(os.path.join(self.main_page_path, "hash_name"), 'w') as f:
f.write(self.page_content)
with open(os.path.join(self.main_page_path, "meta.json"), 'w') as f:
json.dump(self.meta_content, f)
self.args = run_args.parse_args(['--page-dir', self.page_dir])
self.requested_name = '/'
self.loop = asyncio.new_event_loop()
self.handler = HttpRequestHandler(self.meta_content, self.args)
self.handler.run_args.index_page = '/index.html'
self.handler.handle_html_content = AsyncMock(return_value=self.page_content)

def test_parse_type_one(self):
self.detection = {"type": 1}
self.call_content = b'<html><body></body></html>'
self.expected_content = self.page_content

async def test():
(self.res1, self.res2,
self.res3, self.res4) = await self.handler.parse_tanner_response(self.requested_name, self.detection)
self.loop.run_until_complete(test())
real_result = [self.res1, self.res2, self.res3, self.res4]
expected_result = [self.page_content, self.content_type, {}, 200]
self.assertCountEqual(real_result, expected_result)

def test_parse_type_two(self):
self.detection = {
"type": 2,
"payload": {
"page": "/index.html",
"value": "test"
}
}
self.expected_content = b'<html><body><div>test</div></body></html>'

async def test():
(self.res1, self.res2,
self.res3, self.res4) = await self.handler.parse_tanner_response(self.requested_name, self.detection)
self.loop.run_until_complete(test())
real_result = [self.res1, self.res2, self.res3, self.res4]
expected_result = [self.expected_content, self.content_type, {}, 200]
self.assertCountEqual(real_result, expected_result)

def test_parse_type_three(self):
self.detection = {
"type": 3,
"payload": {
"page": "/index.html",
"value": "test",
"status_code": 200
}
}
self.expected_content = None

async def test():
(self.res1, self.res2,
self.res3, self.res4) = await self.handler.parse_tanner_response(self.requested_name, self.detection)
self.loop.run_until_complete(test())
real_result = [self.res1, self.res2, self.res3, self.res4]
expected_result = [self.expected_content, None, {}, 200]
self.assertCountEqual(real_result, expected_result)

def test_call_handle_html(self):
self.detection = {"type": 1}
self.call_content = b'<html><body></body></html>'
self.expected_content = self.page_content

async def test():
(self.res1, self.res2,
self.res3, self.res4) = await self.handler.parse_tanner_response(self.requested_name, self.detection)
self.loop.run_until_complete(test())
self.handler.handle_html_content.assert_called_with(self.call_content)

def test_parse_exception(self):
self.detection = {}
self.call_content = b'<html><body></body></html>'
self.expected_content = self.page_content

async def test():
(self.res1, self.res2,
self.res3, self.res4) = await self.handler.parse_tanner_response(self.requested_name, self.detection)
with self.assertRaises(KeyError):
self.loop.run_until_complete(test())

def tearDown(self):
shutil.rmtree(self.main_page_path)
33 changes: 9 additions & 24 deletions tests/test_submit_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,9 @@
import os
import json
import yarl
from utils.asyncmock import AsyncMock
from snare import HttpRequestHandler


class AsyncMock(Mock): # custom function defined to mock asyncio coroutines

def __call__(self, *args, **kwargs):
sup = super(AsyncMock, self)

async def coro():
return sup.__call__(*args, **kwargs)
return coro()

def __await__(self):
return self().__await__()
from utils.page_path_generator import generate_unique_path


class TestSubmitData(unittest.TestCase):
Expand All @@ -29,10 +18,10 @@ def setUp(self):
run_args = argparse.ArgumentParser()
run_args.add_argument("--tanner")
run_args.add_argument("--page-dir")
if not os.path.exists("/opt/snare/pages/test"):
os.makedirs("/opt/snare/pages/test")
self.args = run_args.parse_args(['--tanner', 'test'])
self.args = run_args.parse_args(['--page-dir', 'test'])
self.main_page_path = generate_unique_path()
os.makedirs(self.main_page_path)
self.page_dir = self.main_page_path.rsplit('/')[-1]
self.args = run_args.parse_args(['--page-dir', self.page_dir])
self.loop = asyncio.new_event_loop()
self.data = {
'method': 'GET', 'path': '/',
Expand All @@ -47,10 +36,10 @@ def setUp(self):
aiohttp.ClientSession.post = AsyncMock(
return_value=aiohttp.ClientResponse(url=yarl.URL("http://www.example.com"), method="GET")
)

def test_post_data(self):
self.handler = HttpRequestHandler(self.meta, self.args)
self.handler.run_args.tanner = "tanner.mushmush.org"

def test_post_data(self):
aiohttp.ClientResponse.json = AsyncMock(return_value=dict(detection={'type': 1}, sess_uuid="test_uuid"))

async def test():
Expand All @@ -61,8 +50,6 @@ async def test():
)

def test_event_result(self):
self.handler = HttpRequestHandler(self.meta, self.args)
self.handler.run_args.tanner = "tanner.mushmush.org"
aiohttp.ClientResponse.json = AsyncMock(return_value=dict(detection={'type': 1}, sess_uuid="test_uuid"))

async def test():
Expand All @@ -71,8 +58,6 @@ async def test():
self.assertEquals(self.result, dict(detection={'type': 1}, sess_uuid="test_uuid"))

def test_event_result_exception(self):
self.handler = HttpRequestHandler(self.meta, self.args)
self.handler.run_args.tanner = "tanner.mushmush.org"
aiohttp.ClientResponse.json = AsyncMock(side_effect=Exception())

async def test():
Expand All @@ -81,4 +66,4 @@ async def test():
self.loop.run_until_complete(test())

def tearDown(self):
shutil.rmtree("/opt/snare/pages/test")
shutil.rmtree(self.main_page_path)
15 changes: 15 additions & 0 deletions utils/asyncmock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import asyncio
from unittest.mock import Mock


class AsyncMock(Mock): # custom function defined to mock asyncio coroutines

def __call__(self, *args, **kwargs):
sup = super(AsyncMock, self)

async def coro():
return sup.__call__(*args, **kwargs)
return coro()

def __await__(self):
return self().__await__()
14 changes: 14 additions & 0 deletions utils/page_path_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import string
import random
import os


def directory_generator(size=9, chars=string.ascii_lowercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))


def generate_unique_path():
path = '/opt/snare/pages/' + directory_generator()
while(os.path.exists(path)):
path = '/opt/snare/pages/' + directory_generator()
return path

0 comments on commit 65771a6

Please sign in to comment.