-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
org export: massive speedup for checking
- Loading branch information
Showing
3 changed files
with
118 additions
and
56 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,77 +1,133 @@ | ||
from concurrent.futures import ProcessPoolExecutor | ||
from __future__ import annotations | ||
|
||
from dataclasses import dataclass | ||
import os | ||
from pathlib import Path | ||
from subprocess import run | ||
from typing import Iterator, List | ||
import re | ||
from typing import Iterator, Optional | ||
|
||
|
||
import orgparse | ||
|
||
|
||
# TODO !! implement a test for this (all of params) | ||
def search(*args) -> Iterator[Exception]: | ||
res = run(['rg', '--follow', '-i', *args]) | ||
if res.returncode == 1: | ||
return # ok, nothin is found | ||
else: | ||
yield RuntimeError(res) | ||
@dataclass | ||
class Config: | ||
f_checks: list[str] # exact matches | ||
word_checks: list[str] | ||
tag_checks: set[str] | ||
|
||
|
||
def check_one(path: Path) -> Iterator[Exception]: | ||
if 'CI' not in os.environ: | ||
from checks import F_CHECKS, WORD_CHECKS, TAG_CHECKS | ||
def get_config() -> Config: | ||
if 'CI' in os.environ: | ||
return Config(f_checks=[], word_checks=[], tag_checks=set()) | ||
else: | ||
F_CHECKS = [] | ||
WORD_CHECKS = [] | ||
TAG_CHECKS = set() | ||
|
||
for x in F_CHECKS: | ||
# TODO might be too many calls, maybe do it in a single regex? | ||
yield from search( | ||
'-F', | ||
x, | ||
path, | ||
) | ||
for x in WORD_CHECKS: | ||
yield from search( | ||
'--word-regexp', | ||
x, | ||
path, | ||
) | ||
|
||
# TODO not sure if should rely on a unit test? | ||
from checks import F_CHECKS, WORD_CHECKS, TAG_CHECKS | ||
|
||
return Config(f_checks=F_CHECKS, word_checks=WORD_CHECKS, tag_checks=TAG_CHECKS) | ||
|
||
|
||
def check_one(path: Path, *, cfg: Optional[Config] = None) -> Iterator[Exception]: | ||
if cfg is None: | ||
cfg = get_config() | ||
|
||
text = path.read_text() | ||
|
||
## find exact matches | ||
if len(cfg.f_checks) > 0: | ||
rgx = '|'.join(re.escape(x) for x in cfg.f_checks) | ||
m = re.search(rgx, text) | ||
if m is not None: | ||
yield RuntimeError('found occurence', rgx, m.group()) | ||
## | ||
|
||
## find 'words' | ||
if len(cfg.word_checks) > 0: | ||
rgx = r'\b(' + '|'.join(re.escape(x) for x in cfg.word_checks) + r')\b' | ||
m = re.search(rgx, text) | ||
if m is not None: | ||
yield RuntimeError('found occurence', rgx, m.group()) | ||
## | ||
|
||
### detect timestamps with time | ||
ts = orgparse.date.TIMESTAMP_RE | ||
for line in path.read_text().splitlines(): | ||
m = ts.search(line) | ||
allowed = { | ||
'inactive_year', | ||
'inactive_month', | ||
'inactive_day', | ||
} | ||
pos = 0 | ||
while pos < len(text): | ||
m = ts.search(text, pos=pos) | ||
if m is None: | ||
continue | ||
allowed = { | ||
'inactive_year', | ||
'inactive_month', | ||
'inactive_day', | ||
break | ||
d = { | ||
k: v | ||
for k, v in m.groupdict().items() | ||
if v is not None and k not in allowed | ||
} | ||
d = {k: v for k, v in m.groupdict().items() if v is not None and k not in allowed} | ||
if len(d) != 0: | ||
yield RuntimeError(d, line) | ||
yield RuntimeError(d, m.group()) | ||
pos = m.start() + 1 | ||
### | ||
|
||
o = orgparse.loads(path.read_text()) | ||
### find forbidden tags | ||
o = orgparse.loads(text) | ||
for n in o: | ||
found = n.tags.intersection(TAG_CHECKS) | ||
found = n.tags.intersection(cfg.tag_checks) | ||
if len(found) > 0: | ||
yield RuntimeError(path, n.heading, found) | ||
### | ||
|
||
|
||
def test_checks(tmp_path: Path) -> None: | ||
cfg = Config( | ||
f_checks=[ | ||
'web.telegram.org', | ||
'mail.google.com', | ||
], | ||
word_checks=['some', 'noth'], | ||
tag_checks=set(), | ||
) | ||
|
||
def do(text: str) -> list[Exception]: | ||
p = tmp_path / 'file.org' | ||
p.write_text(text) | ||
return list(check_one(p, cfg=cfg)) | ||
|
||
assert len(do(''' | ||
* nothing | ||
** [2019-11-02 Sat] wrong | ||
with | ||
* this file :sometag: | ||
''')) == 0 | ||
|
||
assert len(do(''' | ||
somethings | ||
whoopsweb.telegram.org/accidental link | ||
more text | ||
''')) == 1 | ||
|
||
def _check_one(path: Path) -> List[str]: | ||
# helper for multiprocessing.. | ||
return list(map(str, check_one(path))) | ||
assert len(do(''' | ||
somethings | ||
oh | ||
mail.google.com/somelink | ||
more text | ||
''')) == 1 | ||
|
||
assert len(do(''' | ||
* bad link in body [2019-10-17 Thu] | ||
alala mail.google.com/whatever | ||
* but also bad timestamp | ||
[2019-10-18 Fri 02:06] | ||
''')) == 2 | ||
|
||
def check_all(path: Path) -> None: | ||
# TODO not sure about org? | ||
org_files = sorted(path.rglob('*.org')) | ||
assert len(do(''' | ||
* I am ok | ||
* I contain some forbidden words | ||
''')) == 1 | ||
|
||
with ProcessPoolExecutor() as pool: | ||
for f, res in zip(org_files, pool.map(_check_one, org_files)): | ||
for x in res: | ||
# TODO collect errors, report once? | ||
raise RuntimeError(x) | ||
assert len(do(''' | ||
* I end with forbidden noth | ||
* but I am ok | ||
''')) == 1 |