Skip to content

Commit

Permalink
feat: flush not owned streams (#13254)
Browse files Browse the repository at this point in the history
Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
  • Loading branch information
vlad-diachenko authored Jun 19, 2024
1 parent f5a9905 commit 2ca1ac6
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 60 deletions.
17 changes: 11 additions & 6 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ const (
nameLabel = "__name__"
logsValue = "logs"

flushReasonIdle = "idle"
flushReasonMaxAge = "max_age"
flushReasonForced = "forced"
flushReasonFull = "full"
flushReasonSynced = "synced"
flushReasonIdle = "idle"
flushReasonMaxAge = "max_age"
flushReasonForced = "forced"
flushReasonNotOwned = "not_owned"
flushReasonFull = "full"
flushReasonSynced = "synced"
)

// Note: this is called both during the WAL replay (zero or more times)
Expand Down Expand Up @@ -124,7 +125,7 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo

lastChunk := stream.chunks[len(stream.chunks)-1]
shouldFlush, _ := i.shouldFlushChunk(&lastChunk)
if len(stream.chunks) == 1 && !immediate && !shouldFlush {
if len(stream.chunks) == 1 && !immediate && !shouldFlush && !instance.ownedStreamsSvc.isStreamNotOwned(stream.fp) {
return
}

Expand Down Expand Up @@ -217,10 +218,14 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint

stream.chunkMtx.Lock()
defer stream.chunkMtx.Unlock()
notOwnedStream := instance.ownedStreamsSvc.isStreamNotOwned(fp)

var result []*chunkDesc
for j := range stream.chunks {
shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j])
if !shouldFlush && notOwnedStream {
shouldFlush, reason = true, flushReasonNotOwned
}
if immediate || shouldFlush {
// Ensure no more writes happen to this chunk.
if !stream.chunks[j].closed {
Expand Down
50 changes: 50 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,56 @@ func TestFlushingCollidingLabels(t *testing.T) {
}
}

func Test_flush_not_owned_stream(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushCheckPeriod = time.Millisecond * 100
cfg.MaxChunkAge = time.Minute
cfg.MaxChunkIdle = time.Hour

store, ing := newTestStore(t, cfg, nil)
defer store.Stop()

now := time.Unix(0, 0)

entries := []logproto.Entry{
{Timestamp: now.Add(time.Nanosecond), Line: "1"},
{Timestamp: now.Add(time.Minute), Line: "2"},
}

labelSet := model.LabelSet{"app": "l"}
req := &logproto.PushRequest{Streams: []logproto.Stream{
{Labels: labelSet.String(), Entries: entries},
}}

const userID = "testUser"
ctx := user.InjectOrgID(context.Background(), userID)

_, err := ing.Push(ctx, req)
require.NoError(t, err)

time.Sleep(2 * cfg.FlushCheckPeriod)

// ensure chunk is not flushed after flush period elapses
store.checkData(t, map[string][]logproto.Stream{})

instance, found := ing.getInstanceByID(userID)
require.True(t, found)
fingerprint := instance.getHashForLabels(labels.FromStrings("app", "l"))
require.Equal(t, model.Fingerprint(16794418009594958), fingerprint)
instance.ownedStreamsSvc.trackStreamOwnership(fingerprint, false)

time.Sleep(2 * cfg.FlushCheckPeriod)

// assert stream is now both batches
store.checkData(t, map[string][]logproto.Stream{
userID: {
{Labels: labelSet.String(), Entries: entries},
},
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
}

func TestFlushMaxAge(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushCheckPeriod = time.Millisecond * 100
Expand Down
11 changes: 4 additions & 7 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ func (i *instance) onStreamCreated(s *stream) {
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(s)
streamsCountStats.Add(1)
i.ownedStreamsSvc.incOwnedStreamCount()
// we count newly created stream as owned
i.ownedStreamsSvc.trackStreamOwnership(s.fp, true)
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "successfully created stream",
Expand Down Expand Up @@ -421,7 +422,7 @@ func (i *instance) removeStream(s *stream) {
memoryStreams.WithLabelValues(i.instanceID).Dec()
memoryStreamsLabelsBytes.Sub(float64(len(s.labels.String())))
streamsCountStats.Add(-1)
i.ownedStreamsSvc.decOwnedStreamCount()
i.ownedStreamsSvc.trackRemovedStream(s.fp)
}
}

Expand Down Expand Up @@ -1181,11 +1182,7 @@ func (i *instance) updateOwnedStreams(ownedTokenRange ring.TokenRanges) error {
i.streams.WithLock(func() {
i.ownedStreamsSvc.resetStreamCounts()
err = i.streams.ForEach(func(s *stream) (bool, error) {
if ownedTokenRange.IncludesKey(uint32(s.fp)) {
i.ownedStreamsSvc.incOwnedStreamCount()
} else {
i.ownedStreamsSvc.incNotOwnedStreamCount()
}
i.ownedStreamsSvc.trackStreamOwnership(s.fp, ownedTokenRange.IncludesKey(uint32(s.fp)))
return true, nil
})
})
Expand Down
51 changes: 30 additions & 21 deletions pkg/ingester/owned_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/util/constants"
Expand All @@ -17,19 +18,20 @@ var notOwnedStreamsMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{
}, []string{"tenant"})

type ownedStreamService struct {
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32
ownedStreamCount int
notOwnedStreamCount int
lock sync.RWMutex
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32
ownedStreamCount int
lock sync.RWMutex
notOwnedStreams map[model.Fingerprint]any
}

func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService {
svc := &ownedStreamService{
tenantID: tenantID,
limiter: limiter,
fixedLimit: atomic.NewInt32(0),
tenantID: tenantID,
limiter: limiter,
fixedLimit: atomic.NewInt32(0),
notOwnedStreams: make(map[model.Fingerprint]any),
}

svc.updateFixedLimit()
Expand All @@ -51,25 +53,24 @@ func (s *ownedStreamService) getFixedLimit() int {
return int(s.fixedLimit.Load())
}

func (s *ownedStreamService) incOwnedStreamCount() {
s.lock.Lock()
defer s.lock.Unlock()
s.ownedStreamCount++
}

func (s *ownedStreamService) incNotOwnedStreamCount() {
func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bool) {
s.lock.Lock()
defer s.lock.Unlock()
if owned {
s.ownedStreamCount++
return
}
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Inc()
s.notOwnedStreamCount++
s.notOwnedStreams[fp] = nil
}

func (s *ownedStreamService) decOwnedStreamCount() {
func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) {
s.lock.Lock()
defer s.lock.Unlock()
if s.notOwnedStreamCount > 0 {

if _, notOwned := s.notOwnedStreams[fp]; notOwned {
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Dec()
s.notOwnedStreamCount--
delete(s.notOwnedStreams, fp)
return
}
s.ownedStreamCount--
Expand All @@ -79,6 +80,14 @@ func (s *ownedStreamService) resetStreamCounts() {
s.lock.Lock()
defer s.lock.Unlock()
s.ownedStreamCount = 0
s.notOwnedStreamCount = 0
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Set(0)
s.notOwnedStreams = make(map[model.Fingerprint]any)
}

func (s *ownedStreamService) isStreamNotOwned(fp model.Fingerprint) bool {
s.lock.RLock()
defer s.lock.RUnlock()

_, notOwned := s.notOwnedStreams[fp]
return notOwned
}
58 changes: 34 additions & 24 deletions pkg/ingester/owned_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/validation"
Expand All @@ -28,51 +29,60 @@ func Test_OwnedStreamService(t *testing.T) {
service.updateFixedLimit()
require.Equal(t, 100, service.getFixedLimit())

service.incOwnedStreamCount()
service.incOwnedStreamCount()
service.incOwnedStreamCount()
service.trackStreamOwnership(model.Fingerprint(1), true)
service.trackStreamOwnership(model.Fingerprint(2), true)
service.trackStreamOwnership(model.Fingerprint(3), true)
require.Equal(t, 3, service.getOwnedStreamCount())
require.Len(t, service.notOwnedStreams, 0)

service.incOwnedStreamCount()
service.decOwnedStreamCount()
service.notOwnedStreamCount = 1
service.ownedStreamCount = 2
require.Equal(t, 2, service.getOwnedStreamCount())
require.Equal(t, 1, service.notOwnedStreamCount)
service.resetStreamCounts()
service.trackStreamOwnership(model.Fingerprint(3), true)
service.trackStreamOwnership(model.Fingerprint(3), false)
require.Equal(t, 1, service.getOwnedStreamCount(),
"owned streams count must not be changed because not owned stream can be reported only by recalculate_owned_streams job that resets the counters before checking all the streams")
require.Len(t, service.notOwnedStreams, 1)
require.True(t, service.isStreamNotOwned(model.Fingerprint(3)))

service.resetStreamCounts()
service.trackStreamOwnership(model.Fingerprint(1), true)
service.trackStreamOwnership(model.Fingerprint(2), true)
service.trackStreamOwnership(model.Fingerprint(3), false)

service.decOwnedStreamCount()
require.Equal(t, 2, service.getOwnedStreamCount(), "owned stream count must be decremented only when notOwnedStreamCount is set to 0")
require.Equal(t, 0, service.notOwnedStreamCount)
service.trackRemovedStream(model.Fingerprint(3))
require.Equal(t, 2, service.getOwnedStreamCount(), "owned stream count must be decremented only when notOwnedStream does not contain this fingerprint")
require.Len(t, service.notOwnedStreams, 0)

service.decOwnedStreamCount()
service.trackRemovedStream(model.Fingerprint(2))
require.Equal(t, 1, service.getOwnedStreamCount())
require.Equal(t, 0, service.notOwnedStreamCount, "notOwnedStreamCount must not be decremented lower than 0")
require.Len(t, service.notOwnedStreams, 0)

group := sync.WaitGroup{}
group.Add(200)
group.Add(100)
for i := 0; i < 100; i++ {
go func() {
go func(i int) {
defer group.Done()
service.incOwnedStreamCount()
}()
service.trackStreamOwnership(model.Fingerprint(i+1000), true)
}(i)
}
group.Wait()

group.Add(100)
for i := 0; i < 100; i++ {
go func() {
go func(i int) {
defer group.Done()
service.decOwnedStreamCount()
}()
service.trackRemovedStream(model.Fingerprint(i + 1000))
}(i)
}
group.Wait()

require.Equal(t, 1, service.getOwnedStreamCount(), "owned stream count must not be changed")

// simulate the effect from the recalculation job
service.notOwnedStreamCount = 1
service.ownedStreamCount = 2
service.trackStreamOwnership(model.Fingerprint(44), false)
service.trackStreamOwnership(model.Fingerprint(45), true)

service.resetStreamCounts()

require.Equal(t, 0, service.getOwnedStreamCount())
require.Equal(t, 0, service.notOwnedStreamCount)
require.Len(t, service.notOwnedStreams, 0)
}
4 changes: 2 additions & 2 deletions pkg/ingester/recalculate_owned_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) {
createStream(t, tenant, 250)

require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount)
require.Equal(t, 0, tenant.ownedStreamsSvc.notOwnedStreamCount)
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0)

mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}}

Expand All @@ -110,7 +110,7 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) {
require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation")
}
require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount)
require.Equal(t, testData.expectedNotOwnedStreamCount, tenant.ownedStreamsSvc.notOwnedStreamCount)
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount)
})
}

Expand Down

0 comments on commit 2ca1ac6

Please sign in to comment.