From e6bfdff647100efdbd04161a5ab1df108ae314d9 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Tue, 18 Jun 2024 17:25:37 +0300 Subject: [PATCH] updated flush loop to check if the ingester lost ownership over the stream and flush that stream in this case. Signed-off-by: Vladyslav Diachenko --- pkg/ingester/flush.go | 17 ++++-- pkg/ingester/flush_test.go | 50 ++++++++++++++++ pkg/ingester/instance.go | 11 ++-- pkg/ingester/owned_streams.go | 51 +++++++++------- pkg/ingester/owned_streams_test.go | 58 +++++++++++-------- .../recalculate_owned_streams_test.go | 4 +- 6 files changed, 131 insertions(+), 60 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 81407abcb2e2..fbc571e6d14c 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -33,11 +33,12 @@ const ( nameLabel = "__name__" logsValue = "logs" - flushReasonIdle = "idle" - flushReasonMaxAge = "max_age" - flushReasonForced = "forced" - flushReasonFull = "full" - flushReasonSynced = "synced" + flushReasonIdle = "idle" + flushReasonMaxAge = "max_age" + flushReasonForced = "forced" + flushReasonNotOwned = "not_owned" + flushReasonFull = "full" + flushReasonSynced = "synced" ) // Note: this is called both during the WAL replay (zero or more times) @@ -124,7 +125,7 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo lastChunk := stream.chunks[len(stream.chunks)-1] shouldFlush, _ := i.shouldFlushChunk(&lastChunk) - if len(stream.chunks) == 1 && !immediate && !shouldFlush { + if len(stream.chunks) == 1 && !immediate && !shouldFlush && !instance.ownedStreamsSvc.isStreamNotOwned(stream.fp) { return } @@ -217,10 +218,14 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint stream.chunkMtx.Lock() defer stream.chunkMtx.Unlock() + notOwnedStream := instance.ownedStreamsSvc.isStreamNotOwned(fp) var result []*chunkDesc for j := range stream.chunks { shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j]) + if !shouldFlush && notOwnedStream { + shouldFlush, reason = true, flushReasonNotOwned + } if immediate || shouldFlush { // Ensure no more writes happen to this chunk. if !stream.chunks[j].closed { diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 460a50ffc8fa..1287be3d4bfd 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -253,6 +253,56 @@ func TestFlushingCollidingLabels(t *testing.T) { } } +func Test_flush_not_owned_stream(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushCheckPeriod = time.Millisecond * 100 + cfg.MaxChunkAge = time.Minute + cfg.MaxChunkIdle = time.Hour + + store, ing := newTestStore(t, cfg, nil) + defer store.Stop() + + now := time.Unix(0, 0) + + entries := []logproto.Entry{ + {Timestamp: now.Add(time.Nanosecond), Line: "1"}, + {Timestamp: now.Add(time.Minute), Line: "2"}, + } + + labelSet := model.LabelSet{"app": "l"} + req := &logproto.PushRequest{Streams: []logproto.Stream{ + {Labels: labelSet.String(), Entries: entries}, + }} + + const userID = "testUser" + ctx := user.InjectOrgID(context.Background(), userID) + + _, err := ing.Push(ctx, req) + require.NoError(t, err) + + time.Sleep(2 * cfg.FlushCheckPeriod) + + // ensure chunk is not flushed after flush period elapses + store.checkData(t, map[string][]logproto.Stream{}) + + instance, found := ing.getInstanceByID(userID) + require.True(t, found) + fingerprint := instance.getHashForLabels(labels.FromStrings("app", "l")) + require.Equal(t, model.Fingerprint(16794418009594958), fingerprint) + instance.ownedStreamsSvc.trackStreamOwnership(fingerprint, false) + + time.Sleep(2 * cfg.FlushCheckPeriod) + + // assert stream is now both batches + store.checkData(t, map[string][]logproto.Stream{ + userID: { + {Labels: labelSet.String(), Entries: entries}, + }, + }) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) +} + func TestFlushMaxAge(t *testing.T) { cfg := defaultIngesterTestConfig(t) cfg.FlushCheckPeriod = time.Millisecond * 100 diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 65389a3cb04a..1d30e7e23ece 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -357,7 +357,8 @@ func (i *instance) onStreamCreated(s *stream) { i.streamsCreatedTotal.Inc() i.addTailersToNewStream(s) streamsCountStats.Add(1) - i.ownedStreamsSvc.incOwnedStreamCount() + // we count newly created stream as owned + i.ownedStreamsSvc.trackStreamOwnership(s.fp, true) if i.configs.LogStreamCreation(i.instanceID) { level.Debug(util_log.Logger).Log( "msg", "successfully created stream", @@ -421,7 +422,7 @@ func (i *instance) removeStream(s *stream) { memoryStreams.WithLabelValues(i.instanceID).Dec() memoryStreamsLabelsBytes.Sub(float64(len(s.labels.String()))) streamsCountStats.Add(-1) - i.ownedStreamsSvc.decOwnedStreamCount() + i.ownedStreamsSvc.trackRemovedStream(s.fp) } } @@ -1181,11 +1182,7 @@ func (i *instance) updateOwnedStreams(ownedTokenRange ring.TokenRanges) error { i.streams.WithLock(func() { i.ownedStreamsSvc.resetStreamCounts() err = i.streams.ForEach(func(s *stream) (bool, error) { - if ownedTokenRange.IncludesKey(uint32(s.fp)) { - i.ownedStreamsSvc.incOwnedStreamCount() - } else { - i.ownedStreamsSvc.incNotOwnedStreamCount() - } + i.ownedStreamsSvc.trackStreamOwnership(s.fp, ownedTokenRange.IncludesKey(uint32(s.fp))) return true, nil }) }) diff --git a/pkg/ingester/owned_streams.go b/pkg/ingester/owned_streams.go index db9d0e94715f..55f7eb480482 100644 --- a/pkg/ingester/owned_streams.go +++ b/pkg/ingester/owned_streams.go @@ -5,6 +5,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" "go.uber.org/atomic" "github.com/grafana/loki/v3/pkg/util/constants" @@ -17,19 +18,20 @@ var notOwnedStreamsMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{ }, []string{"tenant"}) type ownedStreamService struct { - tenantID string - limiter *Limiter - fixedLimit *atomic.Int32 - ownedStreamCount int - notOwnedStreamCount int - lock sync.RWMutex + tenantID string + limiter *Limiter + fixedLimit *atomic.Int32 + ownedStreamCount int + lock sync.RWMutex + notOwnedStreams map[model.Fingerprint]any } func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService { svc := &ownedStreamService{ - tenantID: tenantID, - limiter: limiter, - fixedLimit: atomic.NewInt32(0), + tenantID: tenantID, + limiter: limiter, + fixedLimit: atomic.NewInt32(0), + notOwnedStreams: make(map[model.Fingerprint]any), } svc.updateFixedLimit() @@ -51,25 +53,24 @@ func (s *ownedStreamService) getFixedLimit() int { return int(s.fixedLimit.Load()) } -func (s *ownedStreamService) incOwnedStreamCount() { - s.lock.Lock() - defer s.lock.Unlock() - s.ownedStreamCount++ -} - -func (s *ownedStreamService) incNotOwnedStreamCount() { +func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bool) { s.lock.Lock() defer s.lock.Unlock() + if owned { + s.ownedStreamCount++ + return + } notOwnedStreamsMetric.WithLabelValues(s.tenantID).Inc() - s.notOwnedStreamCount++ + s.notOwnedStreams[fp] = nil } -func (s *ownedStreamService) decOwnedStreamCount() { +func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) { s.lock.Lock() defer s.lock.Unlock() - if s.notOwnedStreamCount > 0 { + + if _, notOwned := s.notOwnedStreams[fp]; notOwned { notOwnedStreamsMetric.WithLabelValues(s.tenantID).Dec() - s.notOwnedStreamCount-- + delete(s.notOwnedStreams, fp) return } s.ownedStreamCount-- @@ -79,6 +80,14 @@ func (s *ownedStreamService) resetStreamCounts() { s.lock.Lock() defer s.lock.Unlock() s.ownedStreamCount = 0 - s.notOwnedStreamCount = 0 notOwnedStreamsMetric.WithLabelValues(s.tenantID).Set(0) + s.notOwnedStreams = make(map[model.Fingerprint]any) +} + +func (s *ownedStreamService) isStreamNotOwned(fp model.Fingerprint) bool { + s.lock.RLock() + defer s.lock.RUnlock() + + _, notOwned := s.notOwnedStreams[fp] + return notOwned } diff --git a/pkg/ingester/owned_streams_test.go b/pkg/ingester/owned_streams_test.go index 876954b8579a..7f114922fa44 100644 --- a/pkg/ingester/owned_streams_test.go +++ b/pkg/ingester/owned_streams_test.go @@ -4,6 +4,7 @@ import ( "sync" "testing" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/validation" @@ -28,51 +29,60 @@ func Test_OwnedStreamService(t *testing.T) { service.updateFixedLimit() require.Equal(t, 100, service.getFixedLimit()) - service.incOwnedStreamCount() - service.incOwnedStreamCount() - service.incOwnedStreamCount() + service.trackStreamOwnership(model.Fingerprint(1), true) + service.trackStreamOwnership(model.Fingerprint(2), true) + service.trackStreamOwnership(model.Fingerprint(3), true) require.Equal(t, 3, service.getOwnedStreamCount()) + require.Len(t, service.notOwnedStreams, 0) - service.incOwnedStreamCount() - service.decOwnedStreamCount() - service.notOwnedStreamCount = 1 - service.ownedStreamCount = 2 - require.Equal(t, 2, service.getOwnedStreamCount()) - require.Equal(t, 1, service.notOwnedStreamCount) + service.resetStreamCounts() + service.trackStreamOwnership(model.Fingerprint(3), true) + service.trackStreamOwnership(model.Fingerprint(3), false) + require.Equal(t, 1, service.getOwnedStreamCount(), + "owned streams count must not be changed because not owned stream can be reported only by recalculate_owned_streams job that resets the counters before checking all the streams") + require.Len(t, service.notOwnedStreams, 1) + require.True(t, service.isStreamNotOwned(model.Fingerprint(3))) + + service.resetStreamCounts() + service.trackStreamOwnership(model.Fingerprint(1), true) + service.trackStreamOwnership(model.Fingerprint(2), true) + service.trackStreamOwnership(model.Fingerprint(3), false) - service.decOwnedStreamCount() - require.Equal(t, 2, service.getOwnedStreamCount(), "owned stream count must be decremented only when notOwnedStreamCount is set to 0") - require.Equal(t, 0, service.notOwnedStreamCount) + service.trackRemovedStream(model.Fingerprint(3)) + require.Equal(t, 2, service.getOwnedStreamCount(), "owned stream count must be decremented only when notOwnedStream does not contain this fingerprint") + require.Len(t, service.notOwnedStreams, 0) - service.decOwnedStreamCount() + service.trackRemovedStream(model.Fingerprint(2)) require.Equal(t, 1, service.getOwnedStreamCount()) - require.Equal(t, 0, service.notOwnedStreamCount, "notOwnedStreamCount must not be decremented lower than 0") + require.Len(t, service.notOwnedStreams, 0) group := sync.WaitGroup{} - group.Add(200) + group.Add(100) for i := 0; i < 100; i++ { - go func() { + go func(i int) { defer group.Done() - service.incOwnedStreamCount() - }() + service.trackStreamOwnership(model.Fingerprint(i+1000), true) + }(i) } + group.Wait() + group.Add(100) for i := 0; i < 100; i++ { - go func() { + go func(i int) { defer group.Done() - service.decOwnedStreamCount() - }() + service.trackRemovedStream(model.Fingerprint(i + 1000)) + }(i) } group.Wait() require.Equal(t, 1, service.getOwnedStreamCount(), "owned stream count must not be changed") // simulate the effect from the recalculation job - service.notOwnedStreamCount = 1 - service.ownedStreamCount = 2 + service.trackStreamOwnership(model.Fingerprint(44), false) + service.trackStreamOwnership(model.Fingerprint(45), true) service.resetStreamCounts() require.Equal(t, 0, service.getOwnedStreamCount()) - require.Equal(t, 0, service.notOwnedStreamCount) + require.Len(t, service.notOwnedStreams, 0) } diff --git a/pkg/ingester/recalculate_owned_streams_test.go b/pkg/ingester/recalculate_owned_streams_test.go index d2752fbd7649..dda027ca2e44 100644 --- a/pkg/ingester/recalculate_owned_streams_test.go +++ b/pkg/ingester/recalculate_owned_streams_test.go @@ -96,7 +96,7 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) { createStream(t, tenant, 250) require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount) - require.Equal(t, 0, tenant.ownedStreamsSvc.notOwnedStreamCount) + require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0) mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}} @@ -110,7 +110,7 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) { require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation") } require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount) - require.Equal(t, testData.expectedNotOwnedStreamCount, tenant.ownedStreamsSvc.notOwnedStreamCount) + require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount) }) }