Skip to content

Commit

Permalink
CLI: better parallel processing (#39)
Browse files Browse the repository at this point in the history
* better CLI parallelism

* simplify syntax

* add tests
  • Loading branch information
adbar authored May 31, 2023
1 parent d4ec4de commit 6cb411a
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 31 deletions.
74 changes: 44 additions & 30 deletions courlan/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import argparse
import sys

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, List, Optional, Tuple
from concurrent.futures import ProcessPoolExecutor, as_completed
from itertools import islice
from typing import Any, Iterator, List, Optional, Tuple

from .core import check_url, sample_urls

Expand Down Expand Up @@ -76,45 +77,58 @@ def parse_args(args: Any) -> Any:
return argsparser.parse_args()


def _cli_check_url(
url: str,
def _cli_check_urls(
urls: List[str],
strict: bool = False,
with_redirects: bool = False,
language: Optional[str] = None,
with_nav: bool = False,
) -> Tuple[bool, str]:
) -> List[Tuple[bool, str]]:
"Internal function to be used with CLI multiprocessing."
result = check_url(
url,
strict=strict,
with_redirects=with_redirects,
language=language,
with_nav=with_nav,
)
if result is not None:
return (True, result[0])
return (False, url)
results = []
for url in urls:
result = check_url(
url,
strict=strict,
with_redirects=with_redirects,
language=language,
with_nav=with_nav,
)
if result is not None:
results.append((True, result[0]))
else:
results.append((False, url))
return results


def _get_line_batches(filename: str, size: int = 1000) -> Iterator[List[str]]:
"Iterate over a file and returns series of line batches."
with open(filename, "r", encoding="utf-8", errors="ignore") as inputfh:
while True:
line_batch = list(islice(inputfh, size))
if not line_batch:
break
yield line_batch


def process_args(args: Any) -> None:
"""Start processing according to the arguments"""
if not args.sample:
with ThreadPoolExecutor(max_workers=args.parallel) as executor:
with open(
args.inputfile, "r", encoding="utf-8", errors="ignore"
) as inputfh, open(args.outputfile, "w", encoding="utf-8") as outputfh:
futures = (
executor.submit(
_cli_check_url,
line,
strict=args.strict,
with_redirects=args.redirects,
language=args.language,
)
for line in inputfh
with ProcessPoolExecutor(max_workers=args.parallel) as executor, open(
args.outputfile, "w", encoding="utf-8"
) as outputfh:
futures = (
executor.submit(
_cli_check_urls,
batch,
strict=args.strict,
with_redirects=args.redirects,
language=args.language,
)
for future in as_completed(futures):
valid, url = future.result()
for batch in _get_line_batches(args.inputfile)
)
for future in as_completed(futures):
for valid, url in future.result():
if valid:
outputfh.write(url + "\n")
# proceed with discarded URLs. to be rewritten
Expand Down
10 changes: 9 additions & 1 deletion tests/unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,12 @@ def test_cli():
assert args.parallel == 2
assert os.system("courlan --help") == 0 # exit status

# _cli_check_urls
assert cli._cli_check_urls(["123", "https://example.org"]) == [
(False, "123"),
(True, "https://example.org"),
]

# testfile
inputfile = os.path.join(RESOURCES_DIR, "input.txt")
os_handle, temp_outputfile = tempfile.mkstemp(suffix=".txt", text=True)
Expand All @@ -882,7 +888,7 @@ def test_cli():
# test for Windows and the rest
assert (
subprocess.run(
[courlan_bin, "-i", inputfile, "-o", temp_outputfile], env=env
[courlan_bin, "-i", inputfile, "-o", temp_outputfile, "-p", "1"], env=env
).returncode
== 0
)
Expand All @@ -901,6 +907,8 @@ def test_cli():
"--language",
"en",
"--strict",
"-p",
"1",
]
f = io.StringIO()
with patch.object(sys, "argv", testargs):
Expand Down

0 comments on commit 6cb411a

Please sign in to comment.