Skip to content

Commit

Permalink
Don't fail writes due to full WAL disk (#3136)
Browse files Browse the repository at this point in the history
* dont fail writes on full wal disk

* wal full failure will cause flush on shutdown

* logs the first full WAL failure
  • Loading branch information
owen-d authored Jan 7, 2021
1 parent fd94b8d commit ed649ee
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 42 deletions.
24 changes: 21 additions & 3 deletions docs/sources/operations/storage/wal.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,29 @@ title: Write Ahead Log

# Write Ahead Log (WAL)

Ingesters temporarily store data in memory. In the event of a crash, there could be data loss. The WAL helps fill this gap in reliability.

Ingesters store all their data in memory. If there is a crash, there can be data loss. The WAL helps fill this gap in reliability.
This section will use Kubernetes as a reference.
The WAL in Loki records incoming data and stores it on the local file system in order to guarantee persistence of acknowledged data in the event of a process crash. Upon restart, Loki will "replay" all of the data in the log before registering itself as ready for subsequent writes. This allows Loki to maintain the performance & cost benefits of buffering data in memory _and_ durability benefits (it won't lose data once a write has been acknowledged).

To use the WAL, there are some changes that needs to be made.
This section will use Kubernetes as a reference deployment paradigm in the examples.

## Disclaimer & WAL nuances

The Write Ahead Log in Loki takes a few particular tradeoffs compared to other WALs you may be familiar with. The WAL aims to add additional durability guarantees, but _not at the expense of availability_. Particularly, there are two scenarios where the WAL sacrifices these guarantees.

1) Corruption/Deletion of the WAL prior to replaying it

In the event the WAL is corrupted/partially deleted, Loki will not be able to recover all of it's data. In this case, Loki will attempt to recover any data it can, but will not prevent Loki from starting.

Note: the Prometheus metric `loki_ingester_wal_corruptions_total` can be used to track and alert when this happens.

1) No space left on disk

In the event the underlying WAL disk is full, Loki will not fail incoming writes, but neither will it log them to the WAL. In this case, the persistence guarantees across process restarts will not hold.

Note: the Prometheus metric `loki_ingester_wal_disk_full_failures_total` can be used to track and alert when this happens.

### Metrics

## Changes to deployment

Expand Down
29 changes: 15 additions & 14 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ func ensureIngesterData(ctx context.Context, t *testing.T, start, end time.Time,
require.Len(t, result.resps[0].Streams[1].Entries, ln)
}

