Skip to content

Commit

Permalink
ctlog: write pending tiles to protect against concurrent sequencers
Browse files Browse the repository at this point in the history
If two sequencers (somehow) end up running at the same time, they
could scribble over each other's uploads to the backend storage.

Protect against that special case by writing the tiles in two steps. First, all
tiles are staged in a unique tar file (keyed by the hash of the new tree), and
only written to their final location after a successful lock update.

In the normal case, this will incur one extra upload and delete, adding one
write operation, and double the amount of data written to backend storage.

To make sure that the (rare) recovery code path is well-tested, use the same
code path in both the recovery and normal case.

A special IAM policy could allow deletes only of pending files.
  • Loading branch information
jellevandenhooff committed Mar 17, 2024
1 parent 0e8f0c2 commit b89dbd8
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 40 deletions.
172 changes: 153 additions & 19 deletions internal/ctlog/ctlog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ctlog

import (
"archive/tar"
"bytes"
"context"
"crypto"
Expand All @@ -10,10 +11,12 @@ import (
"encoding/base64"
"errors"
"fmt"
"io"
"log/slog"
"maps"
"math"
"math/rand"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -141,6 +144,45 @@ func CreateLog(ctx context.Context, config *Config) error {
return nil
}

var testOnlyFailUploadPending bool

func uploadPending(ctx context.Context, config *Config, checkpoint []byte, tree tlog.Tree) error {
if testOnlyFailUploadPending {
return errors.Join(errFatal, errors.New("failing upload for test"))
}

pendingPath := fmt.Sprintf("pending/%d/%s.tar", tree.N, tree.Hash)
config.Log.DebugContext(ctx, "uploading pending files", "path", pendingPath)

tarData, err := config.Backend.Fetch(ctx, pendingPath)
if err != nil {
return fmt.Errorf("couldn't fetch pending files: %w", err)
}

pendingFiles, err := parsePendingTar(tarData)
if err != nil {
return fmtErrorf("error reading pending tar: %w", err)
}

g, gctx := errgroup.WithContext(ctx)
defer g.Wait()
for _, file := range pendingFiles {
g.Go(func() error {
return config.Backend.Upload(gctx, file.path, file.data, file.opts)
})
}
if err := g.Wait(); err != nil {
return fmtErrorf("couldn't upload a tile: %w", err)
}

config.Log.DebugContext(ctx, "uploading checkpoint", "key", checkpoint)
if err := config.Backend.Upload(ctx, "checkpoint", checkpoint, optsText); err != nil {
return fmtErrorf("couldn't upload checkpoint to object storage: %w", err)
}

return config.Backend.Delete(ctx, pendingPath)
}

func LoadLog(ctx context.Context, config *Config) (*Log, error) {
pkix, err := x509.MarshalPKIXPublicKey(config.Key.Public())
if err != nil {
Expand Down Expand Up @@ -207,11 +249,12 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) {
if c1.N < c.N {
// It's possible that we crashed between committing a new checkpoint to
// the lock backend and uploading it to the object storage backend.
// Or maybe the object storage backend GETs are cached.
// That's ok, as long as the rest of the tree load correctly against the
// lock checkpoint.
config.Log.WarnContext(ctx, "checkpoint in object storage is older than lock checkpoint",
"old_size", c1.N, "size", c.N)
config.Log.DebugContext(ctx, "uploading pending files")
if err := uploadPending(ctx, config, lock.Bytes(), c.Tree); err != nil {
return nil, fmt.Errorf("uploading pending files failed: %w", err)
}
}

pemIssuers, err := config.Backend.Fetch(ctx, "issuers.pem")
Expand Down Expand Up @@ -321,6 +364,9 @@ type Backend interface {
// uploaded with UploadOptions.Compress true.
Fetch(ctx context.Context, key string) ([]byte, error)

// Delete deletes a file.
Delete(ctx context.Context, key string) error

// Metrics returns the metrics to register for this log. The metrics should
// not be shared by any other logs.
Metrics() []prometheus.Collector
Expand All @@ -339,8 +385,39 @@ type UploadOptions struct {
Immutable bool
}

func parseTarUploadOptions(opts map[string]string) (*UploadOptions, error) {
var parsed UploadOptions
if contentType, ok := opts["SUNLIGHT.contenttype"]; ok {
parsed.ContentType = contentType
}
if compressStr, ok := opts["SUNLIGHT.compress"]; ok {
compress, err := strconv.ParseBool(compressStr)
if err != nil {
return nil, fmtErrorf("parsing SUNLIGHT.compress failed: %w", err)
}
parsed.Compress = compress
}
if immutableStr, ok := opts["SUNLIGHT.immutable"]; ok {
immutable, err := strconv.ParseBool(immutableStr)
if err != nil {
return nil, fmtErrorf("parsing SUNLIGHT.immutable failed: %w", err)
}
parsed.Immutable = immutable
}
return &parsed, nil
}

func marshalTarUploadOptions(opts *UploadOptions) map[string]string {
return map[string]string{
"SUNLIGHT.contenttype": opts.ContentType,
"SUNLIGHT.compress": strconv.FormatBool(opts.Compress),
"SUNLIGHT.immutable": strconv.FormatBool(opts.Immutable),
}
}

var optsHashTile = &UploadOptions{Immutable: true}
var optsDataTile = &UploadOptions{Compress: true, Immutable: true}
var optsPendingTar = &UploadOptions{Compress: true}
var optsText = &UploadOptions{ContentType: "text/plain; charset=utf-8"}

// A LockBackend is a database that supports compare-and-swap operations.
Expand Down Expand Up @@ -662,6 +739,57 @@ func (l *Log) sequence(ctx context.Context) error {
return l.sequencePool(ctx, p)
}

type pendingFile struct {
path string
data []byte
opts *UploadOptions
}

func parsePendingTar(tarData []byte) ([]*pendingFile, error) {
tarReader := tar.NewReader(bytes.NewReader(tarData))
var files []*pendingFile
for {
header, err := tarReader.Next()
if err != nil {
if err == io.EOF {
break
}
return nil, fmtErrorf("error reading pending header: %w", err)
}
data, err := io.ReadAll(tarReader)
if err != nil {
return nil, fmtErrorf("error reading pending data: %w", err)
}
opts, err := parseTarUploadOptions(header.PAXRecords)
if err != nil {
return nil, fmtErrorf("error parsing upload options: %w", err)
}
files = append(files, &pendingFile{path: header.Name, data: data, opts: opts})
}
return files, nil
}

func marshalPendingTar(files []*pendingFile) ([]byte, error) {
var buffer bytes.Buffer
writer := tar.NewWriter(&buffer)
for _, file := range files {
if err := writer.WriteHeader(&tar.Header{
Name: file.path,
Size: int64(len(file.data)),
PAXRecords: marshalTarUploadOptions(file.opts),
}); err != nil {
return nil, fmtErrorf("error writing tar header: %w", err)
}
if _, err := writer.Write(file.data); err != nil {
return nil, fmtErrorf("error writing tar data: %w", err)
}
}
if err := writer.Close(); err != nil {
return nil, fmtErrorf("error closing tar writer: %w", err)
}
return buffer.Bytes(), nil
}

func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
oldSize := l.tree.N
defer prometheus.NewTimer(l.m.SeqDuration).ObserveDuration()
Expand Down Expand Up @@ -689,14 +817,13 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, sequenceTimeout)
defer cancel()
g, gctx := errgroup.WithContext(ctx)
defer g.Wait()

timestamp := timeNowUnixMilli()
if timestamp <= l.tree.Time {
return errors.Join(errFatal, fmtErrorf("time did not progress! %d -> %d", l.tree.Time, timestamp))
}

var pendingFiles []*pendingFile

edgeTiles := maps.Clone(l.edgeTiles)
var dataTile []byte
// Load the current partial data tile, if any.
Expand Down Expand Up @@ -737,8 +864,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
"tree_size", n, "tile", tile, "size", len(dataTile))
l.m.SeqDataTileSize.Observe(float64(len(dataTile)))
tileCount++
data := dataTile // data is captured by the g.Go function.
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, optsDataTile) })
pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: dataTile, opts: optsDataTile})
dataTile = nil
}
}
Expand All @@ -752,13 +878,12 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
"tree_size", n, "tile", tile, "size", len(dataTile))
l.m.SeqDataTileSize.Observe(float64(len(dataTile)))
tileCount++
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), dataTile, optsDataTile) })
pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: dataTile, opts: optsDataTile})
}

