From b89dbd8970f84c3ba2c8145a12b7ce4532e95561 Mon Sep 17 00:00:00 2001 From: Jelle van den Hooff Date: Sat, 16 Mar 2024 13:20:39 -0700 Subject: [PATCH] ctlog: write pending tiles to protect against concurrent sequencers If two sequencers (somehow) end up running at the same time, they could scribble over each other's uploads to the backend storage. Protect against that special case by writing the tiles in two steps. First, all tiles are staged in a unique tar file (keyed by the hash of the new tree), and only written to their final location after a successful lock update. In the normal case, this will incur one extra upload and delete, adding one write operation, and double the amount of data written to backend storage. To make sure that the (rare) recovery code path is well-tested, use the same code path in both the recovery and normal case. A special IAM policy could allow deletes only of pending files. --- internal/ctlog/ctlog.go | 172 +++++++++++++++++++++++++++++---- internal/ctlog/ctlog_test.go | 91 +++++++++++++---- internal/ctlog/export_test.go | 4 + internal/ctlog/s3.go | 13 +++ internal/ctlog/testlog_test.go | 29 +++++- 5 files changed, 269 insertions(+), 40 deletions(-) diff --git a/internal/ctlog/ctlog.go b/internal/ctlog/ctlog.go index b418c2d..d8fc191 100644 --- a/internal/ctlog/ctlog.go +++ b/internal/ctlog/ctlog.go @@ -1,6 +1,7 @@ package ctlog import ( + "archive/tar" "bytes" "context" "crypto" @@ -10,10 +11,12 @@ import ( "encoding/base64" "errors" "fmt" + "io" "log/slog" "maps" "math" "math/rand" + "strconv" "sync" "time" @@ -141,6 +144,45 @@ func CreateLog(ctx context.Context, config *Config) error { return nil } +var testOnlyFailUploadPending bool + +func uploadPending(ctx context.Context, config *Config, checkpoint []byte, tree tlog.Tree) error { + if testOnlyFailUploadPending { + return errors.Join(errFatal, errors.New("failing upload for test")) + } + + pendingPath := fmt.Sprintf("pending/%d/%s.tar", tree.N, tree.Hash) + config.Log.DebugContext(ctx, "uploading pending files", "path", pendingPath) + + tarData, err := config.Backend.Fetch(ctx, pendingPath) + if err != nil { + return fmt.Errorf("couldn't fetch pending files: %w", err) + } + + pendingFiles, err := parsePendingTar(tarData) + if err != nil { + return fmtErrorf("error reading pending tar: %w", err) + } + + g, gctx := errgroup.WithContext(ctx) + defer g.Wait() + for _, file := range pendingFiles { + g.Go(func() error { + return config.Backend.Upload(gctx, file.path, file.data, file.opts) + }) + } + if err := g.Wait(); err != nil { + return fmtErrorf("couldn't upload a tile: %w", err) + } + + config.Log.DebugContext(ctx, "uploading checkpoint", "key", checkpoint) + if err := config.Backend.Upload(ctx, "checkpoint", checkpoint, optsText); err != nil { + return fmtErrorf("couldn't upload checkpoint to object storage: %w", err) + } + + return config.Backend.Delete(ctx, pendingPath) +} + func LoadLog(ctx context.Context, config *Config) (*Log, error) { pkix, err := x509.MarshalPKIXPublicKey(config.Key.Public()) if err != nil { @@ -207,11 +249,12 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) { if c1.N < c.N { // It's possible that we crashed between committing a new checkpoint to // the lock backend and uploading it to the object storage backend. - // Or maybe the object storage backend GETs are cached. - // That's ok, as long as the rest of the tree load correctly against the - // lock checkpoint. config.Log.WarnContext(ctx, "checkpoint in object storage is older than lock checkpoint", "old_size", c1.N, "size", c.N) + config.Log.DebugContext(ctx, "uploading pending files") + if err := uploadPending(ctx, config, lock.Bytes(), c.Tree); err != nil { + return nil, fmt.Errorf("uploading pending files failed: %w", err) + } } pemIssuers, err := config.Backend.Fetch(ctx, "issuers.pem") @@ -321,6 +364,9 @@ type Backend interface { // uploaded with UploadOptions.Compress true. Fetch(ctx context.Context, key string) ([]byte, error) + // Delete deletes a file. + Delete(ctx context.Context, key string) error + // Metrics returns the metrics to register for this log. The metrics should // not be shared by any other logs. Metrics() []prometheus.Collector @@ -339,8 +385,39 @@ type UploadOptions struct { Immutable bool } +func parseTarUploadOptions(opts map[string]string) (*UploadOptions, error) { + var parsed UploadOptions + if contentType, ok := opts["SUNLIGHT.contenttype"]; ok { + parsed.ContentType = contentType + } + if compressStr, ok := opts["SUNLIGHT.compress"]; ok { + compress, err := strconv.ParseBool(compressStr) + if err != nil { + return nil, fmtErrorf("parsing SUNLIGHT.compress failed: %w", err) + } + parsed.Compress = compress + } + if immutableStr, ok := opts["SUNLIGHT.immutable"]; ok { + immutable, err := strconv.ParseBool(immutableStr) + if err != nil { + return nil, fmtErrorf("parsing SUNLIGHT.immutable failed: %w", err) + } + parsed.Immutable = immutable + } + return &parsed, nil +} + +func marshalTarUploadOptions(opts *UploadOptions) map[string]string { + return map[string]string{ + "SUNLIGHT.contenttype": opts.ContentType, + "SUNLIGHT.compress": strconv.FormatBool(opts.Compress), + "SUNLIGHT.immutable": strconv.FormatBool(opts.Immutable), + } +} + var optsHashTile = &UploadOptions{Immutable: true} var optsDataTile = &UploadOptions{Compress: true, Immutable: true} +var optsPendingTar = &UploadOptions{Compress: true} var optsText = &UploadOptions{ContentType: "text/plain; charset=utf-8"} // A LockBackend is a database that supports compare-and-swap operations. @@ -662,6 +739,57 @@ func (l *Log) sequence(ctx context.Context) error { return l.sequencePool(ctx, p) } +type pendingFile struct { + path string + data []byte + opts *UploadOptions +} + +func parsePendingTar(tarData []byte) ([]*pendingFile, error) { + tarReader := tar.NewReader(bytes.NewReader(tarData)) + var files []*pendingFile + for { + header, err := tarReader.Next() + if err != nil { + if err == io.EOF { + break + } + return nil, fmtErrorf("error reading pending header: %w", err) + } + data, err := io.ReadAll(tarReader) + if err != nil { + return nil, fmtErrorf("error reading pending data: %w", err) + } + opts, err := parseTarUploadOptions(header.PAXRecords) + if err != nil { + return nil, fmtErrorf("error parsing upload options: %w", err) + } + files = append(files, &pendingFile{path: header.Name, data: data, opts: opts}) + } + return files, nil +} + +func marshalPendingTar(files []*pendingFile) ([]byte, error) { + var buffer bytes.Buffer + writer := tar.NewWriter(&buffer) + for _, file := range files { + if err := writer.WriteHeader(&tar.Header{ + Name: file.path, + Size: int64(len(file.data)), + PAXRecords: marshalTarUploadOptions(file.opts), + }); err != nil { + return nil, fmtErrorf("error writing tar header: %w", err) + } + if _, err := writer.Write(file.data); err != nil { + return nil, fmtErrorf("error writing tar data: %w", err) + } + } + if err := writer.Close(); err != nil { + return nil, fmtErrorf("error closing tar writer: %w", err) + } + return buffer.Bytes(), nil +} + func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { oldSize := l.tree.N defer prometheus.NewTimer(l.m.SeqDuration).ObserveDuration() @@ -689,14 +817,13 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { start := time.Now() ctx, cancel := context.WithTimeout(ctx, sequenceTimeout) defer cancel() - g, gctx := errgroup.WithContext(ctx) - defer g.Wait() - timestamp := timeNowUnixMilli() if timestamp <= l.tree.Time { return errors.Join(errFatal, fmtErrorf("time did not progress! %d -> %d", l.tree.Time, timestamp)) } + var pendingFiles []*pendingFile + edgeTiles := maps.Clone(l.edgeTiles) var dataTile []byte // Load the current partial data tile, if any. @@ -737,8 +864,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { "tree_size", n, "tile", tile, "size", len(dataTile)) l.m.SeqDataTileSize.Observe(float64(len(dataTile))) tileCount++ - data := dataTile // data is captured by the g.Go function. - g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, optsDataTile) }) + pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: dataTile, opts: optsDataTile}) dataTile = nil } } @@ -752,13 +878,12 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { "tree_size", n, "tile", tile, "size", len(dataTile)) l.m.SeqDataTileSize.Observe(float64(len(dataTile))) tileCount++ - g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), dataTile, optsDataTile) }) + pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: dataTile, opts: optsDataTile}) } // Produce and upload new tree tiles. tiles := tlogx.NewTilesForSize(TileHeight, l.tree.N, n) for _, tile := range tiles { - tile := tile // tile is captured by the g.Go function. data, err := tlog.ReadTileData(tile, hashReader) if err != nil { return fmtErrorf("couldn't generate tile %v: %w", tile, err) @@ -771,11 +896,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { l.c.Log.DebugContext(ctx, "uploading tree tile", "old_tree_size", oldSize, "tree_size", n, "tile", tile, "size", len(data)) tileCount++ - g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, optsHashTile) }) - } - - if err := g.Wait(); err != nil { - return fmtErrorf("couldn't upload a tile: %w", err) + pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: data, opts: optsHashTile}) } if testingOnlyPauseSequencing != nil { @@ -788,6 +909,18 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { } tree := treeWithTimestamp{Tree: tlog.Tree{N: n, Hash: rootHash}, Time: timestamp} + // Write all pending tiles for the new tree. + tarBytes, err := marshalPendingTar(pendingFiles) + if err != nil { + return fmtErrorf("error marshaling pending files: %w", err) + } + + // TODO: use URL encoding for the hash instead? + pendingPath := fmt.Sprintf("pending/%d/%s.tar", tree.N, tree.Hash.String()) + if err := l.c.Backend.Upload(ctx, pendingPath, tarBytes, optsPendingTar); err != nil { + return fmtErrorf("couldn't upload pending tar: %w", err) + } + checkpoint, err := signTreeHead(l.c.Name, l.logID, l.c.Key, tree) if err != nil { return fmtErrorf("couldn't sign checkpoint: %w", err) @@ -812,10 +945,11 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) { l.lockCheckpoint = newLock l.edgeTiles = edgeTiles - if err := l.c.Backend.Upload(ctx, "checkpoint", checkpoint, optsText); err != nil { - // Return an error so we don't produce SCTs that, although safely - // serialized, wouldn't be part of a publicly visible tree. - return fmtErrorf("couldn't upload checkpoint to object storage: %w", err) + // Copy the pending tiles to their final location. + if err := uploadPending(ctx, l.c, checkpoint, tree.Tree); err != nil { + // Return an error so we don't produce SCTs that, although committed, + // aren't yet part of a publicly visible tree. + return fmtErrorf("couldn't copy pending files to object storage: %w", err) } // At this point if the cache put fails, there's no reason to return errors diff --git a/internal/ctlog/ctlog_test.go b/internal/ctlog/ctlog_test.go index 6bf7788..516b315 100644 --- a/internal/ctlog/ctlog_test.go +++ b/internal/ctlog/ctlog_test.go @@ -109,44 +109,62 @@ func TestSequenceEmptyPool(t *testing.T) { sequenceTwice(tl) } -func TestSequenceUploadCount(t *testing.T) { +func counter(source *uint64) func() uint64 { + var old uint64 + counter := func() uint64 { + new := *source + n := new - old + old = new + return n + } + counter() + return counter +} + +func TestSequenceUploadCopyCount(t *testing.T) { tl := NewEmptyTestLog(t) for i := 0; i < tileWidth+1; i++ { addCertificate(t, tl) } fatalIfErr(t, tl.Log.Sequence()) - var old uint64 - uploads := func() uint64 { - new := tl.Config.Backend.(*MemoryBackend).uploads - n := new - old - old = new - return n - } - uploads() + uploads := counter(&tl.Config.Backend.(*MemoryBackend).uploads) + deletes := counter(&tl.Config.Backend.(*MemoryBackend).deletes) - // Empty rounds should cause only one upload (the checkpoint). + // Empty rounds should cause only two uploads (the checkpoint and empty + // pending file) and one delete (the pending file). fatalIfErr(t, tl.Log.Sequence()) - if n := uploads(); n != 1 { - t.Errorf("got %d uploads, expected 1", n) + if n := uploads(); n != 2 { + t.Errorf("got %d uploads, expected 2", n) + } + if n := deletes(); n != 1 { + t.Errorf("got %d deletes, expected 1", n) } - // One entry in steady state (not at tile boundary) should cause three - // uploads (the checkpoint, a level -1 tile, and a level 0 tile). + // One entry in steady state (not at tile boundary) should cause four + // uploads (the checkpoint, the pending file, and files for a level -1 tile, + // and a level 0 tile) and one delete (the pending file). addCertificate(t, tl) fatalIfErr(t, tl.Log.Sequence()) - if n := uploads(); n != 3 { - t.Errorf("got %d uploads, expected 3", n) + if n := uploads(); n != 4 { + t.Errorf("got %d uploads, expected 4", n) + } + if n := deletes(); n != 1 { + t.Errorf("got %d deletes, expected 1", n) } - // A tile width worth of entries should cause six uploads (the checkpoint, - // two level -1 tiles, two level 0 tiles, and one level 1 tile). + // A tile width worth of entries should cause seven uploads (the checkpoint, + // the pending file, and files for two level -1 tiles, two level 0 tiles, + // and one level 1 tile) and one delete (the pending file). for i := 0; i < tileWidth; i++ { addCertificate(t, tl) } fatalIfErr(t, tl.Log.Sequence()) - if n := uploads(); n != 6 { - t.Errorf("got %d uploads, expected 6", n) + if n := uploads(); n != 7 { + t.Errorf("got %d uploads, expected 7", n) + } + if n := deletes(); n != 1 { + t.Errorf("got %d deletes, expected 1", n) } } @@ -260,6 +278,39 @@ func testReloadLog(t *testing.T, add func(*testing.T, *TestLog) func(context.Con } } +func TestRecoverLog(t *testing.T) { + tl := NewEmptyTestLog(t) + n := int64(tileWidth + 2) + if testing.Short() { + n = 3 + } else { + tl.Quiet() + } + for i := int64(0); i < n; i++ { + addCertificateFast(t, tl) + + ctlog.SetFailUploadPending(true) + if err := tl.Log.Sequence(); err == nil { + t.Fatal("expected sequencer failure") + } + ctlog.SetFailUploadPending(false) + + pending := tl.Config.Backend.(*MemoryBackend).countPending() + if pending == 0 { + t.Errorf("expected non-zero pending files before recovery, got %d", pending) + } + + tl = ReloadLog(t, tl) + fatalIfErr(t, tl.Log.Sequence()) + tl.CheckLog() + + pending = tl.Config.Backend.(*MemoryBackend).countPending() + if pending != 0 { + t.Errorf("expected non-zero pending files before recovery, got %d", pending) + } + } +} + func TestSubmit(t *testing.T) { t.Run("Certificates", func(t *testing.T) { testSubmit(t, false) diff --git a/internal/ctlog/export_test.go b/internal/ctlog/export_test.go index 5e5d709..2e9cd35 100644 --- a/internal/ctlog/export_test.go +++ b/internal/ctlog/export_test.go @@ -26,3 +26,7 @@ func PauseSequencer() { func ResumeSequencer() { close(seqRunning) } + +func SetFailUploadPending(fail bool) { + testOnlyFailUploadPending = fail +} diff --git a/internal/ctlog/s3.go b/internal/ctlog/s3.go index ee31b50..381e22c 100644 --- a/internal/ctlog/s3.go +++ b/internal/ctlog/s3.go @@ -217,6 +217,19 @@ func (s *S3Backend) Fetch(ctx context.Context, key string) ([]byte, error) { return data, nil } +func (s *S3Backend) Delete(ctx context.Context, key string) error { + _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(s.keyPrefix + key), + }) + if err != nil { + s.log.DebugContext(ctx, "S3 DELETE", "key", key, "err", err) + return fmtErrorf("failed to delete %q from S3: %w", key, err) + } + s.log.DebugContext(ctx, "S3 DELETE", "key", key) + return nil +} + func (s *S3Backend) Metrics() []prometheus.Collector { return s.metrics } diff --git a/internal/ctlog/testlog_test.go b/internal/ctlog/testlog_test.go index 841ffab..35f8468 100644 --- a/internal/ctlog/testlog_test.go +++ b/internal/ctlog/testlog_test.go @@ -16,6 +16,7 @@ import ( "net/http/httptest" "path/filepath" "reflect" + "strings" "sync" "sync/atomic" "testing" @@ -322,7 +323,7 @@ type MemoryBackend struct { mu sync.Mutex m map[string][]byte - uploads uint64 + uploads, deletes uint64 } func NewMemoryBackend(t testing.TB) *MemoryBackend { @@ -359,6 +360,32 @@ func (b *MemoryBackend) Fetch(ctx context.Context, key string) ([]byte, error) { return data, nil } +func (b *MemoryBackend) Delete(ctx context.Context, key string) error { + atomic.AddUint64(&b.deletes, 1) + if err := ctx.Err(); err != nil { + return err + } + b.mu.Lock() + defer b.mu.Unlock() + if _, ok := b.m[key]; !ok { + return fmt.Errorf("key %q not found", key) + } + delete(b.m, key) + return nil +} + +func (b *MemoryBackend) countPending() int { + b.mu.Lock() + defer b.mu.Unlock() + count := 0 + for key := range b.m { + if strings.HasPrefix(key, "pending/") { + count++ + } + } + return count +} + func (b *MemoryBackend) Metrics() []prometheus.Collector { return nil } type MemoryLockBackend struct {