-
Notifications
You must be signed in to change notification settings - Fork 27
/
lz77.py
909 lines (776 loc) · 37.9 KB
/
lz77.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
"""
LZ77 works by finding matches in the past and encoding the match lengths and offsets.
Symbols not part of matches are directly stored as "literals". These streams are
then entropy coded. LZ77 forms the basis of popular compressors like gzip and zstd.
Theoretically LZ77 is a universal compressor, that is, it asymptotically achieves the
entropy rate of any stationary process.
This is a simplified LZ77 encoder deriving ideas from:
- LZ77 modern implementation: https://glinscott.github.io/lz/index.html
- DEFLATE: https://www.rfc-editor.org/rfc/rfc1951
- ZSTD: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md
The encoder has 2 parameters:
- min_match_length: this determines the minimum
allowed match length. The same length is also used for indexing into the substring_dict.
Very low values lead to slow execution. In terms of compression, low values can
lead to suboptimal matches but high values can lead to missed matches. So there
is a tradeoff. Default is 6.
- max_num_matches_considered: Max number of matches considered at
a position to bound time complexity. The matches are considered in order of
recency and the longest match among the max_num_matches_considered is chosen.
Defaults to 64.
The algorithm:
- keep a window that stores everything the encoder has seen (not reset across blocks
unless reset called).
- keep a dict (substring_dict) that stores a map from tuples of length of min_match_length to the
positions where the tuple is seen.
- to find a match during parsing, we look up in the substring_dict and then find the longest
match among the most recent max_num_matches_considered. If no match at a position, we
search for matches starting at next and store the current position as a literal.
We do this until we reach the end of the block and don't find anything. If we can't find a match
and reach the end of the block, we just store everything as a literal.
- the streams generated by parsing are as follows:
1. Literals - when no matches found for a position we put in literals. This can also include
the very end of the block which is unmatched.
2. LZ77 sequences - this is a triple of literal_counts, match_length and match_offset. The idea
is that when we start looking for a match starting at pos_in_window, but only find a match starting
at match_start_pos, we set literal_counts to match_start_pos-pos_in_window. The match length and
offset have their usual meaning. The match offset is the match_start_pos-best_candidate_start_pos.
Note that as in gzip/zstd we allow match_length to be greater than or equal to the offset, denoting
overlapping matches. Decoding this needs to be done a bit carefully since a simple memcpy doesn't
work. Finally, note that the sequences do not encode the last part of the block that has only
literals and no matches. After executing all the sequences, the decoder simply copies over any
leftover literals to the output.
Entropy encoding:
- Literals: encoded with empirical Huffman coder.
- LZ77 sequences - encoded by first binning the literal_counts, match_lengths and match_offsets
in log scale and then encoding the logarithm with empirical Huffman coder, and the difference to 2^logarithm
(residual) as plain old bits. See LogScaleBinnedIntegerEncoder for details.
Current limitations:
1. During compression we use a naive match finding whose
memory usage accumulates over time. We could implement the hash-chain algorithm
described at https://glinscott.github.io/lz/index.html#toc4.2.2.
2. During decompression we keep the entire past output buffer in memory. We
could implement window-based scheme and implement shifting the window to
reduce memory usage.
3. We implement greedy parsing that takes the longest match at a given position.
Instead we could implement more optimal parsing which tries to skip some
bytes as literals to find a longer match in the upcoming bytes.
4. We do not perform any special handling to limit complexity for edge cases like
all very long repeats which have a high complexity of finding the longest match.
Given the above, note that you could still reset the encoder in order after a
few blocks to limit memory usage.
Benchmarks on a few files from https://github.com/nemequ/squash-corpus and
https://corpus.canterbury.ac.nz/descriptions/#cantrbry (plus a few handmade).
All sizes are in bytes.
| File | raw size | scl-lz77 size | gzip size |
|-------------------------------------|----------|------------------|-----------|
| bootstrap-3.3.6.min.css |121260 |20126 |19747 |
| eff.html |41684 |10755 |9670 |
| zlib.wasm |86408 |41156 |37448 |
| jquery-2.1.4.min.js |84345 |31322 |29569 |
| random.bin (random bytes) |1000000 |1005050 |1000328 |
| repeat_As.txt |1000000 |578 |1004 |
| kennedy.xls |1029744 |210182 |204004 |
| alice29.txt |152089 |54106 |54416 |
"""
import argparse
from dataclasses import dataclass
import os
import tempfile
from typing import List, Tuple
from scl.compressors.elias_delta_uint_coder import EliasDeltaUintDecoder, EliasDeltaUintEncoder
from scl.compressors.huffman_coder import HuffmanDecoder, HuffmanEncoder
from scl.core.data_block import DataBlock
from scl.core.data_encoder_decoder import DataDecoder, DataEncoder
from scl.core.data_stream import Uint8FileDataStream
from scl.core.encoded_stream import EncodedBlockReader, EncodedBlockWriter
from scl.core.prob_dist import ProbabilityDist
from scl.utils.bitarray_utils import BitArray, bitarray_to_uint, uint_to_bitarray
from scl.utils.test_utils import (
create_random_binary_file,
try_file_lossless_compression,
try_lossless_compression,
)
import math
ENCODED_BLOCK_SIZE_HEADER_BITS = (
32 # number of bits used to put a header consisting of encoded block size
)
DEFAULT_MIN_MATCH_LEN = 6
DEFAULT_MAX_NUM_MATCHES_CONSIDERED = 64
@dataclass
class LZ77Sequence:
"""LZ77Sequence that determines a series of operations during decompression.
- First copy `literal_count` literal characters to output.
- Next copy `match_length` characters from `match_offset` back in output to the output.
"""
literal_count: int = (0,)
match_length: int = (0,)
match_offset: int = (0,)
class EmpiricalIntHuffmanEncoder(DataEncoder):
"""Perform entropy encoding of the values and return the encoded bitarray.
This assumes the values range from 0 to alphabet_size-1 (known in advance to encoder
and decoder).
We apply Huffman coding for the values and also store the counts to enable the
decoder to construct the same Huffman tree.
"""
def __init__(self, alphabet_size):
self.alphabet_size = alphabet_size
def encode_block(self, data_block: DataBlock):
vals = data_block.data_list
# verify that all values are in the range 0 to alphabet_size-1
assert all([val >= 0 and val < self.alphabet_size for val in vals])
# first encode the values with empirical Huffman code
counts = DataBlock(vals).get_counts()
if len(counts) > 0:
prob_dict = ProbabilityDist.normalize_prob_dict(counts).prob_dict
# let's sort prob_dist by the alphabet to make sure we get exact same ordering
# during decompression as well, which makes sure it's the same Huffman tree!
prob_dist_sorted = ProbabilityDist({i: prob_dict[i] for i in sorted(prob_dict)})
values_encoding = HuffmanEncoder(prob_dist_sorted).encode_block(DataBlock(vals))
# Now encode the counts using Elias Delta code.
# We first generate a list of counts (of length self.alphabet_size) and then apply Elias Delta.
# For any bytes not seen in the data, set count to 0.
for i in range(self.alphabet_size):
if i not in counts:
counts[i] = 0
counts_list = [counts[i] for i in range(self.alphabet_size)]
counts_encoding = EliasDeltaUintEncoder().encode_block(DataBlock(counts_list))
# combine everything into a single bitarray
return (
uint_to_bitarray(len(counts_encoding), ENCODED_BLOCK_SIZE_HEADER_BITS)
+ counts_encoding
+ uint_to_bitarray(len(values_encoding), ENCODED_BLOCK_SIZE_HEADER_BITS)
+ values_encoding
)
else:
# if no counts (i.e., no values) just transmit 0
return uint_to_bitarray(0, ENCODED_BLOCK_SIZE_HEADER_BITS)
class EmpiricalIntHuffmanDecoder(DataDecoder):
def __init__(self, alphabet_size):
self.alphabet_size = alphabet_size
def decode_block(self, encoded_bitarray: BitArray):
num_bits_consumed = 0
# first read the size of the counts (i.e., frequencies) encoding
counts_encoding_size = bitarray_to_uint(encoded_bitarray[:ENCODED_BLOCK_SIZE_HEADER_BITS])
num_bits_consumed += ENCODED_BLOCK_SIZE_HEADER_BITS
# now decode the counts using Elias Delta
if counts_encoding_size == 0:
return DataBlock([]), num_bits_consumed # no values
counts, num_bits_consumed_counts = EliasDeltaUintDecoder().decode_block(
encoded_bitarray[num_bits_consumed : num_bits_consumed + counts_encoding_size]
)
counts = counts.data_list
assert counts_encoding_size == num_bits_consumed_counts
num_bits_consumed += counts_encoding_size
# generate the probability distribution from the counts
counts = {i: counts[i] for i in range(self.alphabet_size) if counts[i] > 0}
prob_dist = ProbabilityDist.normalize_prob_dict(counts)
# load the encoding of the values
huffman_encoding_size = bitarray_to_uint(
encoded_bitarray[num_bits_consumed : num_bits_consumed + ENCODED_BLOCK_SIZE_HEADER_BITS]
)
num_bits_consumed += ENCODED_BLOCK_SIZE_HEADER_BITS
# decode values with Huffman
decoded_vals, num_bits_consumed_vals = HuffmanDecoder(prob_dist).decode_block(
encoded_bitarray[num_bits_consumed : num_bits_consumed + huffman_encoding_size]
)
assert huffman_encoding_size == num_bits_consumed_vals
num_bits_consumed += huffman_encoding_size
return decoded_vals, num_bits_consumed
class LogScaleBinnedIntegerEncoder(DataEncoder):
"""
Encodes a list of non-negative integers by binning in log scale and then
encoding the logarithm with Huffman coding, and the difference to
2^logarithm (residual) as plain old bits. For example, 100 would be converted
to (6, 100-2^6=36) and the 6 will be encoded using Huffman while the 36 is
encoded with 6 bits.
This takes an offset parameter where the values up to the offset are
directly put into Huffman instead of being converted to log scale. This
is useful when small values are common and you want to avoid the overhead
of binning.
This is similar to the coders used in deflate (https://www.w3.org/Graphics/PNG/RFC-1951#codes)
and zstd (https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#sequences-section).
They have a full-fledged table for encoding the bins, but we just use the offset
parameter to keep things simple. In addition we use Huffman like gzip whereas
zstd uses FSE (tANS).
"""
def __init__(self, offset=0, max_num_bins=32):
self.offset = offset
self.max_num_bins = max_num_bins + self.offset
self.empirical_huffman_encoder = EmpiricalIntHuffmanEncoder(alphabet_size=self.max_num_bins)
def encode_block(self, data_block: DataBlock):
bins = []
residuals = []
residual_num_bits = []
for val in data_block.data_list:
assert val >= 0
if val < self.offset:
bins.append(val)
else:
val -= self.offset
val_plus_1 = val + 1
log_val_plus_1 = int(math.log2(val_plus_1))
if log_val_plus_1 >= self.max_num_bins:
raise ValueError(
f"Value {val} is too large to be encoded with {self.max_num_bins} bins"
)
bins.append(log_val_plus_1 + self.offset)
residuals.append(val_plus_1 - 2**log_val_plus_1)
residual_num_bits.append(log_val_plus_1)
bins_encoding = self.empirical_huffman_encoder.encode_block(DataBlock(bins))
residuals_encoding = BitArray()
for residual, num_bits in zip(residuals, residual_num_bits):
if num_bits == 0:
continue
residuals_encoding += uint_to_bitarray(residual, num_bits)
return bins_encoding + residuals_encoding
class LogScaleBinnedIntegerDecoder(DataDecoder):
"""
Decodes a list of non-negative integers encoded by LogScaleBinnedIntegerEncoder.
"""
def __init__(self, offset=0, max_num_bins=32):
self.offset = offset
self.max_num_bins = max_num_bins + self.offset
self.empirical_huffman_decoder = EmpiricalIntHuffmanDecoder(alphabet_size=self.max_num_bins)
def decode_block(self, encoded_bitarray: BitArray):
bins_decoded, num_bits_consumed = self.empirical_huffman_decoder.decode_block(
encoded_bitarray
)
bins_decoded = bins_decoded.data_list
encoded_bitarray = encoded_bitarray[num_bits_consumed:]
decoded = []
for encoded_bin in bins_decoded:
if encoded_bin < self.offset:
decoded.append(encoded_bin)
else:
encoded_bin -= self.offset
log_val_plus_1 = encoded_bin
num_bits = log_val_plus_1
if num_bits == 0:
residual = 0
else:
residual = bitarray_to_uint(encoded_bitarray[:num_bits])
num_bits_consumed += num_bits
encoded_bitarray = encoded_bitarray[num_bits:]
decoded.append(self.offset + 2**log_val_plus_1 + residual - 1)
return DataBlock(decoded), num_bits_consumed
class LZ77StreamsEncoder(DataEncoder):
def __init__(self, log_scale_binned_coder_offset=16):
"""LZ77StreamsEncoder. Encode LZ77 sequences and literals into a bitarray.
Can improve upon this by using different offsets for different streams in LZ77Sequence.
Args:
log_scale_binned_coder_offset (int): offset for log scale binned integer encoder
"""
self.log_scale_binned_coder_offset = log_scale_binned_coder_offset
def encode_lz77_sequences(self, lz77_sequences: List[LZ77Sequence]):
"""Perform entropy encoding of the LZ77 sequences and return the encoded bitarray.
Encode LZ77 sequences with LogScaleBinnedIntegerEncoder, individually encoding
literal counts, match lengths and match offsets.
Args:
encoded_bitarray (BitArray): encoded bit array
"""
log_scale_binned_coder = LogScaleBinnedIntegerEncoder(
offset=self.log_scale_binned_coder_offset
)
encoded_bitarray = BitArray()
encoded_bitarray += log_scale_binned_coder.encode_block(
DataBlock([l.literal_count for l in lz77_sequences])
)
encoded_bitarray += log_scale_binned_coder.encode_block(
DataBlock([l.match_length for l in lz77_sequences])
)
encoded_bitarray += log_scale_binned_coder.encode_block(
DataBlock([l.match_offset for l in lz77_sequences])
)
return encoded_bitarray
def encode_literals(self, literals: List):
"""Perform entropy encoding of the literals and return the encoded bitarray.
Encode literals with empirical Huffman code.
Args:
encoded_bitarray (BitArray): encoded bit array
"""
encoded_bitarray = EmpiricalIntHuffmanEncoder(alphabet_size=256).encode_block(
DataBlock(literals)
)
return encoded_bitarray
def encode_block(self, lz77_sequences: List[LZ77Sequence], literals: List):
"""Encode LZ77 sequences and literals into a bitarray.
Args:
lz77_sequences (List[LZ77Sequence]): LZ77 sequences
literals (List): literals
"""
lz77_sequences_encoding = self.encode_lz77_sequences(lz77_sequences)
literals_encoding = self.encode_literals(literals)
return lz77_sequences_encoding + literals_encoding
class LZ77StreamsDecoder(DataDecoder):
def __init__(self, log_scale_binned_coder_offset=16):
"""LZ77StreamsDecoder. Decode LZ77 sequences and literals from a bitarray.
Args:
log_scale_binned_coder_offset (int): offset for log scale binned integer encoder
"""
self.log_scale_binned_coder_offset = log_scale_binned_coder_offset
def decode_lz77_sequences(self, encoded_bitarray: BitArray):
"""Perform entropy decoding of the LZ77 sequences and return the decoded sequences
and the number of bits consumed.
Args:
encoded_bitarray (BitArray): encoded bit array
"""
log_scale_binned_coder = LogScaleBinnedIntegerDecoder(
offset=self.log_scale_binned_coder_offset
)
num_bits_consumed = 0
# first decode literal counts
literal_counts, num_bits_consumed_literal_counts = log_scale_binned_coder.decode_block(
encoded_bitarray
)
encoded_bitarray = encoded_bitarray[num_bits_consumed_literal_counts:]
num_bits_consumed += num_bits_consumed_literal_counts
# next decode match lengths
match_lengths, num_bits_consumed_match_lengths = log_scale_binned_coder.decode_block(
encoded_bitarray
)
encoded_bitarray = encoded_bitarray[num_bits_consumed_match_lengths:]
num_bits_consumed += num_bits_consumed_match_lengths
# next decode match offsets
match_offsets, num_bits_consumed_match_offsets = log_scale_binned_coder.decode_block(
encoded_bitarray
)
encoded_bitarray = encoded_bitarray[num_bits_consumed_match_offsets:]
num_bits_consumed += num_bits_consumed_match_offsets
lz77_sequences = [
LZ77Sequence(l[0], l[1], l[2])
for l in zip(literal_counts.data_list, match_lengths.data_list, match_offsets.data_list)
]
return lz77_sequences, num_bits_consumed
def decode_literals(self, encoded_bitarray: BitArray):
"""Perform entropy decoding of the literals and return the literals
and the number of bits consumed.
Args:
encoded_bitarray (BitArray): encoded bit array
"""
literals, num_bits_consumed = EmpiricalIntHuffmanDecoder(alphabet_size=256).decode_block(
encoded_bitarray
)
return literals.data_list, num_bits_consumed
def decode_block(self, encoded_bitarray: BitArray):
"""Decode LZ77 sequences and literals from a bitarray.
Args:
encoded_bitarray (BitArray): encoded bitarray
"""
lz77_sequences, num_bits_consumed_sequences = self.decode_lz77_sequences(encoded_bitarray)
encoded_bitarray = encoded_bitarray[num_bits_consumed_sequences:]
literals, num_bits_consumed_literals = self.decode_literals(encoded_bitarray)
num_bits_consumed = num_bits_consumed_sequences + num_bits_consumed_literals
return (lz77_sequences, literals), num_bits_consumed
class LZ77Encoder(DataEncoder):
def __init__(
self,
min_match_length: int = DEFAULT_MIN_MATCH_LEN,
max_num_matches_considered: int = DEFAULT_MAX_NUM_MATCHES_CONSIDERED,
initial_window: List = None,
):
"""LZ77Encoder. See module documentation for details.
Args:
min_match_length (int, optional): Minimum match length. Defaults to 6.
max_num_matches_considered (int, optional): Max number of matches considered
at a position to bound time complexity. The matches are considered
in order of recency and the longest match among the max_num_matches_considered
is chosen. Defaults to 64.
initial_window (List, optional): initialize window (this is like side information
or dictionary in zstd parlance). The same initial window should be used for the decoder.
"""
self.min_match_length = min_match_length
self.max_num_matches_considered = max_num_matches_considered
self.window = []
self.substring_dict = (
{}
) # map from substr of length min_match_length to list of positions where it occurs
self.window_indexed_till = 0 # pointer telling up to what point the window has been indexed
# if window_indexed_till = 100, that means all substrings starting at 0,1,2,...,100-min_match_length+1
# have been indexed
# if initial_window is provided, update window and index it
if initial_window is not None:
self.window = list(initial_window)
self.index_window_upto_pos(len(self.window))
self.streams_encoder = LZ77StreamsEncoder()
def reset(self):
# reset the window and the index
self.window = []
self.substring_dict = {}
self.window_indexed_till = 0
def insert_substring_into_dict(self, substr: Tuple, start_pos: int):
"""Insert substring into the substring_dict (mapping substring to positions where
it occurs in the window).
Args:
substr (Tuple): tuple of length min_match_length
start_pos (int): position of substr in window
"""
if substr in self.substring_dict:
self.substring_dict[substr].append(start_pos)
else:
self.substring_dict[substr] = [start_pos]
def index_window_upto_pos(self, end_pos: int):
"""Index all tuples of min_match_length in self.window[:end_pos]
into the substring_dict. The last tuple to be indexed will start at end_pos-min_match_length+1
This uses self.window_indexed_till to ensure we do not reindex things already indexed.
Args:
end_pos (int): end position in window to index
"""
for end_pos_substr in range(self.window_indexed_till, end_pos + 1):
start_pos_substr = end_pos_substr - self.min_match_length
if start_pos_substr < 0:
continue
substr = tuple(self.window[start_pos_substr:end_pos_substr])
self.insert_substring_into_dict(substr, start_pos_substr)
self.window_indexed_till = end_pos + 1
def find_match_length(self, start_pos_1: int, start_pos_2: int):
"""Find the match length of window starting from start_pos_1 and start_pos_2.
That is, largest match_length s.t.
window[start_pos_1:start_pos_1+match_length]==window[start_pos_2:start_pos_2+match_length]
Note that matching sections are allowed to overlap
Args:
start_pos_1 (int)
start_pos_2 (int)
Returns:
int: match length
"""
match_length = 0
while start_pos_1 + match_length < len(self.window) and start_pos_2 + match_length < len(
self.window
):
if self.window[start_pos_1 + match_length] != self.window[start_pos_2 + match_length]:
break
else:
match_length += 1
return match_length
def lz77_parse_and_generate_sequences(self, data_block: DataBlock):
"""Parse data using LZ77 and returns the LZ77 sequences and literals.
Updates the window accordingly.
Args:
data_list (list of bytes (int)): input data
"""
lz77_sequences = []
literals = []
pos_in_window = len(self.window)
# put the entire data block in the window at once, we will find matches later
self.window += data_block.data_list
# now go over the window starting at pos_in_window and try to find matches
# in the past
# We keep going until we can't find a match anymore
while True:
match_start_pos = pos_in_window
match_found = False
# loop over start positions until we find a match
for match_start_pos in range(
pos_in_window, len(self.window) - self.min_match_length + 1
):
match_substr = tuple(
self.window[match_start_pos : match_start_pos + self.min_match_length]
)
if match_substr not in self.substring_dict:
# substring not seen before, so
self.index_window_upto_pos(match_start_pos + 1)
continue
else:
candidate_match_positions = self.substring_dict[match_substr]
best_match_pos = None
best_match_length = 0
num_candidates_considered = 0
# iterate over candidate_match_positions in reverse order
# we basically want to look at max_num_matches_considered most recent matches
# and find the longest match
for candidate_match_pos in reversed(candidate_match_positions):
match_len = self.find_match_length(candidate_match_pos, match_start_pos)
assert match_len >= self.min_match_length
if match_len > best_match_length:
best_match_length = match_len
best_match_pos = candidate_match_pos
match_found = True
num_candidates_considered += 1
if num_candidates_considered == self.max_num_matches_considered:
# only consider max_num_matches_considered to limit complexity
break
if match_found:
break
else:
# if match not found, we index the current substr
self.index_window_upto_pos(match_start_pos + 1)
if not match_found:
# no match found anywhere so put everything else as a literal and break
literals += self.window[pos_in_window:]
# make sure entire window is indexed
self.index_window_upto_pos(len(self.window))
break
else:
# match was found so we appropriately insert into literals and sequences
# first put part from pos_in_window to match_start_pos in literals
literal_count = match_start_pos - pos_in_window
literals += self.window[pos_in_window:match_start_pos]
# compute the offset
match_offset = match_start_pos - best_match_pos
lz77_sequences.append(LZ77Sequence(literal_count, best_match_length, match_offset))
match_end_pos = match_start_pos + best_match_length
# index the covered portion into the substring_dict
self.index_window_upto_pos(match_end_pos)
# update position in window
pos_in_window = match_end_pos
return lz77_sequences, literals
def encode_block(self, data_block: DataBlock):
# first do lz77 parsing
lz77_sequences, literals = self.lz77_parse_and_generate_sequences(data_block)
# now encode sequences and literals
encoded_bitarray = self.streams_encoder.encode_block(lz77_sequences, literals)
return encoded_bitarray
def encode_file(self, input_file_path: str, encoded_file_path: str, block_size: int = 10000):
"""utility wrapper around the encode function using Uint8FileDataStream
Args:
input_file_path (str): path of the input file
encoded_file_path (str): path of the encoded binary file
block_size (int): choose the block size to be used to call the encode function
"""
# call the encode function and write to the binary file
with Uint8FileDataStream(input_file_path, "rb") as fds:
with EncodedBlockWriter(encoded_file_path) as writer:
self.encode(fds, block_size=block_size, encode_writer=writer)
class LZ77Decoder(DataDecoder):
def __init__(self, initial_window: List = None):
"""Initialize LZ77 decoder.
Args:
initial_window (List, optional): initialize window (this is like side information
or dictionary in zstd parlance). The same initial window should be used as in encoder.
"""
self.window = []
if initial_window is not None:
self.window = list(initial_window)
self.streams_decoder = LZ77StreamsDecoder()
def execute_lz77_sequences(self, literals: List, lz77_sequences: List[LZ77Sequence]):
"""Executes the LZ77 sequences and the literals and returns the decoded bytes.
Execution here just means the decoding.
Updates the window accordingly.
"""
window_len_before = len(self.window)
pos_in_literals = 0
for seq in lz77_sequences:
# first copy over the literals
self.window += literals[pos_in_literals : pos_in_literals + seq.literal_count]
pos_in_literals += seq.literal_count
# now copy the match
if seq.match_length < seq.match_offset:
# if the match length is not bigger than the offset a normal copy works!
self.window += self.window[-seq.match_offset : -seq.match_offset + seq.match_length]
else:
# the match length exceeds the offset, so we need to copy byte by byte
# (since the entire buffer to copy is not yet filled)
for _ in range(seq.match_length):
self.window.append(self.window[-seq.match_offset])
# copy over any leftover literals
self.window += literals[pos_in_literals:]
return self.window[window_len_before:]
def decode_block(self, encoded_bitarray: BitArray):
# first entropy decode the lz77 sequences and the literals
(lz77_sequences, literals), num_bits_consumed = self.streams_decoder.decode_block(
encoded_bitarray
)
# now execute the sequences to decode
decoded_block = DataBlock(self.execute_lz77_sequences(literals, lz77_sequences))
return decoded_block, num_bits_consumed
def decode_file(self, encoded_file_path: str, output_file_path: str):
"""utility wrapper around the decode function using Uint8FileDataStream
Args:
encoded_file_path (str): input binary file
output_file_path (str): output (text) file to which decoded data is written
"""
# decode data and write output to a text file
with EncodedBlockReader(encoded_file_path) as reader:
with Uint8FileDataStream(output_file_path, "wb") as fds:
self.decode(reader, fds)
def test_empirical_int_huffman_encoder_decoder():
import random
encoder = EmpiricalIntHuffmanEncoder(alphabet_size=45)
decoder = EmpiricalIntHuffmanDecoder(alphabet_size=45)
data_list = [random.randint(0, 44) for _ in range(1000)]
data_block = DataBlock(data_list)
is_lossless, _, _ = try_lossless_compression(
data_block, encoder, decoder, add_extra_bits_to_encoder_output=True
)
assert is_lossless
def test_log_scale_binned_integer_encoder_decoder():
"""
Test that log scale binned integer encoder and decoder are inverses of each other
"""
import random
encoder = LogScaleBinnedIntegerEncoder(offset=10)
decoder = LogScaleBinnedIntegerDecoder(offset=10)
data_list = (
[0, 1, 5, 9, 10, 11, 12]
+ [random.randint(0, 20) for _ in range(100)]
+ [random.randint(0, 1000) for _ in range(100)]
)
data_block = DataBlock(data_list)
is_lossless, _, _ = try_lossless_compression(
data_block, encoder, decoder, add_extra_bits_to_encoder_output=True
)
assert is_lossless
def test_lz77_encode_decode():
initial_window = [0, 0, 1, 1, 1]
# see the test_lz77_sequence_generation for the parsing of this sequence
data_list = [
1,
1,
1,
1,
0,
0,
1,
1,
1,
255,
254,
255,
254,
255,
254,
255,
2,
0,
0,
1,
1,
1,
1,
44,
]
data_block = DataBlock(data_list)
for min_match_length in [1, 2, 3, 4, 5]:
for max_num_matches_considered in [0, 1, 5]:
encoder = LZ77Encoder(
min_match_length, max_num_matches_considered, initial_window=initial_window
)
decoder = LZ77Decoder(initial_window=initial_window)
is_lossless, _, _ = try_lossless_compression(
data_block, encoder, decoder, add_extra_bits_to_encoder_output=True
)
assert is_lossless
def test_lz77_sequence_generation():
"""
Test that lz77 produces expected sequences
Also test behavior across blocks both when we reset and when we don't
"""
min_match_len = 3
initial_window = [0, 0, 1, 1, 1]
encoder = LZ77Encoder(min_match_length=min_match_len, initial_window=initial_window)
data_list = [
1,
1,
1,
1,
0,
0,
1,
1,
1,
255,
254,
255,
254,
255,
254,
255,
2,
0,
0,
1,
1,
1,
1,
44,
]
data_block = DataBlock(data_list)
# matches here are (first one is overlapping, last one picks longer match which is not the most recent match for 3-tuple):
# 0, 0, [1, 1, 1, [1,] 1, 1, 1] 0, 0, 1, 1, 1, 255, 254, 255, 254, 255, 254, 255, 2, 0, 0, 1, 1, 1, 1, 44
# [0, 0, 1, 1, 1,] 1, 1, 1, [0, 0, 1, 1, 1,] 255, 254, 255, 254, 255, 254, 255, 2, 0, 0, 1, 1, 1, 1, 44
# 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, [255, 254, 255,] 254, [255, 254, 255,] 2, 0, 0, 1, 1, 1, 1, 44
# [0, 0, 1, 1, 1, 1,] 1, 1, 0, 0, 1, 1, 1, 255, 254, 255, 254, 255, 254, 255, 2, [0, 0, 1, 1, 1, 1], 44
expected_lits = [255, 254, 255, 254, 2, 44]
expected_seqs = [
LZ77Sequence(0, 4, 3),
LZ77Sequence(0, 5, 9),
LZ77Sequence(4, 3, 4),
LZ77Sequence(1, 6, 22),
]
seqs, lits = encoder.lz77_parse_and_generate_sequences(data_block)
assert encoder.window == initial_window + data_list
assert (
sum(len(v) for v in encoder.substring_dict.values())
== len(encoder.window) - min_match_len + 1
)
assert lits == expected_lits
assert seqs == expected_seqs
# encode another block which is copy of first and see that we get just one match
seqs, lits = encoder.lz77_parse_and_generate_sequences(data_block)
assert encoder.window == initial_window + data_list * 2
assert (
sum(len(v) for v in encoder.substring_dict.values())
== len(encoder.window) - min_match_len + 1
) # subtract 2 since min_match_len is 3
assert lits == []
assert seqs == [LZ77Sequence(0, len(data_list), len(data_list))]
# now reset encoder and verify that after encoding we get results that we expect without initial window
# matches:
# 1, [1, 1, 1,] 0, 0, [1, 1, 1,] 255, 254, 255, 254, 255, 254, 255, 2, 0, 0, 1, 1, 1, 1, 44
# 1, 1, 1, 1, 0, 0, 1, 1, 1, [255, 254, 255,] 254, [255, 254, 255,] 2, 0, 0, 1, 1, 1, 1, 44
# 1, 1, 1, 1, [0, 0, 1, 1, 1,] 255, 254, 255, 254, 255, 254, 255, 2, [0, 0, 1, 1, 1,] 1, 44
encoder.reset()
expected_lits = [1, 1, 1, 1, 0, 0, 255, 254, 255, 254, 2, 1, 44]
expected_seqs = [LZ77Sequence(6, 3, 5), LZ77Sequence(4, 3, 4), LZ77Sequence(1, 5, 13)]
seqs, lits = encoder.lz77_parse_and_generate_sequences(data_block)
assert encoder.window == data_list
assert (
sum(len(v) for v in encoder.substring_dict.values()) == len(data_list) - min_match_len + 1
)
assert lits == expected_lits
assert seqs == expected_seqs
def test_lz77_multiblock_file_encode_decode():
"""full test for LZ77Encoder and LZ77Decoder
- create a sample file
- encode the file using LZ77Encoder
- perform decoding and check if the compression was lossless
"""
initial_window = [44, 45, 46] * 5
# define encoder, decoder
encoder = LZ77Encoder(initial_window=initial_window)
decoder = LZ77Decoder(initial_window=initial_window)
with tempfile.TemporaryDirectory() as tmpdirname:
# create a file with some random data
input_file_path = os.path.join(tmpdirname, "inp_file.txt")
create_random_binary_file(
input_file_path,
file_size=500,
prob_dist=ProbabilityDist({44: 0.5, 45: 0.25, 46: 0.2, 255: 0.05}),
)
# test lossless compression
assert try_file_lossless_compression(
input_file_path, encoder, decoder, encode_block_size=1000
)
if __name__ == "__main__":
# Provide a simple CLI interface below for convenient experimentation
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--decompress", help="decompress", action="store_true")
parser.add_argument("-i", "--input", help="input file", required=True, type=str)
parser.add_argument("-o", "--output", help="output file", required=True, type=str)
parser.add_argument(
"-w", "--window_init", help="initialize window from file (like zstd dictionary)", type=str
)
# constants
BLOCKSIZE = 100_000 # encode in 100 KB blocks
args = parser.parse_args()
initial_window = None
if args.window_init is not None:
with open(args.window_init, "rb") as f:
initial_window = list(f.read())
if args.decompress:
decoder = LZ77Decoder(initial_window=initial_window)
decoder.decode_file(args.input, args.output)
else:
encoder = LZ77Encoder(initial_window=initial_window)
encoder.encode_file(args.input, args.output, block_size=BLOCKSIZE)