diff --git a/internal/ctlog/ctlog.go b/internal/ctlog/ctlog.go index 268d487..12e3f97 100644 --- a/internal/ctlog/ctlog.go +++ b/internal/ctlog/ctlog.go @@ -1,6 +1,7 @@ package ctlog import ( + "archive/tar" "bytes" "context" "crypto" @@ -11,8 +12,11 @@ import ( "crypto/x509" "encoding/base64" "encoding/binary" + "encoding/hex" + "encoding/json" "errors" "fmt" + "io" "log/slog" "maps" mathrand "math/rand/v2" @@ -200,11 +204,13 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) { case c1.N < c.N: // It's possible that we crashed between committing a new checkpoint to // the lock backend and uploading it to the object storage backend. - // Or maybe the object storage backend GETs are cached. - // That's ok, as long as the rest of the tree load correctly against the - // lock checkpoint. + // Apply the staged tiles before continuing. config.Log.WarnContext(ctx, "checkpoint in object storage is older than lock checkpoint", "old_size", c1.N, "size", c.N) + stagingPath := fmt.Sprintf("staging/%d-%s", c.N, hex.EncodeToString(c.Hash[:])) + if err := applyStagedUploads(ctx, config, stagingPath); err != nil { + return nil, fmt.Errorf("couldn't apply staged uploads: %w", err) + } } cacheRead, cacheWrite, err := initCache(config.Cache) @@ -376,6 +382,7 @@ type UploadOptions struct { var optsHashTile = &UploadOptions{Immutable: true} var optsDataTile = &UploadOptions{Compress: true, Immutable: true} +var optsStaging = &UploadOptions{Compress: true, Immutable: true} var optsIssuer = &UploadOptions{ContentType: "application/pkix-cert", Immutable: true} var optsCheckpoint = &UploadOptions{ContentType: "text/plain; charset=utf-8"} @@ -676,18 +683,16 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { close(p.done) }() - var tileCount int start := time.Now() ctx, cancel := context.WithTimeout(ctx, sequenceTimeout) defer cancel() - g, gctx := errgroup.WithContext(ctx) - defer g.Wait() timestamp := timeNowUnixMilli() if timestamp <= l.tree.Time { return fmt.Errorf("%w: time did not progress! %d -> %d", errFatal, l.tree.Time, timestamp) } + var tileUploads []*uploadAction edgeTiles := maps.Clone(l.edgeTiles) var dataTile []byte // Load the current partial data tile, if any. @@ -719,37 +724,35 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { n++ - // If the data tile is full, upload it. + // If the data tile is full, stage it. if n%sunlight.TileWidth == 0 { tile := tlog.TileForIndex(sunlight.TileHeight, tlog.StoredHashIndex(0, n-1)) tile.L = -1 edgeTiles[-1] = tileWithBytes{tile, dataTile} - l.c.Log.DebugContext(ctx, "uploading full data tile", + l.c.Log.DebugContext(ctx, "staging full data tile", "tree_size", n, "tile", tile, "size", len(dataTile)) l.m.SeqDataTileSize.Observe(float64(len(dataTile))) - tileCount++ - data := dataTile // data is captured by the g.Go function. - g.Go(func() error { return l.c.Backend.Upload(gctx, sunlight.TilePath(tile), data, optsDataTile) }) + tileUploads = append(tileUploads, &uploadAction{ + sunlight.TilePath(tile), dataTile, optsDataTile}) dataTile = nil } } - // Upload leftover partial data tile, if any. + // Stage leftover partial data tile, if any. if n != l.tree.N && n%sunlight.TileWidth != 0 { tile := tlog.TileForIndex(sunlight.TileHeight, tlog.StoredHashIndex(0, n-1)) tile.L = -1 edgeTiles[-1] = tileWithBytes{tile, dataTile} - l.c.Log.DebugContext(ctx, "uploading partial data tile", + l.c.Log.DebugContext(ctx, "staging partial data tile", "tree_size", n, "tile", tile, "size", len(dataTile)) l.m.SeqDataTileSize.Observe(float64(len(dataTile))) - tileCount++ - g.Go(func() error { return l.c.Backend.Upload(gctx, sunlight.TilePath(tile), dataTile, optsDataTile) }) + tileUploads = append(tileUploads, &uploadAction{ + sunlight.TilePath(tile), dataTile, optsDataTile}) } - // Produce and upload new tree tiles. + // Produce and stage new tree tiles. tiles := tlog.NewTiles(sunlight.TileHeight, l.tree.N, n) for _, tile := range tiles { - tile := tile // tile is captured by the g.Go function. data, err := tlog.ReadTileData(tile, hashReader) if err != nil { return fmtErrorf("couldn't generate tile %v: %w", tile, err) @@ -759,14 +762,10 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { if t0, ok := edgeTiles[tile.L]; !ok || t0.N < tile.N || (t0.N == tile.N && t0.W < tile.W) { edgeTiles[tile.L] = tileWithBytes{tile, data} } - l.c.Log.DebugContext(ctx, "uploading tree tile", "old_tree_size", oldSize, + l.c.Log.DebugContext(ctx, "staging tree tile", "old_tree_size", oldSize, "tree_size", n, "tile", tile, "size", len(data)) - tileCount++ - g.Go(func() error { return l.c.Backend.Upload(gctx, sunlight.TilePath(tile), data, optsHashTile) }) - } - - if err := g.Wait(); err != nil { - return fmtErrorf("couldn't upload a tile: %w", err) + tileUploads = append(tileUploads, &uploadAction{ + sunlight.TilePath(tile), data, optsHashTile}) } if testingOnlyPauseSequencing != nil { @@ -778,6 +777,20 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { return fmtErrorf("couldn't compute tree head: %w", err) } + // Upload tiles to staging, where they can be recovered by LoadLog if we + // crash right after updating the lock database. See also + // https://github.com/FiloSottile/sunlight/issues/11. + data, err := marshalStagedUploads(tileUploads) + if err != nil { + return fmtErrorf("couldn't marshal staged uploads: %w", err) + } + stagingPath := fmt.Sprintf("staging/%d-%s", tree.N, hex.EncodeToString(tree.Hash[:])) + l.c.Log.DebugContext(ctx, "uploading staged tiles", "old_tree_size", oldSize, + "tree_size", n, "path", stagingPath, "size", len(data)) + if err := l.c.Backend.Upload(ctx, stagingPath, data, optsStaging); err != nil { + return fmtErrorf("couldn't upload staged tiles: %w", err) + } + checkpoint, err := signTreeHead(l.c, tree) if err != nil { return fmtErrorf("couldn't sign checkpoint: %w", err) @@ -802,6 +815,30 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { l.lockCheckpoint = newLock l.edgeTiles = edgeTiles + g, gctx := errgroup.WithContext(ctx) + for _, u := range tileUploads { + g.Go(func() error { + // Since errors are fatal, and uploads are idempotent, retry to + // avoid having to reload the whole process. + attempt := 1 + for { + err := l.c.Backend.Upload(gctx, u.key, u.data, u.opts) + if err == nil || attempt == 5 { + return err + } + l.c.Log.WarnContext(gctx, "tile upload failed", "tree_size", n, + "key", u.key, "err", err, "attempt", attempt) + time.Sleep(time.Duration(attempt) * 25 * time.Millisecond) + attempt++ + } + }) + } + if err := g.Wait(); err != nil { + // This is also fatal, since we can't continue leaving behind missing + // tiles. LoadLog will retry uploading them from the staging bundle. + return fmtErrorf("%w: couldn't upload a tile: %w", errFatal, err) + } + if err := l.c.Backend.Upload(ctx, "checkpoint", checkpoint, optsCheckpoint); err != nil { // Return an error so we don't produce SCTs that, although safely // serialized, wouldn't be part of a publicly visible tree. @@ -823,9 +860,9 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { } l.c.Log.Info("sequenced pool", "tree_size", tree.N, "entries", n-oldSize, - "tiles", tileCount, "timestamp", timestamp, + "tiles", len(tileUploads), "timestamp", timestamp, "elapsed", time.Since(start)) - l.m.SeqTiles.Add(float64(tileCount)) + l.m.SeqTiles.Add(float64(len(tileUploads))) l.m.TreeSize.Set(float64(tree.N)) l.m.TreeTime.Set(float64(timestamp) / 1000) @@ -834,6 +871,70 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { var testingOnlyPauseSequencing func() +type uploadAction struct { + key string + data []byte + opts *UploadOptions +} + +func marshalStagedUploads(uploads []*uploadAction) ([]byte, error) { + var buffer bytes.Buffer + writer := tar.NewWriter(&buffer) + for _, u := range uploads { + opts, err := json.Marshal(u.opts) + if err != nil { + return nil, fmtErrorf("couldn't marshal upload options: %w", err) + } + if err := writer.WriteHeader(&tar.Header{ + Name: u.key, + Size: int64(len(u.data)), + PAXRecords: map[string]string{"SUNLIGHT.opts": string(opts)}, + }); err != nil { + return nil, fmtErrorf("error writing tar header: %w", err) + } + if _, err := writer.Write(u.data); err != nil { + return nil, fmtErrorf("error writing tar data: %w", err) + } + } + if err := writer.Close(); err != nil { + return nil, fmtErrorf("error closing tar writer: %w", err) + } + return buffer.Bytes(), nil +} + +func applyStagedUploads(ctx context.Context, config *Config, stagingPath string) error { + data, err := config.Backend.Fetch(ctx, stagingPath) + if err != nil { + return fmt.Errorf("couldn't fetch staged uploads: %w", err) + } + reader := tar.NewReader(bytes.NewReader(data)) + for { + header, err := reader.Next() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("error reading tar header: %w", err) + } + optsBytes, ok := header.PAXRecords["SUNLIGHT.opts"] + if !ok { + return fmt.Errorf("missing SUNLIGHT.opts in tar header") + } + opts := &UploadOptions{} + if err := json.Unmarshal([]byte(optsBytes), opts); err != nil { + return fmt.Errorf("couldn't unmarshal upload options: %w", err) + } + data, err := io.ReadAll(reader) + if err != nil { + return fmt.Errorf("error reading tar data: %w", err) + } + if err := config.Backend.Upload(ctx, header.Name, data, opts); err != nil { + return fmt.Errorf("couldn't upload staged tile: %w", err) + } + } + return nil +} + // signTreeHead signs the tree and returns a c2sp.org/checkpoint. func signTreeHead(c *Config, tree treeWithTimestamp) (checkpoint []byte, err error) { sthBytes, err := ct.SerializeSTHSignatureInput(ct.SignedTreeHead{ diff --git a/internal/ctlog/ctlog_test.go b/internal/ctlog/ctlog_test.go index 32b3e30..a954ce7 100644 --- a/internal/ctlog/ctlog_test.go +++ b/internal/ctlog/ctlog_test.go @@ -27,10 +27,11 @@ import ( "github.com/google/certificate-transparency-go/tls" ) +var globalTime = time.Now().UnixMilli() + func init() { - t := time.Now().UnixMilli() ctlog.SetTimeNowUnixMilli(func() int64 { - return atomic.AddInt64(&t, 1) + return atomic.AddInt64(&globalTime, 1) }) } @@ -134,32 +135,38 @@ func TestSequenceUploadCount(t *testing.T) { } uploads() - // Empty rounds should cause only one upload (the checkpoint). + // Empty rounds should cause only two uploads (an empty staging bundle and + // the checkpoint). fatalIfErr(t, tl.Log.Sequence()) - if n := uploads(); n != 1 { - t.Errorf("got %d uploads, expected 1", n) + if n := uploads(); n != 2 { + t.Errorf("got %d uploads, expected 2", n) } - // One entry in steady state (not at tile boundary) should cause three - // uploads (the checkpoint, a level -1 tile, and a level 0 tile). + // One entry in steady state (not at tile boundary) should cause four + // uploads (the staging bundle, the checkpoint, a level -1 tile, and a level + // 0 tile). addCertificate(t, tl) fatalIfErr(t, tl.Log.Sequence()) - if n := uploads(); n != 3 { - t.Errorf("got %d uploads, expected 3", n) + if n := uploads(); n != 4 { + t.Errorf("got %d uploads, expected 4", n) } - // A tile width worth of entries should cause six uploads (the checkpoint, - // two level -1 tiles, two level 0 tiles, and one level 1 tile). + // A tile width worth of entries should cause six uploads (the staging + // bundle, the checkpoint, two level -1 tiles, two level 0 tiles, and one + // level 1 tile). for i := 0; i < tileWidth; i++ { addCertificate(t, tl) } fatalIfErr(t, tl.Log.Sequence()) - if n := uploads(); n != 6 { - t.Errorf("got %d uploads, expected 6", n) + if n := uploads(); n != 7 { + t.Errorf("got %d uploads, expected 7", n) } } func TestSequenceUploadPaths(t *testing.T) { + defer func(old int64) { globalTime = old }(globalTime) + globalTime = 0 + tl := NewEmptyTestLog(t) for i := int64(0); i < tileWidth+5; i++ { @@ -186,6 +193,8 @@ func TestSequenceUploadPaths(t *testing.T) { "issuer/6b23c0d5f35d1b11f9b683f0b0a617355deb11277d91ae091d399c655b87940d", "issuer/81365bbc90b5b3991c762eebada7c6d84d1e39a0a1d648cb4fe5a9890b089da8", "issuer/df7e70e5021544f4834bbee64a9e3789febc4be81470df629cad6ddb03320a5c", + "staging/261-0a4f1a4119ca89dc90a612834c0da004f5d1b04a5aad89b88df26a904e4a4f0f", + "staging/527-0c3e2c4127196a1a5abb8c6d94d3607a92b510e01004607b910eb0c7ba27f710", "tile/0/000", "tile/0/001", "tile/0/001.p/5", @@ -219,6 +228,8 @@ func testDuplicates(t *testing.T, addWithSeed func(*testing.T, *TestLog, int64) addWithSeed(t, tl, mathrand.Int63()) // 2 addWithSeed(t, tl, mathrand.Int63()) // 3 + // Two pairs of duplicates from the byHash pool. + wait01 := addWithSeed(t, tl, 0) // 4 wait02 := addWithSeed(t, tl, 0) wait11 := addWithSeed(t, tl, 1) // 5 @@ -251,6 +262,8 @@ func testDuplicates(t *testing.T, addWithSeed func(*testing.T, *TestLog, int64) t.Errorf("got timestamp %d, expected %d", e12.Timestamp, e11.Timestamp) } + // A duplicate from the cache. + wait03 := addWithSeed(t, tl, 0) fatalIfErr(t, tl.Log.Sequence()) e03, err := wait03(context.Background()) @@ -263,6 +276,8 @@ func testDuplicates(t *testing.T, addWithSeed func(*testing.T, *TestLog, int64) t.Errorf("got timestamp %d, expected %d", e03.Timestamp, e01.Timestamp) } + // A pair of duplicates from the inSequencing pool. + wait21 := addWithSeed(t, tl, 2) // 6 ctlog.PauseSequencer() go tl.Log.Sequence() @@ -520,6 +535,40 @@ func TestSequenceErrors(t *testing.T) { expectProgress: true, expectFatal: false, }, + { + name: "StagingUpload", + breakSeq: func(tl *TestLog) { + fail := func(key string, data []byte) (apply bool, err error) { + if strings.HasPrefix(key, "staging/") { + return false, errors.New("staging upload error") + } + return true, nil + } + tl.Config.Backend.(*MemoryBackend).UploadCallback = fail + }, + unbreakSeq: func(tl *TestLog) { + tl.Config.Backend.(*MemoryBackend).UploadCallback = nil + }, + expectProgress: false, + expectFatal: false, + }, + { + name: "StagingUpload", + breakSeq: func(tl *TestLog) { + fail := func(key string, data []byte) (apply bool, err error) { + if strings.HasPrefix(key, "staging/") { + return false, errors.New("staging upload error") + } + return true, nil + } + tl.Config.Backend.(*MemoryBackend).UploadCallback = fail + }, + unbreakSeq: func(tl *TestLog) { + tl.Config.Backend.(*MemoryBackend).UploadCallback = nil + }, + expectProgress: false, + expectFatal: false, + }, { name: "DataTileUpload", breakSeq: func(tl *TestLog) { @@ -534,8 +583,8 @@ func TestSequenceErrors(t *testing.T) { unbreakSeq: func(tl *TestLog) { tl.Config.Backend.(*MemoryBackend).UploadCallback = nil }, - expectProgress: false, - expectFatal: false, + expectProgress: true, + expectFatal: true, }, { name: "DataTileUploadPersisted", @@ -551,8 +600,8 @@ func TestSequenceErrors(t *testing.T) { unbreakSeq: func(tl *TestLog) { tl.Config.Backend.(*MemoryBackend).UploadCallback = nil }, - expectProgress: false, - expectFatal: false, + expectProgress: true, + expectFatal: true, }, { name: "Level0TileUpload", @@ -568,8 +617,8 @@ func TestSequenceErrors(t *testing.T) { unbreakSeq: func(tl *TestLog) { tl.Config.Backend.(*MemoryBackend).UploadCallback = nil }, - expectProgress: false, - expectFatal: false, + expectProgress: true, + expectFatal: true, }, { name: "Level0TileUploadPersisted", @@ -585,8 +634,8 @@ func TestSequenceErrors(t *testing.T) { unbreakSeq: func(tl *TestLog) { tl.Config.Backend.(*MemoryBackend).UploadCallback = nil }, - expectProgress: false, - expectFatal: false, + expectProgress: true, + expectFatal: true, }, }