From 9b63fd5340c9da45fedf03d3fc3e1e635d010329 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 5 Aug 2021 15:18:18 -0400 Subject: [PATCH 1/2] stream includes tenant id --- pkg/ingester/instance.go | 4 ++-- pkg/ingester/instance_test.go | 4 ++-- pkg/ingester/stream.go | 6 ++++-- pkg/ingester/stream_test.go | 6 +++++- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 2b952778948f..df8681e1a971 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -132,7 +132,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo if !ok { sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(ls), fp) - stream = newStream(i.cfg, fp, sortedLabels, i.metrics) + stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.metrics) i.streamsByFP[fp] = stream i.streams[stream.labelsString] = stream i.streamsCreatedTotal.Inc() @@ -243,7 +243,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(labels), fp) - stream = newStream(i.cfg, fp, sortedLabels, i.metrics) + stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.metrics) i.streams[pushReqStream.Labels] = stream i.streamsByFP[fp] = stream diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index f3a495723067..9738f4f1c1e4 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -177,7 +177,7 @@ func Test_SeriesQuery(t *testing.T) { for _, testStream := range testStreams { stream, err := instance.getOrCreateStream(testStream, false, recordPool.GetRecord()) require.NoError(t, err) - chunk := newStream(cfg, 0, nil, NilMetrics).NewChunk() + chunk := newStream(cfg, "fake", 0, nil, NilMetrics).NewChunk() for _, entry := range testStream.Entries { err = chunk.Append(&entry) require.NoError(t, err) @@ -333,7 +333,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { lbs := makeRandomLabels() b.Run("addTailersToNewStream", func(b *testing.B) { for n := 0; n < b.N; n++ { - inst.addTailersToNewStream(newStream(nil, 0, lbs, NilMetrics)) + inst.addTailersToNewStream(newStream(nil, "fake", 0, lbs, NilMetrics)) } }) } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index cb61f4ccf670..0d571372433b 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -63,7 +63,8 @@ type line struct { } type stream struct { - cfg *Config + cfg *Config + tenant string // Newest chunk at chunks[n-1]. // Not thread-safe; assume accesses to this are locked by caller. chunks []chunkDesc @@ -109,7 +110,7 @@ type entryWithError struct { e error } -func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, metrics *ingesterMetrics) *stream { +func newStream(cfg *Config, tenant string, fp model.Fingerprint, labels labels.Labels, metrics *ingesterMetrics) *stream { return &stream{ cfg: cfg, fp: fp, @@ -117,6 +118,7 @@ func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, metrics labelsString: labels.String(), tailers: map[uint32]*tailer{}, metrics: metrics, + tenant: tenant, } } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 090be380d3f0..82703557ee17 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -38,6 +38,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { cfg.MaxReturnedErrors = tc.limit s := newStream( cfg, + "fake", model.Fingerprint(0), labels.Labels{ {Name: "foo", Value: "bar"}, @@ -76,6 +77,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { func TestPushDeduplication(t *testing.T) { s := newStream( defaultConfig(), + "fake", model.Fingerprint(0), labels.Labels{ {Name: "foo", Value: "bar"}, @@ -98,6 +100,7 @@ func TestPushDeduplication(t *testing.T) { func TestPushRejectOldCounter(t *testing.T) { s := newStream( defaultConfig(), + "fake", model.Fingerprint(0), labels.Labels{ {Name: "foo", Value: "bar"}, @@ -185,6 +188,7 @@ func TestUnorderedPush(t *testing.T) { cfg.MaxChunkAge = 10 * time.Second s := newStream( &cfg, + "fake", model.Fingerprint(0), labels.Labels{ {Name: "foo", Value: "bar"}, @@ -260,7 +264,7 @@ func Benchmark_PushStream(b *testing.B) { labels.Label{Name: "job", Value: "loki-dev/ingester"}, labels.Label{Name: "container", Value: "ingester"}, } - s := newStream(&Config{}, model.Fingerprint(0), ls, NilMetrics) + s := newStream(&Config{}, "fake", model.Fingerprint(0), ls, NilMetrics) t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}) require.NoError(b, err) From a1aa354daf9009687a5821a77fc66b5e4ca790a6 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 5 Aug 2021 15:59:20 -0400 Subject: [PATCH 2/2] expose out_of_order discarded metrics --- pkg/ingester/stream.go | 15 +++++++++++++++ pkg/validation/validate.go | 1 + 2 files changed, 16 insertions(+) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 0d571372433b..3c740553939c 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/validation" ) var ( @@ -195,6 +196,14 @@ func (s *stream) Push( var storedEntries []logproto.Entry failedEntriesWithError := []entryWithError{} + var outOfOrderSamples, outOfOrderBytes int + defer func() { + if outOfOrderSamples > 0 { + validation.DiscardedSamples.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderSamples)) + validation.DiscardedBytes.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderBytes)) + } + }() + // Don't fail on the first append error - if samples are sent out of order, // we still want to append the later ones. for i := range entries { @@ -234,8 +243,14 @@ func (s *stream) Push( // The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age. if s.cfg.UnorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) { failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrOutOfOrder}) + outOfOrderSamples++ + outOfOrderBytes += len(entries[i].Line) } else if err := chunk.chunk.Append(&entries[i]); err != nil { failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err}) + if err == chunkenc.ErrOutOfOrder { + outOfOrderSamples++ + outOfOrderBytes += len(entries[i].Line) + } } else { storedEntries = append(storedEntries, entries[i]) s.lastLine.ts = entries[i].Timestamp diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index c8f2551807f1..c6236db35913 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -23,6 +23,7 @@ const ( // because the limit of active streams has been reached. StreamLimit = "stream_limit" StreamLimitErrorMsg = "Maximum active stream limit exceeded, reduce the number of active streams (reduce labels or reduce label values), or contact your Loki administrator to see if the limit can be increased" + OutOfOrder = "out_of_order" // GreaterThanMaxSampleAge is a reason for discarding log lines which are older than the current time - `reject_old_samples_max_age` GreaterThanMaxSampleAge = "greater_than_max_sample_age" GreaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v"