func TestIngesterWAL(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

func defaultIngesterTestConfigWithWAL(t *testing.T, walDir string) Config {
ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
Expand All @@ -52,6 +47,18 @@ func TestIngesterWAL(t *testing.T) {
Recover: true,
CheckpointDuration: time.Second,
}

return ingesterConfig
}

func TestIngesterWAL(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

Expand Down Expand Up @@ -134,14 +141,8 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
require.Nil(t, err)
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
Enabled: true,
Dir: walDir,
Recover: true,
CheckpointDuration: time.Second,
}
ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

Expand Down
39 changes: 34 additions & 5 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package ingester

import (
"fmt"
"io/ioutil"
"os"
"sort"
"sync"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -44,7 +47,7 @@ func TestChunkFlushingIdle(t *testing.T) {
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond

store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
testData := pushTestSamples(t, ing)

Expand All @@ -54,7 +57,25 @@ func TestChunkFlushingIdle(t *testing.T) {
}

func TestChunkFlushingShutdown(t *testing.T) {
store, ing := newTestStore(t, defaultIngesterTestConfig(t))
store, ing := newTestStore(t, defaultIngesterTestConfig(t), nil)
testData := pushTestSamples(t, ing)
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
store.checkData(t, testData)
}

type fullWAL struct{}

func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Stop() error { return nil }

func TestWALFullFlush(t *testing.T) {
// technically replaced with a fake wal, but the ingester New() function creates a regular wal first,
// so we enable creation/cleanup even though it remains unused.
walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

store, ing := newTestStore(t, defaultIngesterTestConfigWithWAL(t, walDir), fullWAL{})
testData := pushTestSamples(t, ing)
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
store.checkData(t, testData)
Expand All @@ -66,7 +87,7 @@ func TestFlushingCollidingLabels(t *testing.T) {
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond

store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer store.Stop()

const userID = "testUser"
Expand Down Expand Up @@ -112,7 +133,7 @@ func TestFlushMaxAge(t *testing.T) {
cfg.MaxChunkAge = time.Minute
cfg.MaxChunkIdle = time.Hour

store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer store.Stop()

now := time.Unix(0, 0)
Expand Down Expand Up @@ -166,7 +187,10 @@ type testStore struct {
chunks map[string][]chunk.Chunk
}

func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
// Note: the ingester New() function creates it's own WAL first which we then override if specified.
// Because of this, ensure any WAL directories exist/are cleaned up even when overriding the wal.
// This is an ugly hook for testing :(
func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, *Ingester) {
store := &testStore{
chunks: map[string][]chunk.Chunk{},
}
Expand All @@ -178,6 +202,11 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

if walOverride != nil {
_ = ing.wal.Stop()
ing.wal = walOverride
}

return store, ing
}

Expand Down
29 changes: 19 additions & 10 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ type Ingester struct {

limiter *Limiter

// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch

metrics *ingesterMetrics

wal WAL
Expand All @@ -169,15 +173,16 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
metrics := newIngesterMetrics(registerer)

i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
flushOnShutdownSwitch: &OnceSwitch{},
}

if cfg.WAL.Enabled {
Expand Down Expand Up @@ -319,6 +324,10 @@ func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
var errs errUtil.MultiError
errs.Add(i.wal.Stop())

if i.flushOnShutdownSwitch.Get() {
i.lifecycler.SetFlushOnShutdown(true)
}
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))

// Normally, flushers are stopped via lifecycler (in transferOut), but if lifecycler fails,
Expand Down Expand Up @@ -384,7 +393,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics)
inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics, i.flushOnShutdownSwitch)
i.instances[instanceID] = inst
}
return inst
Expand Down
57 changes: 54 additions & 3 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package ingester
import (
"context"
"net/http"
"os"
"sync"
"syscall"

"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -15,6 +18,7 @@ import (

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ingester/index"
"github.com/cortexproject/cortex/pkg/util"
cutil "github.com/cortexproject/cortex/pkg/util"

"github.com/grafana/loki/pkg/helpers"
Expand Down Expand Up @@ -77,6 +81,10 @@ type instance struct {

wal WAL

// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch

metrics *ingesterMetrics
}

Expand All @@ -86,6 +94,7 @@ func newInstance(
limiter *Limiter,
wal WAL,
metrics *ingesterMetrics,
flushOnShutdownSwitch *OnceSwitch,
) *instance {
i := &instance{
cfg: cfg,
Expand All @@ -101,8 +110,9 @@ func newInstance(
tailers: map[uint32]*tailer{},
limiter: limiter,

wal: wal,
metrics: metrics,
wal: wal,
metrics: metrics,
flushOnShutdownSwitch: flushOnShutdownSwitch,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
Expand Down Expand Up @@ -161,8 +171,19 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {

if !record.IsEmpty() {
if err := i.wal.Log(record); err != nil {
return err
if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
i.metrics.walDiskFullFailures.Inc()
i.flushOnShutdownSwitch.TriggerAnd(func() {
level.Error(util.Logger).Log(
"msg",
"Error writing to WAL, disk full, no further messages will be logged for this error",
)
})
} else {
return err
}
}

}

return appendErr
Expand Down Expand Up @@ -578,3 +599,33 @@ func shouldConsiderStream(stream *stream, req *logproto.SeriesRequest) bool {
}
return false
}

// OnceSwitch is a write optimized switch that can only ever be switched "on".
// It uses a RWMutex underneath the hood to quickly and effectively (in a concurrent environment)
// check if the switch has already been triggered, only actually acquiring the mutex for writing if not.
type OnceSwitch struct {
sync.RWMutex
toggle bool
}

func (o *OnceSwitch) Get() bool {
o.RLock()
defer o.RUnlock()
return o.toggle
}

// TriggerAnd will ensure the switch is on and run the provided function if
// the switch was not already toggled on.
func (o *OnceSwitch) TriggerAnd(fn func()) {
o.RLock()
if o.toggle {
o.RUnlock()
return
}

o.RUnlock()
o.Lock()
o.toggle = true
o.Unlock()
fn()
}
12 changes: 6 additions & 6 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil)
i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil, &OnceSwitch{})

// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
Expand All @@ -62,7 +62,7 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})

const (
concurrent = 10
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)

inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
lbls := makeRandomLabels()

tt := time.Now()
Expand Down Expand Up @@ -160,7 +160,7 @@ func Test_SeriesQuery(t *testing.T) {
cfg.SyncPeriod = 1 * time.Minute
cfg.SyncMinUtilization = 0.20

instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics)
instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})

currentTime := time.Now()

Expand Down Expand Up @@ -271,7 +271,7 @@ func Benchmark_PushInstance(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics)
i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
ctx := context.Background()

for n := 0; n < b.N; n++ {
Expand Down Expand Up @@ -313,7 +313,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {

ctx := context.Background()

inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil)
require.NoError(b, err)
for i := 0; i < 10000; i++ {
Expand Down
Loading

0 comments on commit ed649ee

Please sign in to comment.