Skip to content

Commit

Permalink
lsx: add -t/--mtime option
Browse files Browse the repository at this point in the history
  • Loading branch information
mk-fg committed Oct 26, 2024
1 parent 805f42f commit a747c2c
Showing 1 changed file with 150 additions and 5 deletions.
155 changes: 150 additions & 5 deletions lsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import os, sys, stat, re, pathlib as pl, collections as cs

p_err = lambda *a,**kw: print('ERROR:', *a, **kw, file=sys.stderr, flush=True)

class adict(dict):
def __init__(self, *args, **kws):
super().__init__(*args, **kws)
self.__dict__ = self


def list_adjacent(paths, specs, files_only=False):
p_dirs, stats, p_out = dict(), dict(), dict()
Expand Down Expand Up @@ -72,6 +77,107 @@ def list_adjacent(paths, specs, files_only=False):
return list(ps_print.keys())


def mtime_parse(mtimes, err_func):
import datetime as dt
ts_now = dt.datetime.now()
_td_days = dict(
y=365.25, yr=365.25, year=365.25,
mo=30.5, month=30.5, w=7, week=7, d=1, day=1 )
_td_s = dict( h=3600, hr=3600, hour=3600,
m=60, min=60, minute=60, s=1, sec=1, second=1 )
_td_usort = lambda d: sorted(
d.items(), key=lambda kv: (kv[1], len(kv[0])), reverse=True )
_td_re = re.compile('(?i)^[-+]?' + ''.join( fr'(?P<{k}>\d+{k}\s*)?'
for k, v in [*_td_usort(_td_days), *_td_usort(_td_s)] ) + '$')
def _ts_parse(ts_str, rel=False):
ts = (ts_str := ts_str.strip()).lower() == 'now' and dt.timedelta(0)
no_ts = lambda: ts in [None, False]
if no_ts():
try: ts = dt.timedelta(seconds=float(ts_str))
except: pass
if no_ts() and ( # short time offset like "3d 5h"
(m := _td_re.search(ts_str)) and any(m.groups()) ):
delta = list()
for units in _td_days, _td_s:
val = 0
for k, v in units.items():
if not m.group(k): continue
val += v * int(''.join(filter(str.isdigit, m.group(k))) or 1)
delta.append(val)
ts = dt.timedelta(*delta)
if no_ts() and (m := re.search( # common BE format
r'^(?P<date>(?:\d{2}|(?P<Y>\d{4}))-\d{2}-\d{2})'
r'(?:[ T](?P<time>\d{2}(?::\d{2}(?::\d{2})?)?)?)?$', ts_str )):
tpl = 'y' if not m.group('Y') else 'Y'
tpl, tss = f'%{tpl}-%m-%d', m.group('date')
if m.group('time'):
tpl_time = ['%H', '%M', '%S']
tss += ' ' + ':'.join(tss_time := m.group('time').split(':'))
tpl += ' ' + ':'.join(tpl_time[:len(tss_time)])
try: ts = dt.datetime.strptime(tss, tpl)
except ValueError: pass
if no_ts() and (m := re.search( # just time without AM/PM - treat as 24h format
r'^\d{1,2}:\d{2}(?::\d{2}(?P<us>\.\d+)?)?$', ts_str )):
us, tpl = 0, ':'.join(['%H', '%M', '%S'][:len(ts_str.split(':'))])
if m.group('us'):
ts_str, us = ts_str.rsplit('.', 1)
us = us[:6] + '0'*max(0, 6 - len(us))
try: ts = dt.datetime.strptime(ts_str, tpl)
except ValueError: pass
else:
ts = ts_now.replace( hour=ts.hour,
minute=ts.minute, second=ts.second, microsecond=int(us) )
if ts > ts_now: ts -= dt.timedelta(1)
if no_ts(): # coreutils' "date" parses everything, but is more expensive to use
import subprocess as sp
while True:
res = sp.run( ['date', '+%s', '-d', ts_str],
stdout=sp.PIPE, stderr=sp.DEVNULL )
if res.returncode:
if ',' in ts_str: ts_str = ts_str.replace(',', ' '); continue
else:
ts = dt.datetime.fromtimestamp(int(res.stdout.strip()))
if 0 < (ts - ts_now).total_seconds() <= 24*3600 and re.search(
r'(?i)^[\d:]+\s*(am|pm)?\s*([-+][\d:]+|\w+|\w+[-+][\d:]+)?$', ts_str.strip() ):
ts -= dt.timedelta(1)
break
if no_ts(): err_func(f'Failed to parse -t/--mtime spec: {ts_str}')
if not rel and isinstance(ts, dt.timedelta): ts = ts_now - ts
return ts

