Skip to content

Commit

Permalink
internal/ctlog: ensure that immutable uploads are really so
Browse files Browse the repository at this point in the history
We might want to take advantage of backends overwrite protection in the
future.
  • Loading branch information
FiloSottile committed Aug 21, 2024
1 parent 9aee27e commit 4df346d
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 87 deletions.
4 changes: 2 additions & 2 deletions internal/ctlog/ctlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ type UploadOptions struct {

var optsHashTile = &UploadOptions{Immutable: true}
var optsDataTile = &UploadOptions{Compress: true, Immutable: true}
var optsStaging = &UploadOptions{Compress: true, Immutable: true}
var optsStaging = &UploadOptions{Compress: true}
var optsIssuer = &UploadOptions{ContentType: "application/pkix-cert", Immutable: true}
var optsCheckpoint = &UploadOptions{ContentType: "text/plain; charset=utf-8"}

Expand Down Expand Up @@ -826,7 +826,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
// exercise the same code path as LoadLog.
if err := applyStagedUploads(ctx, l.c, stagedUploads); err != nil {
// This is also fatal, since we can't continue leaving behind missing
// tiles. The next run of sequence will not upload them again, while
// tiles. The next run of sequence would not upload them again, while
// LoadLog will retry uploading them from the staging bundle.
return fmtErrorf("%w: couldn't upload a tile: %w", errFatal, err)
}
Expand Down
154 changes: 75 additions & 79 deletions internal/ctlog/ctlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ import (
"crypto/sha256"
"crypto/x509"
"encoding/base64"
"errors"
"flag"
mathrand "math/rand"
"reflect"
"slices"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
Expand All @@ -29,11 +27,9 @@ import (

var globalTime = time.Now().UnixMilli()

func init() {
ctlog.SetTimeNowUnixMilli(func() int64 {
return atomic.AddInt64(&globalTime, 1)
})
}
func monotonicTime() int64 { return atomic.AddInt64(&globalTime, 1) }

func init() { ctlog.SetTimeNowUnixMilli(monotonicTime) }

var longFlag = flag.Bool("long", false, "run especially slow tests")

Expand Down Expand Up @@ -428,24 +424,78 @@ func TestReloadWrongKey(t *testing.T) {
}
}

func TestFatalError(t *testing.T) {
func TestStagingCollision(t *testing.T) {
tl := NewEmptyTestLog(t)
addCertificate(t, tl)
addCertificate(t, tl)
fatalIfErr(t, tl.Log.Sequence())

lockErr := errors.New("lock replace error")
fail := func(old ctlog.LockedCheckpoint, new []byte) (bool, error) {
return false, lockErr
time := monotonicTime()
ctlog.SetTimeNowUnixMilli(func() int64 { return time })
t.Cleanup(func() { ctlog.SetTimeNowUnixMilli(monotonicTime) })

addCertificateExpectFailureWithSeed(t, tl, 'A')
addCertificateExpectFailureWithSeed(t, tl, 'B')

tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = failLockAndNotPersist
sequenceExpectFailure(t, tl)
tl.CheckLog(1)

tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = nil
tl = ReloadLog(t, tl)
tl.CheckLog(1)

// First, cause the exact same staging bundle to be uploaded.

addCertificateWithSeed(t, tl, 'A')
addCertificateWithSeed(t, tl, 'B')
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog(3)

// Again, but now due to a staging bundle upload error.

time++

addCertificateExpectFailureWithSeed(t, tl, 'C')
addCertificateExpectFailureWithSeed(t, tl, 'D')

tl.Config.Backend.(*MemoryBackend).UploadCallback = failStagingButPersist
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog(3)

tl.Config.Backend.(*MemoryBackend).UploadCallback = nil

addCertificateWithSeed(t, tl, 'C')
addCertificateWithSeed(t, tl, 'D')
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog(5)

// We wanted to also test reaching the same point through two different
// sequencing paths, as suggested in
// https://github.com/FiloSottile/sunlight/pull/18#discussion_r1704174301
// but that doesn't seem possible since time is required to move forward at
// each sequencing.
}

func sequenceExpectFailure(t *testing.T, tl *TestLog) {
t.Helper()
if err := tl.Log.Sequence(); err == nil {
t.Error("expected error, got nil")
}
}

tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = fail
func TestFatalError(t *testing.T) {
tl := NewEmptyTestLog(t)
addCertificate(t, tl)
addCertificate(t, tl)
fatalIfErr(t, tl.Log.Sequence())

tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = failLockAndNotPersist
addCertificateExpectFailure(t, tl)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := tl.Log.RunSequencer(ctx, 1*time.Millisecond); !errors.Is(err, lockErr) {
t.Errorf("expected fatal error, got %v", err)
if err := tl.Log.RunSequencer(ctx, 1*time.Millisecond); err == nil {
t.Errorf("expected fatal error, got nil")
}
tl.CheckLog(2)

Expand Down Expand Up @@ -474,10 +524,7 @@ func TestSequenceErrors(t *testing.T) {
// retried, and the same tiles are generated and uploaded again.
name: "LockUpload",
breakSeq: func(tl *TestLog) {
fail := func(old ctlog.LockedCheckpoint, new []byte) (bool, error) {
return false, errors.New("lock replace error")
}
tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = fail
tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = failLockAndNotPersist
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = nil
Expand All @@ -490,10 +537,7 @@ func TestSequenceErrors(t *testing.T) {
// persisted anyway, such as a response timeout.
name: "LockUploadPersisted",
breakSeq: func(tl *TestLog) {
fail := func(old ctlog.LockedCheckpoint, new []byte) (bool, error) {
return true, errors.New("lock response timeout")
}
tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = fail
tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = failLockButPersist
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = nil
Expand All @@ -504,13 +548,7 @@ func TestSequenceErrors(t *testing.T) {
{
name: "CheckpointUpload",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if key == "checkpoint" {
return false, errors.New("checkpoint upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
tl.Config.Backend.(*MemoryBackend).UploadCallback = failCheckpointAndNotPersist
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
Expand All @@ -521,13 +559,7 @@ func TestSequenceErrors(t *testing.T) {
{
name: "CheckpointUploadPersisted",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if key == "checkpoint" {
return true, errors.New("checkpoint upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
tl.Config.Backend.(*MemoryBackend).UploadCallback = failCheckpointButPersist
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
Expand All @@ -538,13 +570,7 @@ func TestSequenceErrors(t *testing.T) {
{
name: "StagingUpload",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if strings.HasPrefix(key, "staging/") {
return false, errors.New("staging upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
tl.Config.Backend.(*MemoryBackend).UploadCallback = failStagingAndNotPersist
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
Expand All @@ -555,13 +581,7 @@ func TestSequenceErrors(t *testing.T) {
{
name: "StagingUploadPersisted",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if strings.HasPrefix(key, "staging/") {
return true, errors.New("staging upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
tl.Config.Backend.(*MemoryBackend).UploadCallback = failStagingButPersist
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
Expand All @@ -572,13 +592,7 @@ func TestSequenceErrors(t *testing.T) {
{
name: "DataTileUpload",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if strings.HasPrefix(key, "tile/data/") {
return false, errors.New("tile upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
tl.Config.Backend.(*MemoryBackend).UploadCallback = failDataTileAndNotPersist
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
Expand All @@ -589,13 +603,7 @@ func TestSequenceErrors(t *testing.T) {
{
name: "DataTileUploadPersisted",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if strings.HasPrefix(key, "tile/data/") {
return true, errors.New("tile upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
tl.Config.Backend.(*MemoryBackend).UploadCallback = failDataTileButPersist
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
Expand All @@ -606,13 +614,7 @@ func TestSequenceErrors(t *testing.T) {
{
name: "Level0TileUpload",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if strings.HasPrefix(key, "tile/0/") {
return false, errors.New("tile upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
tl.Config.Backend.(*MemoryBackend).UploadCallback = failTile0AndNotPersist
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
Expand All @@ -623,13 +625,7 @@ func TestSequenceErrors(t *testing.T) {
{
name: "Level0TileUploadPersisted",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if strings.HasPrefix(key, "tile/0/") {
return true, errors.New("tile upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
tl.Config.Backend.(*MemoryBackend).UploadCallback = failTile0ButPersist
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
Expand Down
Loading

0 comments on commit 4df346d

Please sign in to comment.