Skip to content

Commit

Permalink
Implement S3 leasing
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Nov 8, 2024
1 parent 2f22a4b commit d1b40b0
Show file tree
Hide file tree
Showing 9 changed files with 1,004 additions and 123 deletions.
132 changes: 129 additions & 3 deletions cmd/litestream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,20 @@ var (
Version = "(development build)"
)

// errStop is a terminal error for indicating program should quit.
var errStop = errors.New("stop")
var (
// errStop is a terminal error for indicating program should quit.
errStop = errors.New("stop")

// errLeaseExpired is a terminal error indicatingthe program has exited due to lease expiration.
errLeaseExpired = errors.New("lease expired")
)

func main() {
m := NewMain()
if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errStop {
os.Exit(1)
} else if err == errLeaseExpired {
os.Exit(2)
} else if err != nil {
slog.Error("failed to run", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -89,8 +96,11 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
return err
}

// Wait for signal to stop program.
// Wait for lease expiration or for a signal to stop program.
select {
case <-c.leaseExpireCh:
return errLeaseExpired

case err = <-c.execCh:
slog.Info("subprocess exited, litestream shutting down")
case sig := <-signalCh:
Expand Down Expand Up @@ -162,6 +172,9 @@ type Config struct {
// List of databases to manage.
DBs []*DBConfig `yaml:"dbs"`

// Optional. Distributed lease configuration.
Lease *LeaseConfig `yaml:"lease"`

// Subcommand to execute during replication.
// Litestream will shutdown when subcommand exits.
Exec string `yaml:"exec"`
Expand Down Expand Up @@ -281,6 +294,119 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
return config, nil
}

// LeaseConfig represents the configuration for a distributed lease.
type LeaseConfig struct {
Type string `yaml:"type"` // "s3"
Path string `yaml:"path"`
URL string `yaml:"url"`
Timeout *time.Duration `yaml:"timeout"`
Owner string `yaml:"owner"`

// S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
ForcePathStyle *bool `yaml:"force-path-style"`
SkipVerify bool `yaml:"skip-verify"`
}

// NewLeaserFromConfig instantiates a lease client.
func NewLeaserFromConfig(c *LeaseConfig) (_ litestream.Leaser, err error) {
// Ensure user did not specify URL in path.
if isURL(c.Path) {
return nil, fmt.Errorf("leaser path cannot be a url, please use the 'url' field instead: %s", c.Path)
}

switch c.Type {
case "s3":
return newS3LeaserFromConfig(c)
default:
return nil, fmt.Errorf("unknown leaser type in config: %q", c.Type)
}
}

// newS3LeaserFromConfig returns a new instance of s3.Leaser built from config.
func newS3LeaserFromConfig(c *LeaseConfig) (_ *s3.Leaser, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for s3 leaser")
} else if c.URL != "" && c.Bucket != "" {
return nil, fmt.Errorf("cannot specify url & bucket for s3 leaser")
}

bucket, path := c.Bucket, c.Path
region, endpoint, skipVerify := c.Region, c.Endpoint, c.SkipVerify

// Use path style if an endpoint is explicitly set. This works because the
// only service to not use path style is AWS which does not use an endpoint.
forcePathStyle := (endpoint != "")
if v := c.ForcePathStyle; v != nil {
forcePathStyle = *v
}

// Apply settings from URL, if specified.
if c.URL != "" {
_, host, upath, err := ParseReplicaURL(c.URL)
if err != nil {
return nil, err
}
ubucket, uregion, uendpoint, uforcePathStyle := s3.ParseHost(host)

// Only apply URL parts to field that have not been overridden.
if path == "" {
path = upath
}
if bucket == "" {
bucket = ubucket
}
if region == "" {
region = uregion
}
if endpoint == "" {
endpoint = uendpoint
}
if !forcePathStyle {
forcePathStyle = uforcePathStyle
}
}

// Ensure required settings are set.
if bucket == "" {
return nil, fmt.Errorf("bucket required for s3 leaser")
}

// Build leaser.
leaser := s3.NewLeaser()
leaser.AccessKeyID = c.AccessKeyID
leaser.AccessKeyID = c.AccessKeyID
leaser.SecretAccessKey = c.SecretAccessKey
leaser.Bucket = bucket
leaser.Path = path
leaser.Region = region
leaser.Endpoint = endpoint
leaser.ForcePathStyle = forcePathStyle
leaser.SkipVerify = skipVerify

owner := c.Owner
if owner == "" {
owner, _ = os.Hostname()
}
leaser.Owner = owner

if v := c.Timeout; v != nil {
leaser.LeaseTimeout = *v
}

// Initialize leaser to build client.
if err := leaser.Open(); err != nil {
return nil, err
}

return leaser, nil
}

// DBConfig represents the configuration for a single database.
type DBConfig struct {
Path string `yaml:"path"`
Expand Down
151 changes: 147 additions & 4 deletions cmd/litestream/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log/slog"
Expand All @@ -10,6 +11,9 @@ import (
_ "net/http/pprof"
"os"
"os/exec"
"sync"
"sync/atomic"
"time"

"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/abs"
Expand All @@ -23,19 +27,39 @@ import (

// ReplicateCommand represents a command that continuously replicates SQLite databases.
type ReplicateCommand struct {
cmd *exec.Cmd // subcommand
execCh chan error // subcommand error channel
cmd *exec.Cmd // subcommand
wg sync.WaitGroup
execCh chan error // subcommand error channel
leaseExpireCh chan struct{} // lease expiration error channel
leaserCtx context.Context
leaserCancel context.CancelCauseFunc

// Holds the current lease, if any.
lease atomic.Value // *litestream.Lease

Config Config

// Lease client for managing distributed lease.
// May be nil if no lease config specified.
Leaser litestream.Leaser

// List of managed databases specified in the config.
DBs []*litestream.DB
}

func NewReplicateCommand() *ReplicateCommand {
return &ReplicateCommand{
execCh: make(chan error),
c := &ReplicateCommand{
execCh: make(chan error),
leaseExpireCh: make(chan struct{}),
}
c.leaserCtx, c.leaserCancel = context.WithCancelCause(context.Background())

c.lease.Store((*litestream.Lease)(nil))
return c
}

func (c *ReplicateCommand) Lease() *litestream.Lease {
return c.lease.Load().(*litestream.Lease)
}

// ParseFlags parses the CLI flags and loads the configuration file.
Expand Down Expand Up @@ -87,6 +111,18 @@ func (c *ReplicateCommand) Run() (err error) {
// Display version information.
slog.Info("litestream", "version", Version)

// Acquire lease if config specified.
if c.Config.Lease != nil {
c.Leaser, err = NewLeaserFromConfig(c.Config.Lease)
if err != nil {
return fmt.Errorf("initialize leaser: %w", err)
}

if err := c.acquireLease(context.Background()); err != nil {
return fmt.Errorf("acquire initial lease: %w", err)
}
}

// Setup databases.
if len(c.Config.DBs) == 0 {
slog.Error("no databases specified in configuration")
Expand Down Expand Up @@ -175,9 +211,116 @@ func (c *ReplicateCommand) Close() (err error) {
}
}
}

// Stop lease monitoring.
c.leaserCancel(errors.New("litestream shutting down"))
c.wg.Wait()

// Release the most recent lease.
if lease := c.Lease(); lease != nil {
slog.Info("releasing lease", slog.Int64("epoch", lease.Epoch))

if e := c.Leaser.ReleaseLease(context.Background(), lease.Epoch); e != nil {
slog.Error("failed to release lease",
slog.Int64("epoch", lease.Epoch),
slog.Any("error", e))
}
}

return err
}

// acquireLease initializes a lease client based on the config, acquires the initial
// lease, and then continuously monitors & renews the lease in the background.
func (c *ReplicateCommand) acquireLease(ctx context.Context) (err error) {
timer := time.NewTimer(1)
defer timer.Stop()

// Continually try to acquire lease if there is an existing lease.
OUTER:
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
case <-timer.C:
var leaseExistsError *litestream.LeaseExistsError
lease, err := c.Leaser.AcquireLease(ctx)
if errors.As(err, &leaseExistsError) {
timer.Reset(litestream.LeaseRetryInterval)
slog.Info("lease already exists, waiting to retry",
slog.Int64("epoch", leaseExistsError.Lease.Epoch),
slog.String("owner", leaseExistsError.Lease.Owner),
slog.Time("expires", leaseExistsError.Lease.Deadline()))
continue
} else if err != nil {
return fmt.Errorf("acquire lease: %w", err)
}
c.lease.Store(lease)
break OUTER
}
}

lease := c.Lease()
slog.Info("lease acquired",
slog.Int64("epoch", lease.Epoch),
slog.Duration("timeout", lease.Timeout),
slog.String("owner", lease.Owner))

// Continuously monitor and renew lease in a separate goroutine.
c.wg.Add(1)
go func() { defer c.wg.Done(); c.monitorLease(c.leaserCtx) }()

return nil
}

func (c *ReplicateCommand) monitorLease(ctx context.Context) {
timer := time.NewTimer(c.Lease().Timeout / 2)
defer timer.Stop()

for {
select {
case <-ctx.Done():
slog.Error("stopping lease monitor")
return

case <-timer.C:
var leaseExistsError *litestream.LeaseExistsError

lease := c.Lease()
slog.Debug("attempting to renew lease", slog.Int64("epoch", lease.Epoch))

// Attempt to renew our currently held lease.
newLease, err := c.Leaser.RenewLease(ctx, lease)
if errors.As(err, &leaseExistsError) {
slog.Error("cannot renew lease, another lease exists, exiting",
slog.Int64("epoch", leaseExistsError.Lease.Epoch),
slog.String("owner", leaseExistsError.Lease.Owner))
c.leaseExpireCh <- struct{}{}
return
}

// If our lease has expired then give up and exit.
if lease.Expired() {
slog.Error("lease expired, exiting")
c.leaseExpireCh <- struct{}{}
return
}

// If we hit a temporary error then aggressively retry.
if err != nil {
slog.Warn("temporarily unable to renew lease, retrying", slog.Any("error", err))
timer.Reset(1 * time.Second)
continue
}

// Replace lease and try to renew after halfway through the timeout.
slog.Debug("lease renewed", slog.Int64("epoch", newLease.Epoch))
c.lease.Store(newLease)
timer.Reset(lease.Timeout / 2)
}
}
}

// Usage prints the help screen to STDOUT.
func (c *ReplicateCommand) Usage() {
fmt.Printf(`
Expand Down
Loading

0 comments on commit d1b40b0

Please sign in to comment.