diff --git a/web_knocking.py b/web_knocking.py index 51b2a7f..fe42c21 100644 --- a/web_knocking.py +++ b/web_knocking.py @@ -5,28 +5,41 @@ from http.server import BaseHTTPRequestHandler \ , ThreadingHTTPServer import os +from operator import itemgetter from datetime import datetime as dt, timedelta -import msvcrt +from socket import inet_aton import resources from rosapi import rosapi_send from easy_logging import EasyLogging +if os.name == 'nt': + import ctypes + import msvcrt -APP_VERSION = 'v2020-07-23' + SetConsoleTitleW = ctypes.windll.kernel32.SetConsoleTitleW + +TITLE = 'Web Knocking' +APP_VERSION = 'v2020-12-06' INI_FILE = 'web_knocking.ini' DEF_DEVICE_TYPE = 'mikrotik_routeros' PASS_SEP = '_' +event_counter = 0 + sett = None { '1.2.3.4' : { - 'counter' : 0 - , 'status': 'good' + 'ip' : '1.2.3.4' + , 'counter' : 0 + , 'status' : 'good' + , 'reason' : 'white' + , 'user' : 'John' } } { 'John' : { - 'passcode' : 's3cReT' + 'name': 'John' + , 'passcode' : 's3cReT' , 'last_access' : dt.now() , 'last_day' : dt( 2020, 4, 20 @@ -66,6 +79,12 @@ , ('secure', True) ] +def set_title(add_to_title:str=''): + if os.name != 'nt': return + title_str = TITLE + if add_to_title: title_str += ' | ' + str(add_to_title) + SetConsoleTitleW(title_str) + class Settings: ''' 2020.04.06 Load settings from .ini file. @@ -179,7 +198,7 @@ def is_ros()->bool: DEF_DEVICE_TYPE def process_ip(ip:str, behavior:str -, reason:str='')->tuple: +, reason:str='', user:str=None)->tuple: ''' Add or not IP to black list on router. Return (True, None) on success or (False, 'error text') on Exception. @@ -189,11 +208,13 @@ def process_ip(ip:str, behavior:str is exceeded. 'good' - reset counter 'danger' or some string with particular - reason - ban immediately without + + reason - if none then ban immediately without checking 'black_threshold' ''' if not reason: reason = behavior + if ip in sett.ips and user: sett.ips[ip]['user'] = user if behavior != 'good' \ and ip in sett.general['safe_hosts']: log.debug(ip.ljust(15), 'do not ban safe host' @@ -213,9 +234,11 @@ def process_ip(ip:str, behavior:str behavior = 'bad' else: sett.ips[ip] = { - 'counter' : 0 + 'ip': ip + , 'counter' : 0 , 'status' : 'grey' , 'reason' : 'new' + , 'user': user } try: if behavior == 'bad': @@ -279,7 +302,7 @@ def decision(path:str, ip:str)->list: try: message = '' if path == '/': - process_ip(ip, 'danger', 'root') + process_ip(ip, behavior='danger', reason='root') message = lang.ban elif path.startswith('/access'): passcode = path.split(PASS_SEP)[1] @@ -299,7 +322,9 @@ def decision(path:str, ip:str)->list: + timedelta(days=1) if dt.now() \ < last_day: - process_ip(ip, 'good', 'valid date access') + process_ip(ip, behavior='good' + , reason='valid date access' + , user=user_name) sett.users[user_name]['ips'].append(ip) sett.users[user_name]['last_access'] = \ dt.now() @@ -323,14 +348,16 @@ def decision(path:str, ip:str)->list: dt.now() log.info(ip.ljust(15) , f'date expired: {user_name}') - process_ip(ip, 'bad', 'date expired') + process_ip(ip, behavior='bad', reason='date expired' + , user=user_name) message = lang.pass_expired \ .format(user_name , last_day.strftime('%d.%m.%Y')) else: log.info(ip.ljust(15) , f'permanent access: {user_name}') - process_ip(ip, 'good', 'permanent access') + process_ip(ip, behavior='good', reason='permanent access' + , user=user_name) sett.users[user_name]['ips'].append(ip) sett.users[user_name]['last_access'] = \ dt.now() @@ -352,7 +379,7 @@ def decision(path:str, ip:str)->list: .format(user_name) else: message = lang.pass_unknown - process_ip(ip, 'bad', 'unknown user') + process_ip(ip, behavior='bad', reason='unknown user') elif path == '/status': if ip in sett.general['safe_hosts']: try: @@ -382,57 +409,39 @@ def decision(path:str, ip:str)->list: , 'status error: ' + repr(e)) message = 'nobody\tnever\nnowhere' else: - process_ip(ip, 'danger', 'status unsafe') + process_ip(ip, behavior='danger', reason='status unsafe') message = lang.ban else: - process_ip(ip, 'danger', 'wrong path') + process_ip(ip, behavior='danger', reason='wrong path') message = lang.ban return True, message except Exception as e: - process_ip(ip, 'bad', 'decision error') + process_ip(ip, behavior='bad', reason='decision error') return False, repr(e) def print_users(): 'Print the dict of users as a table' - headers = ['USER', 'LAST DAY', 'LAST IP' - , 'LAST ACCESS'] - rows = [headers] - for u in sett.users: - if sett.users[u]['ips']: - last_ip = sett.users[u]['ips'][-1] - else: - last_ip = '-' - try: - last_access = sett.users[u] \ - ['last_access'].strftime('%Y.%m.%d %H:%M:%S') - except: - last_access = '-' - rows.append([ - u - , sett.users[u].get('last_day', None) - , last_ip - , last_access + table = [ ['User', 'Last day', 'Last IP', 'Last Access'] ] + for user in sett.users.values(): + table.append([ + user['name'] + , user.get('last_day', None) + , user['ips'][-1] if user['ips'] else None + , user['last_access'].strftime('%Y.%m.%d %H:%M:%S') \ + if user['last_access'] else None ]) - for row in rows: - row[:] = [i if i else '-' for i in row] - col_sizes = [ - max( map(len, col)) for col in zip(*rows) - ] - template = ' '.join( - [ '{{:<{}}}'.format(s) for s in col_sizes ] - ) - print('') - for row in rows: print(template.format(*row)) - print('') + table_print(table, use_headers=True, sorting=[0]) def print_ips(): - print('\nIP STATUS REASON') - for ip in sett.ips: - status = sett.ips[ip]['status'] - reason = sett.ips[ip]['reason'] - print(f'{ip:{16}}', f'{status:{7}}' - , reason) - print('') + table = [ ['User', 'IP', 'Status', 'Reason'] ] + for ip in sett.ips.values(): + table.append([ + ip['user'] + , ip['ip'] + , ip['status'] + , ip['reason'] + ]) + table_print(table, use_headers=True, sorting=[0, 1]) class KnockHandler(BaseHTTPRequestHandler): def handle_one_request(s): @@ -442,11 +451,12 @@ def handle_one_request(s): , encoding='iso-8859-1').split()[0].upper() if req_type != 'GET': process_ip(s.address_string() - , 'danger', 'wrong request method') + , behavior='danger', reason='wrong request method') except ConnectionResetError: log.debug(s.address_string().ljust(15) , 'connection reset') - process_ip(s.address_string(), 'bad', 'port scan') + process_ip(s.address_string(), behavior='bad' + , reason='port scan') except IndexError: try: rr = str(s.raw_requestline, encoding='iso-8859-1') @@ -466,7 +476,7 @@ def handle_one_request(s): , 'h_o_r exception: ' + (repr(e)[:40]) ) process_ip(s.address_string() - , 'danger', 'h_o_r exception') + , behavior='danger', reason='h_o_r exception') def send_error(s, code, message=None , explain=None): @@ -512,6 +522,12 @@ def do_GET(s): s.wfile.write(bytes(page, 'utf-8')) def log_message(s, msg_format, *args): + global event_counter + if args[0].startswith('GET /status') \ + and not sett.general['developer']: + return + event_counter += 1 + set_title(event_counter) log.http( s.address_string().ljust(15) , ' '.join(args) @@ -544,7 +560,8 @@ def load_settings()->tuple: passcode = users_di[user] last_day = None sett.users[user] = { - 'passcode' : passcode + 'name' : user + , 'passcode' : passcode , 'last_access' : None , 'last_day' : last_day , 'ips' : [] @@ -568,24 +585,106 @@ def load_settings()->tuple: except Exception as e: return False, repr(e) +def table_print(table, use_headers=False, row_sep:str=None +, headers_sep:str='-', col_pad:str=' ', row_sep_step:int=0 +, sorting=None, sorting_func=None, sorting_rev:bool=False +, repeat_headers:int=None +, empty_str:str='-', consider_empty:list=[None, '']): + ''' Print list of lists as a table. + row_sep - string to repeat as a row separator. + headers_sep - same for header(s). + ''' + + DEF_SEP = '-' + + def print_sep(sep=row_sep): + nonlocal max_row_len + if not sep: return + print( sep * (max_row_len // len(sep) ) ) + + def print_headers(both=False): + nonlocal headers, template + if not headers: return + if both: print_sep(sep=headers_sep) + print(template.format(*headers)) + print_sep(sep=headers_sep) + + headers = [] + if not table: return + if use_headers and not headers_sep: headers_sep = DEF_SEP + if row_sep_step and not row_sep: row_sep = DEF_SEP + if row_sep and not headers_sep: headers_sep = row_sep + if isinstance(table[0], list): + rows = [l[:] for l in table] + elif isinstance(table[0], tuple): + rows = [list(t) for t in table] + elif isinstance(table[0], dict): + rows = [list( di.values() ) for di in table] + if use_headers == True: headers = list( table[0].keys() ) + if isinstance(use_headers, list): + headers = use_headers + elif use_headers == True: + headers = rows.pop(0) + for row in rows: + row[:] = [empty_str if i in consider_empty else str(i) for i in row] + if sorting: sort_key = itemgetter(*sorting) + if sorting_func: + if isinstance(sorting_func, (tuple, list)): + sfunc, item = sorting_func + else: + sfunc = sorting_func + item = 0 + sort_key = lambda l, f=sfunc, i=item: f(l[i]) + if sorting or sorting_func: + if use_headers: + rows = [ headers, *sorted(rows, key=sort_key + , reverse=sorting_rev) ] + else: + rows.sort(key=sort_key, reverse=sorting_rev) + else: + if headers: rows.insert(0, headers) + col_sizes = [ max( map(len, col) ) for col in zip(*rows) ] + max_row_len = sum(col_sizes) + len(col_pad) * (len(col_sizes) - 1) + template = col_pad.join( + [ '{{:<{}}}'.format(s) for s in col_sizes ] + ) + if headers: rows.pop(0) + print() + if headers: print_headers(False) + for row_num, row in enumerate(rows): + if row_sep_step: + pr = (row_num > 0) and (row_num % row_sep_step == 0) + if pr: + print_sep() + if repeat_headers: + if headers and row_num > 0 \ + and (row_num % repeat_headers == 0): + print_headers(not pr) + else: + if repeat_headers: + if row_num > 0 and (row_num % repeat_headers == 0): + print_headers(True) + print(template.format(*row)) + print() + def main(): global sett global log global lang - try: - os.system('title Web Knocking') - import msvcrt + if os.name == 'nt': + set_title() + def key_wait(): global sett while True: if msvcrt.kbhit(): key = msvcrt.getch() log.debug('key:', key) - if key in [b'u', b'\xa3']: + if key in b'uU\xa3\x83': print_users() - elif key in [b'i', b'\xe8']: + elif key in b'iI\xe8\x98': print_ips() - elif key in [b's', b'\xeb']: + elif key in b'sS\xeb\x9b': log.info('reload settings') status, data = load_settings() if status: @@ -600,9 +699,8 @@ def key_wait(): else: log.error('failed to reload settings:', data) time.sleep(0.001) + threading.Thread(target=key_wait, daemon=True).start() - except: - pass status, sett = load_settings() if not status: print('Error loading settings:', sett)