Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leasing (S3) #617

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 129 additions & 3 deletions cmd/litestream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,20 @@ var (
Version = "(development build)"
)

// errStop is a terminal error for indicating program should quit.
var errStop = errors.New("stop")
var (
// errStop is a terminal error for indicating program should quit.
errStop = errors.New("stop")

// errLeaseExpired is a terminal error indicatingthe program has exited due to lease expiration.
errLeaseExpired = errors.New("lease expired")
)

func main() {
m := NewMain()
if err := m.Run(context.Background(), os.Args[1:]); err == flag.ErrHelp || err == errStop {
os.Exit(1)
} else if err == errLeaseExpired {
os.Exit(2)
} else if err != nil {
slog.Error("failed to run", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -89,8 +96,11 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
return err
}

// Wait for signal to stop program.
// Wait for lease expiration or for a signal to stop program.
select {
case <-c.leaseExpireCh:
return errLeaseExpired

case err = <-c.execCh:
slog.Info("subprocess exited, litestream shutting down")
case sig := <-signalCh:
Expand Down Expand Up @@ -162,6 +172,9 @@ type Config struct {
// List of databases to manage.
DBs []*DBConfig `yaml:"dbs"`

// Optional. Distributed lease configuration.
Lease *LeaseConfig `yaml:"lease"`

// Subcommand to execute during replication.
// Litestream will shutdown when subcommand exits.
Exec string `yaml:"exec"`
Expand Down Expand Up @@ -281,6 +294,119 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
return config, nil
}

// LeaseConfig represents the configuration for a distributed lease.
type LeaseConfig struct {
Type string `yaml:"type"` // "s3"
Path string `yaml:"path"`
URL string `yaml:"url"`
Timeout *time.Duration `yaml:"timeout"`
Owner string `yaml:"owner"`

// S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
ForcePathStyle *bool `yaml:"force-path-style"`
SkipVerify bool `yaml:"skip-verify"`
}

// NewLeaserFromConfig instantiates a lease client.
func NewLeaserFromConfig(c *LeaseConfig) (_ litestream.Leaser, err error) {
// Ensure user did not specify URL in path.
if isURL(c.Path) {
return nil, fmt.Errorf("leaser path cannot be a url, please use the 'url' field instead: %s", c.Path)
}

switch c.Type {
case "s3":
return newS3LeaserFromConfig(c)
default:
return nil, fmt.Errorf("unknown leaser type in config: %q", c.Type)
}
}

// newS3LeaserFromConfig returns a new instance of s3.Leaser built from config.
func newS3LeaserFromConfig(c *LeaseConfig) (_ *s3.Leaser, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for s3 leaser")
} else if c.URL != "" && c.Bucket != "" {
return nil, fmt.Errorf("cannot specify url & bucket for s3 leaser")
}

bucket, path := c.Bucket, c.Path
region, endpoint, skipVerify := c.Region, c.Endpoint, c.SkipVerify

// Use path style if an endpoint is explicitly set. This works because the
// only service to not use path style is AWS which does not use an endpoint.
forcePathStyle := (endpoint != "")
if v := c.ForcePathStyle; v != nil {
forcePathStyle = *v
}

// Apply settings from URL, if specified.
if c.URL != "" {
_, host, upath, err := ParseReplicaURL(c.URL)
if err != nil {
return nil, err
}
ubucket, uregion, uendpoint, uforcePathStyle := s3.ParseHost(host)

// Only apply URL parts to field that have not been overridden.
if path == "" {
path = upath
}
if bucket == "" {
bucket = ubucket
}
if region == "" {
region = uregion
}
if endpoint == "" {
endpoint = uendpoint
}
if !forcePathStyle {
forcePathStyle = uforcePathStyle
}
}

// Ensure required settings are set.
if bucket == "" {
return nil, fmt.Errorf("bucket required for s3 leaser")
}

// Build leaser.
leaser := s3.NewLeaser()
leaser.AccessKeyID = c.AccessKeyID
leaser.AccessKeyID = c.AccessKeyID
leaser.SecretAccessKey = c.SecretAccessKey
leaser.Bucket = bucket
leaser.Path = path
leaser.Region = region
leaser.Endpoint = endpoint
leaser.ForcePathStyle = forcePathStyle
leaser.SkipVerify = skipVerify

owner := c.Owner
if owner == "" {
owner, _ = os.Hostname()
}
leaser.Owner = owner

if v := c.Timeout; v != nil {
leaser.LeaseTimeout = *v
}

// Initialize leaser to build client.
if err := leaser.Open(); err != nil {
return nil, err
}

return leaser, nil
}

// DBConfig represents the configuration for a single database.
type DBConfig struct {
Path string `yaml:"path"`
Expand Down
151 changes: 147 additions & 4 deletions cmd/litestream/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log/slog"
Expand All @@ -10,6 +11,9 @@ import (
_ "net/http/pprof"
"os"
"os/exec"
"sync"
"sync/atomic"
"time"

"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/abs"
Expand All @@ -23,19 +27,39 @@ import (

// ReplicateCommand represents a command that continuously replicates SQLite databases.
type ReplicateCommand struct {
cmd *exec.Cmd // subcommand
execCh chan error // subcommand error channel
cmd *exec.Cmd // subcommand
wg sync.WaitGroup
execCh chan error // subcommand error channel
leaseExpireCh chan struct{} // lease expiration error channel
leaserCtx context.Context
leaserCancel context.CancelCauseFunc

// Holds the current lease, if any.
lease atomic.Value // *litestream.Lease

Config Config

// Lease client for managing distributed lease.
// May be nil if no lease config specified.
Leaser litestream.Leaser

// List of managed databases specified in the config.
DBs []*litestream.DB
}

func NewReplicateCommand() *ReplicateCommand {
return &ReplicateCommand{
execCh: make(chan error),
c := &ReplicateCommand{
execCh: make(chan error),
leaseExpireCh: make(chan struct{}),
}
c.leaserCtx, c.leaserCancel = context.WithCancelCause(context.Background())

c.lease.Store((*litestream.Lease)(nil))
return c
}

func (c *ReplicateCommand) Lease() *litestream.Lease {
return c.lease.Load().(*litestream.Lease)
}

// ParseFlags parses the CLI flags and loads the configuration file.
Expand Down Expand Up @@ -87,6 +111,18 @@ func (c *ReplicateCommand) Run() (err error) {
// Display version information.
slog.Info("litestream", "version", Version)

// Acquire lease if config specified.
if c.Config.Lease != nil {
c.Leaser, err = NewLeaserFromConfig(c.Config.Lease)
if err != nil {
return fmt.Errorf("initialize leaser: %w", err)
}

if err := c.acquireLease(context.Background()); err != nil {
return fmt.Errorf("acquire initial lease: %w", err)
}
}

// Setup databases.
if len(c.Config.DBs) == 0 {
slog.Error("no databases specified in configuration")
Expand Down Expand Up @@ -175,9 +211,116 @@ func (c *ReplicateCommand) Close() (err error) {
}
}
}

// Stop lease monitoring.
c.leaserCancel(errors.New("litestream shutting down"))
c.wg.Wait()

// Release the most recent lease.
if lease := c.Lease(); lease != nil {
slog.Info("releasing lease", slog.Int64("epoch", lease.Epoch))

if e := c.Leaser.ReleaseLease(context.Background(), lease.Epoch); e != nil {
slog.Error("failed to release lease",
slog.Int64("epoch", lease.Epoch),
slog.Any("error", e))
}
}

return err
}

// acquireLease initializes a lease client based on the config, acquires the initial
// lease, and then continuously monitors & renews the lease in the background.
func (c *ReplicateCommand) acquireLease(ctx context.Context) (err error) {
timer := time.NewTimer(1)
defer timer.Stop()

// Continually try to acquire lease if there is an existing lease.
OUTER:
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
case <-timer.C:
var leaseExistsError *litestream.LeaseExistsError
lease, err := c.Leaser.AcquireLease(ctx)
if errors.As(err, &leaseExistsError) {
timer.Reset(litestream.LeaseRetryInterval)
slog.Info("lease already exists, waiting to retry",
slog.Int64("epoch", leaseExistsError.Lease.Epoch),
slog.String("owner", leaseExistsError.Lease.Owner),
slog.Time("expires", leaseExistsError.Lease.Deadline()))
continue
} else if err != nil {
return fmt.Errorf("acquire lease: %w", err)
}
c.lease.Store(lease)
break OUTER
}
}

lease := c.Lease()
slog.Info("lease acquired",
slog.Int64("epoch", lease.Epoch),
slog.Duration("timeout", lease.Timeout),
slog.String("owner", lease.Owner))

// Continuously monitor and renew lease in a separate goroutine.
c.wg.Add(1)
go func() { defer c.wg.Done(); c.monitorLease(c.leaserCtx) }()

return nil
}

func (c *ReplicateCommand) monitorLease(ctx context.Context) {
timer := time.NewTimer(c.Lease().Timeout / 2)
defer timer.Stop()

for {
select {
case <-ctx.Done():
slog.Error("stopping lease monitor")
return

case <-timer.C:
var leaseExistsError *litestream.LeaseExistsError

lease := c.Lease()
slog.Debug("attempting to renew lease", slog.Int64("epoch", lease.Epoch))

// Attempt to renew our currently held lease.
newLease, err := c.Leaser.RenewLease(ctx, lease)
if errors.As(err, &leaseExistsError) {
slog.Error("cannot renew lease, another lease exists, exiting",
slog.Int64("epoch", leaseExistsError.Lease.Epoch),
slog.String("owner", leaseExistsError.Lease.Owner))
c.leaseExpireCh <- struct{}{}
return
}

// If our lease has expired then give up and exit.
if lease.Expired() {
slog.Error("lease expired, exiting")
c.leaseExpireCh <- struct{}{}
return
}

// If we hit a temporary error then aggressively retry.
if err != nil {
slog.Warn("temporarily unable to renew lease, retrying", slog.Any("error", err))
timer.Reset(1 * time.Second)
continue
}

// Replace lease and try to renew after halfway through the timeout.
slog.Debug("lease renewed", slog.Int64("epoch", newLease.Epoch))
c.lease.Store(newLease)
timer.Reset(lease.Timeout / 2)
}
}
}

// Usage prints the help screen to STDOUT.
func (c *ReplicateCommand) Usage() {
fmt.Printf(`
Expand Down
Loading
Loading