Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: lotus-provider: Serialization fix #11501

Merged
merged 5 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/lotus-provider/proving.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ var wdPostTaskCmd = &cli.Command{
return xerrors.Errorf("cannot get miner id %w", err)
}
var id int64
retryDelay := time.Millisecond * 10
retryAddTask:
_, err = deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id)
if err != nil {
Expand All @@ -102,6 +104,11 @@ var wdPostTaskCmd = &cli.Command{
return true, nil
})
if err != nil {
if harmonydb.IsErrSerialization(err) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we be indefinitely looping here, or there should be a max retry?

time.Sleep(retryDelay)
retryDelay *= 2
goto retryAddTask
}
return xerrors.Errorf("writing SQL transaction: %w", err)
}
fmt.Printf("Inserted task %v. Waiting for success ", id)
Expand Down
4 changes: 4 additions & 0 deletions lib/harmony/harmonydb/harmonydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

logging "github.com/ipfs/go-log/v2"
Expand All @@ -33,6 +35,8 @@ type DB struct {
cfg *pgxpool.Config
schema string
hostnames []string
BTFPOnce sync.Once
BTFP atomic.Uintptr
}

var logger = logging.Logger("harmonydb")
Expand Down
47 changes: 47 additions & 0 deletions lib/harmony/harmonydb/userfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package harmonydb
import (
"context"
"errors"
"runtime"

"github.com/georgysavva/scany/v2/pgxscan"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/samber/lo"
)

var errTx = errors.New("Cannot use a non-transaction func in a transaction")

// rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries.
// In any package, raw strings will satisfy compilation. Ex:
//
Expand All @@ -22,6 +26,9 @@ type rawStringOnly string
// Note, for CREATE & DROP please keep these permanent and express
// them in the ./sql/ files (next number).
func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error) {
if db.usedInTransaction() {
return 0, errTx
}
res, err := db.pgx.Exec(ctx, string(sql), arguments...)
return int(res.RowsAffected()), err
}
Expand Down Expand Up @@ -55,6 +62,9 @@ type Query struct {
// fmt.Println(id, name)
// }
func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) {
if db.usedInTransaction() {
return &Query{}, errTx
}
q, err := db.pgx.Query(ctx, string(sql), arguments...)
return &Query{q}, err
}
Expand All @@ -66,6 +76,10 @@ type Row interface {
Scan(...any) error
}

type rowErr struct{}

func (rowErr) Scan(_ ...any) error { return errTx }

// QueryRow gets 1 row using column order matching.
// This is a timesaver for the special case of wanting the first row returned only.
// EX:
Expand All @@ -74,6 +88,9 @@ type Row interface {
// var ID = 123
// err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet)
func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row {
if db.usedInTransaction() {
return rowErr{}
}
return db.pgx.QueryRow(ctx, string(sql), arguments...)
}

Expand All @@ -92,6 +109,9 @@ Ex:
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)
*/
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error {
if db.usedInTransaction() {
return errTx
}
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...)
}

Expand All @@ -100,10 +120,32 @@ type Tx struct {
ctx context.Context
}

// usedInTransaction is a helper to prevent nesting transactions
// & non-transaction calls in transactions. It only checks 20 frames.
// Fast: This memory should all be in CPU Caches.
func (db *DB) usedInTransaction() bool {
snadrus marked this conversation as resolved.
Show resolved Hide resolved
var framePtrs = (&[20]uintptr{})[:] // 20 can be stack-local (no alloc)
framePtrs = framePtrs[:runtime.Callers(3, framePtrs)] // skip past our caller.
return lo.Contains(framePtrs, db.BTFP.Load()) // Unsafe read @ beginTx overlap, but 'return false' is correct there.
}

// BeginTransaction is how you can access transactions using this library.
// The entire transaction happens in the function passed in.
// The return must be true or a rollback will occur.
// Be sure to test the error for IsErrSerialization() if you want to retry
//
// when there is a DB serialization error.
//
//go:noinline
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
db.BTFPOnce.Do(func() {
fp := make([]uintptr, 20)
runtime.Callers(1, fp)
db.BTFP.Store(fp[0])
})
if db.usedInTransaction() {
return false, errTx
}
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return false, err
Expand Down Expand Up @@ -156,3 +198,8 @@ func IsErrUniqueContraint(err error) bool {
var e2 *pgconn.PgError
return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation
}

