Skip to content

Commit

Permalink
Use a line-buffered logger to deamplify write syscalls (grafana#6954)
Browse files Browse the repository at this point in the history
We initialise a global logger in `pkg/util/log/log.go` and use it
extensively throughout the Loki codebase. Every time we write a log
message, a `write` syscall is invoked. Syscalls are problematic because
they transition the process from userspace to kernelspace, which means:

- a context-switch is incurred, which is inherently expensive ([1-2
microseconds](https://eli.thegreenplace.net/2018/measuring-context-switching-and-memory-overheads-for-linux-threads/))
- the goroutine executing the code is **blocked**
- the underlying OS thread (_M_ in the go scheduler model) is **also
blocked**
- the goroutine has to be rescheduled once the syscall exits
- the go scheduler may need to spawn additional OS threads if all are
blocked in syscalls - which can also be expensive

This change introduces the use of a line-buffered logger. It has a
buffer of
[256](https://gist.github.com/dannykopping/0704db32c0b08751d1d2494efaa734c2)
entries, and once that buffer is filled it will flush to disk. However,
a situation will arise in which that buffer remains somewhat empty for a
period of time, so there is a periodic flush mechanism, configured to
flush every 100ms. There is also a preallocated bytes slice of 10MB
which is reused, to avoid excessive slice resizing & garbage collection.

This does mean that we could lose up to 256 log messages in case of an
ungraceful termination of the process, but this would need to be
precisely timed within the 100ms flushes - in other words, the
likelihood is low, and generally we shouldn't `kill -9` any Loki
process.
  • Loading branch information
Danny Kopping authored and lxwzy committed Nov 7, 2022
1 parent 8bad3b5 commit 9133327
Show file tree
Hide file tree
Showing 19 changed files with 243 additions and 31 deletions.
2 changes: 1 addition & 1 deletion clients/cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func main() {
fmt.Println("Invalid log level")
os.Exit(1)
}
util_log.InitLogger(&config.Config.ServerConfig.Config, prometheus.DefaultRegisterer)
util_log.InitLogger(&config.Config.ServerConfig.Config, prometheus.DefaultRegisterer, true, false)

// Use Stderr instead of files for the klog.
klog.SetOutput(os.Stderr)
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Test_dropStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil)
util_log.InitLogger(cfg, nil, true, false)
Debug = true

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/labelallow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func Test_addLabelStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil)
util_log.InitLogger(cfg, nil, true, false)
Debug = true

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/labeldrop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func Test_dropLabelStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil)
util_log.InitLogger(cfg, nil, true, false)
Debug = true

tests := []struct {
Expand Down
6 changes: 3 additions & 3 deletions clients/pkg/logentry/stages/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Test_multilineStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil)
util_log.InitLogger(cfg, nil, true, false)
Debug = true

mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")}
Expand Down Expand Up @@ -52,7 +52,7 @@ func Test_multilineStage_MultiStreams(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil)
util_log.InitLogger(cfg, nil, true, false)
Debug = true

mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")}
Expand Down Expand Up @@ -97,7 +97,7 @@ func Test_multilineStage_MaxWaitTime(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil)
util_log.InitLogger(cfg, nil, true, false)
Debug = true

maxWait := 2 * time.Second
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/pack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func Test_packStage_Run(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg, nil)
util_log.InitLogger(cfg, nil, true, false)
Debug = true

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (t *PushTarget) run() error {

// The logger registers a metric which will cause a duplicate registry panic unless we provide an empty registry
// The metric created is for counting log lines and isn't likely to be missed.
util_log.InitLogger(&t.config.Server, prometheus.NewRegistry())
util_log.InitLogger(&t.config.Server, prometheus.NewRegistry(), true, false)

srv, err := server.New(t.config.Server)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/windows/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func init() {
// Enable debug logging
cfg := &server.Config{}
_ = cfg.LogLevel.Set("debug")
util_log.InitLogger(cfg, nil)
util_log.InitLogger(cfg, nil, true, false)
}

// Test that you can use to generate event logs locally.
Expand Down
2 changes: 1 addition & 1 deletion cmd/logql-analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func main() {
cfg := getConfig()
util_log.InitLogger(&server.Config{
LogLevel: cfg.LogLevel,
}, prometheus.DefaultRegisterer)
}, prometheus.DefaultRegisterer, true, false)
s, err := createServer(cfg)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error while creating the server", "err", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/loki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func main() {
level.Error(util_log.Logger).Log("msg", "invalid log level")
os.Exit(1)
}
util_log.InitLogger(&config.Server, prometheus.DefaultRegisterer)
util_log.InitLogger(&config.Server, prometheus.DefaultRegisterer, config.UseBufferedLogger, config.UseSyncLogger)

