Skip to content

Commit

Permalink
Add ProcessString to Pipeline. (#2972)
Browse files Browse the repository at this point in the history
* Add ProcessString to Pipeline.

Most of the time, you have a buffer that you want to test again a log pipeline, when it passed you usually copy via string(buff).

In some cases like headchunk and tailer you already have an allocated immutable string, since pipeline never mutate the line passed as parameter, we can create ProcessString pipeline method that will avoid re-allocating the line.

benchmp:

```
❯ benchcmp before.txt after.txt
benchmark                                           old ns/op     new ns/op     delta
BenchmarkHeadBlockIterator/Size_100000-16           20264242      16112591      -20.49%
BenchmarkHeadBlockIterator/Size_50000-16            10186969      7905259       -22.40%
BenchmarkHeadBlockIterator/Size_15000-16            3229052       2202770       -31.78%
BenchmarkHeadBlockIterator/Size_10000-16            1916537       1392355       -27.35%
BenchmarkHeadBlockSampleIterator/Size_100000-16     18364773      16106425      -12.30%
BenchmarkHeadBlockSampleIterator/Size_50000-16      8988422       7730226       -14.00%
BenchmarkHeadBlockSampleIterator/Size_15000-16      2788746       2306161       -17.30%
BenchmarkHeadBlockSampleIterator/Size_10000-16      1773766       1488861       -16.06%

benchmark                                           old allocs     new allocs     delta
BenchmarkHeadBlockIterator/Size_100000-16           200039         39             -99.98%
BenchmarkHeadBlockIterator/Size_50000-16            100036         36             -99.96%
BenchmarkHeadBlockIterator/Size_15000-16            30031          31             -99.90%
BenchmarkHeadBlockIterator/Size_10000-16            20029          29             -99.86%
BenchmarkHeadBlockSampleIterator/Size_100000-16     100040         40             -99.96%
BenchmarkHeadBlockSampleIterator/Size_50000-16      50036          36             -99.93%
BenchmarkHeadBlockSampleIterator/Size_15000-16      15031          31             -99.79%
BenchmarkHeadBlockSampleIterator/Size_10000-16      10029          29             -99.71%

benchmark                                           old bytes     new bytes     delta
BenchmarkHeadBlockIterator/Size_100000-16           27604042      21203941      -23.19%
BenchmarkHeadBlockIterator/Size_50000-16            13860654      10660652      -23.09%
BenchmarkHeadBlockIterator/Size_15000-16            4231360       3271363       -22.69%
BenchmarkHeadBlockIterator/Size_10000-16            2633436       1993420       -24.30%
BenchmarkHeadBlockSampleIterator/Size_100000-16     17799137      14598973      -17.98%
BenchmarkHeadBlockSampleIterator/Size_50000-16      7433260       5833137       -21.53%
BenchmarkHeadBlockSampleIterator/Size_15000-16      2258099       1778096       -21.26%
BenchmarkHeadBlockSampleIterator/Size_10000-16      1393600       1073605       -22.96%
```

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* add some precision about why Sum64 is not a concern.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update pkg/chunkenc/memchunk.go

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
cyriltovena and owen-d authored Nov 25, 2020
1 parent 06a89ea commit ec725db
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 10 deletions.
14 changes: 8 additions & 6 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
streams := map[uint64]*logproto.Stream{}
for _, e := range hb.entries {
chunkStats.HeadChunkBytes += int64(len(e.s))
line := []byte(e.s)
newLine, parsedLbs, ok := pipeline.Process(line)
newLine, parsedLbs, ok := pipeline.ProcessString(e.s)
if !ok {
continue
}
Expand All @@ -638,7 +637,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
}
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Unix(0, e.t),
Line: string(newLine),
Line: newLine,
})

}
Expand All @@ -662,8 +661,7 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extra
series := map[uint64]*logproto.Series{}
for _, e := range hb.entries {
chunkStats.HeadChunkBytes += int64(len(e.s))
line := []byte(e.s)
value, parsedLabels, ok := extractor.Process(line)
value, parsedLabels, ok := extractor.ProcessString(e.s)
if !ok {
continue
}
Expand All @@ -676,10 +674,14 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extra
}
series[lhash] = s
}

