forked from mk-fg/fgtk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hz
executable file
·90 lines (71 loc) · 2.94 KB
/
hz
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from __future__ import print_function
import itertools as it, operator as op, functools as ft
import os, sys, io, re
bs_start = 2**7
bs_max = 20 * 2**20
def main(args=None):
args = list(sys.argv[1:] if args is None else args)
for n, arg in enumerate(args):
if re.search(r'^-\d+$', arg): args[n] = '-n{}'.format(arg.lstrip('-'))
import argparse
parser = argparse.ArgumentParser(
description='"head", but with \\x00 delimeters.'
' Ensures that final chunk ends with delimeter by default.')
parser.add_argument('file',
nargs='?', metavar='path',
help='Path to a file to process instead of stdin.')
parser.add_argument('-n', '--count',
metavar='number', type=int,
help='Max number of entries to read. Default is to read ALL of them.')
parser.add_argument('-r', '--replace-with-newlines',
action='store_true', help='Replace zero bytes with newlines in the output.')
parser.add_argument('-z', '--replace-with-nulls',
action='store_true', help='Handle newlines in the'
' input and replace them with zero bytes in the output.'
'If --replace-with-newlines is also specified, behaves same as "head".')
parser.add_argument('-b', '--max-buffer-size',
metavar='bytes', type=int, default=2*2**20,
help='Max size of a single entry (in bytes), i.e. a peek buffer.'
' Cannot be larger than {}B.'.format(bs_max))
parser.add_argument('--no-final-delimiter',
action='store_true', help='Do not ensure that final chunk ends with delimeter (e.g. \\x00).')
opts = parser.parse_args(args)
if opts.max_buffer_size > bs_max:
parser.error('--max-buffer-size cannot be larger than {} (hardcoded limit).'.format(bs_max))
sep = '\0' if not opts.replace_with_nulls else '\n'
glue = '\0' if not opts.replace_with_newlines else '\n'
src = sys.stdin.fileno() if not opts.file else opts.file
src = io.open(src, 'rb')
src = io.BufferedReader(src, buffer_size=opts.max_buffer_size)
with src as src:
bs, bs_last, count = None, None, 0
while True:
if bs is None: bs = bs_start
buff = src.peek(bs)
bs_now = len(buff)
if buff:
if bs_now < bs and bs_now == bs_last: pos = None
else: pos = buff.find(sep)
if pos == -1:
if bs == opts.max_buffer_size:
raise RuntimeError( 'Failed to find next'
' \\x00 byte with buffer size: {}'.format(opts.max_buffer_size) )
bs, bs_last = max(bs * 2, opts.max_buffer_size), bs_now
continue
bs_last = None
if count and (buff or not opts.no_final_delimiter): sys.stdout.write(glue)
if opts.count and count >= opts.count: break
if not buff: break
count += 1
if pos is not None:
buff = src.read(pos)
assert sep not in buff and src.read(1) == sep, 'bad chunk read'
sys.stdout.write(buff)
else: # "read until the end" command
for buff in iter(ft.partial(src.read, bs_max), ''):
assert sep not in buff, 'bad unlimited read'
sys.stdout.write(buff)
bs = None
if __name__ == '__main__': sys.exit(main())