// Validate the config once both the config file has been loaded
// and CLI flags parsed.
Expand Down
2 changes: 1 addition & 1 deletion cmd/querytee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {

util_log.InitLogger(&server.Config{
LogLevel: cfg.LogLevel,
}, prometheus.DefaultRegisterer)
}, prometheus.DefaultRegisterer, true, false)

// Run the instrumentation server.
registry := prometheus.NewRegistry()
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,7 @@ replace github.com/prometheus/prometheus => github.com/prometheus/prometheus v0.
// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet.
replace github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe

// Fork containing a line-buffered logger which should improve logging performance.
// TODO: submit PR to upstream and remove this
replace github.com/go-kit/log => github.com/dannykopping/go-kit-log v0.2.2-0.20221002180827-5591c1641b6b
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ github.com/cznic/ql v1.2.0/go.mod h1:FbpzhyZrqr0PVlK6ury+PoW3T0ODUV22OeWIxcaOrSE
github.com/cznic/sortutil v0.0.0-20150617083342-4c7342852e65/go.mod h1:q2w6Bg5jeox1B+QkJ6Wp/+Vn0G/bo3f1uY7Fn3vivIQ=
github.com/cznic/strutil v0.0.0-20171016134553-529a34b1c186/go.mod h1:AHHPPPXTw0h6pVabbcbyGRK1DckRn7r/STdZEeIDzZc=
github.com/cznic/zappy v0.0.0-20160723133515-2533cb5b45cc/go.mod h1:Y1SNZ4dRUOKXshKUbwUapqNncRrho4mkjQebgEHZLj8=
github.com/dannykopping/go-kit-log v0.2.2-0.20221002180827-5591c1641b6b h1:G8g9mAKEj9O3RsU6Hd/ow6lIcHarlcUl5omV6sFKEOU=
github.com/dannykopping/go-kit-log v0.2.2-0.20221002180827-5591c1641b6b/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -509,10 +511,6 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o=
github.com/go-kit/kit v0.12.0 h1:e4o3o3IsBfAKQh5Qbbiqyfu97Ku7jrO/JbohvztANh4=
github.com/go-kit/kit v0.12.0/go.mod h1:lHd+EkCZPIwYItmGDDRdhinkzX2A1sj+M9biaEaizzs=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-kit/log v0.2.0/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-ldap/ldap v3.0.2+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
Expand Down
7 changes: 7 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ type Config struct {
HTTPPrefix string `yaml:"http_prefix"`
BallastBytes int `yaml:"ballast_bytes"`

// TODO(dannyk): Remove these config options before next release; they don't need to be configurable.
// These are only here to allow us to test the new functionality.
UseBufferedLogger bool `yaml:"use_buffered_logger"`
UseSyncLogger bool `yaml:"use_sync_logger"`

Common common.Config `yaml:"common,omitempty"`
Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
Expand Down Expand Up @@ -103,6 +108,8 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.")
f.IntVar(&c.BallastBytes, "config.ballast-bytes", 0, "The amount of virtual memory to reserve as a ballast in order to optimise "+
"garbage collection. Larger ballasts result in fewer garbage collection passes, reducing compute overhead at the cost of memory usage.")
f.BoolVar(&c.UseBufferedLogger, "log.use-buffered", true, "Uses a line-buffered logger to improve performance.")
f.BoolVar(&c.UseSyncLogger, "log.use-sync", true, "Forces all lines logged to hold a mutex to serialize writes.")

c.registerServerFlagsWithChangedDefaultValues(f)
c.Common.RegisterFlags(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func newTestStore(t testing.TB) *testStore {
t.Helper()
servercfg := &ww.Config{}
require.Nil(t, servercfg.LogLevel.Set("debug"))
util_log.InitLogger(servercfg, nil)
util_log.InitLogger(servercfg, nil, true, false)
workdir := t.TempDir()
filepath.Join(workdir, "index")
indexDir := filepath.Join(workdir, "index")
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/index/compactor/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func newTestStore(t testing.TB, clientMetrics storage.ClientMetrics) *testStore
t.Helper()
servercfg := &ww.Config{}
require.Nil(t, servercfg.LogLevel.Set("debug"))
util_log.InitLogger(servercfg, nil)
util_log.InitLogger(servercfg, nil, true, false)
workdir := t.TempDir()
filepath.Join(workdir, "index")
indexDir := filepath.Join(workdir, "index")
Expand Down
65 changes: 54 additions & 11 deletions pkg/util/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package log

import (
"fmt"
"io"
"math"
"os"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -21,8 +24,8 @@ var (

// InitLogger initialises the global gokit logger (util_log.Logger) and overrides the
// default logger for the server.
func InitLogger(cfg *server.Config, reg prometheus.Registerer) {
l := newPrometheusLogger(cfg.LogLevel, cfg.LogFormat, reg)
func InitLogger(cfg *server.Config, reg prometheus.Registerer, buffered bool, sync bool) {
l := newPrometheusLogger(cfg.LogLevel, cfg.LogFormat, reg, buffered, sync)

// when use util_log.Logger, skip 3 stack frames.
Logger = log.With(l, "caller", log.Caller(3))
Expand All @@ -38,24 +41,64 @@ func InitLogger(cfg *server.Config, reg prometheus.Registerer) {
type prometheusLogger struct {
logger log.Logger
logMessages *prometheus.CounterVec
logFlushes prometheus.Histogram

useBufferedLogger bool
useSyncLogger bool
}

// newPrometheusLogger creates a new instance of PrometheusLogger which exposes
// Prometheus counters for various log levels.
func newPrometheusLogger(l logging.Level, format logging.Format, reg prometheus.Registerer) log.Logger {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
func newPrometheusLogger(l logging.Level, format logging.Format, reg prometheus.Registerer, buffered bool, sync bool) log.Logger {

// buffered logger settings
var (
logEntries uint32 = 256 // buffer up to 256 log lines in memory before flushing to a write(2) syscall
logBufferSize uint32 = 10e6 // 10MB
flushTimeout = 100 * time.Millisecond // flush the buffer after 100ms regardless of how full it is, to prevent losing many logs in case of ungraceful termination
)

logMessages := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "log_messages_total",
Help: "Total number of log messages.",
}, []string{"level"})
logFlushes := promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "log_flushes",
Help: "Histogram of log flushes using the line-buffered logger.",
Buckets: prometheus.ExponentialBuckets(1, 2, int(math.Log2(float64(logEntries)))+1),
})

var writer io.Writer
if buffered {
// TODO: it's technically possible here to lose logs between the 100ms flush and the process being killed
// => call buf.Flush() in a signal handler if this is a concern, but this is unlikely to be a problem
writer = log.NewLineBufferedLogger(os.Stderr, logEntries,
log.WithFlushPeriod(flushTimeout),
log.WithPrellocatedBuffer(logBufferSize),
log.WithFlushCallback(func(entries uint32) {
logFlushes.Observe(float64(entries))
}),
)
} else {
writer = os.Stderr
}

if sync {
writer = log.NewSyncWriter(writer)
}

logger := log.NewLogfmtLogger(writer)
if format.String() == "json" {
logger = log.NewJSONLogger(log.NewSyncWriter(os.Stderr))
logger = log.NewJSONLogger(writer)
}
logger = level.NewFilter(logger, levelFilter(l.String()))

plogger := &prometheusLogger{
logger: logger,
logMessages: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "log_messages_total",
Help: "Total number of log messages.",
}, []string{"level"}),
logger: logger,
logMessages: logMessages,
logFlushes: logFlushes,
}
// Initialise counters for all supported levels:
supportedLevels := []level.Value{
Expand Down
Loading

0 comments on commit 9133327

Please sign in to comment.