// []byte here doesn't create allocation because Sum64 has go:noescape directive
// It specifies that the function does not allow any of the pointers passed as arguments to escape into the heap or into the values returned from the function.
h := xxhash.Sum64([]byte(e.s))
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: e.t,
Value: value,
Hash: xxhash.Sum64([]byte(e.s)),
Hash: h,
})
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,32 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
}
}

func BenchmarkHeadBlockSampleIterator(b *testing.B) {

for _, j := range []int{100000, 50000, 15000, 10000} {
b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) {

h := headBlock{}

for i := 0; i < j; i++ {
if err := h.append(int64(i), "this is the append string"); err != nil {
b.Fatal(err)
}
}

b.ResetTimer()

for n := 0; n < b.N; n++ {
iter := h.sampleIterator(context.Background(), 0, math.MaxInt64, countExtractor)

for iter.Next() {
_ = iter.Sample()
}
}
})
}
}

func TestMemChunk_IteratorBounds(t *testing.T) {

var createChunk = func() *MemChunk {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log

sp := t.pipeline.ForStream(lbs)
for _, e := range stream.Entries {
newLine, parsedLbs, ok := sp.Process([]byte(e.Line))
newLine, parsedLbs, ok := sp.ProcessString(e.Line)
if !ok {
continue
}
Expand All @@ -164,7 +164,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log
}
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: e.Timestamp,
Line: string(newLine),
Line: newLine,
})
}
streamsResult := make([]*logproto.Stream, 0, len(streams))
Expand Down
4 changes: 2 additions & 2 deletions pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,13 @@ func (m *matcherStage) Process(lbs model.LabelSet, extracted map[string]interfac
}

