Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ctlog: write pending tiles to protect against concurrent sequencers #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 153 additions & 19 deletions internal/ctlog/ctlog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ctlog

import (
"archive/tar"
"bytes"
"context"
"crypto"
Expand All @@ -10,10 +11,12 @@ import (
"encoding/base64"
"errors"
"fmt"
"io"
"log/slog"
"maps"
"math"
"math/rand"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -141,6 +144,45 @@ func CreateLog(ctx context.Context, config *Config) error {
return nil
}

var testOnlyFailUploadPending bool

func uploadPending(ctx context.Context, config *Config, checkpoint []byte, tree tlog.Tree) error {
if testOnlyFailUploadPending {
return errors.Join(errFatal, errors.New("failing upload for test"))
}

pendingPath := fmt.Sprintf("pending/%d/%s.tar", tree.N, tree.Hash)
config.Log.DebugContext(ctx, "uploading pending files", "path", pendingPath)

tarData, err := config.Backend.Fetch(ctx, pendingPath)
if err != nil {
return fmt.Errorf("couldn't fetch pending files: %w", err)
}

pendingFiles, err := parsePendingTar(tarData)
if err != nil {
return fmtErrorf("error reading pending tar: %w", err)
}

g, gctx := errgroup.WithContext(ctx)
defer g.Wait()
for _, file := range pendingFiles {
g.Go(func() error {
return config.Backend.Upload(gctx, file.path, file.data, file.opts)
})
}
if err := g.Wait(); err != nil {
return fmtErrorf("couldn't upload a tile: %w", err)
}

config.Log.DebugContext(ctx, "uploading checkpoint", "key", checkpoint)
if err := config.Backend.Upload(ctx, "checkpoint", checkpoint, optsText); err != nil {
return fmtErrorf("couldn't upload checkpoint to object storage: %w", err)
}

return config.Backend.Delete(ctx, pendingPath)
}

func LoadLog(ctx context.Context, config *Config) (*Log, error) {
pkix, err := x509.MarshalPKIXPublicKey(config.Key.Public())
if err != nil {
Expand Down Expand Up @@ -207,11 +249,12 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) {
if c1.N < c.N {
// It's possible that we crashed between committing a new checkpoint to
// the lock backend and uploading it to the object storage backend.
// Or maybe the object storage backend GETs are cached.
// That's ok, as long as the rest of the tree load correctly against the
// lock checkpoint.
config.Log.WarnContext(ctx, "checkpoint in object storage is older than lock checkpoint",
"old_size", c1.N, "size", c.N)
config.Log.DebugContext(ctx, "uploading pending files")
if err := uploadPending(ctx, config, lock.Bytes(), c.Tree); err != nil {
return nil, fmt.Errorf("uploading pending files failed: %w", err)
}
}

pemIssuers, err := config.Backend.Fetch(ctx, "issuers.pem")
Expand Down Expand Up @@ -321,6 +364,9 @@ type Backend interface {
// uploaded with UploadOptions.Compress true.
Fetch(ctx context.Context, key string) ([]byte, error)

// Delete deletes a file.
Delete(ctx context.Context, key string) error

// Metrics returns the metrics to register for this log. The metrics should
// not be shared by any other logs.
Metrics() []prometheus.Collector
Expand All @@ -339,8 +385,39 @@ type UploadOptions struct {
Immutable bool
}

func parseTarUploadOptions(opts map[string]string) (*UploadOptions, error) {
var parsed UploadOptions
if contentType, ok := opts["SUNLIGHT.contenttype"]; ok {
parsed.ContentType = contentType
}
if compressStr, ok := opts["SUNLIGHT.compress"]; ok {
compress, err := strconv.ParseBool(compressStr)
if err != nil {
return nil, fmtErrorf("parsing SUNLIGHT.compress failed: %w", err)
}
parsed.Compress = compress
}
if immutableStr, ok := opts["SUNLIGHT.immutable"]; ok {
immutable, err := strconv.ParseBool(immutableStr)
if err != nil {
return nil, fmtErrorf("parsing SUNLIGHT.immutable failed: %w", err)
}
parsed.Immutable = immutable
}
return &parsed, nil
}

func marshalTarUploadOptions(opts *UploadOptions) map[string]string {
return map[string]string{
"SUNLIGHT.contenttype": opts.ContentType,
"SUNLIGHT.compress": strconv.FormatBool(opts.Compress),
"SUNLIGHT.immutable": strconv.FormatBool(opts.Immutable),
}
}

var optsHashTile = &UploadOptions{Immutable: true}
var optsDataTile = &UploadOptions{Compress: true, Immutable: true}
var optsPendingTar = &UploadOptions{Compress: true}
var optsText = &UploadOptions{ContentType: "text/plain; charset=utf-8"}

// A LockBackend is a database that supports compare-and-swap operations.
Expand Down Expand Up @@ -662,6 +739,57 @@ func (l *Log) sequence(ctx context.Context) error {
return l.sequencePool(ctx, p)
}

type pendingFile struct {
path string
data []byte
opts *UploadOptions
}

func parsePendingTar(tarData []byte) ([]*pendingFile, error) {
tarReader := tar.NewReader(bytes.NewReader(tarData))
var files []*pendingFile
for {
header, err := tarReader.Next()
if err != nil {
if err == io.EOF {
break
}
return nil, fmtErrorf("error reading pending header: %w", err)
}
data, err := io.ReadAll(tarReader)
if err != nil {
return nil, fmtErrorf("error reading pending data: %w", err)
}
opts, err := parseTarUploadOptions(header.PAXRecords)
if err != nil {
return nil, fmtErrorf("error parsing upload options: %w", err)
}
files = append(files, &pendingFile{path: header.Name, data: data, opts: opts})
}
return files, nil
}

func marshalPendingTar(files []*pendingFile) ([]byte, error) {
var buffer bytes.Buffer
writer := tar.NewWriter(&buffer)
for _, file := range files {
if err := writer.WriteHeader(&tar.Header{
Name: file.path,
Size: int64(len(file.data)),
PAXRecords: marshalTarUploadOptions(file.opts),
}); err != nil {
return nil, fmtErrorf("error writing tar header: %w", err)
}
if _, err := writer.Write(file.data); err != nil {
return nil, fmtErrorf("error writing tar data: %w", err)
}
}
if err := writer.Close(); err != nil {
return nil, fmtErrorf("error closing tar writer: %w", err)
}
return buffer.Bytes(), nil
}

func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
oldSize := l.tree.N
defer prometheus.NewTimer(l.m.SeqDuration).ObserveDuration()
Expand Down Expand Up @@ -689,14 +817,13 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, sequenceTimeout)
defer cancel()
g, gctx := errgroup.WithContext(ctx)
defer g.Wait()

timestamp := timeNowUnixMilli()
if timestamp <= l.tree.Time {
return errors.Join(errFatal, fmtErrorf("time did not progress! %d -> %d", l.tree.Time, timestamp))
}

var pendingFiles []*pendingFile

edgeTiles := maps.Clone(l.edgeTiles)
var dataTile []byte
// Load the current partial data tile, if any.
Expand Down Expand Up @@ -737,8 +864,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
"tree_size", n, "tile", tile, "size", len(dataTile))
l.m.SeqDataTileSize.Observe(float64(len(dataTile)))
tileCount++
data := dataTile // data is captured by the g.Go function.
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, optsDataTile) })
pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: dataTile, opts: optsDataTile})
dataTile = nil
}
}
Expand All @@ -752,13 +878,12 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
"tree_size", n, "tile", tile, "size", len(dataTile))
l.m.SeqDataTileSize.Observe(float64(len(dataTile)))
tileCount++
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), dataTile, optsDataTile) })
pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: dataTile, opts: optsDataTile})
}

