From ec725db9d0aee23d737106ad67b0ebb2f751e7d8 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 25 Nov 2020 02:31:31 -0500 Subject: [PATCH] Add ProcessString to Pipeline. (#2972) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add ProcessString to Pipeline. Most of the time, you have a buffer that you want to test again a log pipeline, when it passed you usually copy via string(buff). In some cases like headchunk and tailer you already have an allocated immutable string, since pipeline never mutate the line passed as parameter, we can create ProcessString pipeline method that will avoid re-allocating the line. benchmp: ``` ❯ benchcmp before.txt after.txt benchmark old ns/op new ns/op delta BenchmarkHeadBlockIterator/Size_100000-16 20264242 16112591 -20.49% BenchmarkHeadBlockIterator/Size_50000-16 10186969 7905259 -22.40% BenchmarkHeadBlockIterator/Size_15000-16 3229052 2202770 -31.78% BenchmarkHeadBlockIterator/Size_10000-16 1916537 1392355 -27.35% BenchmarkHeadBlockSampleIterator/Size_100000-16 18364773 16106425 -12.30% BenchmarkHeadBlockSampleIterator/Size_50000-16 8988422 7730226 -14.00% BenchmarkHeadBlockSampleIterator/Size_15000-16 2788746 2306161 -17.30% BenchmarkHeadBlockSampleIterator/Size_10000-16 1773766 1488861 -16.06% benchmark old allocs new allocs delta BenchmarkHeadBlockIterator/Size_100000-16 200039 39 -99.98% BenchmarkHeadBlockIterator/Size_50000-16 100036 36 -99.96% BenchmarkHeadBlockIterator/Size_15000-16 30031 31 -99.90% BenchmarkHeadBlockIterator/Size_10000-16 20029 29 -99.86% BenchmarkHeadBlockSampleIterator/Size_100000-16 100040 40 -99.96% BenchmarkHeadBlockSampleIterator/Size_50000-16 50036 36 -99.93% BenchmarkHeadBlockSampleIterator/Size_15000-16 15031 31 -99.79% BenchmarkHeadBlockSampleIterator/Size_10000-16 10029 29 -99.71% benchmark old bytes new bytes delta BenchmarkHeadBlockIterator/Size_100000-16 27604042 21203941 -23.19% BenchmarkHeadBlockIterator/Size_50000-16 13860654 10660652 -23.09% BenchmarkHeadBlockIterator/Size_15000-16 4231360 3271363 -22.69% BenchmarkHeadBlockIterator/Size_10000-16 2633436 1993420 -24.30% BenchmarkHeadBlockSampleIterator/Size_100000-16 17799137 14598973 -17.98% BenchmarkHeadBlockSampleIterator/Size_50000-16 7433260 5833137 -21.53% BenchmarkHeadBlockSampleIterator/Size_15000-16 2258099 1778096 -21.26% BenchmarkHeadBlockSampleIterator/Size_10000-16 1393600 1073605 -22.96% ``` Signed-off-by: Cyril Tovena * add some precision about why Sum64 is not a concern. Signed-off-by: Cyril Tovena * Update pkg/chunkenc/memchunk.go Co-authored-by: Owen Diehl Co-authored-by: Owen Diehl --- pkg/chunkenc/memchunk.go | 14 ++++---- pkg/chunkenc/memchunk_test.go | 26 +++++++++++++++ pkg/ingester/tailer.go | 4 +-- pkg/logentry/stages/match.go | 4 +-- pkg/logql/log/metrics_extraction.go | 12 +++++++ pkg/logql/log/metrics_extraction_test.go | 10 ++++++ pkg/logql/log/pipeline.go | 27 ++++++++++++++++ pkg/logql/log/pipeline_test.go | 41 ++++++++++++++++++++++++ 8 files changed, 128 insertions(+), 10 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 6ca2e1da1ddb..504893d20d50 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -623,8 +623,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, streams := map[uint64]*logproto.Stream{} for _, e := range hb.entries { chunkStats.HeadChunkBytes += int64(len(e.s)) - line := []byte(e.s) - newLine, parsedLbs, ok := pipeline.Process(line) + newLine, parsedLbs, ok := pipeline.ProcessString(e.s) if !ok { continue } @@ -638,7 +637,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, } stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: time.Unix(0, e.t), - Line: string(newLine), + Line: newLine, }) } @@ -662,8 +661,7 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extra series := map[uint64]*logproto.Series{} for _, e := range hb.entries { chunkStats.HeadChunkBytes += int64(len(e.s)) - line := []byte(e.s) - value, parsedLabels, ok := extractor.Process(line) + value, parsedLabels, ok := extractor.ProcessString(e.s) if !ok { continue } @@ -676,10 +674,14 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extra } series[lhash] = s } + + // []byte here doesn't create allocation because Sum64 has go:noescape directive + // It specifies that the function does not allow any of the pointers passed as arguments to escape into the heap or into the values returned from the function. + h := xxhash.Sum64([]byte(e.s)) s.Samples = append(s.Samples, logproto.Sample{ Timestamp: e.t, Value: value, - Hash: xxhash.Sum64([]byte(e.s)), + Hash: h, }) } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index f04b9b79a4c5..b87113974e6b 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -717,6 +717,32 @@ func BenchmarkHeadBlockIterator(b *testing.B) { } } +func BenchmarkHeadBlockSampleIterator(b *testing.B) { + + for _, j := range []int{100000, 50000, 15000, 10000} { + b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) { + + h := headBlock{} + + for i := 0; i < j; i++ { + if err := h.append(int64(i), "this is the append string"); err != nil { + b.Fatal(err) + } + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + iter := h.sampleIterator(context.Background(), 0, math.MaxInt64, countExtractor) + + for iter.Next() { + _ = iter.Sample() + } + } + }) + } +} + func TestMemChunk_IteratorBounds(t *testing.T) { var createChunk = func() *MemChunk { diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 35c2cb579d60..8497c744abe4 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -151,7 +151,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log sp := t.pipeline.ForStream(lbs) for _, e := range stream.Entries { - newLine, parsedLbs, ok := sp.Process([]byte(e.Line)) + newLine, parsedLbs, ok := sp.ProcessString(e.Line) if !ok { continue } @@ -164,7 +164,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log } stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: e.Timestamp, - Line: string(newLine), + Line: newLine, }) } streamsResult := make([]*logproto.Stream, 0, len(streams)) diff --git a/pkg/logentry/stages/match.go b/pkg/logentry/stages/match.go index b06f7b280091..c6e94fad43dc 100644 --- a/pkg/logentry/stages/match.go +++ b/pkg/logentry/stages/match.go @@ -133,13 +133,13 @@ func (m *matcherStage) Process(lbs model.LabelSet, extracted map[string]interfac } sp := m.pipeline.ForStream(labels.FromMap(util.ModelLabelSetToMap(lbs))) - if newLine, newLabels, ok := sp.Process([]byte(*entry)); ok { + if newLine, newLabels, ok := sp.ProcessString(*entry); ok { switch m.action { case MatchActionDrop: // Adds the drop label to not be sent by the api.EntryHandler lbs[dropLabel] = model.LabelValue(m.dropReason) case MatchActionKeep: - *entry = string(newLine) + *entry = newLine for k := range lbs { delete(lbs, k) } diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index b6b2cce3366a..58cacf503816 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -31,8 +31,10 @@ type SampleExtractor interface { } // StreamSampleExtractor extracts sample for a log line. +// A StreamSampleExtractor never mutate the received line. type StreamSampleExtractor interface { Process(line []byte) (float64, LabelsResult, bool) + ProcessString(line string) (float64, LabelsResult, bool) } type lineSampleExtractor struct { @@ -88,6 +90,11 @@ func (l *streamLineSampleExtractor) Process(line []byte) (float64, LabelsResult, return l.LineExtractor(line), l.builder.GroupedLabels(), true } +func (l *streamLineSampleExtractor) ProcessString(line string) (float64, LabelsResult, bool) { + // unsafe get bytes since we have the guarantee that the line won't be mutated. + return l.Process(unsafeGetBytes(line)) +} + type convertionFn func(value string) (float64, error) type labelSampleExtractor struct { @@ -180,6 +187,11 @@ func (l *streamLabelSampleExtractor) Process(line []byte) (float64, LabelsResult return v, l.builder.GroupedLabels(), true } +func (l *streamLabelSampleExtractor) ProcessString(line string) (float64, LabelsResult, bool) { + // unsafe get bytes since we have the guarantee that the line won't be mutated. + return l.Process(unsafeGetBytes(line)) +} + func convertFloat(v string) (float64, error) { return strconv.ParseFloat(v, 64) } diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index 68e22177d6a2..1318c157b321 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -117,6 +117,11 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { require.Equal(t, tt.wantOk, ok) require.Equal(t, tt.want, outval) require.Equal(t, tt.wantLbs, outlbs.Labels()) + + outval, outlbs, ok = tt.ex.ForStream(tt.in).ProcessString("") + require.Equal(t, tt.wantOk, ok) + require.Equal(t, tt.want, outval) + require.Equal(t, tt.wantLbs, outlbs.Labels()) }) } } @@ -143,6 +148,11 @@ func TestNewLineSampleExtractor(t *testing.T) { require.Equal(t, 1., f) assertLabelResult(t, lbs, l) + f, l, ok = sse.ProcessString(`foo`) + require.True(t, ok) + require.Equal(t, 1., f) + assertLabelResult(t, lbs, l) + filter, err := NewFilter("foo", labels.MatchEqual) require.NoError(t, err) diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index d503a2bec35f..3dafcd77f9a8 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -1,6 +1,8 @@ package log import ( + "unsafe" + "github.com/prometheus/prometheus/pkg/labels" ) @@ -15,11 +17,15 @@ type Pipeline interface { } // StreamPipeline transform and filter log lines and labels. +// A StreamPipeline never mutate the received line. type StreamPipeline interface { Process(line []byte) ([]byte, LabelsResult, bool) + ProcessString(line string) (string, LabelsResult, bool) } // Stage is a single step of a Pipeline. +// A Stage implementation should never mutate the line passed, but instead either +// return the line unchanged or allocate a new line. type Stage interface { Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) } @@ -49,6 +55,10 @@ func (n noopStreamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { return line, n.LabelsResult, true } +func (n noopStreamPipeline) ProcessString(line string) (string, LabelsResult, bool) { + return line, n.LabelsResult, true +} + func (n *noopPipeline) ForStream(labels labels.Labels) StreamPipeline { h := labels.Hash() if cached, ok := n.cache[h]; ok { @@ -123,6 +133,15 @@ func (p *streamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { return line, p.builder.LabelsResult(), true } +func (p *streamPipeline) ProcessString(line string) (string, LabelsResult, bool) { + // Stages only read from the line. + lb := unsafeGetBytes(line) + lb, lr, ok := p.Process(lb) + // either the line is unchanged and we can just send back the same string. + // or we created a new buffer for it in which case it is still safe to avoid the string(byte) copy. + return unsafeGetString(lb), lr, ok +} + // ReduceStages reduces multiple stages into one. func ReduceStages(stages []Stage) Stage { if len(stages) == 0 { @@ -139,3 +158,11 @@ func ReduceStages(stages []Stage) Stage { return line, true }) } + +func unsafeGetBytes(s string) []byte { + return *(*[]byte)(unsafe.Pointer(&s)) +} + +func unsafeGetString(buf []byte) string { + return *((*string)(unsafe.Pointer(&buf))) +} diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index 2038d717486b..96f5ff837a2a 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -5,8 +5,49 @@ import ( "time" "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/require" ) +func TestNoopPipeline(t *testing.T) { + lbs := labels.Labels{{Name: "foo", Value: "bar"}} + l, lbr, ok := NewNoopPipeline().ForStream(lbs).Process([]byte("")) + require.Equal(t, []byte(""), l) + require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) + require.Equal(t, true, ok) + + ls, lbr, ok := NewNoopPipeline().ForStream(lbs).ProcessString("") + require.Equal(t, "", ls) + require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) + require.Equal(t, true, ok) +} + +func TestPipeline(t *testing.T) { + lbs := labels.Labels{{Name: "foo", Value: "bar"}} + p := NewPipeline([]Stage{ + NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")), + newMustLineFormatter("lbs {{.foo}}"), + }) + l, lbr, ok := p.ForStream(lbs).Process([]byte("line")) + require.Equal(t, []byte("lbs bar"), l) + require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) + require.Equal(t, true, ok) + + ls, lbr, ok := p.ForStream(lbs).ProcessString("line") + require.Equal(t, "lbs bar", ls) + require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr) + require.Equal(t, true, ok) + + l, lbr, ok = p.ForStream(labels.Labels{}).Process([]byte("line")) + require.Equal(t, []byte(nil), l) + require.Equal(t, nil, lbr) + require.Equal(t, false, ok) + + ls, lbr, ok = p.ForStream(labels.Labels{}).ProcessString("line") + require.Equal(t, "", ls) + require.Equal(t, nil, lbr) + require.Equal(t, false, ok) +} + var ( resOK bool resLine []byte