diff --git a/CHANGELOG.md b/CHANGELOG.md index efbaac9ec7208..ecefc3cbfda8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## master / unreleased + +### Features + +* [1558](https://github.com/grafana/loki/pull/1558) **owen-d**: Introduces `ingester.max-chunk-age` which specifies the maximum chunk age before it's cut. + ## 1.3.0 (2019-01-16) ### What's New?? ### @@ -156,7 +162,6 @@ Once again we can't thank our community and contributors enough for the signific #### New Members! * [1415](https://github.com/grafana/loki/pull/1415) **cyriltovena**: Add Joe as member of the team. - # 1.2.0 (2019-12-09) One week has passed since the last Loki release, and it's time for a new one! diff --git a/docs/configuration/README.md b/docs/configuration/README.md index eed419496c6f2..0e38d114b4528 100644 --- a/docs/configuration/README.md +++ b/docs/configuration/README.md @@ -300,6 +300,10 @@ The `ingester_config` block configures Ingesters. # The maximum number of errors a stream will report to the user # when a push fails. 0 to make unlimited. [max_returned_stream_errors: | default = 10] + +# The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this the current chunk will be flushed to the store and a new chunk created. +[max_chunk_age: | default = 1h] + ``` ### lifecycler_config diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 7ab60d410a2e2..16f75f22366c6 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -146,7 +146,7 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo } lastChunk := stream.chunks[len(stream.chunks)-1] - if len(stream.chunks) == 1 && time.Since(lastChunk.lastUpdated) < i.cfg.MaxChunkIdle && !immediate { + if len(stream.chunks) == 1 && !immediate && !i.shouldFlushChunk(&lastChunk) { return } @@ -246,7 +246,10 @@ func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool { } if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle { - chunk.closed = true + return true + } + + if from, to := chunk.chunk.Bounds(); to.Sub(from) > i.cfg.MaxChunkAge { return true } diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index c54c10e9b3054..82ab2a603697d 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -102,6 +102,59 @@ func TestFlushingCollidingLabels(t *testing.T) { } } +func TestFlushMaxAge(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushCheckPeriod = time.Millisecond * 100 + cfg.MaxChunkAge = time.Minute + cfg.MaxChunkIdle = time.Hour + + store, ing := newTestStore(t, cfg) + defer store.Stop() + + now := time.Unix(0, 0) + + firstEntries := []logproto.Entry{ + {Timestamp: now.Add(time.Nanosecond), Line: "1"}, + {Timestamp: now.Add(time.Minute), Line: "2"}, + } + + secondEntries := []logproto.Entry{ + {Timestamp: now.Add(time.Second * 61), Line: "3"}, + } + + req := &logproto.PushRequest{Streams: []*logproto.Stream{ + {Labels: model.LabelSet{"app": "l"}.String(), Entries: firstEntries}, + }} + + const userID = "testUser" + ctx := user.InjectOrgID(context.Background(), userID) + + _, err := ing.Push(ctx, req) + require.NoError(t, err) + + time.Sleep(2 * cfg.FlushCheckPeriod) + + // ensure chunk is not flushed after flush period elapses + store.checkData(t, map[string][]*logproto.Stream{}) + + req2 := &logproto.PushRequest{Streams: []*logproto.Stream{ + {Labels: model.LabelSet{"app": "l"}.String(), Entries: secondEntries}, + }} + + _, err = ing.Push(ctx, req2) + require.NoError(t, err) + + time.Sleep(2 * cfg.FlushCheckPeriod) + + // assert stream is now both batches + store.checkData(t, map[string][]*logproto.Stream{ + userID: []*logproto.Stream{ + {Labels: model.LabelSet{"app": "l"}.String(), Entries: append(firstEntries, secondEntries...)}, + }, + }) + +} + type testStore struct { mtx sync.Mutex // Chunks keyed by userID. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 8c6c60fc75744..eae0be9d51fb5 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -51,6 +51,7 @@ type Config struct { BlockSize int `yaml:"chunk_block_size"` TargetChunkSize int `yaml:"chunk_target_size"` ChunkEncoding string `yaml:"chunk_encoding"` + MaxChunkAge time.Duration `yaml:"max_chunk_age"` // Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments. SyncPeriod time.Duration `yaml:"sync_period"` @@ -78,6 +79,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.SyncPeriod, "ingester.sync-period", 0, "How often to cut chunks to synchronize ingesters.") f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0, "Minimum utilization of chunk when doing synchronization.") f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "Maximum number of ignored stream errors to return. 0 to return all errors.") + f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.") } // Ingester builds chunks for incoming log streams.