if (ranges := any('/' in td for td in mtimes)) and not all('/' in td for td in mtimes):
err_func( '-t/--mtime values must either be'
' all deltas or time-ranges, cannot be a mix of both' )
for n, ts_str in enumerate(mtimes):
if ranges:
a, s, b = ts_str.partition('/')
mtimes[n] = sorted(_ts_parse(ts).timestamp() for ts in [a, b])
else:
if not isinstance(td := _ts_parse(ts_str, rel=True), dt.timedelta):
err_func(f'Absolute -t/--mtime specs can only be used in A/B ranges: {ts_str}')
mtimes[n] = td.total_seconds()
return adict(mtimes=mtimes, vicinity=not ranges)

def mtime_list(paths, td):
ps = list()
if td.vicinity:
td = max(td.mtimes)
for p in paths:
try: ps.append((ts := p.stat().st_mtime, p))
except OSError: p_err(f'path inaccessible [ {p} ]'); continue
for p2 in p.parent.iterdir():
try: ts2 = p2.stat().st_mtime
except OSError: continue
if abs(ts - ts2) <= td: ps.append((ts2, p2))
else:
for p in paths:
try: ts = p.stat().st_mtime
except OSError: continue
for a, b in td.mtimes:
if a <= ts <= b: ps.append((ts, p))
return list(p for ts, p in sorted(ps))


def main(argv=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
Expand All @@ -84,15 +190,19 @@ def main(argv=None):
parser.add_argument('paths', nargs='*', help=dd('''
File/dir path arguments for various options below.
If no other options are provided, tool will simply list all existing
path(s), printing errors on stderr for those that can't be accessed.'''))
path(s), printing errors on stderr for those that can't be accessed.
Unlike "ls" tool, directory contents aren't listed
recursively, unless -r/--recursive option is used.'''))

parser.add_argument('-f', '--files', action='store_true',
help='Print only files, not dirs or any other stuff.')
parser.add_argument('-0', '--null', action='store_true',
help='Print null-terminated list, instead of default newline-terminated.')
parser.add_argument('-r', '--recursive', action='store_true',
help='Recurse into specified directories, if other options support that.')

parser.add_argument('-a', '--adjacent', metavar='opts', action='append', help=dd('''
Show specified file/dir path, and also N files/dirs adjacent to it.
List specified file/dir path, and also N files/dirs adjacent to it.
"Adjacent" as in located within same directory, and sorted right before/after it.
Requires a parameter, consisting of optional numbers/letters:
number (digits) - count of items (default=10) to display before/after path(s),
Expand All @@ -108,6 +218,22 @@ def main(argv=None):
in which case they will be returned sorted by the first
parameter that's explicitly specified (or default alpha-sorted otherwise).'''))

parser.add_argument('-t', '--mtime',
metavar='delta-or-range', action='append', help=dd('''
List files/dirs that have modification time within specified range or vicinity.
Either time delta or time range can be specified with this option:
delta examples: 1h30m, 1d, 2w 3d 4h, 4:10:20, 55s, 55, ...
range examples: 5d/8d, 1d/now, 2024-10-20 4:10:20 / 2mo, 1:00/6:00, ...
If delta (relative time) is specified, files/dirs within that vicinity from
other file/dir command-line arguments are listed (and at least one is required).
If time range is specified (two absolute or relative timestamps in any order),
then either files/dirs that have mtime within that range are listed, either in the
current dir, or among/within specified file/dir arguments (see -r/--recursive option).
Multiple time ranges can be specified.
Results are ordered by mtime, pipe through "sort" for alphabetic ordering.
Works somewhat similar to "find" tool with its -mtime option,
but mostly intended to easily list files created/modified around the same time.'''))

opts = parser.parse_args(sys.argv[1:] if argv is None else argv)

pp_first, pp_nullsep = True, opts.null
Expand All @@ -120,13 +246,32 @@ def main(argv=None):
else: pp_first = False
sys.stdout.write(p)

paths = list(pl.Path(p) for p in opts.paths)
no_opts, paths = True, list(pl.Path(p) for p in opts.paths)

if opts.adjacent:
for p in list_adjacent(paths, opts.adjacent, opts.files): _pp(p)
else: # no options - print filtered input paths back
no_opts = False

if opts.mtime:
no_opts, td = False, mtime_parse(opts.mtime, parser.error)
if opts.recursive and not paths: paths = [pl.Path('.')]
if td.vicinity:
for p in mtime_list(paths, td): _pp(p)
else:
ps = list()
for p in paths:
if not p.exists(): p_err(f'path inaccessible [ {p} ]'); continue
if opts.recursive and p.is_dir(): ps.extend(p.iterdir())
elif not opts.files or p.is_file(): ps.append(p)
for p in mtime_list(ps, td): _pp(p)

if no_opts: # no options - print filtered input paths back
if opts.recursive and not paths: paths = [pl.Path('.')]
for p in paths:
if not p.exists(): p_err(f'path inaccessible [ {p} ]'); continue
if not opts.files or p.is_file(): _pp(p)
if opts.recursive and p.is_dir():
for p2 in p.iterdir(): _pp(p2)
elif not opts.files or p.is_file(): _pp(p)

if __name__ == '__main__':
try: sys.exit(main())
Expand Down

0 comments on commit a747c2c

Please sign in to comment.