-
Notifications
You must be signed in to change notification settings - Fork 0
/
asu-notifier.py
297 lines (251 loc) · 11.8 KB
/
asu-notifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
#!/usr/bin/env python3
# Apple Security Updates Notifier v0.4.3b
# File: asu-notifier.py
# Description: Main component of Apple Security Updates Notifier. Once executed it creates config.json file, initializes
# the associated database, and sets a cronjob which will run the secondary component of ASU Notifier hourly.
import argparse
import contextlib
import json
import logging
import os
import re
import sqlite3
import subprocess
import textwrap
import urllib.request
from sqlite3 import Error, Connection
from typing import TypeVar
import pycountry
import pytz
import requests
from crontab import CronTab
# SQL queries
sql_check_empty_database: str = """ SELECT COUNT(name) FROM sqlite_master WHERE type='table' AND name='main' """
sql_create_main_table: str = """CREATE TABLE IF NOT EXISTS main ( main_id integer PRIMARY KEY AUTOINCREMENT, publish_date
text NOT NULL, file_hash text NOT NULL, log_message text NOT NULL );"""
sql_create_updates_table: str = """CREATE TABLE IF NOT EXISTS updates ( update_id integer PRIMARY KEY AUTOINCREMENT,
update_date text NOT NULL, update_product text NOT NULL, update_target text NOT NULL, update_link text,
file_hash text NOT NULL );"""
timezones_list = pytz.all_timezones
def create_connection(file):
if not os.path.isfile(file):
logging.info(f'\'{file}\' - database created.')
conn = TypeVar('conn', Connection, None)
try:
conn: Connection = sqlite3.connect(file)
except Error as error:
logging.error(str(error))
return conn
def create_table(conn, sql_create_table, table_name, file):
with contextlib.suppress(Error):
try:
conn.cursor().execute(sql_create_table)
logging.info(f'\'{file}\' - \'{table_name}\' table created.')
except Error as error:
logging.error(str(error))
def get_config(local_path):
config = open(f'{local_path}/asu-notifier.json', 'r')
data = json.loads(config.read())
prog_name_short = data['prog_name_short']
prog_name_long = data['prog_name_long']
version = data['version']
apple_url = data['apple_url']
return prog_name_short, prog_name_long, version, apple_url
def create_config_json(local_path, apple_url, prog_name, bot_token, chat_ids, tzone):
config_str = f""" "apple_url": "{apple_url}",
"db_file": "{local_path}/{prog_name}.db",
"log_file": "{local_path}/{prog_name}.log",
"timezone": "{tzone}",
"bot_token": "{bot_token}",
"chat_ids": [
"""
for i, value in enumerate(chat_ids):
if i + 1 < len(chat_ids):
config_str += f' "{value}", \n'
else:
config_str += f' "{value}"\n'
config_str += " ]"
config_str = "{\n" + config_str + "\n}"
with open(f"{local_path}/config.json", "w") as file:
file.write(config_str)
return f'{local_path}/{prog_name}.log', f'{local_path}/{prog_name}.db'
def crontab_job(working_dir):
cronjob = CronTab(user=True)
comment = "asu-notifier"
comment_found = any(job.comment == comment for job in cronjob)
if not comment_found:
job = cronjob.new(command=f'python3 {working_dir}/asu-bot.py', comment='asu-notifier')
job.setall('0 */6 * * *')
job.enable()
cronjob.write()
def token_validator(bot_token):
token_json = requests.get(f"https://api.telegram.org/bot{bot_token}/getMe")
token_info = token_json.json()
return token_info["ok"]
def timezone_selection(country_code):
country_name = pycountry.countries.get(alpha_2=country_code).name
country_timezones = pytz.country_timezones[country_code]
country_tz_len = len(country_timezones)
print(f'{country_name} [{country_code}] timezones:')
print('0: Switch back to default timezone (UTC)')
if country_tz_len == 1:
return country_timezones[0]
for i, tz in enumerate(country_timezones):
print(f'{i + 1}: {tz}')
while True:
tz_selection = input("Select a timezone: ")
try:
selection = int(tz_selection)
if selection == 0:
return 'UTC'
elif 0 < selection <= country_tz_len:
return country_timezones[selection - 1]
else:
print(f'Wrong choice, pick a number between 0 and {country_tz_len}')
except ValueError:
print("Please enter a valid number")
def undefined_timezone():
try:
external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8')
location_data = requests.get(f'https://ipapi.co/{external_ip}/json/').json()
country_name = location_data.get("country_name")
country_code = location_data.get("country_code")
if country_code is None or country_name is None:
print(f"I can't identify your country based on your IP [{external_ip}], so you have to set timezone or "
f"country manually.")
return None
else:
print(f"According to your IP address [{external_ip}], it seems that your country is {country_name} "
f"[{country_code}]")
answer = input("Is this correct? (y/n): ")
if answer == 'n':
print("I can't identify your country based on your IP, so you have to set timezone or country "
"manually.")
return None
elif answer == 'y':
return timezone_selection(country_code)
else:
print("Invalid input. Please enter 'y' or 'n'.")
return None
except Exception as e:
print("An error occurred while trying to determine your timezone:", e)
return None
def set_timezone(country_code):
country_names = pytz.country_names
if country_code.upper() == 'X':
return undefined_timezone()
elif country_code.upper() not in country_names:
print("Incorrect country code.\nFor ISO Alpha-2 codes refer to http://iban.com/country-codes")
exit(1)
else:
return timezone_selection(country_code.upper())
def check_timezone(timezone):
if timezone.upper() == "X":
return undefined_timezone()
elif timezone == 'UTC':
print('\nTimezone is set to its default value [UTC].')
while True:
answer = input("Are you OK with it? (y/n): ").strip().lower()
if answer == 'n':
return undefined_timezone()
elif answer == 'y':
return 'UTC'
else:
print("Please enter 'y' or 'n'.")
elif timezone not in timezones_list:
while True:
print("Incorrect timezone.")
timezone = input("Please enter a valid timezone or 'X' to undefined: ").strip()
if timezone.upper() == "X":
return undefined_timezone()
elif timezone in timezones_list:
return timezone
else:
return timezone
def check_chat_ids(chat_ids):
if type(chat_ids) is list:
return chat_ids
print('Wrong format of "chat ids", it must me a list. Defaulting to UTC.')
return 'UTC'
def check_bot_token(bot_token):
regex = "^[0-9]*:[a-zA-Z0-9_-]{35}$"
if re.search(regex, bot_token):
if token_check := token_validator(bot_token):
return token_check
else:
exit(1)
else:
exit(1)
def get_chat_ids():
regex = "(-[0-9]+)"
chat_ids = []
print("\nType in chat ids where the script will notify Apple Updates. To finish input, type \"0\".")
print("Remember to include the minus sign before each chat id like \"-6746386747\".")
counter = 1
while True:
answer = input(f'Type in chat id #{counter}: ')
if answer == '0':
break
elif re.match(regex, answer):
chat_ids.append(answer)
counter += 1
else:
print("Incorrect format, try again.")
return chat_ids
def argument_parser(progname_short, progname_long, ver):
description = f'{progname_long} is python program that will notify you through Telegram, about new Apple updates.'
epilog = """Bot token is made-up of a numerical string of 8-10 digits followed by a ":", and finishes with a 35
alphanumeric string. For ISO Alpha-2 codes refer to http://iban.com/country-codes. Chat ids must be provided one
after another separated by a space, like \"-123456 -4567890\"."""
parser = argparse.ArgumentParser(prog=f"{progname_short}",
description=textwrap.dedent(f"{description}"),
epilog=f"{epilog}",
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-b', '--bot-token', help='set Telegram bot token', required=True)
parser.add_argument('-t', '--timezone', default='UTC', help='[optional] Set bot timezone. Use X or x as argument '
'to allow identification your timezone according to '
'your IP address')
parser.add_argument('-c', '--country', help='[optional] Define a country in order to select an appropriate '
'timezone. Use X or x as argument to allow identification your '
'country according to your IP address')
parser.add_argument('-v', '--version', action='version', version=f'%(prog)s {ver}',
help='Show %(prog)s version information and exit')
parser.add_argument('-i', '--chat-ids', default=None, action="extend", nargs="*", type=str, help='[optional] '
'Define allowed '
'chat ids at '
'startup. If not '
'set at startup, '
'it will be '
'prompted later '
'in the script')
args = parser.parse_args()
config = vars(args)
bot_token = config['bot_token']
country = config['country']
chat_ids = config['chat_ids']
timezone = set_timezone(country) if country is not None else config['timezone']
timezone = check_timezone(timezone)
chat_ids = get_chat_ids() if chat_ids is None else check_chat_ids(chat_ids)
if check_bot_token(bot_token):
return bot_token, timezone, chat_ids
def main():
local_file = __file__
local_path = os.path.dirname(local_file)
prog_name_short, prog_name_long, version, apple_url = get_config(local_path)
bot_token, timezone, chat_ids = argument_parser(prog_name_short, prog_name_long, version)
log_file, db_file = create_config_json(local_path, apple_url, prog_name_short, bot_token, chat_ids, timezone)
# logging
log_format = '%(asctime)s -- %(message)s'
logging.basicConfig(filename=log_file, encoding='utf-8', format=log_format, level=logging.INFO)
# create a database connection
conn: Connection = create_connection(db_file)
cursor = conn.cursor()
empty_database = cursor.execute(sql_check_empty_database).fetchone()[0] == 0
if empty_database:
# create database tables and populate them
create_table(conn, sql_create_main_table, 'main', db_file)
create_table(conn, sql_create_updates_table, 'updates', db_file)
subprocess.run(['python', 'asu-bot.py'], capture_output=True, text=True)
crontab_job(local_path)
if __name__ == '__main__':
main()