// Produce and upload new tree tiles.
tiles := tlogx.NewTilesForSize(TileHeight, l.tree.N, n)
for _, tile := range tiles {
tile := tile // tile is captured by the g.Go function.
data, err := tlog.ReadTileData(tile, hashReader)
if err != nil {
return fmtErrorf("couldn't generate tile %v: %w", tile, err)
Expand All @@ -771,11 +896,7 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
l.c.Log.DebugContext(ctx, "uploading tree tile", "old_tree_size", oldSize,
"tree_size", n, "tile", tile, "size", len(data))
tileCount++
g.Go(func() error { return l.c.Backend.Upload(gctx, tile.Path(), data, optsHashTile) })
}

if err := g.Wait(); err != nil {
return fmtErrorf("couldn't upload a tile: %w", err)
pendingFiles = append(pendingFiles, &pendingFile{path: tile.Path(), data: data, opts: optsHashTile})
}

if testingOnlyPauseSequencing != nil {
Expand All @@ -788,6 +909,18 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
}
tree := treeWithTimestamp{Tree: tlog.Tree{N: n, Hash: rootHash}, Time: timestamp}

// Write all pending tiles for the new tree.
tarBytes, err := marshalPendingTar(pendingFiles)
if err != nil {
return fmtErrorf("error marshaling pending files: %w", err)
}

// TODO: use URL encoding for the hash instead?
pendingPath := fmt.Sprintf("pending/%d/%s.tar", tree.N, tree.Hash.String())
if err := l.c.Backend.Upload(ctx, pendingPath, tarBytes, optsPendingTar); err != nil {
return fmtErrorf("couldn't upload pending tar: %w", err)
}

checkpoint, err := signTreeHead(l.c.Name, l.logID, l.c.Key, tree)
if err != nil {
return fmtErrorf("couldn't sign checkpoint: %w", err)
Expand All @@ -812,10 +945,11 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
l.lockCheckpoint = newLock
l.edgeTiles = edgeTiles

if err := l.c.Backend.Upload(ctx, "checkpoint", checkpoint, optsText); err != nil {
// Return an error so we don't produce SCTs that, although safely
// serialized, wouldn't be part of a publicly visible tree.
return fmtErrorf("couldn't upload checkpoint to object storage: %w", err)
// Copy the pending tiles to their final location.
if err := uploadPending(ctx, l.c, checkpoint, tree.Tree); err != nil {
// Return an error so we don't produce SCTs that, although committed,
// aren't yet part of a publicly visible tree.
return fmtErrorf("couldn't copy pending files to object storage: %w", err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello! Sorry for resurrecting this so much later. I am finally doing a pass of large changes to Sunlight, and decided to go for a variant of this, thank you so much for bringing up the issue and proposing this solution.

I think this needs to be a fatal error, right? If we just fail the round and continue, the next round will make progress, but there will be missing tiles in the tree. (They will be in staging because we won't reach the Delete, so it will be recoverable, but still.)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, unsure how TestRecoverLog is passing, since it checks that Sequence returns an error, which only happens for fatal errors.

Copy link
Owner Author

@jellevandenhooff jellevandenhooff Aug 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello! Sorry for resurrecting this so much later. I am finally doing a pass of large changes to Sunlight, and decided to go for a variant of this, thank you so much for bringing up the issue and proposing this solution.

That's exciting to hear! No worries at all. Let me know if you would like me to update this PR or if you prefer your own code.

I think this needs to be a fatal error, right? If we just fail the round and continue, the next round will make progress, but there will be missing tiles in the tree. (They will be in staging because we won't reach the Delete, so it will be recoverable, but still.)

That sounds right to me. It would be good to spell out exactly what the protocol guarantees at each write to either the lock or storage backends. With a fatal error here you get the guarantee that every lock update is followed by a successful uploadPending call, and I agree that without it's not clear that all tiles will be uploaded.

Also, unsure how TestRecoverLog is passing, since it checks that Sequence returns an error, which only happens for fatal errors.

It's been a while and so I don't remember the details, but I think TestRecoverLog passes because uploadPending returns a fatal error in the test.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented this in FiloSottile@1ed5201. If you have time to review it, that would be amazing.

}

// At this point if the cache put fails, there's no reason to return errors
Expand Down
Loading