Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup/unordered writes ingester config #4192

Merged
merged 2 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,20 @@ func (c *MemChunk) reorder() error {
return nil
}

func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error {

if c.head != nil && c.head.Format() != desired {
newH, err := c.head.Convert(desired)
if err != nil {
return err
}

c.head = newH
}
c.headFmt = desired
return nil
}

// cut a new block and add it to finished blocks.
func (c *MemChunk) cut() error {
if c.head.IsEmpty() {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func fromWireChunks(conf *Config, wireChunks []Chunk) ([]chunkDesc, error) {
lastUpdated: c.LastUpdated,
}

hbType := chunkenc.OrderedHeadBlockFmt
if conf.UnorderedWrites {
hbType = chunkenc.UnorderedHeadBlockFmt
}
// Always use Unordered headblocks during replay
// to ensure Loki can effectively replay an unordered-friendly
// WAL into a new configuration that disables unordered writes.
hbType := chunkenc.UnorderedHeadBlockFmt
mc, err := chunkenc.MemchunkFromCheckpoint(c.Data, c.Head, hbType, conf.BlockSize, conf.TargetChunkSize)
if err != nil {
return nil, err
Expand Down
108 changes: 106 additions & 2 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func Test_SeriesIterator(t *testing.T) {
IngestionBurstSizeMB: 1e4,
}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

for i := 0; i < 3; i++ {
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
Expand Down Expand Up @@ -500,7 +500,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
IngestionBurstSizeMB: 1e4,
}, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

for i := range instances {
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil, nil)
Expand Down Expand Up @@ -575,3 +575,107 @@ func buildChunks(t testing.TB, size int) []Chunk {
}
return chks
}

func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
for _, waitForCheckpoint := range []bool{false, true} {
t.Run(fmt.Sprintf("checkpoint-%v", waitForCheckpoint), func(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)

// First launch the ingester with unordered writes enabled
dft := defaultLimitsTestConfig()
dft.UnorderedWrites = true
limits, err := validation.NewOverrides(dft, nil)
require.NoError(t, err)

newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
},
}

start := time.Now()
steps := 10
end := start.Add(time.Second * time.Duration(steps))

// Write data out of order
for i := steps - 1; i >= 0; i-- {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
}

ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)

if waitForCheckpoint {
// Ensure we have checkpointed now
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*2) // give a bit of buffer

// Add some more data after the checkpoint
tmp := end
end = end.Add(time.Second * time.Duration(steps))
req.Streams[0].Entries = nil
req.Streams[1].Entries = nil
// Write data out of order again
for i := steps - 1; i >= 0; i-- {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: tmp.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", steps+i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: tmp.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", steps+i),
})
}

_, err = i.Push(ctx, &req)
require.NoError(t, err)
}

ensureIngesterData(ctx, t, start, end, i)

require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// Now disable unordered writes
limitCfg := defaultLimitsTestConfig()
limitCfg.UnorderedWrites = false
limits, err = validation.NewOverrides(limitCfg, nil)
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))

// ensure we've recovered data from wal segments
ensureIngesterData(ctx, t, start, end, i)
})
}
}
1 change: 0 additions & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ func defaultIngesterTestConfig(t testing.TB) Config {
cfg.LifecyclerConfig.MinReadyDuration = 0
cfg.BlockSize = 256 * 1024
cfg.TargetChunkSize = 1500 * 1024
cfg.UnorderedWrites = true
return cfg
}

Expand Down
13 changes: 6 additions & 7 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ type Config struct {

ChunkFilterer storage.RequestChunkFilterer `yaml:"-"`

UnorderedWrites bool `yaml:"unordered_writes_enabled"`

IndexShards int `yaml:"index_shards"`
}

