From d4ec4de51b463cc9aacac354e4d58609d7aff925 Mon Sep 17 00:00:00 2001 From: Adrien Barbaresi Date: Tue, 30 May 2023 17:19:16 +0200 Subject: [PATCH] CLI: add partial support for parallel processing (#37) * CLI: add partial support for parallel processing * add test --- courlan/cli.py | 68 ++++++++++++++++++++++++++++++++++----------- tests/unit_tests.py | 3 ++ 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/courlan/cli.py b/courlan/cli.py index 8eed55a8..b956cebd 100644 --- a/courlan/cli.py +++ b/courlan/cli.py @@ -8,7 +8,8 @@ import argparse import sys -from typing import Any, List +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, List, Optional, Tuple from .core import check_url, sample_urls @@ -42,6 +43,13 @@ def parse_args(args: Any) -> Any: group1.add_argument( "-v", "--verbose", help="increase output verbosity", action="store_true" ) + group1.add_argument( + "-p", + "--parallel", + help="number of parallel threads (not used for sampling)", + type=int, + default=4, + ) group2 = argsparser.add_argument_group("Filtering", "Configure URL filters") group2.add_argument( "--strict", help="perform more restrictive tests", action="store_true" @@ -68,25 +76,53 @@ def parse_args(args: Any) -> Any: return argsparser.parse_args() +def _cli_check_url( + url: str, + strict: bool = False, + with_redirects: bool = False, + language: Optional[str] = None, + with_nav: bool = False, +) -> Tuple[bool, str]: + "Internal function to be used with CLI multiprocessing." + result = check_url( + url, + strict=strict, + with_redirects=with_redirects, + language=language, + with_nav=with_nav, + ) + if result is not None: + return (True, result[0]) + return (False, url) + + def process_args(args: Any) -> None: """Start processing according to the arguments""" if not args.sample: - with open( - args.inputfile, "r", encoding="utf-8", errors="ignore" - ) as inputfh, open(args.outputfile, "w", encoding="utf-8") as outputfh: - for line in inputfh: - result = check_url( - line, - strict=args.strict, - with_redirects=args.redirects, - language=args.language, + with ThreadPoolExecutor(max_workers=args.parallel) as executor: + with open( + args.inputfile, "r", encoding="utf-8", errors="ignore" + ) as inputfh, open(args.outputfile, "w", encoding="utf-8") as outputfh: + futures = ( + executor.submit( + _cli_check_url, + line, + strict=args.strict, + with_redirects=args.redirects, + language=args.language, + ) + for line in inputfh ) - if result is not None: - outputfh.write(result[0] + "\n") - # proceed with discarded URLs. to be rewritten - elif args.discardedfile is not None: - with open(args.discardedfile, "a", encoding="utf-8") as discardfh: - discardfh.write(line) + for future in as_completed(futures): + valid, url = future.result() + if valid: + outputfh.write(url + "\n") + # proceed with discarded URLs. to be rewritten + elif args.discardedfile is not None: + with open( + args.discardedfile, "a", encoding="utf-8" + ) as discardfh: + discardfh.write(url) else: urllist: List[str] = [] with open(args.inputfile, "r", encoding="utf-8", errors="ignore") as inputfh: diff --git a/tests/unit_tests.py b/tests/unit_tests.py index 59fa3058..26c7cce2 100644 --- a/tests/unit_tests.py +++ b/tests/unit_tests.py @@ -856,6 +856,8 @@ def test_cli(): "-v", "--language", "en", + "--parallel", + "2", ] with patch.object(sys, "argv", testargs): args = cli.parse_args(testargs) @@ -864,6 +866,7 @@ def test_cli(): assert args.outputfile == "output.txt" assert args.verbose is True assert args.language == "en" + assert args.parallel == 2 assert os.system("courlan --help") == 0 # exit status # testfile