sp := m.pipeline.ForStream(labels.FromMap(util.ModelLabelSetToMap(lbs)))
if newLine, newLabels, ok := sp.Process([]byte(*entry)); ok {
if newLine, newLabels, ok := sp.ProcessString(*entry); ok {
switch m.action {
case MatchActionDrop:
// Adds the drop label to not be sent by the api.EntryHandler
lbs[dropLabel] = model.LabelValue(m.dropReason)
case MatchActionKeep:
*entry = string(newLine)
*entry = newLine
for k := range lbs {
delete(lbs, k)
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/logql/log/metrics_extraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ type SampleExtractor interface {
}

// StreamSampleExtractor extracts sample for a log line.
// A StreamSampleExtractor never mutate the received line.
type StreamSampleExtractor interface {
Process(line []byte) (float64, LabelsResult, bool)
ProcessString(line string) (float64, LabelsResult, bool)
}

type lineSampleExtractor struct {
Expand Down Expand Up @@ -88,6 +90,11 @@ func (l *streamLineSampleExtractor) Process(line []byte) (float64, LabelsResult,
return l.LineExtractor(line), l.builder.GroupedLabels(), true
}

func (l *streamLineSampleExtractor) ProcessString(line string) (float64, LabelsResult, bool) {
// unsafe get bytes since we have the guarantee that the line won't be mutated.
return l.Process(unsafeGetBytes(line))
}

type convertionFn func(value string) (float64, error)

type labelSampleExtractor struct {
Expand Down Expand Up @@ -180,6 +187,11 @@ func (l *streamLabelSampleExtractor) Process(line []byte) (float64, LabelsResult
return v, l.builder.GroupedLabels(), true
}

func (l *streamLabelSampleExtractor) ProcessString(line string) (float64, LabelsResult, bool) {
// unsafe get bytes since we have the guarantee that the line won't be mutated.
return l.Process(unsafeGetBytes(line))
}

func convertFloat(v string) (float64, error) {
return strconv.ParseFloat(v, 64)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/logql/log/metrics_extraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ func Test_labelSampleExtractor_Extract(t *testing.T) {
require.Equal(t, tt.wantOk, ok)
require.Equal(t, tt.want, outval)
require.Equal(t, tt.wantLbs, outlbs.Labels())

outval, outlbs, ok = tt.ex.ForStream(tt.in).ProcessString("")
require.Equal(t, tt.wantOk, ok)
require.Equal(t, tt.want, outval)
require.Equal(t, tt.wantLbs, outlbs.Labels())
})
}
}
Expand All @@ -143,6 +148,11 @@ func TestNewLineSampleExtractor(t *testing.T) {
require.Equal(t, 1., f)
assertLabelResult(t, lbs, l)

f, l, ok = sse.ProcessString(`foo`)
require.True(t, ok)
require.Equal(t, 1., f)
assertLabelResult(t, lbs, l)

filter, err := NewFilter("foo", labels.MatchEqual)
require.NoError(t, err)

Expand Down
27 changes: 27 additions & 0 deletions pkg/logql/log/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package log

import (
"unsafe"

"github.com/prometheus/prometheus/pkg/labels"
)

Expand All @@ -15,11 +17,15 @@ type Pipeline interface {
}

// StreamPipeline transform and filter log lines and labels.
// A StreamPipeline never mutate the received line.
type StreamPipeline interface {
Process(line []byte) ([]byte, LabelsResult, bool)
ProcessString(line string) (string, LabelsResult, bool)
}

// Stage is a single step of a Pipeline.
// A Stage implementation should never mutate the line passed, but instead either
// return the line unchanged or allocate a new line.
type Stage interface {
Process(line []byte, lbs *LabelsBuilder) ([]byte, bool)
}
Expand Down Expand Up @@ -49,6 +55,10 @@ func (n noopStreamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) {
return line, n.LabelsResult, true
}

func (n noopStreamPipeline) ProcessString(line string) (string, LabelsResult, bool) {
return line, n.LabelsResult, true
}

func (n *noopPipeline) ForStream(labels labels.Labels) StreamPipeline {
h := labels.Hash()
if cached, ok := n.cache[h]; ok {
Expand Down Expand Up @@ -123,6 +133,15 @@ func (p *streamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) {
return line, p.builder.LabelsResult(), true
}

func (p *streamPipeline) ProcessString(line string) (string, LabelsResult, bool) {
// Stages only read from the line.
lb := unsafeGetBytes(line)
lb, lr, ok := p.Process(lb)
// either the line is unchanged and we can just send back the same string.
// or we created a new buffer for it in which case it is still safe to avoid the string(byte) copy.
return unsafeGetString(lb), lr, ok
}

// ReduceStages reduces multiple stages into one.
func ReduceStages(stages []Stage) Stage {
if len(stages) == 0 {
Expand All @@ -139,3 +158,11 @@ func ReduceStages(stages []Stage) Stage {
return line, true
})
}

func unsafeGetBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(&s))
}

func unsafeGetString(buf []byte) string {
return *((*string)(unsafe.Pointer(&buf)))
}
41 changes: 41 additions & 0 deletions pkg/logql/log/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,49 @@ import (
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
)

func TestNoopPipeline(t *testing.T) {
lbs := labels.Labels{{Name: "foo", Value: "bar"}}
l, lbr, ok := NewNoopPipeline().ForStream(lbs).Process([]byte(""))
require.Equal(t, []byte(""), l)
require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr)
require.Equal(t, true, ok)

ls, lbr, ok := NewNoopPipeline().ForStream(lbs).ProcessString("")
require.Equal(t, "", ls)
require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr)
require.Equal(t, true, ok)
}

func TestPipeline(t *testing.T) {
lbs := labels.Labels{{Name: "foo", Value: "bar"}}
p := NewPipeline([]Stage{
NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")),
newMustLineFormatter("lbs {{.foo}}"),
})
l, lbr, ok := p.ForStream(lbs).Process([]byte("line"))
require.Equal(t, []byte("lbs bar"), l)
require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr)
require.Equal(t, true, ok)

ls, lbr, ok := p.ForStream(lbs).ProcessString("line")
require.Equal(t, "lbs bar", ls)
require.Equal(t, NewLabelsResult(lbs, lbs.Hash()), lbr)
require.Equal(t, true, ok)

l, lbr, ok = p.ForStream(labels.Labels{}).Process([]byte("line"))
require.Equal(t, []byte(nil), l)
require.Equal(t, nil, lbr)
require.Equal(t, false, ok)

ls, lbr, ok = p.ForStream(labels.Labels{}).ProcessString("line")
require.Equal(t, "", ls)
require.Equal(t, nil, lbr)
require.Equal(t, false, ok)
}

var (
resOK bool
resLine []byte
Expand Down

0 comments on commit ec725db

Please sign in to comment.