From fae2fd60572abbb7c9a939b15872a63c615b9f06 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 19 Jul 2019 13:43:39 -0400 Subject: [PATCH 1/4] Use prometheus pool for line buffer. --- pkg/chunkenc/gzip.go | 24 +++++++++++++++--------- pkg/chunkenc/pool.go | 26 +++----------------------- 2 files changed, 18 insertions(+), 32 deletions(-) diff --git a/pkg/chunkenc/gzip.go b/pkg/chunkenc/gzip.go index ff5bc5d54ce2..b4de328c0ac2 100644 --- a/pkg/chunkenc/gzip.go +++ b/pkg/chunkenc/gzip.go @@ -476,8 +476,8 @@ type bufferedIterator struct { err error - buf *bytes.Buffer // The buffer for a single entry. - decBuf []byte // The buffer for decoding the lengths. + buf []byte // The buffer for a single entry. + decBuf []byte // The buffer for decoding the lengths. closed bool @@ -491,7 +491,6 @@ func newBufferedIterator(pool CompressionPool, b []byte, filter logql.Filter) *b reader: r, pool: pool, filter: filter, - buf: BytesBufferPool.Get(), decBuf: make([]byte, binary.MaxVarintLen64), } } @@ -529,25 +528,32 @@ func (si *bufferedIterator) moveNext() (int64, []byte, bool) { return 0, nil, false } } + lineSize := int(l) - if si.buf.Cap() < int(l) { - si.buf.Grow(int(l) - si.buf.Cap()) + // If the buffer is not yet initialize or too small, we get a new one. + if si.buf == nil || lineSize > cap(si.buf) { + // in case of replacement + if si.buf != nil { + BytesBufferPool.Put(si.buf) + } + si.buf = BytesBufferPool.Get(lineSize).([]byte) } - n, err := si.s.Read(si.buf.Bytes()[:l]) + // Then process reading the line. + n, err := si.s.Read(si.buf[:lineSize]) if err != nil && err != io.EOF { si.err = err return 0, nil, false } - for n < int(l) { - r, err := si.s.Read(si.buf.Bytes()[n:l]) + for n < lineSize { + r, err := si.s.Read(si.buf[n:lineSize]) if err != nil { si.err = err return 0, nil, false } n += r } - return ts, si.buf.Bytes()[:l], true + return ts, si.buf[:lineSize], true } func (si *bufferedIterator) Entry() logproto.Entry { diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go index a843159de25f..e37cda313468 100644 --- a/pkg/chunkenc/pool.go +++ b/pkg/chunkenc/pool.go @@ -2,11 +2,11 @@ package chunkenc import ( "bufio" - "bytes" "compress/gzip" - "io" "sync" + + "github.com/prometheus/prometheus/pkg/pool" ) // CompressionPool is a pool of CompressionWriter and CompressionReader @@ -28,7 +28,7 @@ var ( }, } // BytesBufferPool is a bytes buffer used for lines decompressed. - BytesBufferPool = newBufferPoolWithSize(4096) + BytesBufferPool = pool.New(1024, 16384, 2, func(size int) interface{} { return make([]byte, 0, size) }) ) // GzipPool is a gun zip compression pool @@ -92,23 +92,3 @@ func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader { func (bufPool *BufioReaderPool) Put(b *bufio.Reader) { bufPool.pool.Put(b) } - -type bufferPool struct { - pool sync.Pool -} - -func newBufferPoolWithSize(size int) *bufferPool { - return &bufferPool{ - pool: sync.Pool{ - New: func() interface{} { return bytes.NewBuffer(make([]byte, size)) }, - }, - } -} - -func (bp *bufferPool) Get() *bytes.Buffer { - return bp.pool.Get().(*bytes.Buffer) -} - -func (bp *bufferPool) Put(b *bytes.Buffer) { - bp.pool.Put(b) -} From f9ed0faceff00198041c4d1f271e7fb085ecbd69 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 19 Jul 2019 14:30:23 -0400 Subject: [PATCH 2/4] adjustment --- Gopkg.lock | 9 +- pkg/chunkenc/gzip.go | 2 +- pkg/chunkenc/pool.go | 3 +- .../prometheus/prometheus/pkg/pool/pool.go | 87 +++++++++++++++++++ 4 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 vendor/github.com/prometheus/prometheus/pkg/pool/pool.go diff --git a/Gopkg.lock b/Gopkg.lock index 8c60d724f65c..ee398f6f12a4 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -904,7 +904,7 @@ [[projects]] branch = "master" - digest = "1:5fc7e79142258f1ce47429c00fedb9ee04d8562e7fd477f2d9ae28a692e80969" + digest = "1:43edb14ccbe0d18f38bd55511250c39ccc3917c64a514a0e791408ac19a71558" name = "github.com/prometheus/prometheus" packages = [ "discovery", @@ -925,6 +925,7 @@ "pkg/gate", "pkg/labels", "pkg/modtimevfs", + "pkg/pool", "pkg/relabel", "pkg/textparse", "pkg/timestamp", @@ -1608,7 +1609,9 @@ "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus/promauto", + "github.com/prometheus/client_golang/prometheus/promhttp", "github.com/prometheus/client_golang/prometheus/testutil", + "github.com/prometheus/client_model/go", "github.com/prometheus/common/config", "github.com/prometheus/common/expfmt", "github.com/prometheus/common/model", @@ -1618,6 +1621,7 @@ "github.com/prometheus/prometheus/discovery/targetgroup", "github.com/prometheus/prometheus/pkg/labels", "github.com/prometheus/prometheus/pkg/modtimevfs", + "github.com/prometheus/prometheus/pkg/pool", "github.com/prometheus/prometheus/pkg/relabel", "github.com/prometheus/prometheus/pkg/textparse", "github.com/prometheus/prometheus/promql", @@ -1637,8 +1641,9 @@ "github.com/weaveworks/common/user", "golang.org/x/net/context", "google.golang.org/grpc", + "google.golang.org/grpc/codes", "google.golang.org/grpc/health/grpc_health_v1", - "google.golang.org/grpc/metadata", + "google.golang.org/grpc/status", "gopkg.in/alecthomas/kingpin.v2", "gopkg.in/fsnotify.v1", "gopkg.in/yaml.v2", diff --git a/pkg/chunkenc/gzip.go b/pkg/chunkenc/gzip.go index b4de328c0ac2..3b3bb67ee354 100644 --- a/pkg/chunkenc/gzip.go +++ b/pkg/chunkenc/gzip.go @@ -532,7 +532,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, bool) { // If the buffer is not yet initialize or too small, we get a new one. if si.buf == nil || lineSize > cap(si.buf) { - // in case of replacement + // in case of a replacement we replace back the buffer in the pool if si.buf != nil { BytesBufferPool.Put(si.buf) } diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go index e37cda313468..83d34fb93576 100644 --- a/pkg/chunkenc/pool.go +++ b/pkg/chunkenc/pool.go @@ -28,7 +28,8 @@ var ( }, } // BytesBufferPool is a bytes buffer used for lines decompressed. - BytesBufferPool = pool.New(1024, 16384, 2, func(size int) interface{} { return make([]byte, 0, size) }) + // Buckets [0.5KB,1KB,2KB,4KB,8KB] + BytesBufferPool = pool.New(2e9, 2e13, 2, func(size int) interface{} { return make([]byte, 0, size) }) ) // GzipPool is a gun zip compression pool diff --git a/vendor/github.com/prometheus/prometheus/pkg/pool/pool.go b/vendor/github.com/prometheus/prometheus/pkg/pool/pool.go new file mode 100644 index 000000000000..2ee897185442 --- /dev/null +++ b/vendor/github.com/prometheus/prometheus/pkg/pool/pool.go @@ -0,0 +1,87 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pool + +import ( + "fmt" + "reflect" + "sync" +) + +// Pool is a bucketed pool for variably sized byte slices. +type Pool struct { + buckets []sync.Pool + sizes []int + // make is the function used to create an empty slice when none exist yet. + make func(int) interface{} +} + +// New returns a new Pool with size buckets for minSize to maxSize +// increasing by the given factor. +func New(minSize, maxSize int, factor float64, makeFunc func(int) interface{}) *Pool { + if minSize < 1 { + panic("invalid minimum pool size") + } + if maxSize < 1 { + panic("invalid maximum pool size") + } + if factor < 1 { + panic("invalid factor") + } + + var sizes []int + + for s := minSize; s <= maxSize; s = int(float64(s) * factor) { + sizes = append(sizes, s) + } + + p := &Pool{ + buckets: make([]sync.Pool, len(sizes)), + sizes: sizes, + make: makeFunc, + } + + return p +} + +// Get returns a new byte slices that fits the given size. +func (p *Pool) Get(sz int) interface{} { + for i, bktSize := range p.sizes { + if sz > bktSize { + continue + } + b := p.buckets[i].Get() + if b == nil { + b = p.make(bktSize) + } + return b + } + return p.make(sz) +} + +// Put adds a slice to the right bucket in the pool. +func (p *Pool) Put(s interface{}) { + slice := reflect.ValueOf(s) + + if slice.Kind() != reflect.Slice { + panic(fmt.Sprintf("%+v is not a slice", slice)) + } + for i, size := range p.sizes { + if slice.Cap() > size { + continue + } + p.buckets[i].Put(slice.Slice(0, 0).Interface()) + return + } +} From 28b4e5dd3d91d7c53ff4af154b7a2c2894ff557a Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 19 Jul 2019 15:15:03 -0400 Subject: [PATCH 3/4] correct buckets size --- pkg/chunkenc/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go index 83d34fb93576..72bbf037ef94 100644 --- a/pkg/chunkenc/pool.go +++ b/pkg/chunkenc/pool.go @@ -29,7 +29,7 @@ var ( } // BytesBufferPool is a bytes buffer used for lines decompressed. // Buckets [0.5KB,1KB,2KB,4KB,8KB] - BytesBufferPool = pool.New(2e9, 2e13, 2, func(size int) interface{} { return make([]byte, 0, size) }) + BytesBufferPool = pool.New(1<<10, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) ) // GzipPool is a gun zip compression pool From 84a132c0e6f51ee96db40cec6de6011c9cef4264 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 19 Jul 2019 15:26:55 -0400 Subject: [PATCH 4/4] align bucket with comment --- pkg/chunkenc/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go index 72bbf037ef94..b6a62e2ab055 100644 --- a/pkg/chunkenc/pool.go +++ b/pkg/chunkenc/pool.go @@ -29,7 +29,7 @@ var ( } // BytesBufferPool is a bytes buffer used for lines decompressed. // Buckets [0.5KB,1KB,2KB,4KB,8KB] - BytesBufferPool = pool.New(1<<10, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) + BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) ) // GzipPool is a gun zip compression pool