A plugin for SimpleBot, a Delta Chat(http://delta.chat/) bot that allows to subscribe to RSS/Atoms feeds. fh.read() + + setup( + name=MODULE_NAME, + version=version, + description=DESC, + long_description=long_description, + long_description_content_type='text/x-rst', + keywords='simplebot plugin deltachat', + license='MPL', + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Environment :: Plugins', + 'Programming Language :: Python :: 3', + 'License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)', + 'Operating System :: OS Independent', + 'Topic :: Utilities', + ], + zip_safe=False, + include_package_data=True, + packages=find_packages(), + install_requires=[ + 'simplebot', + 'feedparser', + 'html2text' + ], + entry_points={ + 'simplebot.plugins': '{0} = {0}'.format(MODULE_NAME), + }, + ) diff --git a/simplebot_feeds/__init__.py b/simplebot_feeds/__init__.py new file mode 100644 index 0000000..5c083e8 --- /dev/null +++ b/simplebot_feeds/__init__.py @@ -0,0 +1,249 @@ + +import os +import sqlite3 +from threading import Thread +from time import sleep +from typing import Optional + +import feedparser +import html2text +import simplebot +from deltachat import Chat, Contact, Message +from feedparser.exceptions import CharacterEncodingOverride +from simplebot.bot import DeltaBot, Replies + +from .db import DBManager +from .util import ResultProcess + +__version__ = '1.0.0' +feedparser.USER_AGENT = 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:60.0)' +feedparser.USER_AGENT += ' Gecko/20100101 Firefox/60.0' +html2text.config.WRAP_LINKS = False +db: DBManager +TIMEOUT = 60 + + +@simplebot.hookimpl +def deltabot_init(bot: DeltaBot) -> None: + global db + db = _get_db(bot) + + _getdefault(bot, 'delay', 60*5) + _getdefault(bot, 'max_feed_count', -1) + + +@simplebot.hookimpl +def deltabot_start(bot: DeltaBot) -> None: + Thread(target=_check_feeds, args=(bot,), daemon=True).start() + + +@simplebot.hookimpl +def deltabot_member_removed(bot: DeltaBot, chat: Chat, contact: Contact) -> None: + me = bot.self_contact + if me == contact or len(chat.get_contacts()) <= 1: + feeds = db.get_feeds(chat.id) + if feeds: + db.remove_fchat(chat.id) + for feed in feeds: + if not db.get_fchats(feed['url']): + db.remove_feed(feed['url']) + + +@simplebot.command +def feed_sub(bot: DeltaBot, payload: str, message: Message, replies: Replies) -> None: + """Subscribe current chat to the given feed. + """ + url = db.normalize_url(payload) + feed = db.get_feed(url) + + if feed: + process = ResultProcess(target=feedparser.parse, args=(feed['url'],)) + process.start() + d = process.get_result(TIMEOUT) + else: + max_fc = int(_getdefault(bot, 'max_feed_count')) + if 0 <= max_fc <= len(db.get_feeds()): + replies.add(text='Sorry, maximum number of feeds reached') + return + process = ResultProcess(target=feedparser.parse, args=(url,)) + process.start() + d = process.get_result(TIMEOUT) + bozo_exception = d.get('bozo_exception', '') + if (d.get('bozo') == 1 and not isinstance( + bozo_exception, CharacterEncodingOverride)) or not d.entries: + replies.add(text='Invalid feed url: {}'.format(url)) + bot.logger.warning('Invalid feed %s: %s', url, bozo_exception) + return + feed = dict( + url=url, + etag=d.get('etag'), + modified=d.get('modified') or d.get('updated'), + latest=get_latest_date(d.entries), + ) + db.add_feed(url, feed['etag'], feed['modified'], feed['latest']) + assert feed + + if message.chat.is_group(): + chat = message.chat + else: + chat = bot.create_group( + d.feed.get('title') or url, [message.get_sender_contact()]) + + if chat.id in db.get_fchats(feed['url']): + replies.add(text='Chat alredy subscribed to that feed.', chat=chat) + return + + db.add_fchat(chat.id, feed['url']) + title = d.feed.get('title') or '-' + desc = d.feed.get('description') or '-' + text = 'Title: {}\n\nURL: {}\n\nDescription: {}'.format( + title, feed['url'], desc) + + if d.entries and feed['latest']: + latest = tuple(map(int, feed['latest'].split())) + html = format_entries(get_old_entries(d.entries, latest)[:5]) + replies.add(text=text, html=html, chat=chat) + else: + replies.add(text=text, chat=chat) + + +@simplebot.command +def feed_unsub(payload: str, message: Message, replies: Replies) -> None: + """Unsubscribe current chat from the given feed. + """ + url = payload + feed = db.get_feed(url) + if not feed: + replies.add(text='Unknow feed: {}'.format(url)) + return + + if message.chat.id not in db.get_fchats(feed['url']): + replies.add( + text='This chat is not subscribed to: {}'.format(feed['url'])) + return + + db.remove_fchat(message.chat.id, feed['url']) + if not db.get_fchats(feed['url']): + db.remove_feed(feed['url']) + replies.add(text='Chat unsubscribed from: {}'.format(feed['url'])) + + +@simplebot.command +def feed_list(message: Message, replies: Replies) -> None: + """List feed subscriptions for the current chat. + """ + feeds = db.get_feeds(message.chat.id) + text = '\n\n'.join(f['url'] for f in feeds) + replies.add(text=text or 'No feed subscriptions in this chat') + + +def _check_feeds(bot: DeltaBot) -> None: + while True: + bot.logger.debug('Checking feeds') + for f in db.get_feeds(): + try: + _check_feed(bot, f) + except Exception as err: + bot.logger.exception(err) + sleep(int(_getdefault(bot, 'delay'))) + + +def _check_feed(bot: DeltaBot, f: sqlite3.Row) -> None: + fchats = db.get_fchats(f['url']) + + if not fchats: + db.remove_feed(f['url']) + return + + bot.logger.debug('Checking feed: %s', f['url']) + process = ResultProcess( + target=feedparser.parse, + args=(f['url'],), kwargs=dict(etag=f['etag'], modified=f['modified'])) + process.start() + d = process.get_result(TIMEOUT) + + bozo_exception = d.get('bozo_exception', '') + if d.get('bozo') == 1 and not isinstance( + bozo_exception, CharacterEncodingOverride): + bot.logger.exception(bozo_exception) + return + + if d.entries and f['latest']: + d.entries = get_new_entries( + d.entries, tuple(map(int, f['latest'].split()))) + if not d.entries: + return + + html = format_entries(d.entries[:50]) + replies = Replies(bot, logger=bot.logger) + for gid in fchats: + try: + replies.add(html=html, chat=bot.get_chat(gid)) + except (ValueError, AttributeError): + db.remove_fchat(gid) + replies.send_reply_messages() + + latest = get_latest_date(d.entries) or f['latest'] + modified = d.get('modified') or d.get('updated') + db.update_feed(f['url'], d.get('etag'), modified, latest) + + +def format_entries(entries: list) -> str: + entries_text = [] + for e in entries: + t = '<a href="{}"><h3>{}</h3></a>'.format( + e.get('link') or '', e.get('title') or 'NO TITLE') + pub_date = e.get('published') + if pub_date: + t += '<p>📆 <small><em>{}</em></small></p>'.format(pub_date) + desc = e.get('description') or '' + if not desc and e.get('content'): + for c in e.get('content'): + if c.get('type') == 'text/html': + desc += c['value'] + if desc and desc != e.get('title'): + t += desc + entries_text.append(t) + return '<br><hr>'.join(entries_text) + + +def get_new_entries(entries: list, date: tuple) -> list: + new_entries = [] + for e in entries: + d = e.get('published_parsed') or e.get('updated_parsed') + if d is not None and d > date: + new_entries.append(e) + return new_entries + + +def get_old_entries(entries: list, date: tuple) -> list: + old_entries = [] + for e in entries: + d = e.get('published_parsed') or e.get('updated_parsed') + if d is not None and d <= date: + old_entries.append(e) + return old_entries + + +def get_latest_date(entries: list) -> Optional[str]: + dates = [] + for e in entries: + d = e.get('published_parsed') or e.get('updated_parsed') + if d: + dates.append(d) + return ' '.join(map(str, max(dates))) if dates else None + + +def _getdefault(bot: DeltaBot, key: str, value=None) -> str: + val = bot.get(key, scope=__name__) + if val is None and value is not None: + bot.set(key, value, scope=__name__) + val = value + return val + + +def _get_db(bot: DeltaBot) -> DBManager: + path = os.path.join(os.path.dirname(bot.account.db_path), __name__) + if not os.path.exists(path): + os.makedirs(path) + return DBManager(os.path.join(path, 'sqlite.db')) diff --git a/simplebot_feeds/db.py b/simplebot_feeds/db.py new file mode 100644 index 0000000..baf215f --- /dev/null +++ b/simplebot_feeds/db.py @@ -0,0 +1,91 @@ +import sqlite3 +from typing import List, Optional + + +class DBManager: + def __init__(self, db_path: str) -> None: + self.db = sqlite3.connect(db_path, check_same_thread=False) + self.db.row_factory = sqlite3.Row + with self.db: + self.db.execute( + '''CREATE TABLE IF NOT EXISTS feeds + (url TEXT PRIMARY KEY, + etag TEXT, + modified TEXT, + latest TEXT)''') + self.db.execute( + '''CREATE TABLE IF NOT EXISTS fchats + (gid INTEGER, + feed TEXT REFERENCES feeds(url), + PRIMARY KEY(gid, feed))''') + + def execute(self, statement: str, args=()) -> sqlite3.Cursor: + return self.db.execute(statement, args) + + def commit(self, statement: str, args=()) -> sqlite3.Cursor: + with self.db: + return self.db.execute(statement, args) + + def close(self) -> None: + self.db.close() + + # ==== feeds ===== + + def add_feed(self, url: str, etag: str, modified: str, latest: str) -> None: + url = self.normalize_url(url) + with self.db: + self.db.execute('INSERT INTO feeds VALUES (?,?,?,?)', + (url, etag, modified, latest)) + + def remove_feed(self, url: str) -> None: + url = self.normalize_url(url) + with self.db: + self.db.execute('DELETE FROM fchats WHERE feed=?', (url,)) + self.db.execute('DELETE FROM feeds WHERE url=?', (url,)) + + def update_feed(self, url: str, etag: Optional[str], + modified: Optional[str], latest: Optional[str]) -> None: + url = self.normalize_url(url) + q = 'UPDATE feeds SET etag=?, modified=?, latest=? WHERE url=?' + self.commit(q, (etag, modified, latest, url)) + + def get_feed(self, url: str) -> Optional[sqlite3.Row]: + url = self.normalize_url(url) + return self.db.execute( + 'SELECT * FROM feeds WHERE url=?', (url,)).fetchone() + + def get_feeds(self, gid: int = None) -> List[sqlite3.Row]: + if gid is None: + return self.db.execute('SELECT * FROM feeds').fetchall() + rows = self.db.execute( + 'SELECT feed FROM fchats WHERE gid=?', (gid,)).fetchall() + if not rows: + return [] + rows = [r[0] for r in rows] + q = 'SELECT * FROM feeds WHERE ' + q += ' or '.join('url=?' for r in rows) + return self.db.execute(q, rows).fetchall() + + def add_fchat(self, gid: int, url: str) -> None: + url = self.normalize_url(url) + self.commit('INSERT INTO fchats VALUES (?,?)', (gid, url)) + + def remove_fchat(self, gid: int, url: str = None) -> None: + if url: + url = self.normalize_url(url) + self.commit( + 'DELETE FROM fchats WHERE gid=? AND feed=?', (gid, url)) + else: + self.commit('DELETE FROM fchats WHERE gid=?', (gid,)) + + def get_fchats(self, url: str) -> List[int]: + url = self.normalize_url(url) + rows = self.db.execute('SELECT gid FROM fchats WHERE feed=?', (url,)) + return [r[0] for r in rows] + + def normalize_url(self, url: str) -> str: + if not url.startswith('http'): + url = 'http://'+url + if url.endswith('/'): + url = url[:-1] + return url diff --git a/simplebot_feeds/util.py b/simplebot_feeds/util.py new file mode 100644 index 0000000..9308f99 --- /dev/null +++ b/simplebot_feeds/util.py @@ -0,0 +1,42 @@ +import queue +from multiprocessing import Event, Process, Queue + + +class AbortError(Exception): + """User canceled the operation.""" + + +class ResultProcess(Process): + """A process + internal queue to get target result in other process.""" + + def __init__(self, target, **kwargs) -> None: + self._real_target = target + self._result_queue: Queue = Queue() + self._failed = Event() + kwargs.setdefault("daemon", True) + super().__init__(target=self._wrapper, **kwargs) + + def _wrapper(self, *args, **kwargs) -> None: + try: + self._result_queue.put(self._real_target(*args, **kwargs)) + except BaseException as ex: + self._failed.set() + self._result_queue.put(ex) + + def abort(self) -> None: + """Cancel process execution.""" + self.kill() + self._failed.set() + self._result_queue.put(AbortError()) + + def get_result(self, timeout: float = None, kill: bool = True): + """Return target result.""" + try: + result = self._result_queue.get(timeout=timeout) + except queue.Empty as ex: + if kill: + self.kill() + raise TimeoutError("Operation timed out.") from ex + if self._failed.is_set(): + raise result + return result