From 7c103605bd9b3922fe5ddd05c8a6322b825add52 Mon Sep 17 00:00:00 2001 From: Edison Montes M Date: Sun, 23 Jul 2023 19:04:18 -0400 Subject: [PATCH 1/5] Update README.md --- README.md | 76 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index c76e9a3..0b99fbf 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,11 @@ Python script that checks for Apple software updates, and notifies via Telegram Bot. -## Basic installation and configuration +## Previous steps + +This script relies on a previously created Telegram bot. If you don't have one follow [this steps](https://www.alphr.com/telegram-create-bot/) to create one. You need your bot token at hand in order to run this script. -This script relies on a previously created Telegram bot. If you don't have one follow [this steps](https://www.alphr.com/telegram-create-bot/) to create one. +## Basic installation and configuration Clone this repo with... @@ -12,48 +14,80 @@ Clone this repo with... git clone https://github.com/Geek-MD/apple-security-updates-notifier.git ``` -Now open ***config.json*** with the editor of your preference, and change *"timezone"*, *"bot_token"* and *"chat_id_n"* with their corresponding values. +To run the script you must add *-b* or *--bot-token* option followed by the bot token itself. Bot token is mandatory, if you don't provide it, the script will exit with an error. Timezone or country arguments are optional. The default timezone is UTC. +The basic syntax to run the script is as the following examples. ``` -nano config.json +python3 config.py -b +python3 config.py --bot-token ``` -Now, you have to create a service at ***systemd*** which will execute the script and restart it on failure. +You can check the script help by using *-h* or *--help* option. ``` -sudo nano /etc/systemd/system/apple-security-updates.service +python3 config.py -h +python3 config.py --help ``` -Paste the content of *apple-security-updates.service* file. Be sure to modify *ExecStart* with the path to the python script, and *Environment* with the route for python libraries. +You can check the script version by using *-v* or *--version* option. + +``` +python3 config.py -v +python3 config.py --version +``` + +## Advanced configuration + +There are two options, independent one from the other, to define the time zone of the bot. You can use timezone or country options. Remember that this is optional. +Additionally, you can define the chat ids where Telegram notifications will be sent. + +### Timezone or country configuration -To get PYTHONPATH, first you need to know which version of python you have installed. +If you don't provide timezone or country at startup, the script will display a dialog indicating that the default timezone is *"UTC"* and asking if you're OK with that. +If you answer *"no"*, the script will try to identify your country based on the IP address and will ask if that guess is OK or not. +If you answer *"yes"*, the script will display a list with all the timezones associated to that country, and will ask for a selection. Type in the option of your choice. If you want to switchback to the default timezone *(UTC)*, type *"0"* in answer to that dialog. + +For timezone configuration you can use *-t* or *--timezone* followed by the timezone according to IANA Zone ID list (http://nodatime.org/TimeZones). Alternatively you may use *'x'* or *'X'* as argument, so the script will try to identify your country based on the IP address. + +``` +python3 config.py -b -t +python3 config.py --bot-token --timezone x +``` + +For timezone configuration through country code, you can use *-c* or *--country* followed by the country code according to ISO Alpha-2 codes (http://iban.com/country-codes). Country code is a 2-letter code in upper case, but this script accepts it in lower case. Alternatively you may use *'x'* or *'X'* as argument, so the script will try to identify your country based on the IP address. ``` -python3 --version +python3 config.py -b -c +python3 config.py --bot-token --country x ``` -In my case is *python 3.9.2*. With that info you can search for the path of your python libraries. Get into python interpreter and run the following commands. +### Chat ids configuration + +For chat ids configuration you can use *-i* or *--chat-ids* followed by chat ids separated by a space between them. Don't forget to add the minus sign before the numeric code of the chat id. +If you don't provide chat ids at startup, the script will ask for them one by one, and you will have to type "0" to finish the input. ``` -python3 -import sys -sys.path +python3 config.py -b -i +python3 config.py --bot-token --chat-ids ``` -You will obtain a series of paths. Focus on the one that has a structure similar to *'/home//.local/lib/python3.9/site-packages'*. Note that it makes mention to *python3.9* which is the same version that is running in your system. Paste the path replacing . Do not include quotation marks. +## Functionality -Save with Ctrl-O and exit with Ctrl X. +This piece of software comprehends 2 *.py* files, 1 *.json* file and 1 *.sh* file. Python files are *config.py* which runs just once at start, and *apple_security_updates.py* which is the persistent script. The JSON file is *asu-notifier.json* which contains some basic information for *config.py*. Finally, the bash file is *asu-notifier.sh* which contains shell commands in order to run the persistent Python script as a systemd service. +Once the script is run, it will automatically recreate 2 files using the information you gave it, a *asu-notifier.service* file used to start a *systemd* service, and a *config.json* file with the configuration needed by the persistent script. +Finally, the script will execute the bash file in order to enable and start the systemd service. This will take some time to start because of *ExecStartPre* section which will wait for 60 seconds. This is due to a recommendation found on several forums which intention is to ensure that the script executes only when the system is fully loaded. +When you see the system prompt, if you haven't received an error message, the script will be running in background, and you'll receive a Telegram Notification with the latest Apple Security Updates. -Now you have to reload *systemd* daemons, enable and start the service +## Troubleshooting + +Eventually, the systemd service may need to be restarted due to a system restart or by any other reason. You can do it manually using the following shell commands. ``` sudo systemctl daemon-reload -sudo systemctl enable apple-security-updates.service -sudo systemctl start apple-security-updates.service +sudo systemctl enable asu-notifier.service +sudo systemctl start asu-notifier.service ``` -The service will take some time to start because of *ExecStartPre* section which will wait for 60 seconds. This is due to a recomendation found on several forums which intention is to ensure that the script executes only when the system is fully loaded. - ## Note -This is a first workaround attempt for a persistent bot in the near future. +This is a first workaround attempt for a permanent bot in the near future, instead of a self-hosted bot. From 592e409078338b587ec02a3c46d40962dfb6aa71 Mon Sep 17 00:00:00 2001 From: Edison Montes M Date: Sun, 23 Jul 2023 19:04:52 -0400 Subject: [PATCH 2/5] Delete apple-security-updates.service --- apple-security-updates.service | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 apple-security-updates.service diff --git a/apple-security-updates.service b/apple-security-updates.service deleted file mode 100644 index faf863e..0000000 --- a/apple-security-updates.service +++ /dev/null @@ -1,17 +0,0 @@ -[Unit] -Description=Apple Security Updates Notifier -After=multi-user.target - -[Service] -Type=simple -Environment=DISPLAY=:0 -ExecStartPre=/bin/sleep 60 -ExecStart=/usr/bin/python3 /asu_notifier.py -Restart=on-failure -RestartSec=30s -KillMode=process -TimeoutSec=infinity -Environment="PYTHONPATH=$PYTHONPATH:" - -[Install] -WantedBy=multi-user.target From 60453aa82f1e834fb682fb94ca9bf8c89f4f5cbb Mon Sep 17 00:00:00 2001 From: Edison Montes M Date: Sun, 23 Jul 2023 19:07:21 -0400 Subject: [PATCH 3/5] Create asu-notifier.sh --- asu-notifier.sh | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 asu-notifier.sh diff --git a/asu-notifier.sh b/asu-notifier.sh new file mode 100644 index 0000000..61dec3c --- /dev/null +++ b/asu-notifier.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +# filename: asu-notifier.sh + +LOCAL_DIR=pwd + +sudo cp $LOCAL_DIR/asu-notifier.service /etc/systemd/system/asu-notifier.service + +sudo systemctl daemon-reload +sudo systemctl enable asu-notifier.service +sudo systemctl start asu-notifier.service From f2d74312703eaf9c9c4d957008aa4650f92d1257 Mon Sep 17 00:00:00 2001 From: Edison Montes M Date: Sun, 23 Jul 2023 19:08:20 -0400 Subject: [PATCH 4/5] Add files via upload --- LICENSE | 2 +- README.md | 2 +- asu-bot.py | 225 ++++++++++++++++++++++++++++++++++++++++++++++ asu-notifier.json | 6 ++ asu-notifier.py | 217 ++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 7 +- setup.py | 12 +++ 7 files changed, 467 insertions(+), 4 deletions(-) create mode 100644 asu-bot.py create mode 100644 asu-notifier.json create mode 100644 asu-notifier.py create mode 100644 setup.py diff --git a/LICENSE b/LICENSE index ff5408c..ad04c58 100644 --- a/LICENSE +++ b/LICENSE @@ -18,4 +18,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md index 0b99fbf..901c7b3 100644 --- a/README.md +++ b/README.md @@ -90,4 +90,4 @@ sudo systemctl start asu-notifier.service ## Note -This is a first workaround attempt for a permanent bot in the near future, instead of a self-hosted bot. +This is a first workaround attempt for a permanent bot in the near future, instead of a self-hosted bot. \ No newline at end of file diff --git a/asu-bot.py b/asu-bot.py new file mode 100644 index 0000000..ddac79b --- /dev/null +++ b/asu-bot.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 + +# Apple Security Updates Notifier v0.4.0 +# Python script that checks for Apple software updates, and notifies via Telegram Bot. +# This is a first workaround attempt for a permanent bot in the near future. + +import contextlib +import datetime +import hashlib +import json +import logging +import os +import sqlite3 +import time +import re +from datetime import datetime +from sqlite3 import Error, Connection +from typing import TypeVar + +import pytz +import requests +import schedule +from apprise import Apprise +from bs4 import BeautifulSoup + +os.chdir('/home/emontes/python/apple-security-updates-notifier') + +# set global variables +global apple_file, db_file, log_file, localtime, bot_token, chat_ids + +# SQL queries +sql_check_empty_database: str = """ SELECT COUNT(name) FROM sqlite_master WHERE type='table' AND name='main' """ +sql_create_main_table: str = """ CREATE TABLE IF NOT EXISTS main ( main_id integer PRIMARY KEY AUTOINCREMENT, log_date text NOT +NULL, file_hash text NOT NULL, log_message text NOT NULL ); """ +sql_create_updates_table: str = """ CREATE TABLE IF NOT EXISTS updates ( update_id integer PRIMARY KEY AUTOINCREMENT, update_date +text NOT NULL, update_product text NOT NULL, update_target text NOT NULL, update_link text, file_hash text NOT NULL ); """ +sql_main_table_hash_check: str = """ SELECT COUNT(*) FROM main WHERE file_hash = ? """ +sql_main_table: str = """ INSERT INTO main (log_date, file_hash, log_message) VALUES (?, ?, ?); """ +sql_updates_table: str = """ INSERT INTO updates (update_date, update_product, update_target, update_link, file_hash) +VALUES (?, ?, ?, ?, ?); """ +sql_get_updates: str = """ SELECT update_date, update_product, update_target, update_link FROM updates ORDER BY update_id DESC; """ +sql_get_updates_count: str = """ SELECT count(update_date) FROM updates WHERE update_date = ?; """ +sql_get_last_updates: str = """ SELECT update_date, update_product, update_target, update_link FROM updates WHERE update_date = ?; """ +sql_get_last_update_date: str = """ SELECT update_date FROM updates ORDER BY update_id DESC LIMIT 1; """ +sql_get_update_dates: str = """ SELECT DISTINCT update_date FROM updates; """ +sql_get_date_update: str = """ SELECT update_date, update_product, update_target, update_link FROM updates WHERE update_date = ?; """ + +def get_config(): + global apple_file, db_file, log_file, localtime, bot_token, chat_ids + config = open('config.json', 'r') + data = json.loads(config.read()) + apple_file = data['apple_file'] + db_file = data['db_file'] + log_file = data['log_file'] + timezone = data['timezone'] + localtime = pytz.timezone(timezone) + bot_token = data['bot_token'] + chat_ids = data['chat_ids'] + +def create_connection(file): + if not os.path.isfile(file): + logging.info(f'\'{file}\' database created.') + conn = TypeVar('conn', Connection, None) + try: + conn: Connection = sqlite3.connect(file) + except Error as error: + logging.error(str(error)) + return conn + +def create_table(conn, sql_create_table, table_name, file): + with contextlib.suppress(Error): + try: + conn.cursor().execute(sql_create_table) + logging.info(f'\'{file}\' - \'{table_name}\' table created.') + except Error as error: + logging.error(str(error)) + +def get_updates(conn, full_update): + cursor = conn.cursor() + response = requests.get(apple_file) + content = response.content + file_hash = hashlib.sha256(content).hexdigest() + available_updates = cursor.execute(sql_main_table_hash_check, [file_hash]).fetchone()[0] == 0 + if not available_updates: + logging.info('No updates available.') + else: + update_databases(conn, content, file_hash, full_update) + conn.commit() + conn.close() + +def update_databases(conn, content, file_hash, full_update): + log_date = datetime.now(tz=localtime) + update_main_database(conn, log_date, file_hash, full_update) + update_updates_database(conn, file_hash, content, full_update) + +def update_main_database(conn, log_date, file_hash, full_update): + cursor = conn.cursor() + if full_update: + log_message = f'First \'main\' table population - SHA256: {file_hash}.' + else: + log_message = f'\'main\' table updated - SHA256: {file_hash}.' + cursor.execute(sql_main_table, (log_date, file_hash, log_message)) + logging.info(log_message) + conn.commit() + +def update_updates_database(conn, file_hash, content, full_update): + cursor = conn.cursor() + soup = BeautifulSoup(content, 'html.parser') + updates_table = soup.find('div', id="tableWraper").find_all('tr') + recent_updates = formatted_content(updates_table) + if full_update: + log_message = f'First \'updates\' table population - SHA256: {file_hash}.' + else: + log_message = f'\'updates\' table updated - SHA256: {file_hash}.' + old_updates = cursor.execute(sql_get_updates).fetchall() + for element in old_updates: + recent_updates.remove(element) + for element in recent_updates: + cursor.execute(sql_updates_table, (element[0], element[1], element[2], element[3], file_hash)) + logging.info(log_message) + conn.commit() + apprise_notification(conn, recent_updates, full_update) + +def formatted_content(content): + content_list = [] + for i, row in enumerate(content): + if i == 0: + continue + columns = row.find_all('td') + date_str = columns[2].get_text().strip().replace('\xa0', ' ') + update_date = check_date(date_str) + update_product = columns[0].get_text().strip().replace( + 'Esta actualización no tiene ninguna entrada de CVE publicada.', '').replace('\xa0', ' ').replace('\n', '') + update_target = columns[1].get_text().replace('\xa0', ' ').replace('\n', '') + try: + update_link = columns[0].find('a')['href'] + except Exception: + update_link = None + list_element = [update_date, update_product, update_target, update_link] + content_list.append(list_element) + content_list.reverse() + return content_list + +def check_date(date_str): + pattern = r'(\d{1,2}) de (\w+) de (\d{4})' + try: + match = re.match(pattern, date_str) + day = int(match[1]) + month = match[2] + year = int(match[3]) + month_list = ['enero', 'febrero', 'marzo', 'abril', 'mayo', 'junio', 'julio', 'agosto', 'septiembre', 'octubre', + 'noviembre', 'diciembre'] + month_num = month_list.index(month) + 1 + return datetime(year, month_num, day) + except Exception: + return date_str + +def apprise_notification(conn, updates, full_update): + apprise_object = Apprise() + apprise_message = build_message(conn, updates, full_update) + apprise_syntax = f'tgram://{bot_token}/' + for chat_id in chat_ids: + apprise_syntax += f'{chat_id}/' + apprise_syntax *= '?format=markdown' + apprise_object.add(apprise_syntax, tag='telegram') + apprise_object.notify(apprise_message, tag="telegram") + +def build_message(conn, last_updates, full_update): + max_updates = 5 + cursor = conn.cursor() + if full_update: + last_updates = [] + update_dates = cursor.execute(sql_get_update_dates).fetchall() + for element in reversed(update_dates): + if max_updates >= 0: + query = cursor.execute(sql_get_date_update, element).fetchall() + query_count = cursor.execute(sql_get_updates_count, element).fetchone()[0] + last_updates += query + max_updates -= query_count + apprise_message = '*Últimas actualizaciones de Apple.*\n\n' + else: + apprise_message = '*Nuevas actualizaciones de Apple.*\n\n' + for element in reversed(last_updates): + if element[0] == 'Preinstalado': + date_time = element[0] + else: + date = datetime.strptime(str(element[0]), "%Y-%m-%d %H:%M:%S") + date_time = date.strftime("%d/%m/%Y") + apprise_message += f'_{date_time}_' + if element[3] is not None: + apprise_message += f' - [{element[1]}]({element[3]})' + else: + apprise_message += f' - _{element[1]}_' + apprise_message += f' - {element[2]}\n' + return apprise_message + +def main(): + get_config() + + # logging + log_format = '%(asctime)s -- %(message)s' + logging.basicConfig(filename=log_file, encoding='utf-8', format=log_format, level=logging.INFO) + + # create a database connection + conn: Connection = create_connection(db_file) + cursor = conn.cursor() + empty_database = cursor.execute(sql_check_empty_database).fetchone()[0] == 0 + if empty_database: + # create database tables and populate them + create_table(conn, sql_create_main_table, 'main', db_file) + create_table(conn, sql_create_updates_table, 'updates', db_file) + + # run first database update + get_updates(conn, full_update=True) + + # Schedule the function to run every hour + schedule.every().hour.do(get_updates, conn=conn, full_update=False) + + # Run the scheduler continuously + while True: + schedule.run_pending() + time.sleep(1) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/asu-notifier.json b/asu-notifier.json new file mode 100644 index 0000000..7bc3ffd --- /dev/null +++ b/asu-notifier.json @@ -0,0 +1,6 @@ +{ + "prog_name_short": "asu-notifier", + "prog_name_long": "Apple Security Updates Notifier", + "version": "v0.4.0", + "apple_url": "https://support.apple.com/es-es/HT201222" +} \ No newline at end of file diff --git a/asu-notifier.py b/asu-notifier.py new file mode 100644 index 0000000..64ce19b --- /dev/null +++ b/asu-notifier.py @@ -0,0 +1,217 @@ +#!/usr/bin/env python3 + +# This script recreates config.json and asu-notifier.service files. + +import site +import os +import argparse +import json +import textwrap + +import pytz +import requests +import re +import urllib.request +import subprocess +import pycountry + +def get_config(): + config = open('asu-notifier.json', 'r') + data = json.loads(config.read()) + prog_name_short = data['prog_name_short'] + prog_name_long = data['prog_name_long'] + version = data['version'] + apple_url = data['apple_url'] + return prog_name_short, prog_name_long, version, apple_url + +def create_service_file(localpath, pythonpath, progname): + service_str = f"""[Unit] +Description={progname} +After=multi-user.target + +[Service] +Type=simple +Environment=DISPLAY=:0 +ExecStartPre=/bin/sleep 60 +ExecStart=/usr/bin/python3 {localpath}/asu-bot.py +Restart=on-failure +RestartSec=30s +KillMode=process +TimeoutSec=infinity +Environment="PYTHONPATH=$PYTHONPATH:{pythonpath}/" + +[Install] +WantedBy=multi-user.target""" + with open("asu-notifier.service", "w") as file: + file.write(service_str) + +def create_config_json(appleurl, progname, bottoken, chatids, tzone): + config_str = f""" "apple_url": "{appleurl}", + "db_file": "{progname}.db", + "log_file": "{progname}.log", + "timezone": "{tzone}", + "bot_token": "{bottoken}", + "chat_ids": [ +""" + for i, value in enumerate(chatids): + if i+1 < len(chatids): + config_str += f' "{value}", \n' + else: + config_str += f' "{value}"\n' + config_str += " ]" + config_str = "{\n" + config_str + "\n}" + with open("config.json", "w") as file: + file.write(config_str) + +def token_validator(bot_token): + token_json = requests.get(f"https://api.telegram.org/bot{bot_token}/getMe") + token_info = token_json.json() + return token_info["ok"] + +def timezone_selection(country_code): + country_name = pycountry.countries.get(alpha_2=country_code).name + country_timezones = pytz.country_timezones[country_code] + country_tz_len = len(country_timezones) + selection_bool = False + print(f'{country_name} [{country_code}] timezones:') + print('0: Switch back to default timezone (UTC)') + if country_tz_len == 1: + return country_timezones[0] + for i, tz in enumerate(country_timezones): + print(f'{i + 1}: {tz}') + while not selection_bool: + tz_selection = input("Select a timezone: ") + index = int(tz_selection) - 1 + if int(tz_selection) == 0: + selection_bool = True + return 'UTC' + elif country_tz_len >= int(tz_selection) > 0: + selection_bool = True + return country_timezones[index] + else: + print(f'Wrong choice, pick a number between 0 and {country_tz_len}') + +def undefined_timezone(): + external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8') + location_data = requests.get(f'https://ipapi.co/{external_ip}/json/').json() + country_name = location_data.get("country_name") + country_code = location_data.get("country_code") + if country_code is None or country_name is None: + print(f"I can´t identify your country based on your IP [{external_ip}], so you have to set timezone or country manually.") + exit(1) + else: + print(f'According to your IP address [{external_ip}], it seems that your country is {country_name} [{country_code}]') + selection_bool = False + while not selection_bool: + selection = input("Is this correct? (y/n): ") + if selection == 'n': + print("I can´t identify your country based on your IP, so you have to set timezone or country manually.") + selection_bool = True + exit(1) + elif selection == 'y': + selection_bool = True + return timezone_selection(country_code) + +def set_timezone(country_code): + country_names = pytz.country_names + if country_code.upper() == 'X': + return undefined_timezone() + elif country_code.upper() not in country_names: + print("Incorrect country code.\nFor ISO Alpha-2 codes refer to http://iban.com/country-codes") + exit(1) + else: + return timezone_selection(country_code.upper()) + +def check_timezone(timezone): + timezones_list = pytz.all_timezones + if timezone.upper() == "X": + return undefined_timezone() + elif timezone == 'UTC': + print (f'\nTimezone is set to it\'s default value [UTC].') + selection_bool = False + while not selection_bool: + selection = input("Are you OK with it? (y/n): ") + if selection == 'n': + selection_bool = True + return undefined_timezone() + elif selection == 'y': + selection_bool = True + return 'UTC' + elif timezone not in timezones_list: + print("Incorrect timezone.") + exit(1) + else: + return timezone + +def check_chat_ids(chat_ids): + if type(chat_ids) is list: + return chat_ids + print('Wrong format of "chat ids", it must me a list. Defaulting to UTC.') + return 'UTC' + +def check_bot_token(bot_token): + regex = "^[0-9]*:[a-zA-Z0-9_-]{35}$" + if re.search(regex, bot_token): + if token_check := token_validator(bot_token): + return token_check + else: + exit(1) + else: + exit(1) + +def get_chat_ids(): + regex = "(-[0-9]+)" + chat_ids = [] + print("\nType in chat ids where the script will notify Apple Updates. To finish input, type \"0\".") + print("Remember to include the minus sign before each chat id like \"-6746386747\".") + selection_bool = False + counter = 1 + while not selection_bool: + selection = input(f'Type in chat id #{counter}: ') + if selection == '0': + selection_bool = True + return chat_ids + elif re.search(regex, selection): + chat_ids.append(selection) + counter += 1 + else: + print("Incorrect format, try again.") + +def argument_parser(progname_short, progname_long, ver): + description = f'script used to setup {progname_long}. It creates the systemd service file, config file and starts the service' + epilog = """bot token is made-up of a numerical string of 8-10 digits followed by a ":", and finishes with a 35 alphanumeric string. +for ISO Alpha-2 codes refer to http://iban.com/country-codes. +chat ids must be provided one after another separated by a space, like \"-123456 -4567890\".""" + parser = argparse.ArgumentParser(prog=f"{progname_short}", + description=textwrap.dedent(f"{description}"), + epilog=f"{epilog}", + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('-b', '--bot-token', help='set Telegram bot token', required=True) + parser.add_argument('-t', '--timezone', default='UTC', help='[optional] set bot timezone. Use X or x as argument to allow identification your timezone according to your IP address') + parser.add_argument('-c', '--country', help='[optional] define a country in order to select an appropriate timezone. Use X or x as argument to allow identification your country according to your IP address') + parser.add_argument('-v', '--version', action='version', version=f'%(prog)s {ver}', + help='show %(prog)s version information and exit') + parser.add_argument('-i', '--chat-ids', default=None, action="extend", nargs="*", type=str, help='[optional] define allowed chat ids at startup. If not set at startup, it will be prompted later in the script') + args = parser.parse_args() + config = vars(args) + bot_token = config['bot_token'] + country = config['country'] + chat_ids = config['chat_ids'] + timezone = set_timezone(country) if country is not None else config['timezone'] + timezone = check_timezone(timezone) + chat_ids = get_chat_ids() if chat_ids is None else check_chat_ids(chat_ids) + if check_bot_token(bot_token): + return bot_token, timezone, chat_ids + +def main(): + python_path = site.getusersitepackages() + local_path = os.getcwd() + + prog_name_short, prog_name_long, version, apple_url = get_config() + bot_token, timezone, chat_ids = argument_parser(prog_name_short, prog_name_long, version) + create_service_file(python_path, local_path, prog_name_long) + create_config_json(apple_url, prog_name_short, bot_token, chat_ids, timezone) + subprocess.run(f'{local_path}/asu-notifier.sh') + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt index 172ef50..c2c3827 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ -requests==2.31.0 +pytz~=2023.3 +requests~=2.31.0 +apprise~=1.4.5 beautifulsoup4~=4.12.2 -apprise==1.4.0 \ No newline at end of file +pycountry~=22.3.5 +setuptools==68.0.0 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..f03216a --- /dev/null +++ b/setup.py @@ -0,0 +1,12 @@ +from setuptools import setup + +setup( + name='asu-notifier', + version='0.4.0', + packages=[''], + url='https://github.com/Geek-MD/apple-security-updates-notifier', + license='MIT', + author='Geek-MD', + author_email='edison.montes@gmail.com', + description='Python script that checks for Apple software updates, and notifies via Telegram Bot' +) From e06b6deb19dbec26f04b5bc5f018b93a63e35530 Mon Sep 17 00:00:00 2001 From: Edison Montes M Date: Sun, 23 Jul 2023 19:10:15 -0400 Subject: [PATCH 5/5] Delete apple_security_updates.py --- apple_security_updates.py | 236 -------------------------------------- 1 file changed, 236 deletions(-) delete mode 100644 apple_security_updates.py diff --git a/apple_security_updates.py b/apple_security_updates.py deleted file mode 100644 index 1c163ee..0000000 --- a/apple_security_updates.py +++ /dev/null @@ -1,236 +0,0 @@ -#!/usr/bin/env python3 - -# Apple Updates v0.3.0 -# Python script that checks for Apple software updates, and notifies via Telegram Bot. -# This is a first workaround attempt for a persistent bot in the near future. - -import contextlib -import datetime -import hashlib -import json -import logging -import os -import sqlite3 -import time -import re -from datetime import datetime -from sqlite3 import Error, Connection -from typing import TypeVar - -import pytz -import requests -import schedule -from apprise import Apprise -from bs4 import BeautifulSoup - -os.chdir('/home/emontes/python/apple-security-updates-notifier') - -# set global variables -global apple_file, db_file, log_file, localtime, bot_token, chat_ids - -# SQL queries -sql_check_empty_database: str = """ SELECT COUNT(name) FROM sqlite_master WHERE type='table' AND name='main' """ -sql_create_main_table: str = """ CREATE TABLE main ( main_id integer PRIMARY KEY AUTOINCREMENT, log_date text NOT -NULL, file_hash text NOT NULL, log_message text NOT NULL ); """ -sql_create_updates_table: str = """ CREATE TABLE updates ( update_id integer PRIMARY KEY AUTOINCREMENT, update_date -text NOT NULL, update_product text NOT NULL, update_target text NOT NULL, update_link text, file_hash text NOT NULL ); """ -sql_main_table_hash_check: str = """ SELECT COUNT(*) FROM main WHERE file_hash = ? """ -sql_main_table: str = """ INSERT INTO main (log_date, file_hash, log_message) VALUES (?, ?, ?); """ -sql_updates_table: str = """ INSERT INTO updates (update_date, update_product, update_target, update_link, file_hash) -VALUES (?, ?, ?, ?, ?); """ -sql_get_updates: str = """ SELECT update_date, update_product, update_target, update_link FROM updates ORDER BY update_id DESC; """ -sql_get_updates_count: str = """ SELECT count(update_date) FROM updates WHERE update_date = ?; """ -sql_get_last_updates: str = """ SELECT update_date, update_product, update_target, update_link FROM updates WHERE update_date = ?; """ -sql_get_last_update_date: str = """ SELECT update_date FROM updates ORDER BY update_id DESC LIMIT 1; """ -sql_get_update_dates: str = """ SELECT DISTINCT update_date FROM updates; """ -sql_get_date_update: str = """ SELECT update_date, update_product, update_target, update_link FROM updates WHERE update_date = ?; """ - - -def get_config(): - global apple_file, db_file, log_file, localtime, bot_token, chat_ids - config = open('config.json', 'r') - data = json.loads(config.read()) - apple_file = data['apple_file'] - db_file = data['db_file'] - log_file = data['log_file'] - timezone = data['timezone'] - localtime = pytz.timezone(timezone) - bot_token = data['bot_token'] - chat_ids = data['chat_ids'] - - -def create_connection(file): - if not os.path.isfile(file): - logging.info(f'\'{file}\' database created.') - conn = TypeVar('conn', Connection, None) - try: - conn: Connection = sqlite3.connect(file) - except Error as error: - logging.error(str(error)) - return conn - - -def create_table(conn, sql_create_table, table_name): - with contextlib.suppress(Error): - try: - conn.cursor().execute(sql_create_table) - logging.info(f'\'{table_name}\' table created.') - except Error as error: - logging.error(str(error)) - - -def get_updates(conn, full_update): - cursor = conn.cursor() - response = requests.get(apple_file) - content = response.content - file_hash = hashlib.sha256(content).hexdigest() - available_updates = cursor.execute(sql_main_table_hash_check, [file_hash]).fetchone()[0] == 0 - if not available_updates: - logging.info('No updates available.') - else: - update_databases(conn, content, file_hash, full_update) - conn.commit() - conn.close() - - -def update_databases(conn, content, file_hash, full_update): - log_date = datetime.now(tz=localtime) - update_main_database(conn, log_date, file_hash, full_update) - update_updates_database(conn, file_hash, content, full_update) - - -def update_main_database(conn, log_date, file_hash, full_update): - cursor = conn.cursor() - if full_update: - log_message = f'First \'main\' table population - SHA256: {file_hash}.' - else: - log_message = f'\'main\' table updated - SHA256: {file_hash}.' - cursor.execute(sql_main_table, (log_date, file_hash, log_message)) - logging.info(log_message) - conn.commit() - - -def update_updates_database(conn, file_hash, content, full_update): - cursor = conn.cursor() - soup = BeautifulSoup(content, 'html.parser') - updates_table = soup.find('div', id="tableWraper").find_all('tr') - recent_updates = formatted_content(updates_table) - if full_update: - log_message = f'First \'updates\' table population - SHA256: {file_hash}.' - else: - log_message = f'\'updates\' table updated - SHA256: {file_hash}.' - old_updates = cursor.execute(sql_get_updates).fetchall() - for element in old_updates: - recent_updates.remove(element) - for element in recent_updates: - cursor.execute(sql_updates_table, (element[0], element[1], element[2], element[3], file_hash)) - logging.info(log_message) - conn.commit() - apprise_notification(conn, recent_updates, full_update) - - -def formatted_content(content): - content_list = [] - for i, row in enumerate(content): - if i == 0: - continue - columns = row.find_all('td') - date_str = columns[2].get_text().strip().replace('\xa0', ' ') - update_date = check_date(date_str) - update_product = columns[0].get_text().strip().replace( - 'Esta actualización no tiene ninguna entrada de CVE publicada.', '').replace('\xa0', ' ').replace('\n', '') - update_target = columns[1].get_text().replace('\xa0', ' ').replace('\n', '') - try: - update_link = columns[0].find('a')['href'] - except Exception: - update_link = None - list_element = [update_date, update_product, update_target, update_link] - content_list.append(list_element) - content_list.reverse() - return content_list - - -def check_date(date_str): - pattern = r'(\d{1,2}) de (\w+) de (\d{4})' - try: - match = re.match(pattern, date_str) - day = int(match[1]) - month = match[2] - year = int(match[3]) - month_list = ['enero', 'febrero', 'marzo', 'abril', 'mayo', 'junio', 'julio', 'agosto', 'septiembre', 'octubre', - 'noviembre', 'diciembre'] - month_num = month_list.index(month) + 1 - return datetime(year, month_num, day) - except Exception: - return date_str - - -def apprise_notification(conn, updates, full_update): - apprise_object = Apprise() - apprise_message = build_message(conn, updates, full_update) - for chat_id in chat_ids: - apprise_syntax = f'tgram://{bot_token}/{chat_id}/?format=markdown' - apprise_object.add(apprise_syntax, tag='telegram') - apprise_object.notify(apprise_message, tag="telegram") - - -def build_message(conn, last_updates, full_update): - max_updates = 5 - cursor = conn.cursor() - if full_update: - last_updates = [] - update_dates = cursor.execute(sql_get_update_dates).fetchall() - for element in reversed(update_dates): - if max_updates >= 0: - query = cursor.execute(sql_get_date_update, element).fetchall() - query_count = cursor.execute(sql_get_updates_count, element).fetchone()[0] - last_updates += query - max_updates -= query_count - apprise_message = '*Últimas actualizaciones de Apple.*\n\n' - else: - apprise_message = '*Nuevas actualizaciones de Apple.*\n\n' - for element in reversed(last_updates): - if element[0] == 'Preinstalado': - date_time = element[0] - else: - date = datetime.strptime(str(element[0]), "%Y-%m-%d %H:%M:%S") - date_time = date.strftime("%d/%m/%Y") - apprise_message += f'_{date_time}_' - if element[3] is not None: - apprise_message += f' - [{element[1]}]({element[3]})' - else: - apprise_message += f' - _{element[1]}_' - apprise_message += f' - {element[2]}\n' - return apprise_message - - -def main(): - get_config() - - # logging - log_format = '%(asctime)s -- %(message)s' - logging.basicConfig(filename=log_file, encoding='utf-8', format=log_format, level=logging.INFO) - - # create a database connection - conn: Connection = create_connection(db_file) - cursor = conn.cursor() - empty_database = cursor.execute(sql_check_empty_database).fetchone()[0] == 0 - if empty_database: - # create database tables and populate them - create_table(conn, sql_create_main_table, 'main') - create_table(conn, sql_create_updates_table, 'updates') - - # run first database update - get_updates(conn, full_update=True) - - # Schedule the function to run every hour - schedule.every().hour.do(get_updates, conn=conn, full_update=False) - - # Run the scheduler continuously - while True: - schedule.run_pending() - time.sleep(1) - - -if __name__ == '__main__': - main() \ No newline at end of file