Expand All @@ -107,7 +105,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.")
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`")
f.BoolVar(&cfg.UnorderedWrites, "ingester.unordered-writes-enabled", false, "(Experimental) Allow out of order writes.")
f.IntVar(&cfg.IndexShards, "ingester.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.")
}

Expand Down Expand Up @@ -231,7 +228,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid

// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)

i.Service = services.NewBasicService(i.starting, i.running, i.stopping)

Expand Down Expand Up @@ -328,9 +325,9 @@ func (i *Ingester) starting(ctx context.Context) error {
i.cfg.RetainPeriod = old
}()

// Disable the in process stream limit checks while replaying the WAL
i.limiter.Disable()
defer i.limiter.Enable()
// Disable the in process stream limit checks while replaying the WAL.
// It is re-enabled in the recover's Close() method.
i.limiter.DisableForWALReplay()

recoverer := newIngesterRecoverer(i)
defer recoverer.Close()
Expand Down Expand Up @@ -381,6 +378,8 @@ func (i *Ingester) starting(ctx context.Context) error {
"errors", segmentRecoveryErr != nil,
)

level.Info(util_log.Logger).Log("msg", "closing recoverer")
recoverer.Close()
elapsed := time.Since(start)
i.metrics.walReplayDuration.Set(elapsed.Seconds())
level.Info(util_log.Logger).Log("msg", "recovery finished", "time", elapsed.String())
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo
if !ok {

sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(ls), fp)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.limits.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streamsByFP[fp] = stream
i.streams[stream.labelsString] = stream
i.streamsCreatedTotal.Inc()
Expand Down Expand Up @@ -243,7 +243,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
fp := i.getHashForLabels(labels)

sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(labels), fp)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.limits.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streams[pushReqStream.Labels] = stream
i.streamsByFP[fp] = stream

Expand Down
16 changes: 8 additions & 8 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var NilMetrics = newIngesterMetrics(nil)
func TestLabelsCollisions(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, nil, &OnceSwitch{}, nil)

Expand All @@ -66,7 +66,7 @@ func TestLabelsCollisions(t *testing.T) {
func TestConcurrentPushes(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)

Expand Down Expand Up @@ -117,7 +117,7 @@ func TestConcurrentPushes(t *testing.T) {
func TestSyncPeriod(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

const (
syncPeriod = 1 * time.Minute
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestSyncPeriod(t *testing.T) {
func Test_SeriesQuery(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

// just some random values
cfg := defaultConfig()
Expand Down Expand Up @@ -274,7 +274,7 @@ func makeRandomLabels() labels.Labels {
func Benchmark_PushInstance(b *testing.B) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

i := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
ctx := context.Background()
Expand Down Expand Up @@ -314,7 +314,7 @@ func Benchmark_PushInstance(b *testing.B) {
func Benchmark_instance_addNewTailer(b *testing.B) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 100000}, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

ctx := context.Background()

Expand Down Expand Up @@ -368,7 +368,7 @@ func Test_Iterator(t *testing.T) {
defaultLimits := defaultLimitsTestConfig()
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
ctx := context.TODO()
direction := logproto.BACKWARD
limit := uint32(2)
Expand Down Expand Up @@ -450,7 +450,7 @@ func Test_ChunkFilter(t *testing.T) {
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(
&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{})
&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{})
ctx := context.TODO()
direction := logproto.BACKWARD
limit := uint32(2)
Expand Down
17 changes: 15 additions & 2 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,45 @@ type Limiter struct {
limits *validation.Overrides
ring RingCount
replicationFactor int
metrics *ingesterMetrics

mtx sync.RWMutex
disabled bool
}

func (l *Limiter) Disable() {
func (l *Limiter) DisableForWALReplay() {
l.mtx.Lock()
defer l.mtx.Unlock()
l.disabled = true
l.metrics.limiterEnabled.Set(0)
}

func (l *Limiter) Enable() {
l.mtx.Lock()
defer l.mtx.Unlock()
l.disabled = false
l.metrics.limiterEnabled.Set(1)
}

// NewLimiter makes a new limiter
func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int) *Limiter {
func NewLimiter(limits *validation.Overrides, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter {
return &Limiter{
limits: limits,
ring: ring,
replicationFactor: replicationFactor,
metrics: metrics,
}
}

func (l *Limiter) UnorderedWrites(userID string) bool {
// WAL replay should not discard previously ack'd writes,
// so allow out of order writes while the limiter is disabled.
if l.disabled {
return true
}
return l.limits.UnorderedWrites(userID)
}

// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, testData.ringReplicationFactor)
limiter := NewLimiter(limits, NilMetrics, ring, testData.ringReplicationFactor)
actual := limiter.AssertMaxStreamsPerUser("test", testData.streams)

assert.Equal(t, testData.expected, actual)
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestLimiter_minNonZero(t *testing.T) {
testData := testData

t.Run(testName, func(t *testing.T) {
limiter := NewLimiter(nil, nil, 0)
limiter := NewLimiter(nil, NilMetrics, nil, 0)
assert.Equal(t, testData.expected, limiter.minNonZero(testData.first, testData.second))
})
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type ingesterMetrics struct {
recoveryBytesInUse prometheus.Gauge
recoveryIsFlushing prometheus.Gauge

limiterEnabled prometheus.Gauge

autoForgetUnhealthyIngestersTotal prometheus.Counter
}

Expand Down Expand Up @@ -119,6 +121,10 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
Name: "loki_ingester_wal_replay_flushing",
Help: "Whether the wal replay is in a flushing phase due to backpressure",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_limiter_enabled",
Help: "Whether the ingester's limiter is enabled",
}),
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten",
Expand Down
Loading