func IsErrSerialization(err error) bool {
var e2 *pgconn.PgError
return errors.As(err, &e2) && e2.Code == pgerrcode.SerializationFailure
}
15 changes: 14 additions & 1 deletion lib/harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type taskTypeHandler struct {

func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) {
var tID TaskID
retryWait := time.Millisecond * 100
retryAddTask:
_, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
// create taskID (from DB)
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time)
Expand All @@ -44,6 +46,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
log.Debugf("addtask(%s) saw unique constraint, so it's added already.", h.Name)
return
}
if harmonydb.IsErrSerialization(err) {
time.Sleep(retryWait)
retryWait *= 2
goto retryAddTask
}
log.Error("Could not add task. AddTasFunc failed: %v", err)
return
}
Expand Down Expand Up @@ -161,7 +168,8 @@ top:

func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
workEnd := time.Now()

retryWait := time.Millisecond * 100
retryRecordCompletion:
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
var postedTime time.Time
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)
Expand Down Expand Up @@ -214,6 +222,11 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, wo
return true, nil
})
if err != nil {
if harmonydb.IsErrSerialization(err) {
time.Sleep(retryWait)
retryWait *= 2
goto retryRecordCompletion
}
log.Error("Could not record transaction: ", err)
return
}
Expand Down
44 changes: 32 additions & 12 deletions storage/paths/db_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,13 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
}
}

retryWait := time.Millisecond * 100
retryAttachStorage:
// Single transaction to attach storage which is not present in the DB
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {

var urls sql.NullString
var storageId sql.NullString
err = dbi.harmonyDB.QueryRow(ctx,
err = tx.QueryRow(
"Select storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
return false, xerrors.Errorf("storage attach select fails: %v", err)
Expand All @@ -200,7 +201,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
}
currUrls = union(currUrls, si.URLs)

_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
"UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10",
strings.Join(currUrls, ","),
si.Weight,
Expand All @@ -220,7 +221,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
}

// Insert storage id
_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
"INSERT INTO storage_path "+
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
si.ID,
Expand All @@ -245,6 +246,11 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
return true, nil
})
if err != nil {
if harmonydb.IsErrSerialization(err) {
time.Sleep(retryWait)
retryWait *= 2
goto retryAttachStorage
}
return err
}

Expand Down Expand Up @@ -284,22 +290,29 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri

log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
} else {
retryWait := time.Millisecond * 100
retryDropPath:
// Single transaction to drop storage path and sector decls which have this as a storage path
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// Drop storage path completely
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storage_path WHERE storage_id=$1", id)
_, err = tx.Exec("DELETE FROM storage_path WHERE storage_id=$1", id)
if err != nil {
return false, err
}

// Drop all sectors entries which use this storage path
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sector_location WHERE storage_id=$1", id)
_, err = tx.Exec("DELETE FROM sector_location WHERE storage_id=$1", id)
if err != nil {
return false, err
}
return true, nil
})
if err != nil {
if harmonydb.IsErrSerialization(err) {
time.Sleep(retryWait)
retryWait *= 2
goto retryDropPath
}
return err
}
log.Warnw("Dropping sector storage", "path", id)
Expand Down Expand Up @@ -373,9 +386,11 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
return xerrors.Errorf("invalid filetype")
}

retryWait := time.Millisecond * 100
retryStorageDeclareSector:
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var currPrimary sql.NullBool
err = dbi.harmonyDB.QueryRow(ctx,
err = tx.QueryRow(
"SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary)
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
Expand All @@ -385,7 +400,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
// If storage id already exists for this sector, update primary if need be
if currPrimary.Valid {
if !currPrimary.Bool && primary {
_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
"UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
s.Miner, s.Number, ft, storageID)
if err != nil {
Expand All @@ -395,7 +410,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
log.Warnf("sector %v redeclared in %s", s, storageID)
}
} else {
_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
"INSERT INTO sector_location "+
"values($1, $2, $3, $4, $5)",
s.Miner, s.Number, ft, storageID, primary)
Expand All @@ -407,6 +422,11 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
return true, nil
})
if err != nil {
if harmonydb.IsErrSerialization(err) {
time.Sleep(retryWait)
retryWait *= 2
goto retryStorageDeclareSector
}
return err
}

Expand Down Expand Up @@ -750,7 +770,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {

fts := (read | write).AllSet()
err = dbi.harmonyDB.Select(ctx, &rows,
err = tx.Select(&rows,
`SELECT sector_filetype, read_ts, read_refs, write_ts
FROM sector_location
WHERE miner_id=$1
Expand Down Expand Up @@ -792,7 +812,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
}

// Acquire write locks
_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
`UPDATE sector_location
SET write_ts = NOW(), write_lock_owner = $1
WHERE miner_id=$2
Expand All @@ -807,7 +827,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
}

// Acquire read locks
_, err = dbi.harmonyDB.Exec(ctx,
_, err = tx.Exec(
`UPDATE sector_location
SET read_ts = NOW(), read_refs = read_refs + 1
WHERE miner_id=$1
Expand Down