Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flush not owned streams #13254

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ const (
nameLabel = "__name__"
logsValue = "logs"

flushReasonIdle = "idle"
flushReasonMaxAge = "max_age"
flushReasonForced = "forced"
flushReasonFull = "full"
flushReasonSynced = "synced"
flushReasonIdle = "idle"
flushReasonMaxAge = "max_age"
flushReasonForced = "forced"
flushReasonNotOwned = "not_owned"
flushReasonFull = "full"
flushReasonSynced = "synced"
)

// Note: this is called both during the WAL replay (zero or more times)
Expand Down Expand Up @@ -124,7 +125,7 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo

lastChunk := stream.chunks[len(stream.chunks)-1]
shouldFlush, _ := i.shouldFlushChunk(&lastChunk)
if len(stream.chunks) == 1 && !immediate && !shouldFlush {
if len(stream.chunks) == 1 && !immediate && !shouldFlush && !instance.ownedStreamsSvc.isStreamNotOwned(stream.fp) {
return
}

Expand Down Expand Up @@ -217,10 +218,14 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint

stream.chunkMtx.Lock()
defer stream.chunkMtx.Unlock()
notOwnedStream := instance.ownedStreamsSvc.isStreamNotOwned(fp)

var result []*chunkDesc
for j := range stream.chunks {
shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j])
if !shouldFlush && notOwnedStream {
shouldFlush, reason = true, flushReasonNotOwned
}
if immediate || shouldFlush {
// Ensure no more writes happen to this chunk.
if !stream.chunks[j].closed {
Expand Down
50 changes: 50 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,56 @@ func TestFlushingCollidingLabels(t *testing.T) {
}
}

func Test_flush_not_owned_stream(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushCheckPeriod = time.Millisecond * 100
cfg.MaxChunkAge = time.Minute
cfg.MaxChunkIdle = time.Hour

store, ing := newTestStore(t, cfg, nil)
defer store.Stop()

now := time.Unix(0, 0)

entries := []logproto.Entry{
{Timestamp: now.Add(time.Nanosecond), Line: "1"},
{Timestamp: now.Add(time.Minute), Line: "2"},
}

labelSet := model.LabelSet{"app": "l"}
req := &logproto.PushRequest{Streams: []logproto.Stream{
{Labels: labelSet.String(), Entries: entries},
}}

const userID = "testUser"
ctx := user.InjectOrgID(context.Background(), userID)

_, err := ing.Push(ctx, req)
require.NoError(t, err)

time.Sleep(2 * cfg.FlushCheckPeriod)

// ensure chunk is not flushed after flush period elapses
store.checkData(t, map[string][]logproto.Stream{})

instance, found := ing.getInstanceByID(userID)
require.True(t, found)
fingerprint := instance.getHashForLabels(labels.FromStrings("app", "l"))
require.Equal(t, model.Fingerprint(16794418009594958), fingerprint)
instance.ownedStreamsSvc.trackStreamOwnership(fingerprint, false)

time.Sleep(2 * cfg.FlushCheckPeriod)

// assert stream is now both batches
store.checkData(t, map[string][]logproto.Stream{
userID: {
{Labels: labelSet.String(), Entries: entries},
},
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
}

func TestFlushMaxAge(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushCheckPeriod = time.Millisecond * 100
Expand Down
11 changes: 4 additions & 7 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ func (i *instance) onStreamCreated(s *stream) {
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(s)
streamsCountStats.Add(1)
i.ownedStreamsSvc.incOwnedStreamCount()
// we count newly created stream as owned
i.ownedStreamsSvc.trackStreamOwnership(s.fp, true)
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "successfully created stream",
Expand Down Expand Up @@ -421,7 +422,7 @@ func (i *instance) removeStream(s *stream) {
memoryStreams.WithLabelValues(i.instanceID).Dec()
memoryStreamsLabelsBytes.Sub(float64(len(s.labels.String())))
streamsCountStats.Add(-1)
i.ownedStreamsSvc.decOwnedStreamCount()
i.ownedStreamsSvc.trackRemovedStream(s.fp)
}
}

Expand Down Expand Up @@ -1181,11 +1182,7 @@ func (i *instance) updateOwnedStreams(ownedTokenRange ring.TokenRanges) error {
i.streams.WithLock(func() {
i.ownedStreamsSvc.resetStreamCounts()
err = i.streams.ForEach(func(s *stream) (bool, error) {
if ownedTokenRange.IncludesKey(uint32(s.fp)) {
i.ownedStreamsSvc.incOwnedStreamCount()
} else {
i.ownedStreamsSvc.incNotOwnedStreamCount()
}
i.ownedStreamsSvc.trackStreamOwnership(s.fp, ownedTokenRange.IncludesKey(uint32(s.fp)))
return true, nil
})
})
Expand Down
51 changes: 30 additions & 21 deletions pkg/ingester/owned_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/util/constants"
Expand All @@ -17,19 +18,20 @@ var notOwnedStreamsMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{
}, []string{"tenant"})

type ownedStreamService struct {
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32
ownedStreamCount int
notOwnedStreamCount int
lock sync.RWMutex
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32
ownedStreamCount int
lock sync.RWMutex
notOwnedStreams map[model.Fingerprint]any
}

func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService {
svc := &ownedStreamService{
tenantID: tenantID,
limiter: limiter,
fixedLimit: atomic.NewInt32(0),
tenantID: tenantID,
limiter: limiter,
fixedLimit: atomic.NewInt32(0),
notOwnedStreams: make(map[model.Fingerprint]any),
}

svc.updateFixedLimit()
Expand All @@ -51,25 +53,24 @@ func (s *ownedStreamService) getFixedLimit() int {
return int(s.fixedLimit.Load())
}

func (s *ownedStreamService) incOwnedStreamCount() {
s.lock.Lock()
defer s.lock.Unlock()
s.ownedStreamCount++
}

func (s *ownedStreamService) incNotOwnedStreamCount() {
func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bool) {
s.lock.Lock()
defer s.lock.Unlock()
if owned {
s.ownedStreamCount++
return
}
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Inc()
s.notOwnedStreamCount++
s.notOwnedStreams[fp] = nil
}

func (s *ownedStreamService) decOwnedStreamCount() {
func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) {
s.lock.Lock()
defer s.lock.Unlock()
if s.notOwnedStreamCount > 0 {

if _, notOwned := s.notOwnedStreams[fp]; notOwned {
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Dec()
s.notOwnedStreamCount--
delete(s.notOwnedStreams, fp)
return
}
s.ownedStreamCount--
Expand All @@ -79,6 +80,14 @@ func (s *ownedStreamService) resetStreamCounts() {
s.lock.Lock()
defer s.lock.Unlock()
s.ownedStreamCount = 0
s.notOwnedStreamCount = 0
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Set(0)
s.notOwnedStreams = make(map[model.Fingerprint]any)
}

func (s *ownedStreamService) isStreamNotOwned(fp model.Fingerprint) bool {
s.lock.RLock()
defer s.lock.RUnlock()

_, notOwned := s.notOwnedStreams[fp]
return notOwned
}
58 changes: 34 additions & 24 deletions pkg/ingester/owned_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/validation"
Expand All @@ -28,51 +29,60 @@ func Test_OwnedStreamService(t *testing.T) {
service.updateFixedLimit()
require.Equal(t, 100, service.getFixedLimit())

service.incOwnedStreamCount()
service.incOwnedStreamCount()
service.incOwnedStreamCount()
service.trackStreamOwnership(model.Fingerprint(1), true)
service.trackStreamOwnership(model.Fingerprint(2), true)
service.trackStreamOwnership(model.Fingerprint(3), true)
require.Equal(t, 3, service.getOwnedStreamCount())
require.Len(t, service.notOwnedStreams, 0)

service.incOwnedStreamCount()
service.decOwnedStreamCount()
service.notOwnedStreamCount = 1
service.ownedStreamCount = 2
require.Equal(t, 2, service.getOwnedStreamCount())
require.Equal(t, 1, service.notOwnedStreamCount)
service.resetStreamCounts()
service.trackStreamOwnership(model.Fingerprint(3), true)
service.trackStreamOwnership(model.Fingerprint(3), false)
require.Equal(t, 1, service.getOwnedStreamCount(),
"owned streams count must not be changed because not owned stream can be reported only by recalculate_owned_streams job that resets the counters before checking all the streams")
require.Len(t, service.notOwnedStreams, 1)
require.True(t, service.isStreamNotOwned(model.Fingerprint(3)))

service.resetStreamCounts()
service.trackStreamOwnership(model.Fingerprint(1), true)
service.trackStreamOwnership(model.Fingerprint(2), true)
service.trackStreamOwnership(model.Fingerprint(3), false)

service.decOwnedStreamCount()
require.Equal(t, 2, service.getOwnedStreamCount(), "owned stream count must be decremented only when notOwnedStreamCount is set to 0")
require.Equal(t, 0, service.notOwnedStreamCount)
service.trackRemovedStream(model.Fingerprint(3))
require.Equal(t, 2, service.getOwnedStreamCount(), "owned stream count must be decremented only when notOwnedStream does not contain this fingerprint")
require.Len(t, service.notOwnedStreams, 0)

service.decOwnedStreamCount()
service.trackRemovedStream(model.Fingerprint(2))
require.Equal(t, 1, service.getOwnedStreamCount())
require.Equal(t, 0, service.notOwnedStreamCount, "notOwnedStreamCount must not be decremented lower than 0")
require.Len(t, service.notOwnedStreams, 0)

group := sync.WaitGroup{}
group.Add(200)
group.Add(100)
for i := 0; i < 100; i++ {
go func() {
go func(i int) {
defer group.Done()
service.incOwnedStreamCount()
}()
service.trackStreamOwnership(model.Fingerprint(i+1000), true)
}(i)
}
group.Wait()

group.Add(100)
for i := 0; i < 100; i++ {
go func() {
go func(i int) {
defer group.Done()
service.decOwnedStreamCount()
}()
service.trackRemovedStream(model.Fingerprint(i + 1000))
}(i)
}
group.Wait()

require.Equal(t, 1, service.getOwnedStreamCount(), "owned stream count must not be changed")

// simulate the effect from the recalculation job
service.notOwnedStreamCount = 1
service.ownedStreamCount = 2
service.trackStreamOwnership(model.Fingerprint(44), false)
service.trackStreamOwnership(model.Fingerprint(45), true)

service.resetStreamCounts()

require.Equal(t, 0, service.getOwnedStreamCount())
require.Equal(t, 0, service.notOwnedStreamCount)
require.Len(t, service.notOwnedStreams, 0)
}
4 changes: 2 additions & 2 deletions pkg/ingester/recalculate_owned_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) {
createStream(t, tenant, 250)

require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount)
require.Equal(t, 0, tenant.ownedStreamsSvc.notOwnedStreamCount)
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0)

mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}}

Expand All @@ -110,7 +110,7 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) {
require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation")
}
require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount)
require.Equal(t, testData.expectedNotOwnedStreamCount, tenant.ownedStreamsSvc.notOwnedStreamCount)
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount)
})
}

Expand Down
Loading