Skip to content

Commit

Permalink
make analysis of session a separate background task (#395)
Browse files Browse the repository at this point in the history
* make analysis of session a separate background task

* Proper imports

* get keys value from redis

* Cancel the analyze task

* add analyze timeout so the the task doesn't run continuously

Also drop the usage of queue during analyze function

* update the exception message

* Update tests

* Remove threading completely

* update thread loops
  • Loading branch information
Mehtab Zafar authored Jul 31, 2020
1 parent f5b4af0 commit 336a87e
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 94 deletions.
1 change: 1 addition & 0 deletions tanner/data/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,4 @@ REMOTE_DOCKERFILE:

SESSIONS:
delete_timeout: 300
analyze_timeout: 300
32 changes: 25 additions & 7 deletions tanner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from aiohttp import web

from tanner import dorks_manager, redis_client, postgres_client, dbutils
from tanner.sessions import session_manager
from tanner.sessions import session_manager, session_analyzer
from tanner.config import TannerConfig
from tanner.emulators import base
from tanner.reporting.log_local import Reporting as local_report
Expand All @@ -25,7 +25,9 @@ def __init__(self):
db_name = TannerConfig.get("SQLI", "db_name")

self.session_manager = session_manager.SessionManager()
self.session_analyzer = session_analyzer.SessionAnalyzer()
self.delete_timeout = TannerConfig.get("SESSIONS", "delete_timeout")
self.analyze_timeout = TannerConfig.get("SESSIONS", "analyze_timeout")

self.dorks = dorks_manager.DorksManager()
self.base_handler = base.BaseHandler(base_dir, db_name)
Expand Down Expand Up @@ -66,7 +68,7 @@ async def handle_event(self, request):
self.logger.info("Requested path %s", path)
await self.dorks.extract_path(path, self.redis_client)
detection = await self.base_handler.handle(data, session)
session.set_attack_type(data['path'], detection["name"])
session.set_attack_type(data["path"], detection["name"])

response_msg = self._make_response(
msg=dict(detection=detection, sess_uuid=session.get_uuid())
Expand Down Expand Up @@ -103,7 +105,9 @@ async def handle_version(self, request):
return web.json_response(response_msg)

async def on_shutdown(self, app):
await self.session_manager.delete_sessions_on_shutdown(self.redis_client, self.pg_client)
await self.session_manager.delete_sessions_on_shutdown(
self.redis_client, self.pg_client
)
self.redis_client.close()
await self.redis_client.wait_closed()
self.pg_client.close()
Expand All @@ -112,11 +116,21 @@ async def on_shutdown(self, app):
async def delete_sessions(self):
try:
while True:
await self.session_manager.delete_old_sessions(self.redis_client, self.pg_client)
await self.session_manager.delete_old_sessions(
self.redis_client, self.pg_client
)
await asyncio.sleep(self.delete_timeout)
except asyncio.CancelledError:
pass

async def analyze_sessions(self):
try:
while True:
await self.session_analyzer.analyze(self.redis_client, self.pg_client)
await asyncio.sleep(self.analyze_timeout)
except asyncio.CancelledError:
pass

def setup_routes(self, app):
app.router.add_route("*", "/", self.default_handler)
app.router.add_post("/event", self.handle_event)
Expand All @@ -129,12 +143,17 @@ def create_app(self, loop):
self.setup_routes(app)
return app

async def start_background_analyze(self, app):
app["session_analyze"] = asyncio.ensure_future(self.analyze_sessions())

async def start_background_delete(self, app):
app["session_delete"] = asyncio.ensure_future(self.delete_sessions())

async def cleanup_background_tasks(self, app):
app["session_delete"].cancel()
await app["session_delete"]
app["session_analyze"].cancel()
await app["session_analyze"]

def start(self):
loop = asyncio.get_event_loop()
Expand All @@ -144,11 +163,10 @@ def start(self):
self.pg_client = loop.run_until_complete(
postgres_client.PostgresClient().get_pg_client()
)
loop.run_until_complete(
dbutils.DBUtils.create_data_tables(self.pg_client)
)
loop.run_until_complete(dbutils.DBUtils.create_data_tables(self.pg_client))

app = self.create_app(loop)
app.on_startup.append(self.start_background_analyze)
app.on_startup.append(self.start_background_delete)
app.on_cleanup.append(self.cleanup_background_tasks)

Expand Down
201 changes: 116 additions & 85 deletions tanner/sessions/session_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import socket
import psycopg2
from datetime import datetime
from geoip2.database import Reader
import geoip2
Expand All @@ -13,73 +14,93 @@

class SessionAnalyzer:
def __init__(self, loop=None):
self._loop = loop if loop is not None else asyncio.get_event_loop()
self.queue = asyncio.Queue(loop=self._loop)
self.logger = logging.getLogger('tanner.session_analyzer.SessionAnalyzer')
self.attacks = ['sqli', 'rfi', 'lfi', 'xss', 'php_code_injection', 'cmd_exec', 'crlf']

async def analyze(self, session_key, redis_client, pg_client):
session = None
await asyncio.sleep(1, loop=self._loop)
self.logger = logging.getLogger("tanner.session_analyzer.SessionAnalyzer")
self.attacks = [
"sqli",
"rfi",
"lfi",
"xss",
"php_code_injection",
"cmd_exec",
"crlf",
]

async def analyze(self, redis_client, pg_client):
"""Perform analysis on the sessions, store the analyzed
session in postgres and then delete that session from redis.
"""
_loop = asyncio.get_running_loop()
sessions = None
await asyncio.sleep(1, loop=_loop)

try:
session = await redis_client.get(session_key, encoding='utf-8')
session = json.loads(session)
keys = await redis_client.keys("[0-9a-f]*")
except (aioredis.ProtocolError, TypeError, ValueError) as error:
self.logger.exception('Can\'t get session for analyze: %s', error)
self.logger.exception("Can't get session for analyze: %s", error)
else:
result = await self.create_stats(session, redis_client)
await self.queue.put(result)
await self.save_session(redis_client, pg_client)

async def save_session(self, redis_client, pg_client):
while not self.queue.empty():
session = await self.queue.get()
del_key = session["sess_uuid"]

await DBUtils.add_analyzed_data(session, pg_client)

try:
await redis_client.delete(*[del_key])
except aioredis.ProtocolError as redis_error:
self.logger.exception(
"Error with redis. Session will be returned to the queue: %s",
redis_error,
)
self.queue.put(session)
for key in keys:
try:
session = await redis_client.get(key, encoding="utf-8")
session = json.loads(session)

result = await self.create_stats(session, redis_client)

del_key = result["sess_uuid"]
try:
await DBUtils.add_analyzed_data(result, pg_client)
await redis_client.delete(*[del_key])
except psycopg2.ProgrammingError as pg_error:
self.logger.exception(
"Error with Postgres: %s. Session with session-id %s will not be added to postgres",
pg_error,
key,
)
except aioredis.ProtocolError as redis_error:
self.logger.exception(
"Error with redis: %s. Session with session-id %s will not be removed from redis.",
redis_error,
key
)
# This is the key which stores all the dorks.
# It matches the pattern of other keys.
except aioredis.errors.ReplyError:
continue

async def create_stats(self, session, redis_client):
sess_duration = session['end_time'] - session['start_time']
sess_duration = session["end_time"] - session["start_time"]
referer = None
if sess_duration != 0:
rps = session['count'] / sess_duration
rps = session["count"] / sess_duration
else:
rps = 0
location_info = await self._loop.run_in_executor(
None, self.find_location, session['peer']['ip']
_loop = asyncio.get_running_loop()
location_info = await _loop.run_in_executor(
None, self.find_location, session["peer"]["ip"]
)
tbr, errors, hidden_links, attack_types = await self.analyze_paths(
session["paths"], redis_client
)
tbr, errors, hidden_links, attack_types = await self.analyze_paths(session['paths'],
redis_client)
attack_count = self.set_attack_count(attack_types)

stats = dict(
sess_uuid=session['sess_uuid'],
peer_ip=session['peer']['ip'],
peer_port=session['peer']['port'],
sess_uuid=session["sess_uuid"],
peer_ip=session["peer"]["ip"],
peer_port=session["peer"]["port"],
location=location_info,
user_agent=session['user_agent'],
snare_uuid=session['snare_uuid'],
start_time=session['start_time'],
end_time=session['end_time'],
user_agent=session["user_agent"],
snare_uuid=session["snare_uuid"],
start_time=session["start_time"],
end_time=session["end_time"],
requests_in_second=rps,
approx_time_between_requests=tbr,
accepted_paths=session['count'],
accepted_paths=session["count"],
errors=errors,
hidden_links=hidden_links,
attack_types=attack_types,
attack_count=attack_count,
paths=session['paths'],
cookies=session['cookies'],
referer=session['referer']
paths=session["paths"],
cookies=session["cookies"],
referer=session["referer"],
)

owner = await self.choose_possible_owner(stats)
Expand All @@ -94,54 +115,67 @@ async def analyze_paths(paths, redis_client):
dorks = await redis_client.smembers(DorksManager.dorks_key)

for _, path in enumerate(paths, start=1):
tbr.append(path['timestamp'] - current_path['timestamp'])
tbr.append(path["timestamp"] - current_path["timestamp"])
current_path = path
tbr_average = sum(tbr) / float(len(tbr))

errors = 0
hidden_links = 0
for path in paths:
if path['response_status'] != 200:
if path["response_status"] != 200:
errors += 1
if path['path'] in dorks:
if path["path"] in dorks:
hidden_links += 1
if 'attack_type' in path:
attack_types.append(path['attack_type'])
if "attack_type" in path:
attack_types.append(path["attack_type"])
return tbr_average, errors, hidden_links, attack_types

def set_attack_count(self, attack_types):
attacks = self.attacks.copy()
attacks.append('index')
attacks.append("index")
attack_count = {k: 0 for k in attacks}
for attack in attacks:
attack_count[attack] = attack_types.count(attack)
count = {k: v for k, v in attack_count.items() if v != 0}
return count

async def choose_possible_owner(self, stats):
owner_names = ['user', 'tool', 'crawler', 'attacker', 'admin']
owner_names = ["user", "tool", "crawler", "attacker", "admin"]
possible_owners = {k: 0.0 for k in owner_names}
if stats['peer_ip'] == '127.0.0.1' or stats['peer_ip'] == '::1':
possible_owners['admin'] = 1.0
with open(TannerConfig.get('DATA', 'crawler_stats')) as f:
bots_owner = await self._loop.run_in_executor(None, f.read)
crawler_hosts = ['googlebot.com', 'baiduspider', 'search.msn.com', 'spider.yandex.com', 'crawl.sogou.com']
possible_owners['crawler'], possible_owners['tool'] = await self.detect_crawler(
if stats["peer_ip"] == "127.0.0.1" or stats["peer_ip"] == "::1":
possible_owners["admin"] = 1.0
_loop = asyncio.get_running_loop()
with open(TannerConfig.get("DATA", "crawler_stats")) as f:
bots_owner = await _loop.run_in_executor(None, f.read)
crawler_hosts = [
"googlebot.com",
"baiduspider",
"search.msn.com",
"spider.yandex.com",
"crawl.sogou.com",
]
possible_owners["crawler"], possible_owners["tool"] = await self.detect_crawler(
stats, bots_owner, crawler_hosts
)
possible_owners['attacker'] = await self.detect_attacker(
possible_owners["attacker"] = await self.detect_attacker(
stats, bots_owner, crawler_hosts
)
maxcf = max([possible_owners['crawler'], possible_owners['attacker'], possible_owners['tool']])
maxcf = max(
[
possible_owners["crawler"],
possible_owners["attacker"],
possible_owners["tool"],
]
)

possible_owners['user'] = round(1 - maxcf, 2)
possible_owners["user"] = round(1 - maxcf, 2)

owners = {k: v for k, v in possible_owners.items() if v != 0}
return {'possible_owners': owners}
return {"possible_owners": owners}

@staticmethod
def find_location(ip):
reader = Reader(TannerConfig.get('DATA', 'geo_db'))
reader = Reader(TannerConfig.get("DATA", "geo_db"))
try:
location = reader.city(ip)
if location.postal.code is None:
Expand All @@ -157,27 +191,23 @@ def find_location(ip):
)
except geoip2.errors.AddressNotFoundError:
# When IP doesn't exist in the db, set info as "NA - Not Available"
info = dict(
country=None,
country_code=None,
city=None,
zip_code=0,
)
info = dict(country=None, country_code=None, city=None, zip_code=0,)
return info

async def detect_crawler(self, stats, bots_owner, crawler_hosts):
for path in stats['paths']:
if path['path'] == '/robots.txt':
for path in stats["paths"]:
if path["path"] == "/robots.txt":
return (1.0, 0.0)
if stats['requests_in_second'] > 10:
if stats['referer'] is not None:
if stats["requests_in_second"] > 10:
if stats["referer"] is not None:
return (0.0, 0.5)
if stats['user_agent'] is not None and stats['user_agent'] in bots_owner:
if stats["user_agent"] is not None and stats["user_agent"] in bots_owner:
return (0.85, 0.15)
return (0.5, 0.85)
if stats['user_agent'] is not None and stats['user_agent'] in bots_owner:
hostname, _, _ = await self._loop.run_in_executor(
None, socket.gethostbyaddr, stats['peer_ip']
_loop = asyncio.get_running_loop()
if stats["user_agent"] is not None and stats["user_agent"] in bots_owner:
hostname, _, _ = await _loop.run_in_executor(
None, socket.gethostbyaddr, stats["peer_ip"]
)
if hostname is not None:
for crawler_host in crawler_hosts:
Expand All @@ -187,19 +217,20 @@ async def detect_crawler(self, stats, bots_owner, crawler_hosts):
return (0.0, 0.0)

async def detect_attacker(self, stats, bots_owner, crawler_hosts):
if set(stats['attack_types']).intersection(self.attacks):
_loop = asyncio.get_running_loop()
if set(stats["attack_types"]).intersection(self.attacks):
return 1.0
if stats['requests_in_second'] > 10:
if stats["requests_in_second"] > 10:
return 0.0
if stats['user_agent'] is not None and stats['user_agent'] in bots_owner:
hostname, _, _ = await self._loop.run_in_executor(
None, socket.gethostbyaddr, stats['peer_ip']
if stats["user_agent"] is not None and stats["user_agent"] in bots_owner:
hostname, _, _ = await _loop.run_in_executor(
None, socket.gethostbyaddr, stats["peer_ip"]
)
if hostname is not None:
for crawler_host in crawler_hosts:
if crawler_host in hostname:
return 0.25
return 0.75
if stats['hidden_links'] > 0:
if stats["hidden_links"] > 0:
return 0.5
return 0.0
Loading

0 comments on commit 336a87e

Please sign in to comment.