From ac1927e2aaf7e92a1c0574d69af7eb3eb6c85875 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 18 Mar 2021 12:32:28 +0100 Subject: [PATCH 1/3] zstd: Add WithLowerEncoderMem Add option to trade off speed with lower memory usage. Extend tests. --- zstd/enc_base.go | 34 ++++-- zstd/enc_best.go | 1 + zstd/enc_better.go | 1 + zstd/encoder.go | 8 +- zstd/encoder_options.go | 21 +++- zstd/encoder_test.go | 257 ++++++++++++++++++++++------------------ 6 files changed, 193 insertions(+), 129 deletions(-) diff --git a/zstd/enc_base.go b/zstd/enc_base.go index b1b7c6e6a7..186c1e5b37 100644 --- a/zstd/enc_base.go +++ b/zstd/enc_base.go @@ -17,6 +17,7 @@ type fastBase struct { tmp [8]byte blk *blockEnc lastDictID uint32 + lowMem bool } // CRC returns the underlying CRC writer. @@ -57,15 +58,10 @@ func (e *fastBase) addBlock(src []byte) int32 { // check if we have space already if len(e.hist)+len(src) > cap(e.hist) { if cap(e.hist) == 0 { - l := e.maxMatchOff * 2 - // Make it at least 1MB. - if l < 1<<20 { - l = 1 << 20 - } - e.hist = make([]byte, 0, l) + e.ensureHist(len(src)) } else { - if cap(e.hist) < int(e.maxMatchOff*2) { - panic("unexpected buffer size") + if cap(e.hist) < int(e.maxMatchOff+maxCompressedBlockSize) { + panic(fmt.Errorf("unexpected buffer cap %d, want at least %d with window %d", cap(e.hist), e.maxMatchOff+maxCompressedBlockSize, e.maxMatchOff)) } // Move down offset := int32(len(e.hist)) - e.maxMatchOff @@ -79,6 +75,28 @@ func (e *fastBase) addBlock(src []byte) int32 { return s } +// ensureHist will ensure that history can keep at least this many bytes. +func (e *fastBase) ensureHist(n int) { + if cap(e.hist) >= n { + return + } + l := e.maxMatchOff + if (e.lowMem && e.maxMatchOff > maxCompressedBlockSize) || e.maxMatchOff <= maxCompressedBlockSize { + l += maxCompressedBlockSize + } else { + l += e.maxMatchOff + } + // Make it at least 1MB. + if l < 1<<20 && !e.lowMem { + l = 1 << 20 + } + // Make it at least the requested size. + if l < int32(n) { + l = int32(n) + } + e.hist = make([]byte, 0, l) +} + // useBlock will replace the block with the provided one, // but transfer recent offsets from the previous. func (e *fastBase) UseBlock(enc *blockEnc) { diff --git a/zstd/enc_best.go b/zstd/enc_best.go index bb71d1eea9..fe3625c5f5 100644 --- a/zstd/enc_best.go +++ b/zstd/enc_best.go @@ -407,6 +407,7 @@ encodeLoop: // Most notable difference is that src will not be copied for history and // we do not need to check for max match length. func (e *bestFastEncoder) EncodeNoHist(blk *blockEnc, src []byte) { + e.ensureHist(len(src)) e.Encode(blk, src) } diff --git a/zstd/enc_better.go b/zstd/enc_better.go index 94a5343d00..1133d88814 100644 --- a/zstd/enc_better.go +++ b/zstd/enc_better.go @@ -516,6 +516,7 @@ encodeLoop: // Most notable difference is that src will not be copied for history and // we do not need to check for max match length. func (e *betterFastEncoder) EncodeNoHist(blk *blockEnc, src []byte) { + e.ensureHist(len(src)) e.Encode(blk, src) } diff --git a/zstd/encoder.go b/zstd/encoder.go index f5759211da..9514d3edd3 100644 --- a/zstd/encoder.go +++ b/zstd/encoder.go @@ -176,6 +176,12 @@ func (e *Encoder) nextBlock(final bool) error { } if !s.headerWritten { // If we have a single block encode, do a sync compression. + if final && len(s.filling) == 0 && !e.o.fullZero { + s.headerWritten = true + s.fullFrameWritten = true + s.eofWritten = true + return nil + } if final && len(s.filling) > 0 { s.current = e.EncodeAll(s.filling, s.current[:0]) var n2 int @@ -471,7 +477,7 @@ func (e *Encoder) EncodeAll(src, dst []byte) []byte { } // If less than 1MB, allocate a buffer up front. - if len(dst) == 0 && cap(dst) == 0 && len(src) < 1<<20 { + if len(dst) == 0 && cap(dst) == 0 && len(src) < 1<<20 && !e.o.lowMem { dst = make([]byte, 0, len(src)) } dst, err := fh.appendTo(dst) diff --git a/zstd/encoder_options.go b/zstd/encoder_options.go index a7312f42af..dd3ef8423b 100644 --- a/zstd/encoder_options.go +++ b/zstd/encoder_options.go @@ -24,6 +24,7 @@ type encoderOptions struct { allLitEntropy bool customWindow bool customALEntropy bool + lowMem bool dict *dict } @@ -37,6 +38,7 @@ func (o *encoderOptions) setDefault() { windowSize: 8 << 20, level: SpeedDefault, allLitEntropy: true, + lowMem: false, } } @@ -44,13 +46,13 @@ func (o *encoderOptions) setDefault() { func (o encoderOptions) encoder() encoder { switch o.level { case SpeedDefault: - return &doubleFastEncoder{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}}} + return &doubleFastEncoder{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}} case SpeedBetterCompression: - return &betterFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}} + return &betterFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}} case SpeedBestCompression: - return &bestFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}} + return &bestFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}} case SpeedFastest: - return &fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}} + return &fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}} } panic("unknown compression level") } @@ -276,6 +278,17 @@ func WithSingleSegment(b bool) EOption { } } +// WithLowerEncoderMem will trade in some memory cases trade less memory usage for +// slower encoding speed. +// This will not change the window size which is the primary function for reducing +// memory usage. See WithWindowSize. +func WithLowerEncoderMem(b bool) EOption { + return func(o *encoderOptions) error { + o.lowMem = b + return nil + } +} + // WithEncoderDict allows to register a dictionary that will be used for the encode. // The encoder *may* choose to use no dictionary instead for certain payloads. func WithEncoderDict(dict []byte) EOption { diff --git a/zstd/encoder_test.go b/zstd/encoder_test.go index e131753516..ad5e051bbc 100644 --- a/zstd/encoder_test.go +++ b/zstd/encoder_test.go @@ -23,6 +23,44 @@ import ( var testWindowSizes = []int{MinWindowSize, 1 << 16, 1 << 22, 1 << 24} +type testEncOpt struct { + name string + o []EOption +} + +func getEncOpts(cMax int) []testEncOpt { + var o []testEncOpt + for level := EncoderLevel(speedNotSet + 1); level < speedLast; level++ { + for conc := 1; conc <= 4; conc *= 2 { + for _, wind := range testWindowSizes { + addOpt := func(name string, options ...EOption) { + opts := append([]EOption(nil), WithEncoderLevel(level), WithEncoderConcurrency(conc), WithWindowSize(wind)) + name = fmt.Sprintf("%s-c%d-w%dk-%s", level.String(), conc, wind/1024, name) + o = append(o, testEncOpt{name: name, o: append(opts, options...)}) + } + addOpt("default") + if testing.Short() { + break + } + addOpt("nocrc", WithEncoderCRC(false)) + addOpt("lowmem", WithLowerEncoderMem(true)) + addOpt("alllit", WithAllLitEntropyCompression(true)) + addOpt("nolit", WithNoEntropyCompression(true)) + addOpt("pad1k", WithEncoderPadding(1024)) + addOpt("zerof", WithZeroFrames(true)) + addOpt("singleseg", WithSingleSegment(true)) + } + if testing.Short() && conc == 2 { + break + } + if conc >= cMax { + break + } + } + } + return o +} + func TestEncoder_EncodeAllSimple(t *testing.T) { in, err := ioutil.ReadFile("testdata/z000028") if err != nil { @@ -35,9 +73,9 @@ func TestEncoder_EncodeAllSimple(t *testing.T) { defer dec.Close() in = append(in, in...) - for level := EncoderLevel(speedNotSet + 1); level < speedLast; level++ { - t.Run(level.String(), func(t *testing.T) { - e, err := NewWriter(nil, WithEncoderLevel(level), WithEncoderConcurrency(2), WithWindowSize(128<<10), WithZeroFrames(true)) + for _, opts := range getEncOpts(4) { + t.Run(opts.name, func(t *testing.T) { + e, err := NewWriter(nil, opts.o...) if err != nil { t.Fatal(err) } @@ -70,19 +108,19 @@ func TestEncoder_EncodeAllConcurrent(t *testing.T) { in = append(in, in...) // When running race no more than 8k goroutines allowed. - n := 4000 / runtime.GOMAXPROCS(0) + n := 400 / runtime.GOMAXPROCS(0) if testing.Short() { - n = 200 / runtime.GOMAXPROCS(0) + n = 20 / runtime.GOMAXPROCS(0) } dec, err := NewReader(nil) if err != nil { t.Fatal(err) } defer dec.Close() - for level := EncoderLevel(speedNotSet + 1); level < speedLast; level++ { - t.Run(level.String(), func(t *testing.T) { + for _, opts := range getEncOpts(2) { + t.Run(opts.name, func(t *testing.T) { rng := rand.New(rand.NewSource(0x1337)) - e, err := NewWriter(nil, WithEncoderLevel(level), WithZeroFrames(true)) + e, err := NewWriter(nil, opts.o...) if err != nil { t.Fatal(err) } @@ -170,73 +208,62 @@ func TestEncoderRegression(t *testing.T) { return } defer dec.Close() - testWindowSizes := testWindowSizes - if testing.Short() { - testWindowSizes = []int{1 << 20} - } - for level := EncoderLevel(speedNotSet + 1); level < speedLast; level++ { - t.Run(level.String(), func(t *testing.T) { - for _, windowSize := range testWindowSizes { - t.Run(fmt.Sprintf("window:%d", windowSize), func(t *testing.T) { - zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) + for _, opts := range getEncOpts(2) { + t.Run(opts.name, func(t *testing.T) { + zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) + if err != nil { + t.Fatal(err) + } + enc, err := NewWriter( + nil, + opts.o..., + ) + if err != nil { + t.Fatal(err) + } + defer enc.Close() + + for i, tt := range zr.File { + if !strings.HasSuffix(t.Name(), "") { + continue + } + if testing.Short() && i > 10 { + break + } + + t.Run(tt.Name, func(t *testing.T) { + r, err := tt.Open() if err != nil { - t.Fatal(err) + t.Error(err) + return + } + in, err := ioutil.ReadAll(r) + if err != nil { + t.Error(err) } - enc, err := NewWriter( - nil, - WithEncoderCRC(true), - WithEncoderLevel(level), - WithWindowSize(windowSize), - ) + encoded := enc.EncodeAll(in, nil) + got, err := dec.DecodeAll(encoded, nil) if err != nil { + t.Logf("error: %v\nwant: %v\ngot: %v", err, len(in), len(got)) t.Fatal(err) } - defer enc.Close() - - for i, tt := range zr.File { - if !strings.HasSuffix(t.Name(), "") { - continue - } - if testing.Short() && i > 100 { - break - } - - t.Run(tt.Name, func(t *testing.T) { - r, err := tt.Open() - if err != nil { - t.Error(err) - return - } - in, err := ioutil.ReadAll(r) - if err != nil { - t.Error(err) - } - encoded := enc.EncodeAll(in, nil) - got, err := dec.DecodeAll(encoded, nil) - if err != nil { - t.Logf("error: %v\nwant: %v\ngot: %v", err, len(in), len(got)) - t.Fatal(err) - } - - // Use the Writer - var dst bytes.Buffer - enc.Reset(&dst) - _, err = enc.Write(in) - if err != nil { - t.Error(err) - } - err = enc.Close() - if err != nil { - t.Error(err) - } - encoded = dst.Bytes() - got, err = dec.DecodeAll(encoded, nil) - if err != nil { - t.Logf("error: %v\nwant: %v\ngot: %v", err, in, got) - t.Fatal(err) - } - - }) + + // Use the Writer + var dst bytes.Buffer + enc.Reset(&dst) + _, err = enc.Write(in) + if err != nil { + t.Error(err) + } + err = enc.Close() + if err != nil { + t.Error(err) + } + encoded = dst.Bytes() + got, err = dec.DecodeAll(encoded, nil) + if err != nil { + t.Logf("error: %v\nwant: %v\ngot: %v", err, in, got) + t.Error(err) } }) } @@ -461,9 +488,9 @@ func TestEncoder_EncoderEnwik9(t *testing.T) { // test roundtrip using io.ReaderFrom interface. func testEncoderRoundtrip(t *testing.T, file string, wantCRC []byte) { - for level := speedNotSet + 1; level < speedLast; level++ { - t.Run(level.String(), func(t *testing.T) { - level := level + for _, opt := range getEncOpts(1) { + t.Run(opt.name, func(t *testing.T) { + opt := opt t.Parallel() f, err := os.Open(file) if err != nil { @@ -496,7 +523,7 @@ func testEncoderRoundtrip(t *testing.T, file string, wantCRC []byte) { } defer dec2.Close() - enc, err := NewWriter(pw, WithEncoderCRC(true), WithEncoderLevel(level)) + enc, err := NewWriter(pw, opt.o...) if err != nil { t.Fatal(err) } @@ -758,52 +785,50 @@ func TestEncoder_EncodeAllEmpty(t *testing.T) { } var in []byte - e, err := NewWriter(nil, WithZeroFrames(true)) - if err != nil { - t.Fatal(err) - } - defer e.Close() - dst := e.EncodeAll(in, nil) - if len(dst) == 0 { - t.Fatal("Requested zero frame, but got nothing.") - } - t.Log("Block Encoder len", len(in), "-> zstd len", len(dst), dst) + for _, opt := range getEncOpts(1) { + t.Run(opt.name, func(t *testing.T) { + e, err := NewWriter(nil, opt.o...) + if err != nil { + t.Fatal(err) + } + defer e.Close() + dst := e.EncodeAll(in, nil) + t.Log("Block Encoder len", len(in), "-> zstd len", len(dst), dst) - dec, err := NewReader(nil, WithDecoderMaxMemory(220<<20)) - if err != nil { - t.Fatal(err) - } - defer dec.Close() - decoded, err := dec.DecodeAll(dst, nil) - if err != nil { - t.Error(err, len(decoded)) - } - if !bytes.Equal(decoded, in) { - t.Fatal("Decoded does not match") - } + dec, err := NewReader(nil, WithDecoderMaxMemory(220<<20)) + if err != nil { + t.Fatal(err) + } + defer dec.Close() + decoded, err := dec.DecodeAll(dst, nil) + if err != nil { + t.Error(err, len(decoded)) + } + if !bytes.Equal(decoded, in) { + t.Fatal("Decoded does not match") + } - // Test buffer writer. - var buf bytes.Buffer - e.Reset(&buf) - err = e.Close() - if err != nil { - t.Fatal(err) - } - dst = buf.Bytes() - if len(dst) == 0 { - t.Fatal("Requested zero frame, but got nothing.") - } - t.Log("Buffer Encoder len", len(in), "-> zstd len", len(dst)) + // Test buffer writer. + var buf bytes.Buffer + e.Reset(&buf) + err = e.Close() + if err != nil { + t.Fatal(err) + } + dst = buf.Bytes() + t.Log("Buffer Encoder len", len(in), "-> zstd len", len(dst)) - decoded, err = dec.DecodeAll(dst, nil) - if err != nil { - t.Error(err, len(decoded)) - } - if !bytes.Equal(decoded, in) { - t.Fatal("Decoded does not match") - } + decoded, err = dec.DecodeAll(dst, nil) + if err != nil { + t.Error(err, len(decoded)) + } + if !bytes.Equal(decoded, in) { + t.Fatal("Decoded does not match") + } - t.Log("Encoded content matched") + t.Log("Encoded content matched") + }) + } } func TestEncoder_EncodeAllEnwik9(t *testing.T) { From 4b68ef1e5c4bcb86127108e3ebea026a128b391a Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 19 Mar 2021 12:27:52 +0100 Subject: [PATCH 2/3] Reduce more encoder allocs. --- zstd/blockenc.go | 42 +++++++++++++++++++++++++++++------------- zstd/enc_base.go | 2 +- zstd/encoder.go | 2 +- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/zstd/blockenc.go b/zstd/blockenc.go index c85c40255d..9647c64e53 100644 --- a/zstd/blockenc.go +++ b/zstd/blockenc.go @@ -22,28 +22,44 @@ type blockEnc struct { dictLitEnc *huff0.Scratch wr bitWriter - extraLits int - last bool - + extraLits int output []byte recentOffsets [3]uint32 prevRecentOffsets [3]uint32 + + last bool + lowMem bool } // init should be used once the block has been created. // If called more than once, the effect is the same as calling reset. func (b *blockEnc) init() { - if cap(b.literals) < maxCompressedLiteralSize { - b.literals = make([]byte, 0, maxCompressedLiteralSize) - } - const defSeqs = 200 - b.literals = b.literals[:0] - if cap(b.sequences) < defSeqs { - b.sequences = make([]seq, 0, defSeqs) - } - if cap(b.output) < maxCompressedBlockSize { - b.output = make([]byte, 0, maxCompressedBlockSize) + if b.lowMem { + // 1K literals + if cap(b.literals) < 1<<10 { + b.literals = make([]byte, 0, 1<<10) + } + const defSeqs = 20 + if cap(b.sequences) < defSeqs { + b.sequences = make([]seq, 0, defSeqs) + } + // 1K + if cap(b.output) < 1<<10 { + b.output = make([]byte, 0, 1<<10) + } + } else { + if cap(b.literals) < maxCompressedBlockSize { + b.literals = make([]byte, 0, maxCompressedBlockSize) + } + const defSeqs = 200 + if cap(b.sequences) < defSeqs { + b.sequences = make([]seq, 0, defSeqs) + } + if cap(b.output) < maxCompressedBlockSize { + b.output = make([]byte, 0, maxCompressedBlockSize) + } } + if b.coders.mlEnc == nil { b.coders.mlEnc = &fseEncoder{} b.coders.mlPrev = &fseEncoder{} diff --git a/zstd/enc_base.go b/zstd/enc_base.go index 186c1e5b37..e3f01a10d0 100644 --- a/zstd/enc_base.go +++ b/zstd/enc_base.go @@ -135,7 +135,7 @@ func (e *fastBase) matchlen(s, t int32, src []byte) int32 { // Reset the encoding table. func (e *fastBase) resetBase(d *dict, singleBlock bool) { if e.blk == nil { - e.blk = &blockEnc{} + e.blk = &blockEnc{lowMem: e.lowMem} e.blk.init() } else { e.blk.reset(nil) diff --git a/zstd/encoder.go b/zstd/encoder.go index 9514d3edd3..6f0265099e 100644 --- a/zstd/encoder.go +++ b/zstd/encoder.go @@ -106,7 +106,7 @@ func (e *Encoder) Reset(w io.Writer) { s.encoder = e.o.encoder() } if s.writing == nil { - s.writing = &blockEnc{} + s.writing = &blockEnc{lowMem: e.o.lowMem} s.writing.init() } s.writing.initNewEncode() From abd83a5cad311c199167324d1d3da7575f31bbeb Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 19 Mar 2021 14:12:19 +0100 Subject: [PATCH 3/3] Remove wrong comment --- zstd/encoder_options.go | 1 - 1 file changed, 1 deletion(-) diff --git a/zstd/encoder_options.go b/zstd/encoder_options.go index dd3ef8423b..abd953697e 100644 --- a/zstd/encoder_options.go +++ b/zstd/encoder_options.go @@ -30,7 +30,6 @@ type encoderOptions struct { func (o *encoderOptions) setDefault() { *o = encoderOptions{ - // use less ram: true for now, but may change. concurrent: runtime.GOMAXPROCS(0), crc: true, single: nil,