// Produce and upload new tree tiles.
tiles := tlogx.NewTilesForSize(TileHeight, l.tree.N, n)
for _, tile := range tiles {
tile := tile // tile is captured by the g.Go function.
data, err := tlog.ReadTileData(tile, hashReader)
if err != nil {
return fmtErrorf("couldn't generate tile %v: %w", tile, err)
Expand All @@ -771,11 +896,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
l.c.Log.DebugContext(ctx, "uploading tree tile", "old_tree_size", oldSize,
"tree_size", n, "tile", tile, "size", len(data))
tileCount++
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, optsHashTile) })
}

if err := g.Wait(); err != nil {
return fmtErrorf("couldn't upload a tile: %w", err)
pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: data, opts: optsHashTile})
}

if testingOnlyPauseSequencing != nil {
Expand All @@ -788,6 +909,18 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
}
tree := treeWithTimestamp{Tree: tlog.Tree{N: n, Hash: rootHash}, Time: timestamp}

// Write all pending tiles for the new tree.
tarBytes, err := marshalPendingTar(pendingFiles)
if err != nil {
return fmtErrorf("error marshaling pending files: %w", err)
}

// TODO: use URL encoding for the hash instead?
pendingPath := fmt.Sprintf("pending/%d/%s.tar", tree.N, tree.Hash.String())
if err := l.c.Backend.Upload(ctx, pendingPath, tarBytes, optsPendingTar); err != nil {
return fmtErrorf("couldn't upload pending tar: %w", err)
}

checkpoint, err := signTreeHead(l.c.Name, l.logID, l.c.Key, tree)
if err != nil {
return fmtErrorf("couldn't sign checkpoint: %w", err)
Expand All @@ -812,10 +945,11 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
l.lockCheckpoint = newLock
l.edgeTiles = edgeTiles

if err := l.c.Backend.Upload(ctx, "checkpoint", checkpoint, optsText); err != nil {
// Return an error so we don't produce SCTs that, although safely
// serialized, wouldn't be part of a publicly visible tree.
return fmtErrorf("couldn't upload checkpoint to object storage: %w", err)
// Copy the pending tiles to their final location.
if err := uploadPending(ctx, l.c, checkpoint, tree.Tree); err != nil {
// Return an error so we don't produce SCTs that, although committed,
// aren't yet part of a publicly visible tree.
return fmtErrorf("couldn't copy pending files to object storage: %w", err)
}

// At this point if the cache put fails, there's no reason to return errors
Expand Down
Loading

0 comments on commit b89dbd8

Please sign in to comment.