forked from mk-fg/fgtk
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
166 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import os, sys, socket, re, unicodedata, pathlib as pl | ||
import time, datetime as dt, subprocess as sp | ||
import urllib.request as ul, urllib.error as ule, urllib.parse as ulp | ||
|
||
import feedparser as fp | ||
|
||
|
||
p_err = lambda *a,**k: print(*a, file=sys.stderr, **k) or 1 | ||
|
||
class adict(dict): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self.__dict__ = self | ||
|
||
cc = lambda v: unicodedata.normalize('NFKC', v).casefold() | ||
|
||
conf = adict( version='1.0', socket_timeout=20.0, | ||
feed_user_agent='rss-get/{ver} (github.com/mk-fg/fgtk#rss-get) feedparser/{ver_fp}' ) | ||
|
||
|
||
def main(args=None): | ||
import argparse | ||
parser = argparse.ArgumentParser( | ||
description='Parse feed at the URL and print/download enclosure links from it.' | ||
' Prints item info by default, unless -n/--num and/or -p/--print-urls opts are specified.' | ||
' Downloads to current dir, by passing' | ||
' filelist to aria2c with some parallelization options.') | ||
parser.add_argument('url', help='URL or file path to grab the feed from.') | ||
parser.add_argument('-f', '--file-type', metavar='ext', | ||
help='Enclosure file extension or content-type to match in case-insensitive manner.' | ||
' Extension gets picked from last slug of the enclosure URLs.' | ||
' Examples: audio/mp3, audio/mpeg, mp3, ogg') | ||
parser.add_argument('-o', '--out', metavar='file', | ||
help='Cache fetched feed to specified file and parse from there.') | ||
parser.add_argument('-n', '--num', metavar='n[-m]', | ||
help='Item(s) to print/download. Downloads them by default.' | ||
' Can be either an integer item number (1 is latest), or N-M for first-last (inclusive).') | ||
parser.add_argument('-p', '--print-urls', action='store_true', | ||
help='Print all or selected (by -n/--num) item URLs (one per line) instead of downloading.') | ||
parser.add_argument('-t', '--ts-names', action='store_true', | ||
help='Use timestamp-derived names in "YY-mm-dd.HHMM.N.ext" format, if possible.') | ||
opts = parser.parse_args(sys.argv[1:] if args is None else args) | ||
|
||
a = b = None | ||
if opts.num: | ||
m = re.search(r'^(\d+)(?:-(\d+))?$', opts.num) | ||
if not m: parser.error('Invalid format for -n/--num option, should be N[-M]') | ||
a, b = int(m.group(1)) or 1, int(m.group(2) or 0) | ||
|
||
feed_url = opts.url | ||
feed_ua = conf.feed_user_agent.format(ver=conf.version, ver_fp=fp.__version__) | ||
if conf.socket_timeout is not None: socket.setdefaulttimeout(conf.socket_timeout) | ||
|
||
if opts.out: # fetch via urllib and replace feed_url | ||
feed_cache = pl.Path(opts.out).resolve() | ||
if not re.search(r'^[\w\d]+:', feed_url) and pl.Path(feed_url).resolve() == feed_cache: | ||
feed_cache = None # same path as url argument | ||
if feed_cache: | ||
status = err = None | ||
try: | ||
req = ul.Request(feed_url, headers={'User-Agent': feed_ua}) | ||
with ul.urlopen(req) as req: status, err, body = req.getcode(), req.reason, req.read() | ||
except ule.URLError: status = 1000 | ||
if status >= 300: | ||
if body and len(body) < 250: err = repr(body.decode('utf-8', 'backslashreplace')) | ||
return p_err(f'Failed to cache feed (status={status}): {opts.url!r} - {err}') | ||
feed_cache.write_bytes(body) | ||
feed_url = feed_cache | ||
|
||
feed = fp.parse(feed_url, agent=feed_ua) | ||
file_queue = list() | ||
|
||
status, bozo, bozo_err = ( | ||
feed.get(k) for k in ['status', 'bozo', 'bozo_exception'] ) | ||
fetch_fail = (not status and bozo) or (status or 1000) >= 400 | ||
if not fetch_fail: | ||
if not feed.entries: | ||
fetch_fail = bozo = True | ||
bozo_err = 'No feed entries' | ||
elif status is None and not re.search(r'^[\w\d]+:', feed_url): fetch_fail = False # from file | ||
if fetch_fail: | ||
if bozo and not bozo_err: bozo_err = f'[no error msg (bozo={bozo})]' | ||
return p_err(f'Failed to fetch feed (status={status}): {opts.url!r} - {bozo_err}') | ||
|
||
ext_filter = opts.file_type | ||
if ext_filter: ext_filter = cc(ext_filter.strip()) | ||
|
||
entries, file_meta = list(enumerate(feed.entries, 1)), dict() | ||
for n, e in reversed(entries): | ||
|
||
enc_list = list() | ||
for enc in sorted(e.enclosures or list(), key=lambda enc: enc.get('type') or '-'): | ||
enc_href = enc.get('href') | ||
if not enc_href: continue | ||
if ext_filter: | ||
if '/' in ext_filter: | ||
enc_mime = enc.get('type') | ||
if enc_mime and cc(enc_mime.split(';', 1)[0].strip()) != ext_filter: continue | ||
else: | ||
enc_ext = enc_href.rsplit('.', 1)[-1] | ||
if ( len(ext_filter) < 5 and len(enc_ext) < 5 | ||
and ext_filter != cc(enc_ext) ): continue | ||
enc_list.append(enc_href) | ||
if not enc_list: continue | ||
|
||
for k in 'published created modified'.split(): | ||
if not e.get(k): continue | ||
ts, ts_str = e[f'{k}_parsed'], e[k] | ||
if ts: # not sure if timezone is discarded here, shouldn't matter | ||
ts = dt.datetime.fromtimestamp(time.mktime(ts)) | ||
break | ||
else: ts = ts_str = None | ||
|
||
if a: | ||
if n < a: continue | ||
if b: | ||
if n > b: continue | ||
elif n != a: continue | ||
|
||
url = enc_list[0] | ||
if opts.print_urls: print(url) | ||
else: | ||
file_queue.append(url) | ||
file_meta[url] = adict(ts=ts, n=n) | ||
|
||
else: | ||
print(f'--- [{n: 2d}] {e.title}') | ||
print(f' [{ts_str}] {e.link}') | ||
for enc in enc_list: print(f' {enc_href}') | ||
print() | ||
|
||
if not file_queue: return | ||
|
||
if not opts.ts_names: | ||
file_list = ''.join(f'{url}\n' for url in file_queue) | ||
else: | ||
file_list = list() | ||
for url in file_queue: | ||
info = file_meta[url] | ||
file_list.append(f'{url}\n') | ||
if info.ts: | ||
ext = os.path.basename(ulp.urlparse(url).path).rsplit('.')[-1] | ||
p = info.ts.strftime(f'%y-%m-%d.%H%M.{info.n:03d}.{ext}') | ||
file_list.append(f' out={p}\n') | ||
file_list = ''.join(file_list) | ||
|
||
sp.run( | ||
['aria2c', '-s8', '-x8', '-k', '2M', '--user-agent', feed_ua, '--input-file', '-'], | ||
input=file_list.encode(), check=True ) | ||
|
||
if __name__ == '__main__': sys.exit(main()) |