Skip to content

Commit

Permalink
+ table_print
Browse files Browse the repository at this point in the history
  • Loading branch information
vikilpet committed Dec 6, 2020
1 parent 0348201 commit ed08ee1
Showing 1 changed file with 162 additions and 64 deletions.
226 changes: 162 additions & 64 deletions web_knocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,41 @@
from http.server import BaseHTTPRequestHandler \
, ThreadingHTTPServer
import os
from operator import itemgetter
from datetime import datetime as dt, timedelta
import msvcrt
from socket import inet_aton
import resources
from rosapi import rosapi_send
from easy_logging import EasyLogging
if os.name == 'nt':
import ctypes
import msvcrt

APP_VERSION = 'v2020-07-23'
SetConsoleTitleW = ctypes.windll.kernel32.SetConsoleTitleW

TITLE = 'Web Knocking'
APP_VERSION = 'v2020-12-06'
INI_FILE = 'web_knocking.ini'
DEF_DEVICE_TYPE = 'mikrotik_routeros'
PASS_SEP = '_'

event_counter = 0


sett = None
{
'1.2.3.4' : {
'counter' : 0
, 'status': 'good'
'ip' : '1.2.3.4'
, 'counter' : 0
, 'status' : 'good'
, 'reason' : 'white'
, 'user' : 'John'
}
}
{
'John' : {
'passcode' : 's3cReT'
'name': 'John'
, 'passcode' : 's3cReT'
, 'last_access' : dt.now()
, 'last_day' : dt(
2020, 4, 20
Expand Down Expand Up @@ -66,6 +79,12 @@
, ('secure', True)
]

def set_title(add_to_title:str=''):
if os.name != 'nt': return
title_str = TITLE
if add_to_title: title_str += ' | ' + str(add_to_title)
SetConsoleTitleW(title_str)

class Settings:
''' 2020.04.06
Load settings from .ini file.
Expand Down Expand Up @@ -179,7 +198,7 @@ def is_ros()->bool:
DEF_DEVICE_TYPE

def process_ip(ip:str, behavior:str
, reason:str='')->tuple:
, reason:str='', user:str=None)->tuple:
''' Add or not IP to black list on router.
Return (True, None) on success or
(False, 'error text') on Exception.
Expand All @@ -189,11 +208,13 @@ def process_ip(ip:str, behavior:str
is exceeded.
'good' - reset counter
'danger' or some string with particular
reason - ban immediately without
reason - if none then ban immediately without
checking 'black_threshold'
'''

if not reason: reason = behavior
if ip in sett.ips and user: sett.ips[ip]['user'] = user
if behavior != 'good' \
and ip in sett.general['safe_hosts']:
log.debug(ip.ljust(15), 'do not ban safe host'
Expand All @@ -213,9 +234,11 @@ def process_ip(ip:str, behavior:str
behavior = 'bad'
else:
sett.ips[ip] = {
'counter' : 0
'ip': ip
, 'counter' : 0
, 'status' : 'grey'
, 'reason' : 'new'
, 'user': user
}
try:
if behavior == 'bad':
Expand Down Expand Up @@ -279,7 +302,7 @@ def decision(path:str, ip:str)->list:
try:
message = ''
if path == '/':
process_ip(ip, 'danger', 'root')
process_ip(ip, behavior='danger', reason='root')
message = lang.ban
elif path.startswith('/access'):
passcode = path.split(PASS_SEP)[1]
Expand All @@ -299,7 +322,9 @@ def decision(path:str, ip:str)->list:
+ timedelta(days=1)
if dt.now() \
< last_day:
process_ip(ip, 'good', 'valid date access')
process_ip(ip, behavior='good'
, reason='valid date access'
, user=user_name)
sett.users[user_name]['ips'].append(ip)
sett.users[user_name]['last_access'] = \
dt.now()
Expand All @@ -323,14 +348,16 @@ def decision(path:str, ip:str)->list:
dt.now()
log.info(ip.ljust(15)
, f'date expired: {user_name}')
process_ip(ip, 'bad', 'date expired')
process_ip(ip, behavior='bad', reason='date expired'
, user=user_name)
message = lang.pass_expired \
.format(user_name
, last_day.strftime('%d.%m.%Y'))
else:
log.info(ip.ljust(15)
, f'permanent access: {user_name}')
process_ip(ip, 'good', 'permanent access')
process_ip(ip, behavior='good', reason='permanent access'
, user=user_name)
sett.users[user_name]['ips'].append(ip)
sett.users[user_name]['last_access'] = \
dt.now()
Expand All @@ -352,7 +379,7 @@ def decision(path:str, ip:str)->list:
.format(user_name)
else:
message = lang.pass_unknown
process_ip(ip, 'bad', 'unknown user')
process_ip(ip, behavior='bad', reason='unknown user')
elif path == '/status':
if ip in sett.general['safe_hosts']:
try:
Expand Down Expand Up @@ -382,57 +409,39 @@ def decision(path:str, ip:str)->list:
, 'status error: ' + repr(e))
message = 'nobody\tnever\nnowhere'
else:
process_ip(ip, 'danger', 'status unsafe')
process_ip(ip, behavior='danger', reason='status unsafe')
message = lang.ban
else:
process_ip(ip, 'danger', 'wrong path')
process_ip(ip, behavior='danger', reason='wrong path')
message = lang.ban
return True, message
except Exception as e:
process_ip(ip, 'bad', 'decision error')
process_ip(ip, behavior='bad', reason='decision error')
return False, repr(e)

def print_users():
'Print the dict of users as a table'
headers = ['USER', 'LAST DAY', 'LAST IP'
, 'LAST ACCESS']
rows = [headers]
for u in sett.users:
if sett.users[u]['ips']:
last_ip = sett.users[u]['ips'][-1]
else:
last_ip = '-'
try:
last_access = sett.users[u] \
['last_access'].strftime('%Y.%m.%d %H:%M:%S')
except:
last_access = '-'
rows.append([
u
, sett.users[u].get('last_day', None)
, last_ip
, last_access
table = [ ['User', 'Last day', 'Last IP', 'Last Access'] ]
for user in sett.users.values():
table.append([
user['name']
, user.get('last_day', None)
, user['ips'][-1] if user['ips'] else None
, user['last_access'].strftime('%Y.%m.%d %H:%M:%S') \
if user['last_access'] else None
])
for row in rows:
row[:] = [i if i else '-' for i in row]
col_sizes = [
max( map(len, col)) for col in zip(*rows)
]
template = ' '.join(
[ '{{:<{}}}'.format(s) for s in col_sizes ]
)
print('')
for row in rows: print(template.format(*row))
print('')
table_print(table, use_headers=True, sorting=[0])

def print_ips():
print('\nIP STATUS REASON')
for ip in sett.ips:
status = sett.ips[ip]['status']
reason = sett.ips[ip]['reason']
print(f'{ip:{16}}', f'{status:{7}}'
, reason)
print('')
table = [ ['User', 'IP', 'Status', 'Reason'] ]
for ip in sett.ips.values():
table.append([
ip['user']
, ip['ip']
, ip['status']
, ip['reason']
])
table_print(table, use_headers=True, sorting=[0, 1])

class KnockHandler(BaseHTTPRequestHandler):
def handle_one_request(s):
Expand All @@ -442,11 +451,12 @@ def handle_one_request(s):
, encoding='iso-8859-1').split()[0].upper()
if req_type != 'GET':
process_ip(s.address_string()
, 'danger', 'wrong request method')
, behavior='danger', reason='wrong request method')
except ConnectionResetError:
log.debug(s.address_string().ljust(15)
, 'connection reset')
process_ip(s.address_string(), 'bad', 'port scan')
process_ip(s.address_string(), behavior='bad'
, reason='port scan')
except IndexError:
try:
rr = str(s.raw_requestline, encoding='iso-8859-1')
Expand All @@ -466,7 +476,7 @@ def handle_one_request(s):
, 'h_o_r exception: ' + (repr(e)[:40])
)
process_ip(s.address_string()
, 'danger', 'h_o_r exception')
, behavior='danger', reason='h_o_r exception')

def send_error(s, code, message=None
, explain=None):
Expand Down Expand Up @@ -512,6 +522,12 @@ def do_GET(s):
s.wfile.write(bytes(page, 'utf-8'))

def log_message(s, msg_format, *args):
global event_counter
if args[0].startswith('GET /status') \
and not sett.general['developer']:
return
event_counter += 1
set_title(event_counter)
log.http(
s.address_string().ljust(15)
, ' '.join(args)
Expand Down Expand Up @@ -544,7 +560,8 @@ def load_settings()->tuple:
passcode = users_di[user]
last_day = None
sett.users[user] = {
'passcode' : passcode
'name' : user
, 'passcode' : passcode
, 'last_access' : None
, 'last_day' : last_day
, 'ips' : []
Expand All @@ -568,24 +585,106 @@ def load_settings()->tuple:
except Exception as e:
return False, repr(e)

def table_print(table, use_headers=False, row_sep:str=None
, headers_sep:str='-', col_pad:str=' ', row_sep_step:int=0
, sorting=None, sorting_func=None, sorting_rev:bool=False
, repeat_headers:int=None
, empty_str:str='-', consider_empty:list=[None, '']):
''' Print list of lists as a table.
row_sep - string to repeat as a row separator.
headers_sep - same for header(s).
'''

DEF_SEP = '-'

def print_sep(sep=row_sep):
nonlocal max_row_len
if not sep: return
print( sep * (max_row_len // len(sep) ) )

def print_headers(both=False):
nonlocal headers, template
if not headers: return
if both: print_sep(sep=headers_sep)
print(template.format(*headers))
print_sep(sep=headers_sep)

headers = []
if not table: return
if use_headers and not headers_sep: headers_sep = DEF_SEP
if row_sep_step and not row_sep: row_sep = DEF_SEP
if row_sep and not headers_sep: headers_sep = row_sep
if isinstance(table[0], list):
rows = [l[:] for l in table]
elif isinstance(table[0], tuple):
rows = [list(t) for t in table]
elif isinstance(table[0], dict):
rows = [list( di.values() ) for di in table]
if use_headers == True: headers = list( table[0].keys() )
if isinstance(use_headers, list):
headers = use_headers
elif use_headers == True:
headers = rows.pop(0)
for row in rows:
row[:] = [empty_str if i in consider_empty else str(i) for i in row]
if sorting: sort_key = itemgetter(*sorting)
if sorting_func:
if isinstance(sorting_func, (tuple, list)):
sfunc, item = sorting_func
else:
sfunc = sorting_func
item = 0
sort_key = lambda l, f=sfunc, i=item: f(l[i])
if sorting or sorting_func:
if use_headers:
rows = [ headers, *sorted(rows, key=sort_key
, reverse=sorting_rev) ]
else:
rows.sort(key=sort_key, reverse=sorting_rev)
else:
if headers: rows.insert(0, headers)
col_sizes = [ max( map(len, col) ) for col in zip(*rows) ]
max_row_len = sum(col_sizes) + len(col_pad) * (len(col_sizes) - 1)
template = col_pad.join(
[ '{{:<{}}}'.format(s) for s in col_sizes ]
)
if headers: rows.pop(0)
print()
if headers: print_headers(False)
for row_num, row in enumerate(rows):
if row_sep_step:
pr = (row_num > 0) and (row_num % row_sep_step == 0)
if pr:
print_sep()
if repeat_headers:
if headers and row_num > 0 \
and (row_num % repeat_headers == 0):
print_headers(not pr)
else:
if repeat_headers:
if row_num > 0 and (row_num % repeat_headers == 0):
print_headers(True)
print(template.format(*row))
print()

def main():
global sett
global log
global lang
try:
os.system('title Web Knocking')
import msvcrt
if os.name == 'nt':
set_title()

def key_wait():
global sett
while True:
if msvcrt.kbhit():
key = msvcrt.getch()
log.debug('key:', key)
if key in [b'u', b'\xa3']:
if key in b'uU\xa3\x83':
print_users()
elif key in [b'i', b'\xe8']:
elif key in b'iI\xe8\x98':
print_ips()
elif key in [b's', b'\xeb']:
elif key in b'sS\xeb\x9b':
log.info('reload settings')
status, data = load_settings()
if status:
Expand All @@ -600,9 +699,8 @@ def key_wait():
else:
log.error('failed to reload settings:', data)
time.sleep(0.001)

threading.Thread(target=key_wait, daemon=True).start()
except:
pass
status, sett = load_settings()
if not status:
print('Error loading settings:', sett)
Expand Down

0 comments on commit ed08ee1

Please sign in to comment.