diff --git a/internal/ctlog/ctlog.go b/internal/ctlog/ctlog.go index b418c2d..d8fc191 100644 --- a/internal/ctlog/ctlog.go +++ b/internal/ctlog/ctlog.go @@ -1,6 +1,7 @@ package ctlog import ( + "archive/tar" "bytes" "context" "crypto" @@ -10,10 +11,12 @@ import ( "encoding/base64" "errors" "fmt" + "io" "log/slog" "maps" "math" "math/rand" + "strconv" "sync" "time" @@ -141,6 +144,45 @@ func CreateLog(ctx context.Context, config *Config) error { return nil } +var testOnlyFailUploadPending bool + +func uploadPending(ctx context.Context, config *Config, checkpoint []byte, tree tlog.Tree) error { + if testOnlyFailUploadPending { + return errors.Join(errFatal, errors.New("failing upload for test")) + } + + pendingPath := fmt.Sprintf("pending/%d/%s.tar", tree.N, tree.Hash) + config.Log.DebugContext(ctx, "uploading pending files", "path", pendingPath) + + tarData, err := config.Backend.Fetch(ctx, pendingPath) + if err != nil { + return fmt.Errorf("couldn't fetch pending files: %w", err) + } + + pendingFiles, err := parsePendingTar(tarData) + if err != nil { + return fmtErrorf("error reading pending tar: %w", err) + } + + g, gctx := errgroup.WithContext(ctx) + defer g.Wait() + for _, file := range pendingFiles { + g.Go(func() error { + return config.Backend.Upload(gctx, file.path, file.data, file.opts) + }) + } + if err := g.Wait(); err != nil { + return fmtErrorf("couldn't upload a tile: %w", err) + } + + config.Log.DebugContext(ctx, "uploading checkpoint", "key", checkpoint) + if err := config.Backend.Upload(ctx, "checkpoint", checkpoint, optsText); err != nil { + return fmtErrorf("couldn't upload checkpoint to object storage: %w", err) + } + + return config.Backend.Delete(ctx, pendingPath) +} + func LoadLog(ctx context.Context, config *Config) (*Log, error) { pkix, err := x509.MarshalPKIXPublicKey(config.Key.Public()) if err != nil { @@ -207,11 +249,12 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) { if c1.N < c.N { // It's possible that we crashed between committing a new checkpoint to // the lock backend and uploading it to the object storage backend. - // Or maybe the object storage backend GETs are cached. - // That's ok, as long as the rest of the tree load correctly against the - // lock checkpoint. config.Log.WarnContext(ctx, "checkpoint in object storage is older than lock checkpoint", "old_size", c1.N, "size", c.N) + config.Log.DebugContext(ctx, "uploading pending files") + if err := uploadPending(ctx, config, lock.Bytes(), c.Tree); err != nil { + return nil, fmt.Errorf("uploading pending files failed: %w", err) + } } pemIssuers, err := config.Backend.Fetch(ctx, "issuers.pem") @@ -321,6 +364,9 @@ type Backend interface { // uploaded with UploadOptions.Compress true. Fetch(ctx context.Context, key string) ([]byte, error) + // Delete deletes a file. + Delete(ctx context.Context, key string) error + // Metrics returns the metrics to register for this log. The metrics should // not be shared by any other logs. Metrics() []prometheus.Collector @@ -339,8 +385,39 @@ type UploadOptions struct { Immutable bool } +func parseTarUploadOptions(opts map[string]string) (*UploadOptions, error) { + var parsed UploadOptions + if contentType, ok := opts["SUNLIGHT.contenttype"]; ok { + parsed.ContentType = contentType + } + if compressStr, ok := opts["SUNLIGHT.compress"]; ok { + compress, err := strconv.ParseBool(compressStr) + if err != nil { + return nil, fmtErrorf("parsing SUNLIGHT.compress failed: %w", err) + } + parsed.Compress = compress + } + if immutableStr, ok := opts["SUNLIGHT.immutable"]; ok { + immutable, err := strconv.ParseBool(immutableStr) + if err != nil { + return nil, fmtErrorf("parsing SUNLIGHT.immutable failed: %w", err) + } + parsed.Immutable = immutable + } + return &parsed, nil +} + +func marshalTarUploadOptions(opts *UploadOptions) map[string]string { + return map[string]string{ + "SUNLIGHT.contenttype": opts.ContentType, + "SUNLIGHT.compress": strconv.FormatBool(opts.Compress), + "SUNLIGHT.immutable": strconv.FormatBool(opts.Immutable), + } +} + var optsHashTile = &UploadOptions{Immutable: true} var optsDataTile = &UploadOptions{Compress: true, Immutable: true} +var optsPendingTar = &UploadOptions{Compress: true} var optsText = &UploadOptions{ContentType: "text/plain; charset=utf-8"} // A LockBackend is a database that supports compare-and-swap operations. @@ -662,6 +739,57 @@ func (l *Log) sequence(ctx context.Context) error { return l.sequencePool(ctx, p) } +type pendingFile struct { + path string + data []byte + opts *UploadOptions +} + +func parsePendingTar(tarData []byte) ([]*pendingFile, error) { + tarReader := tar.NewReader(bytes.NewReader(tarData)) + var files []*pendingFile + for { + header, err := tarReader.Next() + if err != nil { + if err == io.EOF { + break + } + return nil, fmtErrorf("error reading pending header: %w", err) + } + data, err := io.ReadAll(tarReader) + if err != nil { + return nil, fmtErrorf("error reading pending data: %w", err) + } + opts, err := parseTarUploadOptions(header.PAXRecords) + if err != nil { + return nil, fmtErrorf("error parsing upload options: %w", err) + } + files = append(files, &pendingFile{path: header.Name, data: data, opts: opts}) + } + return files, nil +} + +func marshalPendingTar(files []*pendingFile) ([]byte, error) { + var buffer bytes.Buffer + writer := tar.NewWriter(&buffer) + for _, file := range files { + if err := writer.WriteHeader(&tar.Header{ + Name: file.path, + Size: int64(len(file.data)), + PAXRecords: marshalTarUploadOptions(file.opts), + }); err != nil { + return nil, fmtErrorf("error writing tar header: %w", err) + } + if _, err := writer.Write(file.data); err != nil { + return nil, fmtErrorf("error writing tar data: %w", err) + } + } + if err := writer.Close(); err != nil { + return nil, fmtErrorf("error closing tar writer: %w", err) + } + return buffer.Bytes(), nil +} + func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { oldSize := l.tree.N defer prometheus.NewTimer(l.m.SeqDuration).ObserveDuration() @@ -689,14 +817,13 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { start := time.Now() ctx, cancel := context.WithTimeout(ctx, sequenceTimeout) defer cancel() - g, gctx := errgroup.WithContext(ctx) - defer g.Wait() - timestamp := timeNowUnixMilli() if timestamp <= l.tree.Time { return errors.Join(errFatal, fmtErrorf("time did not progress! %d -> %d", l.tree.Time, timestamp)) } + var pendingFiles []*pendingFile + edgeTiles := maps.Clone(l.edgeTiles) var dataTile []byte // Load the current partial data tile, if any. @@ -737,8 +864,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { "tree_size", n, "tile", tile, "size", len(dataTile)) l.m.SeqDataTileSize.Observe(float64(len(dataTile))) tileCount++ - data := dataTile // data is captured by the g.Go function. - g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, optsDataTile) }) + pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: dataTile, opts: optsDataTile}) dataTile = nil } } @@ -752,13 +878,12 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { "tree_size", n, "tile", tile, "size", len(dataTile)) l.m.SeqDataTileSize.Observe(float64(len(dataTile))) tileCount++ - g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), dataTile, optsDataTile) }) + pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: dataTile, opts: optsDataTile}) } // Produce and upload new tree tiles. tiles := tlogx.NewTilesForSize(TileHeight, l.tree.N, n) for _, tile := range tiles { - tile := tile // tile is captured by the g.Go function. data, err := tlog.ReadTileData(tile, hashReader) if err != nil { return fmtErrorf("couldn't generate tile %v: %w", tile, err) @@ -771,11 +896,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { l.c.Log.DebugContext(ctx, "uploading tree tile", "old_tree_size", oldSize, "tree_size", n, "tile", tile, "size", len(data)) tileCount++ - g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, optsHashTile) }) - } - - if err := g.Wait(); err != nil { - return fmtErrorf("couldn't upload a tile: %w", err) + pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: data, opts: optsHashTile}) } if testingOnlyPauseSequencing != nil { @@ -788,6 +909,18 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { } tree := treeWithTimestamp{Tree: tlog.Tree{N: n, Hash: rootHash}, Time: timestamp} + // Write all pending tiles for the new tree. + tarBytes, err := marshalPendingTar(pendingFiles) + if err != nil { + return fmtErrorf("error marshaling pending files: %w", err) + } + + // TODO: use URL encoding for the hash instead? + pendingPath := fmt.Sprintf("pending/%d/%s.tar", tree.N, tree.Hash.String()) + if err := l.c.Backend.Upload(ctx, pendingPath, tarBytes, optsPendingTar); err != nil { + return fmtErrorf("couldn't upload pending tar: %w", err) + } + checkpoint, err := signTreeHead(l.c.Name, l.logID, l.c.Key, tree) if err != nil { return fmtErrorf("couldn't sign checkpoint: %w", err) @@ -812,10 +945,11 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { l.lockCheckpoint = newLock l.edgeTiles = edgeTiles - if err := l.c.Backend.Upload(ctx, "checkpoint", checkpoint, optsText); err != nil { - // Return an error so we don't produce SCTs that, although safely - // serialized, wouldn't be part of a publicly visible tree. - return fmtErrorf("couldn't upload checkpoint to object storage: %w", err) + // Copy the pending tiles to their final location. + if err := uploadPending(ctx, l.c, checkpoint, tree.Tree); err != nil { + // Return an error so we don't produce SCTs that, although committed, + // aren't yet part of a publicly visible tree. + return fmtErrorf("couldn't copy pending files to object storage: %w", err) } // At this point if the cache put fails, there's no reason to return errors diff --git a/internal/ctlog/ctlog_test.go b/internal/ctlog/ctlog_test.go index 6bf7788..516b315 100644 --- a/internal/ctlog/ctlog_test.go +++ b/internal/ctlog/ctlog_test.go @@ -109,44 +109,62 @@ func TestSequenceEmptyPool(t *testing.T) { sequenceTwice(tl) } -func TestSequenceUploadCount(t *testing.T) { +func counter(source *uint64) func() uint64 { + var old uint64 + counter := func() uint64 { + new := *source + n := new - old + old = new + return n + } + counter() + return counter +} + +func TestSequenceUploadCopyCount(t *testing.T) { tl := NewEmptyTestLog(t) for i := 0; i < tileWidth+1; i++ { addCertificate(t, tl) } fatalIfErr(t, tl.Log.Sequence()) - var old uint64 - uploads := func() uint64 { - new := tl.Config.Backend.(*MemoryBackend).uploads - n := new - old - old = new - return n - } - uploads() + uploads := counter(&tl.Config.Backend.(*MemoryBackend).uploads) + deletes := counter(&tl.Config.Backend.(*MemoryBackend).deletes) - // Empty rounds should cause only one upload (the checkpoint). + // Empty rounds should cause only two uploads (the checkpoint and empty + // pending file) and one delete (the pending file). fatalIfErr(t, tl.Log.Sequence()) - if n := uploads(); n != 1 { - t.Errorf("got %d uploads, expected 1", n) + if n := uploads(); n != 2 { + t.Errorf("got %d uploads, expected 2", n) + } + if n := deletes(); n != 1 { + t.Errorf("got %d deletes, expected 1", n) } - // One entry in steady state (not at tile boundary) should cause three - // uploads (the checkpoint, a level -1 tile, and a level 0 tile). + // One entry in steady state (not at tile boundary) should cause four + // uploads (the checkpoint, the pending file, and files for a level -1 tile, + // and a level 0 tile) and one delete (the pending file). addCertificate(t, tl) fatalIfErr(t, tl.Log.Sequence()) - if n := uploads(); n != 3 { - t.Errorf("got %d uploads, expected 3", n) + if n := uploads(); n != 4 { + t.Errorf("got %d uploads, expected 4", n) + } + if n := deletes(); n != 1 { + t.Errorf("got %d deletes, expected 1", n) } - // A tile width worth of entries should cause six uploads (the checkpoint, - // two level -1 tiles, two level 0 tiles, and one level 1 tile). + // A tile width worth of entries should cause seven uploads (the checkpoint, + // the pending file, and files for two level -1 tiles, two level 0 tiles, + // and one level 1 tile) and one delete (the pending file). for i := 0; i < tileWidth; i++ { addCertificate(t, tl) } fatalIfErr(t, tl.Log.Sequence()) - if n := uploads(); n != 6 { - t.Errorf("got %d uploads, expected 6", n) + if n := uploads(); n != 7 { + t.Errorf("got %d uploads, expected 7", n) + } + if n := deletes(); n != 1 { + t.Errorf("got %d deletes, expected 1", n) } } @@ -260,6 +278,39 @@ func testReloadLog(t *testing.T, add func(*testing.T, *TestLog) func(context.Con } } +func TestRecoverLog(t *testing.T) { + tl := NewEmptyTestLog(t) + n := int64(tileWidth + 2) + if testing.Short() { + n = 3 + } else { + tl.Quiet() + } + for i := int64(0); i < n; i++ { + addCertificateFast(t, tl) + + ctlog.SetFailUploadPending(true) + if err := tl.Log.Sequence(); err == nil { + t.Fatal("expected sequencer failure") + } + ctlog.SetFailUploadPending(false) + + pending := tl.Config.Backend.(*MemoryBackend).countPending() + if pending == 0 { + t.Errorf("expected non-zero pending files before recovery, got %d", pending) + } + + tl = ReloadLog(t, tl) + fatalIfErr(t, tl.Log.Sequence()) + tl.CheckLog() + + pending = tl.Config.Backend.(*MemoryBackend).countPending() + if pending != 0 { + t.Errorf("expected non-zero pending files before recovery, got %d", pending) + } + } +} + func TestSubmit(t *testing.T) { t.Run("Certificates", func(t *testing.T) { testSubmit(t, false) diff --git a/internal/ctlog/export_test.go b/internal/ctlog/export_test.go index 5e5d709..2e9cd35 100644 --- a/internal/ctlog/export_test.go +++ b/internal/ctlog/export_test.go @@ -26,3 +26,7 @@ func PauseSequencer() { func ResumeSequencer() { close(seqRunning) } + +func SetFailUploadPending(fail bool) { + testOnlyFailUploadPending = fail +} diff --git a/internal/ctlog/s3.go b/internal/ctlog/s3.go index ee31b50..381e22c 100644 --- a/internal/ctlog/s3.go +++ b/internal/ctlog/s3.go @@ -217,6 +217,19 @@ func (s *S3Backend) Fetch(ctx context.Context, key string) ([]byte, error) { return data, nil } +func (s *S3Backend) Delete(ctx context.Context, key string) error { + _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(s.keyPrefix + key), + }) + if err != nil { + s.log.DebugContext(ctx, "S3 DELETE", "key", key, "err", err) + return fmtErrorf("failed to delete %q from S3: %w", key, err) + } + s.log.DebugContext(ctx, "S3 DELETE", "key", key) + return nil +} + func (s *S3Backend) Metrics() []prometheus.Collector { return s.metrics } diff --git a/internal/ctlog/testlog_test.go b/internal/ctlog/testlog_test.go index 841ffab..35f8468 100644 --- a/internal/ctlog/testlog_test.go +++ b/internal/ctlog/testlog_test.go @@ -16,6 +16,7 @@ import ( "net/http/httptest" "path/filepath" "reflect" + "strings" "sync" "sync/atomic" "testing" @@ -322,7 +323,7 @@ type MemoryBackend struct { mu sync.Mutex m map[string][]byte - uploads uint64 + uploads, deletes uint64 } func NewMemoryBackend(t testing.TB) *MemoryBackend { @@ -359,6 +360,32 @@ func (b *MemoryBackend) Fetch(ctx context.Context, key string) ([]byte, error) { return data, nil } +func (b *MemoryBackend) Delete(ctx context.Context, key string) error { + atomic.AddUint64(&b.deletes, 1) + if err := ctx.Err(); err != nil { + return err + } + b.mu.Lock() + defer b.mu.Unlock() + if _, ok := b.m[key]; !ok { + return fmt.Errorf("key %q not found", key) + } + delete(b.m, key) + return nil +} + +func (b *MemoryBackend) countPending() int { + b.mu.Lock() + defer b.mu.Unlock() + count := 0 + for key := range b.m { + if strings.HasPrefix(key, "pending/") { + count++ + } + } + return count +} + func (b *MemoryBackend) Metrics() []prometheus.Collector { return nil } type MemoryLockBackend struct {