Skip to content

Commit

Permalink
internal/ctlog: expand sequencing failure tests
Browse files Browse the repository at this point in the history
Closes #12
  • Loading branch information
FiloSottile committed Aug 3, 2024
1 parent ce1f1f9 commit 90c5da1
Show file tree
Hide file tree
Showing 2 changed files with 329 additions and 49 deletions.
292 changes: 261 additions & 31 deletions internal/ctlog/ctlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
mathrand "math/rand"
"reflect"
"slices"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -56,11 +58,11 @@ func TestSequenceOneLeaf(t *testing.T) {
}

if !*longFlag {
tl.CheckLog()
tl.CheckLog(i + 1)
// TODO: check leaf contents at index id.
}
}
tl.CheckLog()
tl.CheckLog(n)
}

func TestSequenceLargeLog(t *testing.T) {
Expand All @@ -71,26 +73,28 @@ func TestSequenceLargeLog(t *testing.T) {
tl := NewEmptyTestLog(t)
tl.Quiet()
for i := 0; i < 5; i++ {
addCertificateFast(t, tl)
addCertificate(t, tl)
}
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog()
tl.CheckLog(5)

for i := 0; i < 500; i++ {
for i := 0; i < 3000; i++ {
addCertificateFast(t, tl)
for k := 0; k < 3000; k++ {
e := &ctlog.PendingLogEntry{}
e.Certificate = []byte(strconv.Itoa(i*3000 + k))
tl.Log.AddLeafToPool(e)
}
fatalIfErr(t, tl.Log.Sequence())
}
tl.CheckLog()
tl.CheckLog(5 + 500*3000)
}

func TestSequenceEmptyPool(t *testing.T) {
sequenceTwice := func(tl *TestLog) {
sequenceTwice := func(tl *TestLog, size int64) {
fatalIfErr(t, tl.Log.Sequence())
t1 := tl.CheckLog()
t1 := tl.CheckLog(size)
fatalIfErr(t, tl.Log.Sequence())
t2 := tl.CheckLog()
t2 := tl.CheckLog(size)
if t1 >= t2 {
t.Helper()
t.Error("time did not advance")
Expand All @@ -103,15 +107,15 @@ func TestSequenceEmptyPool(t *testing.T) {
}

tl := NewEmptyTestLog(t)
sequenceTwice(tl)
addCerts(tl, 5) // 5
sequenceTwice(tl)
addCerts(tl, tileWidth-5-1) // tileWidth - 1
sequenceTwice(tl)
addCerts(tl, 1) // tileWidth
sequenceTwice(tl)
addCerts(tl, 1) // tileWidth + 1
sequenceTwice(tl)
sequenceTwice(tl, 0)
addCerts(tl, 5)
sequenceTwice(tl, 5)
addCerts(tl, tileWidth-5-1)
sequenceTwice(tl, tileWidth-1)
addCerts(tl, 1)
sequenceTwice(tl, tileWidth)
addCerts(tl, 1)
sequenceTwice(tl, tileWidth+1)
}

func TestSequenceUploadCount(t *testing.T) {
Expand Down Expand Up @@ -166,7 +170,7 @@ func TestSequenceUploadPaths(t *testing.T) {
addCertificateWithSeed(t, tl, 1000+i)
}
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog()
tl.CheckLog(2*tileWidth + 15)

m := tl.Config.Backend.(*MemoryBackend).m
keys := make([]string, 0, len(m))
Expand Down Expand Up @@ -221,7 +225,7 @@ func testDuplicates(t *testing.T, addWithSeed func(*testing.T, *TestLog, int64)
wait12 := addWithSeed(t, tl, 1)
fatalIfErr(t, tl.Log.Sequence())
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog()
tl.CheckLog(6)

e01, err := wait01(context.Background())
fatalIfErr(t, err)
Expand Down Expand Up @@ -300,11 +304,11 @@ func testReloadLog(t *testing.T, add func(*testing.T, *TestLog) func(context.Con
add(t, tl)

fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog()
tl.CheckLog(i + 1)

tl = ReloadLog(t, tl)
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog()
tl.CheckLog(i + 1)
}
}

Expand Down Expand Up @@ -416,27 +420,253 @@ func TestFatalError(t *testing.T) {
fatalIfErr(t, tl.Log.Sequence())

lockErr := errors.New("lock replace error")
tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = func(
old ctlog.LockedCheckpoint, new []byte) error {
return lockErr
fail := func(old ctlog.LockedCheckpoint, new []byte) (bool, error) {
return false, lockErr
}

tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = fail
addCertificateExpectFailure(t, tl)
addCertificateExpectFailure(t, tl)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := tl.Log.RunSequencer(ctx, 50*time.Millisecond); !errors.Is(err, lockErr) {
if err := tl.Log.RunSequencer(ctx, 1*time.Millisecond); !errors.Is(err, lockErr) {
t.Errorf("expected fatal error, got %v", err)
}
tl.CheckLog()
tl.CheckLog(2)

tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = nil

tl = ReloadLog(t, tl)
addCertificate(t, tl)
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog()
// TODO: expect log size 3.
tl.CheckLog(3)
}

func TestNonFatalError(t *testing.T) {
// TODO
}

func TestSequenceErrors(t *testing.T) {
tests := []struct {
name string
breakSeq func(*TestLog)
unbreakSeq func(*TestLog)
expectProgress bool
expectFatal bool
}{
{
// A fatal error while uploading to the lock backend. The upload is
// retried, and the same tiles are generated and uploaded again.
name: "LockUpload",
breakSeq: func(tl *TestLog) {
fail := func(old ctlog.LockedCheckpoint, new []byte) (bool, error) {
return false, errors.New("lock replace error")
}
tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = fail
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = nil
},
expectProgress: false,
expectFatal: true,
},
{
// An error while uploading to the lock backend, where the lock is
// persisted anyway, such as a response timeout.
name: "LockUploadPersisted",
breakSeq: func(tl *TestLog) {
fail := func(old ctlog.LockedCheckpoint, new []byte) (bool, error) {
return true, errors.New("lock response timeout")
}
tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = fail
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Lock.(*MemoryLockBackend).ReplaceCallback = nil
},
expectProgress: true,
expectFatal: true,
},
{
name: "CheckpointUpload",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if key == "checkpoint" {
return false, errors.New("checkpoint upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
},
expectProgress: true,
expectFatal: false,
},
{
name: "CheckpointUploadPersisted",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if key == "checkpoint" {
return true, errors.New("checkpoint upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
},
expectProgress: true,
expectFatal: false,
},
{
name: "DataTileUpload",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if strings.HasPrefix(key, "tile/data/") {
return false, errors.New("tile upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
},
expectProgress: false,
expectFatal: false,
},
{
name: "DataTileUploadPersisted",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if strings.HasPrefix(key, "tile/data/") {
return true, errors.New("tile upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
},
expectProgress: false,
expectFatal: false,
},
{
name: "Level0TileUpload",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if strings.HasPrefix(key, "tile/0/") {
return false, errors.New("tile upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
},
expectProgress: false,
expectFatal: false,
},
{
name: "Level0TileUploadPersisted",
breakSeq: func(tl *TestLog) {
fail := func(key string, data []byte) (apply bool, err error) {
if strings.HasPrefix(key, "tile/0/") {
return true, errors.New("tile upload error")
}
return true, nil
}
tl.Config.Backend.(*MemoryBackend).UploadCallback = fail
},
unbreakSeq: func(tl *TestLog) {
tl.Config.Backend.(*MemoryBackend).UploadCallback = nil
},
expectProgress: false,
expectFatal: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var tl *TestLog
var expectedSize int64
var broken bool
breakSeq := func() {
tt.breakSeq(tl)
broken = true
}
unbreakSeq := func() {
tt.unbreakSeq(tl)
broken = false
}
sequence := func(added int64) {
err := tl.Log.Sequence()
if broken && tt.expectFatal {
if err == nil {
t.Error("expected error, got nil")
}
} else {
fatalIfErr(t, err)
}
if !broken || tt.expectProgress {
expectedSize += added
}
tl.CheckLog(expectedSize)
if err != nil {
tl = ReloadLog(t, tl)
}
}

tl = NewEmptyTestLog(t)
for range tileWidth - 2 {
addCertificate(t, tl)
}
sequence(tileWidth - 2)

breakSeq()
addCertificateExpectFailure(t, tl)
addCertificateExpectFailure(t, tl)
addCertificateExpectFailure(t, tl)
sequence(3)

// Re-failing the same tile sizes.
addCertificateExpectFailure(t, tl)
addCertificateExpectFailure(t, tl)
addCertificateExpectFailure(t, tl)
sequence(3)

unbreakSeq()

// Succeeding with the same size that failed.
addCertificate(t, tl)
addCertificate(t, tl)
addCertificate(t, tl)
sequence(3)

tl = NewEmptyTestLog(t)
expectedSize = 0
for range tileWidth - 2 {
addCertificate(t, tl)
}
sequence(tileWidth - 2)

breakSeq()
addCertificateExpectFailure(t, tl)
addCertificateExpectFailure(t, tl)
addCertificateExpectFailure(t, tl)
sequence(3)

unbreakSeq()

// Succeeding with a different set of tiles.
addCertificate(t, tl)
sequence(1)
})
}
}

func BenchmarkSequencer(b *testing.B) {
Expand Down
Loading

0 comments on commit 90c5da1

Please sign in to comment.