diff --git a/readme.md b/readme.md
index 01a4cc7..c761265 100644
--- a/readme.md
+++ b/readme.md
@@ -23,7 +23,7 @@ Just download the archive from the release page.
It should also work on Linux, but I haven't checked.
-Just download the project.
+Just download the project. No third-party dependencies.
## Usage
Change settings in *settings.ini* to your own.
diff --git a/readme_ru.md b/readme_ru.md
index 8e15bc3..79dc3b1 100644
--- a/readme_ru.md
+++ b/readme_ru.md
@@ -16,14 +16,14 @@
## Установка
### Вариант 1: EXE
-Просто скачайте архив со страницы релизов.
+Просто скачайте архив со страницы [релизов](https://github.com/vikilpet/Web-Knocking/releases).
### Вариант 2: Python
**Требования:** Python 3.8; Windows 7+
Теоретически должно работать и на Linux, но я не проверял.
-Просто скачайте проект.
+Просто скачайте [проект](https://github.com/vikilpet/Web-Knocking/releases). Без зависимостей от нестандартных модулей.
## Использование
Измените настройки в settings.ini на свои.
diff --git a/resources.py b/resources.py
index 8d15bd7..5d4e19a 100644
--- a/resources.py
+++ b/resources.py
@@ -14,7 +14,7 @@ def __init__(self, language:str='en'):
ip_capt=Your IP
page_title=Knock-knock
access_error={}
there is some error
-pass_expired={}
your passcode is expired: {}
+pass_expired={}
your passcode expired: {}
pass_unknown=Unknown passcode
ban=Ban
perm_timeout_text={}
access granted
diff --git a/web_knocking.py b/web_knocking.py
index fab074e..5439730 100644
--- a/web_knocking.py
+++ b/web_knocking.py
@@ -1,15 +1,14 @@
import configparser
-import logging
-from logging.handlers import TimedRotatingFileHandler
+import atexit
from http.server import BaseHTTPRequestHandler \
, ThreadingHTTPServer
import os
-import datetime
+from datetime import datetime as dt, timedelta
import resources
from rosapi import rosapi_send
-APP_VERSION = 'v2020-04-15'
+APP_VERSION = 'v2020-04-20'
INI_FILE = 'web_knocking.ini'
DEF_DEVICE_TYPE = 'mikrotik_routeros'
PASS_SEP = '_'
@@ -25,8 +24,8 @@
{
'John' : {
'passcode' : 's3cReT'
- , 'last_access' : datetime.datetime.now()
- , 'last_day' : datetime.datetime(
+ , 'last_access' : dt.now()
+ , 'last_day' : dt(
2020, 4, 20
)
, 'ips': [
@@ -35,7 +34,7 @@
]
}
}
-logger = None
+log = None
lang = None
DEF_OPT_GENERAL = [
@@ -63,6 +62,107 @@
, ('password', 'admin')
, ('secure', True)
]
+class EasyLogging:
+ ''' Logging to console and optionally to a disk.
+ Attributes:
+ level - log level. Default is 10 ('INFO')
+ Default levels:
+ 'DEBUG' : 0
+ 'INFO' : 10
+ 'ERROR' : 20
+
+ time_format:str='%Y.%m.%d %H:%M:%S'
+
+ file_name_format:str='%Y-%m-%d.log'
+
+ directory - write log files to this folder.
+ If not specified - no logging to a file.
+
+ levels:dict - provide your own levels.
+ Format: {'level name 1': level_num1
+ , 'level name 2': level_num2}
+
+ add_levels - just add this level(s) to the default
+ levels. Format:
+ (('lvl 1', lvl_num1), ('lvl 2', lvl_num2))
+
+ sep - separator between columns.
+ '''
+ def __init__(s
+ , level:int=20
+ , directory:str=None
+ , file_name_format:str='%Y-%m-%d.log'
+ , time_format:str='%Y.%m.%d %H:%M:%S'
+ , add_levels:list=None
+ , levels:dict={
+ 'DEBUG' : 0
+ , 'INFO' : 10
+ , 'ERROR' : 20
+ }
+ , sep:str = ' : '
+ ):
+ s.levels = levels
+ if add_levels:
+ if type(add_levels[0]) in [list, tuple]:
+ for l, n in add_levels:
+ s.levels[l] = n
+ else:
+ s.levels[add_levels[0]] = add_levels[1]
+ s.level = level
+ s.time_format = time_format
+ for key, value in levels.items():
+ setattr(
+ EasyLogging
+ , key.lower()
+ , lambda s, *strings, l=key: s._log(*strings, level=l)
+ )
+ setattr(EasyLogging, key.upper(), value)
+ s.lvl_pad = max(*map(len, levels))
+ s.sep = sep
+ s.filed = None
+ if directory:
+ if not os.path.exists(directory): os.mkdir(directory)
+ s.file_name_format = file_name_format
+ s.directory = directory
+ s.file_name = dt.now().strftime(
+ file_name_format)
+ s.filed = open(
+ os.path.join(s.directory, s.file_name)
+ , 'ta+'
+ )
+ atexit.register(s._cleanup)
+
+ def _log(s, *strings, level:str='DEBUG'):
+ 'Log to console and optionally to disk'
+ if s.levels.get(level, 0) < s.level: return
+ string = s.sep.join(strings)
+ t = dt.now().strftime(s.time_format)
+ msg = f'{t}{s.sep}{level:{s.lvl_pad}}{s.sep}{string}'
+ print(msg)
+ if not s.filed: return
+ s._write_to_file(msg)
+
+ def _cleanup(s):
+ if not s.filed: return
+ s._log('cleanup', level='DEBUG')
+ s.filed.close()
+
+ def _write_to_file(s, msg):
+ fn = dt.now().strftime(s.file_name_format)
+ if fn != s.file_name:
+ s.file_name = fn
+ s.filed.close()
+ s.filed = open(
+ os.path.join(s.directory, s.file_name)
+ , 'ta+'
+ )
+ s.filed.write(msg + '\n')
+ s.filed.flush()
+
+ def __getattr__(s, name):
+ def method(*args, **kwargs):
+ s._log(level=name.upper(), *args, **kwargs)
+ return method
class Settings:
''' 2020.04.06 20:48:34
@@ -90,7 +190,7 @@ def __init__(s, keep_setting_case:bool=False):
, encoding='utf-8') as fd:
config.read_file(fd)
except FileNotFoundError:
- log_error(f'{INI_FILE} file not found'
+ print(f'{INI_FILE} file not found'
+ '\nPress any key to exit')
sections = {
@@ -117,7 +217,7 @@ def __init__(s, keep_setting_case:bool=False):
def __getattr__(s, name):
try:
- log_debug(f'Settings: unknown key: {name}')
+ log.debug(f'Settings: unknown key: {name}')
except:
print(f'Settings: unknown key: {name}')
return None
@@ -132,8 +232,7 @@ def netmiko_send(cmd, debug:bool=False):
, 'password': sett.device['password']
}
if debug:
- logging.basicConfig(level=logging.WARNING)
- logger2 = logging.getLogger('netmiko')
+ pass
try:
with ConnectHandler(**device_params) as ssh:
result = ssh.send_command(cmd)
@@ -162,7 +261,8 @@ def send_ip(ip:str, list_name:str=''
)
if result[0]:
status, data = ros_answer(result[1])
- log_debug(f'rosapi: {status}, {data}')
+ log.debug(ip.ljust(15)
+ , f'rosapi: {status}, {data}')
else:
cmd = sett.device['cmd'].format(
ip=ip, list_name=list_name
@@ -171,10 +271,6 @@ def send_ip(ip:str, list_name:str=''
result = netmiko_send(cmd)
return result
-def log_debug(msg): logger.debug(msg)
-def log_info(msg): logger.info(msg)
-def log_error(msg): logger.error(msg)
-
def is_ros()->bool:
'Is it a MikroTik device?'
return sett.device['device_type'] == \
@@ -198,23 +294,20 @@ def process_ip(ip:str, behavior:str
if not reason: reason = behavior
if behavior != 'good' \
and ip in sett.general['safe_hosts']:
- log_debug(
- f'IP {ip} - do not ban safe host'
- + f' ({behavior}: {reason})'
- )
+ log.debug(ip.ljust(15), 'do not ban safe host'
+ + f' ({behavior}: {reason})')
return True, None
if behavior != 'good' \
and ip in sett.ips:
if sett.ips[ip]['status'] == 'good':
- log_debug(
- f'IP {ip} - do not ban white ip'
- + f' ({behavior}: {reason})'
- )
+ log.debug(ip.ljust(15), 'do not ban white ip'
+ + f' ({behavior}: {reason})')
return True, None
if ip in sett.ips:
if sett.ips[ip]['status'] == 'white' \
and behavior == 'danger':
- log_debug(f'IP {ip} - white ip {behavior}: {reason}')
+ log.debug(ip.ljust(15)
+ , f'white ip: {behavior}: {reason}')
behavior = 'bad'
else:
sett.ips[ip] = {
@@ -242,11 +335,10 @@ def process_ip(ip:str, behavior:str
sett.ips[ip]['counter'] = 0
sett.ips[ip]['status'] = 'white'
elif behavior == 'bad':
- log_debug(f'IP {ip} - bad behavior: {reason}')
+ log.debug(ip.ljust(15), f'bad behavior: {reason}')
elif behavior == 'danger':
sett.ips[ip]['status'] = 'black'
- log_info(
- f'add {ip} to black list: {reason}')
+ log.info(ip.ljust(15), f'add to black list: {reason}')
status, data = send_ip(
ip
, list_name = \
@@ -256,10 +348,8 @@ def process_ip(ip:str, behavior:str
)
)
if not status:
- log_error(
- 'Error on adding to'
- + f' blacklist IP {ip}: {data}'
- )
+ log.error(ip.ljust(15), 'Error on adding to'
+ + f' blacklist: {data}')
return True, reason
except Exception as e:
return False, repr(e)
@@ -301,18 +391,19 @@ def decision(path:str, ip:str)->list:
break
if user_name:
if last_day:
- last_day = datetime.datetime.strptime(
+ last_day = dt.strptime(
last_day, '%Y-%m-%d'
)
last_day = last_day \
- + datetime.timedelta(days=1)
- if datetime.datetime.now() \
+ + timedelta(days=1)
+ if dt.now() \
< last_day:
process_ip(ip, 'good', 'valid date access')
sett.users[user_name]['ips'].append(ip)
sett.users[user_name]['last_access'] = \
- datetime.datetime.now()
- log_info(f'valid date access: {user_name} (IP {ip})')
+ dt.now()
+ log.info(ip.ljust(15)
+ , f'valid date access: {user_name}')
status, data = send_ip(
ip
, sett.general['white_list']
@@ -328,18 +419,20 @@ def decision(path:str, ip:str)->list:
.format(user_name)
else:
sett.users[user_name]['last_access'] = \
- datetime.datetime.now()
- log_info(f'date expired: {user_name} (IP {ip})')
+ dt.now()
+ log.info(ip.ljust(15)
+ , f'date expired: {user_name}')
process_ip(ip, 'bad', 'date expired')
message = lang.pass_expired \
.format(user_name
, last_day.strftime('%d.%m.%Y'))
else:
- log_info(f'permanent access: {user_name} (IP {ip})')
+ log.info(ip.ljust(15)
+ , f'permanent access: {user_name}')
process_ip(ip, 'good', 'permanent access')
sett.users[user_name]['ips'].append(ip)
sett.users[user_name]['last_access'] = \
- datetime.datetime.now()
+ dt.now()
status, data = send_ip(
ip
, sett.general['white_list']
@@ -352,7 +445,8 @@ def decision(path:str, ip:str)->list:
.format(user_name)
else:
- log_debug(f'send_ip error: {data}')
+ log.debug(ip.ljust(15)
+ , f'send_ip error: {data}')
message = lang.access_error \
.format(user_name)
print_users()
@@ -384,7 +478,8 @@ def decision(path:str, ip:str)->list:
else:
message = 'nobody\tnever\nnowhere'
except Exception as e:
- log_debug('status error: ' + repr(e))
+ log.debug(ip.ljust(15)
+ , 'status error: ' + repr(e))
message = 'nobody\tnever\nnowhere'
else:
process_ip(ip, 'danger', 'status unsafe')
@@ -426,8 +521,9 @@ def print_users():
template = ' '.join(
[ '{{:<{}}}'.format(s) for s in col_sizes ]
)
+ print('')
for row in rows: print(template.format(*row))
- print('\n')
+ print('')
def print_ips():
print('\nIP STATUS REASON')
@@ -448,26 +544,27 @@ def handle_one_request(s):
process_ip(s.address_string()
, 'danger', 'wrong request method')
except ConnectionResetError:
- log_debug(f'IP {s.address_string()} - connection reset')
+ log.debug(s.address_string().ljust(15)
+ , 'connection reset')
process_ip(s.address_string(), 'bad', 'port scan')
except IndexError:
try:
rr = str(s.raw_requestline, encoding='iso-8859-1')
except Exception as e:
- log_debug(
- 'IP {} - raw_requestline error: {}'.format(
- s.address_string(), repr(e)
- )
- )
+ log.debug(
+ s.address_string().ljust(15)
+ , 'raw_requestline error: ' + repr(e))
else:
- log_debug(
- 'IP {} - raw_requestline: {} (len={})'.format(
- s.address_string(), rr, len(rr)
- )
+ log.debug(
+ s.address_string().ljust(15)
+ , 'raw_requestline: {} (len={})'.format(
+ rr, len(rr) )
)
except Exception as e:
- log_debug('IP {} - h_o_r exception: {}'.format(
- s.address_string(), repr(e)[:40]) )
+ log.debug(
+ s.address_string().ljust(15)
+ , 'h_o_r exception: ' + (repr(e)[:40])
+ )
process_ip(s.address_string()
, 'danger', 'h_o_r exception')
@@ -484,8 +581,8 @@ def send_error(s, code, message=None
def do_GET(s):
if 'favicon.' in s.path:
- log_debug(f'IP {s.address_string()} -'
- + f' favicon request: {s.path}')
+ log.debug(s.address_string().ljust(15)
+ , f'favicon request: {s.path}')
s.wfile.write(b'')
return
s.send_response(200)
@@ -495,15 +592,17 @@ def do_GET(s):
if status:
message = data
else:
- log_debug('IP {} - decision error: {}'.format(
- s.address_string(), data) )
+ log.debug(
+ s.address_string().ljust(15)
+ , 'decision error: ' + data
+ )
message = lang.ban
if s.path == '/status':
page = message
else:
page = sett.html.format(
message=message
- , timestamp = datetime.datetime.now() \
+ , timestamp = dt.now() \
.strftime('%Y.%m.%d %H:%M:%S')
, ip_address = s.address_string()
, ip_capt = lang.ip_capt
@@ -514,15 +613,14 @@ def do_GET(s):
def log_message(s, msg_format, *args):
if sett.general['developer']:
- log_info(
- 'HTTP : '
- + s.address_string() + ' - '
- + ' '.join(args)
+ log.http(
+ s.address_string().ljust(15)
+ , ' '.join(args)
)
def main():
global sett
- global logger
+ global log
global lang
try:
os.system('title Web Knocking')
@@ -534,28 +632,14 @@ def main():
for opt in DEF_OPT_DEVICE:
sett.device.setdefault(*opt)
if sett.general['developer']:
- log_level = logging.DEBUG
- log_format = '%(asctime)s : %(levelname)s\t: %(message)s'
- else:
- log_level = logging.INFO
- log_format = '%(asctime)s %(message)s'
- logger = logging.getLogger(__name__)
- logger.setLevel(log_level)
- ch = logging.StreamHandler()
- ch.setLevel(log_level)
- formatter = logging.Formatter(log_format, "%Y.%m.%d %H:%M:%S")
- ch.setFormatter(formatter)
- logger.addHandler(ch)
- if sett.general['log_file']:
- if not os.path.exists('logs'): os.mkdir('logs')
- fh = TimedRotatingFileHandler(
- 'logs/web_knocking.log'
- , when='midnight'
+ log = EasyLogging(
+ level=10, directory='logs'
+ , add_levels=('HTTP', 10)
+ , sep=' : '
)
- fh.setFormatter(formatter)
- fh.suffix = '%Y.%m.%d.log'
- fh.setLevel(logging.DEBUG)
- logger.addHandler(fh)
+ else:
+ d = 'logs' if sett.general['log_file'] else None
+ log = EasyLogging(directory=d)
if os.path.exists('files/index.html'):
with open('files/index.html'
, encoding='utf-8') as fd:
@@ -605,15 +689,17 @@ def main():
, print_debug=sett.general['developer']
)
if status:
- log_info(f'Access to device: OK')
+ log.info(sett.device['host'].ljust(15)
+ , f'Access to device: OK')
else:
- log_error(
- f'Access to device error: {data}\n'
- + 'Check "/ip services"\n'
- + 'Check firewalls\n'
+ log.error(
+ sett.device['host'].ljust(15)
+ , f'Access to device error: {data}\n'
+ + 'Check "/ip services"\n'
+ + 'Check firewalls\n'
)
if sett.general['developer']:
- print('\nDeveloper mode\n')
+ print('\nDEVELOPER MODE')
print_users()
try:
port = sett.general['port']
@@ -621,11 +707,12 @@ def main():
('0.0.0.0', port)
, KnockHandler
)
- log_info(f'Start listening on {port} port')
+ log.info(sett.device['host'].ljust(15)
+ , f'Start listening on {port} port')
httpd.serve_forever()
except KeyboardInterrupt:
- log_info('Terminated by keyboard')
+ log.info('Terminated by keyboard')
except Exception as e:
- log_info(f'General error: {e}')
+ log.info(f'General error: {e}')
if __name__ == '__main__': main()