From b1abd6bd993b2b1ee2e9c3a7271a3f523b9be97f Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Tue, 17 Oct 2023 00:05:22 +0300 Subject: [PATCH] Use structured logging with slog (#475) --- .github/workflows/commit.yml | 2 +- .github/workflows/release.linux.yml | 2 +- Dockerfile | 2 +- cmd/litestream/generations.go | 5 +- cmd/litestream/main.go | 46 +++++++++++++++-- cmd/litestream/main_notwindows.go | 2 +- cmd/litestream/main_windows.go | 9 ++-- cmd/litestream/replicate.go | 41 +++++---------- cmd/litestream/restore.go | 8 --- cmd/litestream/snapshots.go | 4 +- cmd/litestream/wal.go | 5 +- db.go | 38 ++++++-------- go.mod | 2 +- go.sum | 4 ++ internal/internal_unix.go | 1 + internal/internal_windows.go | 1 + litestream.go | 3 -- replica.go | 80 ++++++++++++++--------------- 18 files changed, 132 insertions(+), 123 deletions(-) diff --git a/.github/workflows/commit.yml b/.github/workflows/commit.yml index f3b9e18d..4179b570 100644 --- a/.github/workflows/commit.yml +++ b/.github/workflows/commit.yml @@ -8,7 +8,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-go@v2 with: - go-version: '1.20' + go-version: '1.21' - uses: actions/cache@v2 with: path: ~/go/pkg/mod diff --git a/.github/workflows/release.linux.yml b/.github/workflows/release.linux.yml index b06c61e6..73f06e13 100644 --- a/.github/workflows/release.linux.yml +++ b/.github/workflows/release.linux.yml @@ -31,7 +31,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-go@v2 with: - go-version: '1.20' + go-version: '1.21' - id: release uses: bruceadams/get-release@v1.2.2 diff --git a/Dockerfile b/Dockerfile index 631510e5..d4ca441c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20.1 as builder +FROM golang:1.21.3 as builder WORKDIR /src/litestream COPY . . diff --git a/cmd/litestream/generations.go b/cmd/litestream/generations.go index fefa40c6..56754e28 100644 --- a/cmd/litestream/generations.go +++ b/cmd/litestream/generations.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "log" "os" "text/tabwriter" "time" @@ -87,7 +86,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) for _, r := range replicas { generations, err := r.Client.Generations(ctx) if err != nil { - log.Printf("%s: cannot list generations: %s", r.Name(), err) + r.Logger().Error("cannot list generations", "error", err) continue } @@ -95,7 +94,7 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error) for _, generation := range generations { createdAt, updatedAt, err := r.GenerationTimeBounds(ctx, generation) if err != nil { - log.Printf("%s: cannot determine generation time bounds: %s", r.Name(), err) + r.Logger().Error("cannot determine generation time bounds", "error", err) continue } diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 3bb20c1a..ce75f06a 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -6,7 +6,7 @@ import ( "flag" "fmt" "io/ioutil" - "log" + "log/slog" "net/url" "os" "os/user" @@ -37,13 +37,11 @@ var ( var errStop = errors.New("stop") func main() { - log.SetFlags(0) - m := NewMain() if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errStop { os.Exit(1) } else if err != nil { - log.Println(err) + slog.Error("failed to run", "error", err) os.Exit(1) } } @@ -172,6 +170,16 @@ type Config struct { // Global S3 settings AccessKeyID string `yaml:"access-key-id"` SecretAccessKey string `yaml:"secret-access-key"` + + // Logging + Logging LoggingConfig `yaml:"logging"` +} + +// LoggingConfig configures logging. +type LoggingConfig struct { + Level string `yaml:"level"` + Type string `yaml:"type"` + Stderr bool `yaml:"stderr"` } // propagateGlobalSettings copies global S3 settings to replica configs. @@ -241,6 +249,36 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) { // Propage settings from global config to replica configs. config.propagateGlobalSettings() + // Configure logging. + logOutput := os.Stdout + if config.Logging.Stderr { + logOutput = os.Stderr + } + + logOptions := slog.HandlerOptions{ + Level: slog.LevelInfo, + } + + switch strings.ToUpper(config.Logging.Level) { + case "DEBUG": + logOptions.Level = slog.LevelDebug + case "WARN", "WARNING": + logOptions.Level = slog.LevelWarn + case "ERROR": + logOptions.Level = slog.LevelError + } + + var logHandler slog.Handler + switch config.Logging.Type { + case "json": + logHandler = slog.NewJSONHandler(logOutput, &logOptions) + case "text", "": + logHandler = slog.NewTextHandler(logOutput, &logOptions) + } + + // Set global default logger. + slog.SetDefault(slog.New(logHandler)) + return config, nil } diff --git a/cmd/litestream/main_notwindows.go b/cmd/litestream/main_notwindows.go index aaf87a10..77fef904 100644 --- a/cmd/litestream/main_notwindows.go +++ b/cmd/litestream/main_notwindows.go @@ -1,4 +1,4 @@ -// +build !windows +//go:build !windows package main diff --git a/cmd/litestream/main_windows.go b/cmd/litestream/main_windows.go index a762d322..0f6c1c2b 100644 --- a/cmd/litestream/main_windows.go +++ b/cmd/litestream/main_windows.go @@ -1,11 +1,10 @@ -// +build windows +//go:build windows package main import ( "context" "io" - "log" "os" "os/signal" @@ -63,13 +62,13 @@ func (s *windowsService) Execute(args []string, r <-chan svc.ChangeRequest, stat // Instantiate replication command and load configuration. c := NewReplicateCommand() if c.Config, err = ReadConfigFile(DefaultConfigPath(), true); err != nil { - log.Printf("cannot load configuration: %s", err) + slog.Error("cannot load configuration", "error", err) return true, 1 } // Execute replication command. if err := c.Run(s.ctx); err != nil { - log.Printf("cannot replicate: %s", err) + slog.Error("cannot replicate", "error", err) statusCh <- svc.Status{State: svc.StopPending} return true, 2 } @@ -88,7 +87,7 @@ func (s *windowsService) Execute(args []string, r <-chan svc.ChangeRequest, stat case svc.Interrogate: statusCh <- req.CurrentStatus default: - log.Printf("Litestream service received unexpected change request cmd: %d", req.Cmd) + slog.Error("Litestream service received unexpected change request", "cmd", req.Cmd) } } } diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index cd95d961..898e79de 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -4,7 +4,7 @@ import ( "context" "flag" "fmt" - "log" + "log/slog" "net" "net/http" _ "net/http/pprof" @@ -42,7 +42,6 @@ func NewReplicateCommand() *ReplicateCommand { func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) { fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError) execFlag := fs.String("exec", "", "execute subcommand") - tracePath := fs.String("trace", "", "trace path") configPath, noExpandEnv := registerConfigFlag(fs) fs.Usage = c.Usage if err := fs.Parse(args); err != nil { @@ -80,27 +79,17 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e c.Config.Exec = *execFlag } - // Enable trace logging. - if *tracePath != "" { - f, err := os.Create(*tracePath) - if err != nil { - return err - } - defer f.Close() - litestream.Tracef = log.New(f, "", log.LstdFlags|log.Lmicroseconds|log.LUTC|log.Lshortfile).Printf - } - return nil } // Run loads all databases specified in the configuration. func (c *ReplicateCommand) Run() (err error) { // Display version information. - log.Printf("litestream %s", Version) + slog.Info("litestream", "version", Version) // Setup databases. if len(c.Config.DBs) == 0 { - log.Println("no databases specified in configuration") + slog.Error("no databases specified in configuration") } for _, dbConfig := range c.Config.DBs { @@ -118,21 +107,22 @@ func (c *ReplicateCommand) Run() (err error) { // Notify user that initialization is done. for _, db := range c.DBs { - log.Printf("initialized db: %s", db.Path()) + slog.Info("initialized db", "path", db.Path()) for _, r := range db.Replicas { + slog := slog.With("name", r.Name(), "type", r.Client.Type(), "sync-interval", r.SyncInterval) switch client := r.Client.(type) { case *file.ReplicaClient: - log.Printf("replicating to: name=%q type=%q path=%q", r.Name(), client.Type(), client.Path()) + slog.Info("replicating to", "path", client.Path()) case *s3.ReplicaClient: - log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Region, client.Endpoint, r.SyncInterval) + slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "region", client.Region, "endpoint", client.Endpoint) case *gcs.ReplicaClient: - log.Printf("replicating to: name=%q type=%q bucket=%q path=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, r.SyncInterval) + slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path) case *abs.ReplicaClient: - log.Printf("replicating to: name=%q type=%q bucket=%q path=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Endpoint, r.SyncInterval) + slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "endpoint", client.Endpoint) case *sftp.ReplicaClient: - log.Printf("replicating to: name=%q type=%q host=%q user=%q path=%q sync-interval=%s", r.Name(), client.Type(), client.Host, client.User, client.Path, r.SyncInterval) + slog.Info("replicating to", "host", client.Host, "user", client.User, "path", client.Path) default: - log.Printf("replicating to: name=%q type=%q", r.Name(), client.Type()) + slog.Info("replicating to") } } } @@ -146,11 +136,11 @@ func (c *ReplicateCommand) Run() (err error) { hostport = net.JoinHostPort("localhost", port) } - log.Printf("serving metrics on http://%s/metrics", hostport) + slog.Info("serving metrics on", "url", fmt.Sprintf("http://%s/metrics", hostport)) go func() { http.Handle("/metrics", promhttp.Handler()) if err := http.ListenAndServe(c.Config.Addr, nil); err != nil { - log.Printf("cannot start metrics server: %s", err) + slog.Error("cannot start metrics server", "error", err) } }() } @@ -179,7 +169,7 @@ func (c *ReplicateCommand) Run() (err error) { func (c *ReplicateCommand) Close() (err error) { for _, db := range c.DBs { if e := db.Close(); e != nil { - log.Printf("error closing db: path=%s err=%s", db.Path(), e) + db.Logger.Error("error closing db", "error", e) if err == nil { err = e } @@ -215,8 +205,5 @@ Arguments: -no-expand-env Disables environment variable expansion in configuration file. - -trace PATH - Write verbose trace logging to PATH. - `[1:], DefaultConfigPath()) } diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go index 28c20fc1..3e4c86aa 100644 --- a/cmd/litestream/restore.go +++ b/cmd/litestream/restore.go @@ -5,7 +5,6 @@ import ( "errors" "flag" "fmt" - "log" "os" "strconv" "time" @@ -19,7 +18,6 @@ type RestoreCommand struct{} // Run executes the command. func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { opt := litestream.NewRestoreOptions() - opt.Verbose = true fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError) configPath, noExpandEnv := registerConfigFlag(fs) @@ -31,7 +29,6 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { ifDBNotExists := fs.Bool("if-db-not-exists", false, "") ifReplicaExists := fs.Bool("if-replica-exists", false, "") timestampStr := fs.String("timestamp", "", "timestamp") - verbose := fs.Bool("v", false, "verbose output") fs.Usage = c.Usage if err := fs.Parse(args); err != nil { return err @@ -48,11 +45,6 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { } } - // Instantiate logger if verbose output is enabled. - if *verbose { - opt.Logger = log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds) - } - // Determine replica & generation to restore from. var r *litestream.Replica if isURL(fs.Arg(0)) { diff --git a/cmd/litestream/snapshots.go b/cmd/litestream/snapshots.go index 72e67a5a..25d83e37 100644 --- a/cmd/litestream/snapshots.go +++ b/cmd/litestream/snapshots.go @@ -4,7 +4,7 @@ import ( "context" "flag" "fmt" - "log" + "log/slog" "os" "text/tabwriter" "time" @@ -82,7 +82,7 @@ func (c *SnapshotsCommand) Run(ctx context.Context, args []string) (err error) { for _, r := range replicas { infos, err := r.Snapshots(ctx) if err != nil { - log.Printf("cannot determine snapshots: %s", err) + slog.Error("cannot determine snapshots", "error", err) continue } for _, info := range infos { diff --git a/cmd/litestream/wal.go b/cmd/litestream/wal.go index 9b7b9efc..8fe7381b 100644 --- a/cmd/litestream/wal.go +++ b/cmd/litestream/wal.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "log" "os" "text/tabwriter" "time" @@ -86,7 +85,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) { generations = []string{*generation} } else { if generations, err = r.Client.Generations(ctx); err != nil { - log.Printf("%s: cannot determine generations: %s", r.Name(), err) + r.Logger().Error("cannot determine generations", "error", err) continue } } @@ -113,7 +112,7 @@ func (c *WALCommand) Run(ctx context.Context, args []string) (err error) { } return itr.Close() }(); err != nil { - log.Printf("%s: cannot fetch wal segments: %s", r.Name(), err) + r.Logger().Error("cannot fetch wal segments", "error", err) continue } } diff --git a/db.go b/db.go index c5c4de75..6aef6127 100644 --- a/db.go +++ b/db.go @@ -11,7 +11,7 @@ import ( "hash/crc64" "io" "io/ioutil" - "log" + "log/slog" "math" "math/rand" "os" @@ -97,8 +97,8 @@ type DB struct { // Must be set before calling Open(). Replicas []*Replica - // Where to send log messages, defaults to log.Default() - Logger *log.Logger + // Where to send log messages, defaults to global slog with databas epath. + Logger *slog.Logger } // NewDB returns a new instance of DB for a given path. @@ -114,8 +114,7 @@ func NewDB(path string) *DB { MaxCheckpointPageN: DefaultMaxCheckpointPageN, CheckpointInterval: DefaultCheckpointInterval, MonitorInterval: DefaultMonitorInterval, - - Logger: log.Default(), + Logger: slog.With("db", path), } db.dbSizeGauge = dbSizeGaugeVec.WithLabelValues(db.path) @@ -461,7 +460,7 @@ func (db *DB) init() (err error) { // If we have an existing shadow WAL, ensure the headers match. if err := db.verifyHeadersMatch(); err != nil { - db.Logger.Printf("%s: init: cannot determine last wal position, clearing generation; %s", db.path, err) + db.Logger.Warn("init: cannot determine last wal position, clearing generation", "error", err) if err := os.Remove(db.GenerationNamePath()); err != nil && !os.IsNotExist(err) { return fmt.Errorf("remove generation name: %w", err) } @@ -703,7 +702,7 @@ func (db *DB) Sync(ctx context.Context) (err error) { if err := db.init(); err != nil { return err } else if db.db == nil { - Tracef("%s: sync: no database found", db.path) + db.Logger.Debug("sync: no database found") return nil } @@ -729,7 +728,7 @@ func (db *DB) Sync(ctx context.Context) (err error) { if err != nil { return fmt.Errorf("cannot verify wal state: %w", err) } - Tracef("%s: sync: info=%#v", db.path, info) + db.Logger.Debug("sync", "info", &info) // Track if anything in the shadow WAL changes and then notify at the end. changed := info.walSize != info.shadowWALSize || info.restart || info.reason != "" @@ -740,7 +739,7 @@ func (db *DB) Sync(ctx context.Context) (err error) { if info.generation, err = db.createGeneration(); err != nil { return fmt.Errorf("create generation: %w", err) } - db.Logger.Printf("%s: sync: new generation %q, %s", db.path, info.generation, info.reason) + db.Logger.Info("sync: new generation", "generation", info.generation, "reason", info.reason) // Clear shadow wal info. info.shadowWALPath = db.ShadowWALPath(info.generation, 0) @@ -794,7 +793,7 @@ func (db *DB) Sync(ctx context.Context) (err error) { db.notify = make(chan struct{}) } - Tracef("%s: sync: ok", db.path) + db.Logger.Debug("sync: ok") return nil } @@ -985,7 +984,8 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) { } func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { - Tracef("%s: copy-shadow: %s", db.path, filename) + logger := db.Logger.With("filename", filename) + logger.Debug("copy-shadow") r, err := os.Open(db.WALPath()) if err != nil { @@ -1049,7 +1049,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { for { // Read next page from WAL file. if _, err := io.ReadFull(r, frame); err == io.EOF || err == io.ErrUnexpectedEOF { - Tracef("%s: copy-shadow: break %s @ %d; err=%s", db.path, filename, offset, err) + logger.Debug("copy-shadow: break", "offset", offset, "error", err) break // end of file or partial page } else if err != nil { return 0, fmt.Errorf("read wal: %w", err) @@ -1059,7 +1059,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { salt0 := binary.BigEndian.Uint32(frame[8:]) salt1 := binary.BigEndian.Uint32(frame[12:]) if salt0 != hsalt0 || salt1 != hsalt1 { - Tracef("%s: copy-shadow: break: salt mismatch", db.path) + logger.Debug("copy-shadow: break: salt mismatch") break } @@ -1069,7 +1069,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[:8]) // frame header chksum0, chksum1 = Checksum(bo, chksum0, chksum1, frame[24:]) // frame data if chksum0 != fchksum0 || chksum1 != fchksum1 { - Tracef("%s: copy shadow: checksum mismatch, skipping: offset=%d (%x,%x) != (%x,%x)", db.path, offset, chksum0, chksum1, fchksum0, fchksum1) + logger.Debug("copy shadow: checksum mismatch, skipping", "offset", offset, "check", fmt.Sprintf("(%x,%x) != (%x,%x)", chksum0, chksum1, fchksum0, fchksum1)) break } @@ -1078,7 +1078,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { return 0, fmt.Errorf("write temp shadow wal: %w", err) } - Tracef("%s: copy-shadow: ok %s offset=%d salt=%x %x", db.path, filename, offset, salt0, salt1) + logger.Debug("copy-shadow: ok", "offset", offset, "salt", fmt.Sprintf("%x %x", salt0, salt1)) offset += int64(len(frame)) // Update new size if written frame was a commit record. @@ -1391,7 +1391,7 @@ func (db *DB) execCheckpoint(mode string) (err error) { if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil { return err } - Tracef("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2]) + db.Logger.Debug("checkpoint", "mode", mode, "result", fmt.Sprintf("%d,%d,%d", row[0], row[1], row[2])) // Reacquire the read lock immediately after the checkpoint. if err := db.acquireReadLock(); err != nil { @@ -1416,7 +1416,7 @@ func (db *DB) monitor() { // Sync the database to the shadow WAL. if err := db.Sync(db.ctx); err != nil && !errors.Is(err, context.Canceled) { - db.Logger.Printf("%s: sync error: %s", db.path, err) + db.Logger.Error("sync error", "error", err) } } } @@ -1560,10 +1560,6 @@ type RestoreOptions struct { // Specifies how many WAL files are downloaded in parallel during restore. Parallelism int - - // Logging settings. - Logger *log.Logger - Verbose bool } // NewRestoreOptions returns a new instance of RestoreOptions with defaults. diff --git a/go.mod b/go.mod index 9c49a438..35ef41db 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/benbjohnson/litestream -go 1.19 +go 1.21 require ( cloud.google.com/go/storage v1.31.0 diff --git a/go.sum b/go.sum index 79a3b898..c8747b0f 100644 --- a/go.sum +++ b/go.sum @@ -87,6 +87,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= +github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc= github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -106,6 +107,7 @@ github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -137,6 +139,7 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -212,6 +215,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/internal/internal_unix.go b/internal/internal_unix.go index cedc947e..e30a68aa 100644 --- a/internal/internal_unix.go +++ b/internal/internal_unix.go @@ -1,3 +1,4 @@ +//go:build aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris // +build aix darwin dragonfly freebsd linux netbsd openbsd solaris package internal diff --git a/internal/internal_windows.go b/internal/internal_windows.go index 18531642..b285de03 100644 --- a/internal/internal_windows.go +++ b/internal/internal_windows.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package internal diff --git a/litestream.go b/litestream.go index 3fa34742..95e97d50 100644 --- a/litestream.go +++ b/litestream.go @@ -540,9 +540,6 @@ func isHexChar(ch rune) bool { return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f') } -// Tracef is used for low-level tracing. -var Tracef = func(format string, a ...interface{}) {} - func assert(condition bool, message string) { if !condition { panic("assertion failed: " + message) diff --git a/replica.go b/replica.go index b633123f..f08b69b4 100644 --- a/replica.go +++ b/replica.go @@ -7,7 +7,7 @@ import ( "hash/crc64" "io" "io/ioutil" - "log" + "log/slog" "math" "os" "path/filepath" @@ -72,9 +72,6 @@ type Replica struct { // Encryption identities and recipients AgeIdentities []age.Identity AgeRecipients []age.Recipient - - // The logger to send logging messages to. Defaults to log.Default() - Logger *log.Logger } func NewReplica(db *DB, name string) *Replica { @@ -87,7 +84,6 @@ func NewReplica(db *DB, name string) *Replica { Retention: DefaultRetention, RetentionCheckInterval: DefaultRetentionCheckInterval, MonitorEnabled: true, - Logger: log.Default(), } return r @@ -101,6 +97,11 @@ func (r *Replica) Name() string { return r.name } +// Logger returns the DB sub-logger for this replica. +func (r *Replica) Logger() *slog.Logger { + return r.db.Logger.With("replica", r.Name()) +} + // DB returns a reference to the database the replica is attached to, if any. func (r *Replica) DB() *DB { return r.db } @@ -166,7 +167,7 @@ func (r *Replica) Sync(ctx context.Context) (err error) { } generation := dpos.Generation - Tracef("%s(%s): replica sync: db.pos=%s", r.db.Path(), r.Name(), dpos) + r.Logger().Debug("replica sync", "position", dpos.String()) // Create a new snapshot and update the current replica position if // the generation on the database has changed. @@ -188,7 +189,7 @@ func (r *Replica) Sync(ctx context.Context) (err error) { return fmt.Errorf("cannot determine replica position: %s", err) } - Tracef("%s(%s): replica sync: calc new pos: %s", r.db.Path(), r.Name(), pos) + r.Logger().Debug("replica sync: calc new pos", "position", pos.String()) r.mu.Lock() r.pos = pos r.mu.Unlock() @@ -222,6 +223,12 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { // Obtain initial position from shadow reader. // It may have moved to the next index if previous position was at the end. pos := rd.Pos() + initialPos := pos + startTime := time.Now() + var bytesWritten int + + logger := r.Logger() + logger.Info("write wal segment", "position", initialPos.String()) // Copy through pipe into client from the starting position. var g errgroup.Group @@ -263,6 +270,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { return err } walBytesCounter.Add(float64(n)) + bytesWritten += n } // Copy frames. @@ -289,6 +297,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { return err } walBytesCounter.Add(float64(n)) + bytesWritten += n } // Flush LZ4 writer, encryption writer and close pipe. @@ -314,6 +323,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { replicaWALIndexGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Index)) replicaWALOffsetGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Offset)) + logger.Info("wal segment written", "position", initialPos.String(), "elapsed", time.Since(startTime).String(), "sz", bytesWritten) return nil } @@ -535,6 +545,10 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) { return wc.Close() }) + logger := r.Logger() + logger.Info("write snapshot", "position", pos.String()) + + startTime := time.Now() // Delegate write to client & wait for writer goroutine to finish. if info, err = r.Client.WriteSnapshot(ctx, pos.Generation, pos.Index, pr); err != nil { return info, err @@ -542,8 +556,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) { return info, err } - r.Logger.Printf("%s(%s): snapshot written %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index) - + logger.Info("snapshot written", "position", pos.String(), "elapsed", time.Since(startTime).String(), "sz", info.Size) return info, nil } @@ -610,7 +623,7 @@ func (r *Replica) deleteSnapshotsBeforeIndex(ctx context.Context, generation str if err := r.Client.DeleteSnapshot(ctx, info.Generation, info.Index); err != nil { return fmt.Errorf("delete snapshot %s/%08x: %w", info.Generation, info.Index, err) } - r.Logger.Printf("%s(%s): snapshot deleted %s/%08x", r.db.Path(), r.Name(), generation, index) + r.Logger().Info("snapshot deleted", "generation", generation, "index", index) } return itr.Close() @@ -642,8 +655,8 @@ func (r *Replica) deleteWALSegmentsBeforeIndex(ctx context.Context, generation s if err := r.Client.DeleteWALSegments(ctx, a); err != nil { return fmt.Errorf("delete wal segments: %w", err) } - r.Logger.Printf("%s(%s): wal segmented deleted before %s/%08x: n=%d", r.db.Path(), r.Name(), generation, index, len(a)) + r.Logger().Info("wal segmented deleted before", "generation", generation, "index", index, "n", len(a)) return nil } @@ -679,7 +692,7 @@ func (r *Replica) monitor(ctx context.Context) { // Synchronize the shadow wal into the replication directory. if err := r.Sync(ctx); err != nil { - r.Logger.Printf("%s(%s): monitor error: %s", r.db.Path(), r.Name(), err) + r.Logger().Error("monitor error", "error", err) continue } } @@ -707,7 +720,7 @@ func (r *Replica) retainer(ctx context.Context) { return case <-ticker.C: if err := r.EnforceRetention(ctx); err != nil { - r.Logger.Printf("%s(%s): retainer error: %s", r.db.Path(), r.Name(), err) + r.Logger().Error("retainer error", "error", err) continue } } @@ -729,7 +742,7 @@ func (r *Replica) snapshotter(ctx context.Context) { return case <-ticker.C: if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration { - r.Logger.Printf("%s(%s): snapshotter error: %s", r.db.Path(), r.Name(), err) + r.Logger().Error("snapshotter error", "error", err) continue } } @@ -757,7 +770,7 @@ func (r *Replica) validator(ctx context.Context) { return case <-ticker.C: if err := r.Validate(ctx); err != nil { - r.Logger.Printf("%s(%s): validation error: %s", r.db.Path(), r.Name(), err) + r.Logger().Error("validation error", "error", err) continue } } @@ -794,7 +807,6 @@ func (r *Replica) Validate(ctx context.Context) error { ReplicaName: r.Name(), Generation: pos.Generation, Index: pos.Index - 1, - Logger: r.Logger, }); err != nil { return fmt.Errorf("cannot restore: %w", err) } @@ -819,7 +831,7 @@ func (r *Replica) Validate(ctx context.Context) error { if mismatch { status = "mismatch" } - r.Logger.Printf("%s(%s): validator: status=%s db=%016x replica=%016x pos=%s", db.Path(), r.Name(), status, chksum0, chksum1, pos) + r.Logger().Info("validator", "status", status, "db", fmt.Sprintf("%016x", chksum0), "replica", fmt.Sprintf("%016x", chksum1), "position", pos.String()) // Validate checksums match. if mismatch { @@ -837,8 +849,6 @@ func (r *Replica) Validate(ctx context.Context) error { // waitForReplica blocks until replica reaches at least the given position. func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error { - db := r.DB() - ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() @@ -861,7 +871,7 @@ func (r *Replica) waitForReplica(ctx context.Context, pos Pos) error { // Obtain current position of replica, check if past target position. curr := r.Pos() if curr.IsZero() { - r.Logger.Printf("%s(%s): validator: no replica position available", db.Path(), r.Name()) + r.Logger().Info("validator: no replica position available") continue } @@ -1013,17 +1023,6 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) { return fmt.Errorf("cannot specify index & timestamp to restore") } - // Ensure logger exists. - logger := opt.Logger - if logger == nil { - logger = r.Logger - } - - logPrefix := r.Name() - if db := r.DB(); db != nil { - logPrefix = fmt.Sprintf("%s(%s)", db.Path(), r.Name()) - } - // Ensure output path does not already exist. if _, err := os.Stat(opt.OutputPath); err == nil { return fmt.Errorf("cannot restore, output path already exists: %s", opt.OutputPath) @@ -1070,19 +1069,19 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) { tmpPath := opt.OutputPath + ".tmp" // Copy snapshot to output path. - logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath) + r.Logger().Info("restoring snapshot", "generation", opt.Generation, "index", minWALIndex, "path", tmpPath) if err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath); err != nil { return fmt.Errorf("cannot restore snapshot: %w", err) } // If no WAL files available, move snapshot to final path & exit early. if snapshotOnly { - logger.Printf("%s: snapshot only, finalizing database", logPrefix) + r.Logger().Info("snapshot only, finalizing database") return os.Rename(tmpPath, opt.OutputPath) } // Begin processing WAL files. - logger.Printf("%s: restoring wal files: generation=%s index=[%08x,%08x]", logPrefix, opt.Generation, minWALIndex, maxWALIndex) + r.Logger().Info("restoring wal files", "generation", opt.Generation, "index_min", minWALIndex, "index_max", maxWALIndex) // Fill input channel with all WAL indexes to be loaded in order. // Verify every index has at least one offset. @@ -1138,9 +1137,9 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) { return err } - logger.Printf("%s: downloaded wal %s/%08x elapsed=%s", - logPrefix, opt.Generation, index, - time.Since(startTime).String(), + r.Logger().Info("downloaded wal", + "generation", opt.Generation, "index", index, + "elapsed", time.Since(startTime).String(), ) } } @@ -1167,10 +1166,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) { if err = applyWAL(ctx, index, tmpPath); err != nil { return fmt.Errorf("cannot apply wal: %w", err) } - logger.Printf("%s: applied wal %s/%08x elapsed=%s", - logPrefix, opt.Generation, index, - time.Since(startTime).String(), - ) + r.Logger().Info("applied wal", "generation", opt.Generation, "index", index, "elapsed", time.Since(startTime).String()) } // Ensure all goroutines finish. All errors should have been handled during @@ -1180,7 +1176,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) { } // Copy file to final location. - logger.Printf("%s: renaming database from temporary location", logPrefix) + r.Logger().Info("renaming database from temporary location") if err := os.Rename(tmpPath, opt.OutputPath); err != nil { return err }