diff --git a/cmd/lotus-miner/init.go b/cmd/lotus-miner/init.go index c109e85b980..80bb3fbf6d7 100644 --- a/cmd/lotus-miner/init.go +++ b/cmd/lotus-miner/init.go @@ -16,6 +16,7 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/mitchellh/go-homedir" @@ -47,6 +48,7 @@ import ( "github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/journal/fsjournal" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" storageminer "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules" @@ -463,7 +465,16 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix)) smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix)) - si := paths.NewIndex(nil) + // TODO: run sector index init only for devnets. This is not needed for longer running networks + harmonyDB, err := harmonydb.New([]string{"127.0.0.1"}, "yugabyte", "yugabyte", "yugabyte", "5433", "", + func(s string) { logging.Logger("harmonydb").Error(s) }) + if err != nil { + return err + } + + enableSectorIndexDB := true + + si := paths.NewIndexProxy(nil, harmonyDB, enableSectorIndexDB) lstor, err := paths.NewLocal(ctx, lr, si, nil) if err != nil { diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index 9c3f3afb298..af409c5ad82 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -145,6 +145,14 @@ # env var: LOTUS_SUBSYSTEMS_ENABLEMARKETS #EnableMarkets = false + # When enabled, the sector index will reside in an external database + # as opposed to the local KV store in the miner process + # This is useful to allow workers to bypass the lotus miner to access sector information + # + # type: bool + # env var: LOTUS_SUBSYSTEMS_ENABLESECTORINDEXDB + #EnableSectorIndexDB = false + # type: string # env var: LOTUS_SUBSYSTEMS_SEALERAPIINFO #SealerApiInfo = "" diff --git a/itests/harmonydb_test.go b/itests/harmonydb_test.go index b52a2aa8f79..7c0c22d888d 100644 --- a/itests/harmonydb_test.go +++ b/itests/harmonydb_test.go @@ -68,7 +68,7 @@ func TestTransaction(t *testing.T) { if _, err := cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (4), (5), (6)"); err != nil { t.Fatal("E0", err) } - _, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool) { + _, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { if _, err := tx.Exec("INSERT INTO itest_scratch (some_int) VALUES (7), (8), (9)"); err != nil { t.Fatal("E1", err) } @@ -90,7 +90,7 @@ func TestTransaction(t *testing.T) { if sum2 != 4+5+6+7+8+9 { t.Fatal("Expected 39, got ", sum2) } - return false // rollback + return false, nil // rollback }) if err != nil { t.Fatal("ET", err) diff --git a/itests/path_type_filters_test.go b/itests/path_type_filters_test.go index d41e2c2159a..c668976ac2d 100644 --- a/itests/path_type_filters_test.go +++ b/itests/path_type_filters_test.go @@ -15,6 +15,7 @@ import ( ) func TestPathTypeFilters(t *testing.T) { + runTest := func(t *testing.T, name string, asserts func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func())) { t.Run(name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/lib/harmony/harmonydb/sql/20230712.sql b/lib/harmony/harmonydb/sql/20230712.sql new file mode 100644 index 00000000000..37024b47116 --- /dev/null +++ b/lib/harmony/harmonydb/sql/20230712.sql @@ -0,0 +1,41 @@ +create table sector_location +( + miner_id bigint not null, + sector_num bigint not null, + sector_filetype int not null, + storage_id varchar not null, + is_primary bool, + read_ts timestamp(6), + read_refs int, + write_ts timestamp(6), + write_lock_owner varchar, + constraint sectorlocation_pk + primary key (miner_id, sector_num, sector_filetype, storage_id) +); + + + +create table storage_path +( + "storage_id" varchar not null + constraint "storage_path_pkey" + primary key, + "urls" varchar, -- comma separated list of urls + "weight" bigint, + "max_storage" bigint, + "can_seal" bool, + "can_store" bool, + "groups" varchar, -- comma separated list of group names + "allow_to" varchar, -- comma separated list of allowed groups + "allow_types" varchar, -- comma separated list of allowed file types + "deny_types" varchar, -- comma separated list of denied file types + + "capacity" bigint, + "available" bigint, + "fs_available" bigint, + "reserved" bigint, + "used" bigint, + "last_heartbeat" timestamp(6), + "heartbeat_err" varchar +); + diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index 4d35fd8ca7b..c4620d44900 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -100,7 +100,7 @@ type Tx struct { // BeginTransaction is how you can access transactions using this library. // The entire transaction happens in the function passed in. // The return must be true or a rollback will occur. -func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (didCommit bool, retErr error) { +func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) { tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{}) if err != nil { return false, err @@ -111,7 +111,10 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) ( retErr = tx.Rollback(ctx) } }() - commit = f(&Tx{tx, ctx}) + commit, err = f(&Tx{tx, ctx}) + if err != nil { + return false, err + } if commit { err := tx.Commit(ctx) if err != nil { diff --git a/node/builder_miner.go b/node/builder_miner.go index bd81f426562..23dc9c5424e 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -126,10 +126,14 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(sectorblocks.SectorBuilder), From(new(*sealing.Sealing))), ), + Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) { + return harmonydb.NewFromConfigWithITestID(cfg)(id) + }), + If(cfg.Subsystems.EnableSectorStorage, // Sector storage - Override(new(*paths.Index), paths.NewIndex), - Override(new(paths.SectorIndex), From(new(*paths.Index))), + Override(new(*paths.IndexProxy), paths.NewIndexProxyHelper(cfg.Subsystems.EnableSectorIndexDB)), + Override(new(paths.SectorIndex), From(new(*paths.IndexProxy))), Override(new(*sectorstorage.Manager), modules.SectorStorage), Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))), Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))), @@ -234,9 +238,6 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(config.HarmonyDB), cfg.HarmonyDB), Override(new(harmonydb.ITestID), harmonydb.ITestID("")), Override(new(*ctladdr.AddressSelector), modules.AddressSelector(&cfg.Addresses)), - Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) { - return harmonydb.NewFromConfigWithITestID(cfg)(id) - }), ) } diff --git a/node/config/def.go b/node/config/def.go index 47d314fc04b..aba7e340d99 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -235,6 +235,7 @@ func DefaultStorageMiner() *StorageMiner { EnableSealing: true, EnableSectorStorage: true, EnableMarkets: false, + EnableSectorIndexDB: false, }, Fees: MinerFeeConfig{ diff --git a/node/config/def_test.go b/node/config/def_test.go index 1739339a2d0..adbb44b6038 100644 --- a/node/config/def_test.go +++ b/node/config/def_test.go @@ -71,6 +71,8 @@ func TestDefaultMinerRoundtrip(t *testing.T) { fmt.Println(s) + fmt.Println(c) + fmt.Println(c2) require.True(t, reflect.DeepEqual(c, c2)) } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index d8976d56084..9c19d1953d5 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -747,6 +747,14 @@ over the worker address if this flag is set.`, Comment: ``, }, + { + Name: "EnableSectorIndexDB", + Type: "bool", + + Comment: `When enabled, the sector index will reside in an external database +as opposed to the local KV store in the miner process +This is useful to allow workers to bypass the lotus miner to access sector information`, + }, { Name: "SealerApiInfo", Type: "string", diff --git a/node/config/types.go b/node/config/types.go index c0b6c97b7fc..21c92e47b26 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -110,6 +110,11 @@ type MinerSubsystemConfig struct { EnableSectorStorage bool EnableMarkets bool + // When enabled, the sector index will reside in an external database + // as opposed to the local KV store in the miner process + // This is useful to allow workers to bypass the lotus miner to access sector information + EnableSectorIndexDB bool + SealerApiInfo string // if EnableSealing == false SectorIndexApiInfo string // if EnableSectorStorage == false } diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go new file mode 100644 index 00000000000..7f8bc6ebce1 --- /dev/null +++ b/storage/paths/db_index.go @@ -0,0 +1,1001 @@ +package paths + +import ( + "context" + "database/sql" + "errors" + "fmt" + "net/url" + gopath "path" + "strings" + "time" + + "github.com/google/uuid" + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/journal/alerting" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/metrics" + "github.com/filecoin-project/lotus/storage/sealer/fsutil" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +var errAlreadyLocked = errors.New("already locked") + +type DBIndex struct { + alerting *alerting.Alerting + pathAlerts map[storiface.ID]alerting.AlertType + + harmonyDB *harmonydb.DB +} + +func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex { + return &DBIndex{ + harmonyDB: db, + + alerting: al, + pathAlerts: map[storiface.ID]alerting.AlertType{}, + } +} + +func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { + + var sectorEntries []struct { + StorageId string + MinerId sql.NullInt64 + SectorNum sql.NullInt64 + SectorFiletype sql.NullInt32 + IsPrimary sql.NullBool + } + + err := dbi.harmonyDB.Select(ctx, §orEntries, + "SELECT stor.storage_id, miner_id, sector_num, sector_filetype, is_primary FROM storage_path stor LEFT JOIN sector_location sec on stor.storage_id=sec.storage_id") + if err != nil { + return nil, xerrors.Errorf("StorageList DB query fails: %v", err) + } + + byID := map[storiface.ID]map[abi.SectorID]storiface.SectorFileType{} + for _, entry := range sectorEntries { + id := storiface.ID(entry.StorageId) + _, ok := byID[id] + if !ok { + byID[id] = map[abi.SectorID]storiface.SectorFileType{} + } + + // skip sector info for storage paths with no sectors + if !entry.MinerId.Valid { + continue + } + + sectorId := abi.SectorID{ + Miner: abi.ActorID(entry.MinerId.Int64), + Number: abi.SectorNumber(entry.SectorNum.Int64), + } + + byID[id][sectorId] |= storiface.SectorFileType(entry.SectorFiletype.Int32) + } + + out := map[storiface.ID][]storiface.Decl{} + for id, m := range byID { + out[id] = []storiface.Decl{} + for sectorID, fileType := range m { + out[id] = append(out[id], storiface.Decl{ + SectorID: sectorID, + SectorFileType: fileType, + }) + } + } + + return out, nil +} + +func union(a, b []string) []string { + m := make(map[string]bool) + + for _, elem := range a { + m[elem] = true + } + + for _, elem := range b { + if _, ok := m[elem]; !ok { + a = append(a, elem) + } + } + return a +} + +func splitString(str string) []string { + if str == "" { + return []string{} + } + return strings.Split(str, ",") +} + +func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { + var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) + + if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert { + dbi.pathAlerts[si.ID] = dbi.alerting.AddAlertType("sector-index", "pathconf-"+string(si.ID)) + } + + var hasConfigIssues bool + + for id, typ := range si.AllowTypes { + _, err := storiface.TypeFromString(typ) + if err != nil { + //No need to hard-fail here, just warn the user + //(note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path) + hasConfigIssues = true + + if dbi.alerting != nil { + dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{ + "message": "bad path type in AllowTypes", + "path": string(si.ID), + "idx": id, + "path_type": typ, + "error": err.Error(), + }) + } + + continue + } + allow = append(allow, typ) + } + for id, typ := range si.DenyTypes { + _, err := storiface.TypeFromString(typ) + if err != nil { + //No need to hard-fail here, just warn the user + hasConfigIssues = true + + if dbi.alerting != nil { + dbi.alerting.Raise(dbi.pathAlerts[si.ID], map[string]interface{}{ + "message": "bad path type in DenyTypes", + "path": string(si.ID), + "idx": id, + "path_type": typ, + "error": err.Error(), + }) + } + + continue + } + deny = append(deny, typ) + } + si.AllowTypes = allow + si.DenyTypes = deny + + if dbi.alerting != nil && !hasConfigIssues && dbi.alerting.IsRaised(dbi.pathAlerts[si.ID]) { + dbi.alerting.Resolve(dbi.pathAlerts[si.ID], map[string]string{ + "message": "path config is now correct", + }) + } + + for _, u := range si.URLs { + if _, err := url.Parse(u); err != nil { + return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err) + } + } + + // Single transaction to attach storage which is not present in the DB + _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + + var urls sql.NullString + var storageId sql.NullString + err = dbi.harmonyDB.QueryRow(ctx, + "Select storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls) + if err != nil && !strings.Contains(err.Error(), "no rows in result set") { + return false, xerrors.Errorf("storage attach select fails: %v", err) + } + + // Storage ID entry exists + // TODO: Consider using insert into .. on conflict do update set ... below + if storageId.Valid { + var currUrls []string + if urls.Valid { + currUrls = strings.Split(urls.String, ",") + } + currUrls = union(currUrls, si.URLs) + + _, err = dbi.harmonyDB.Exec(ctx, + "UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10", + strings.Join(currUrls, ","), + si.Weight, + si.MaxStorage, + si.CanSeal, + si.CanStore, + strings.Join(si.Groups, ","), + strings.Join(si.AllowTo, ","), + strings.Join(si.AllowTypes, ","), + strings.Join(si.DenyTypes, ","), + si.ID) + if err != nil { + return false, xerrors.Errorf("storage attach UPDATE fails: %v", err) + } + + return true, nil + } + + // Insert storage id + _, err = dbi.harmonyDB.Exec(ctx, + "INSERT INTO storage_path "+ + "Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)", + si.ID, + strings.Join(si.URLs, ","), + si.Weight, + si.MaxStorage, + si.CanSeal, + si.CanStore, + strings.Join(si.Groups, ","), + strings.Join(si.AllowTo, ","), + strings.Join(si.AllowTypes, ","), + strings.Join(si.DenyTypes, ","), + st.Capacity, + st.Available, + st.FSAvailable, + st.Reserved, + st.Used, + time.Now()) + if err != nil { + return false, xerrors.Errorf("StorageAttach insert fails: %v", err) + } + return true, nil + }) + if err != nil { + return err + } + + return nil +} + +func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url string) error { + + // If url not in path urls, error out + // if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path + + var qUrls string + err := dbi.harmonyDB.QueryRow(ctx, "SELECT COALESCE(urls,'') FROM storage_path WHERE storage_id=$1", string(id)).Scan(&qUrls) + if err != nil { + return err + } + urls := splitString(qUrls) + + var modUrls []string + for _, u := range urls { + if u != url { + modUrls = append(modUrls, u) + } + } + + // noop if url doesn't exist in urls + if len(modUrls) == len(urls) { + return nil + } + + if len(modUrls) > 0 { + newUrls := strings.Join(modUrls, ",") + _, err := dbi.harmonyDB.Exec(ctx, "UPDATE storage_path set urls=$1 WHERE storage_id=$2", newUrls, id) + if err != nil { + return err + } + + log.Warnw("Dropping sector path endpoint", "path", id, "url", url) + } else { + // Single transaction to drop storage path and sector decls which have this as a storage path + _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + // Drop storage path completely + _, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storage_path WHERE storage_id=$1", id) + if err != nil { + return false, err + } + + // Drop all sectors entries which use this storage path + _, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sector_location WHERE storage_id=$1", id) + if err != nil { + return false, err + } + return true, nil + }) + if err != nil { + return err + } + log.Warnw("Dropping sector storage", "path", id) + } + + return nil +} + +func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { + + var canSeal, canStore bool + err := dbi.harmonyDB.QueryRow(ctx, + "SELECT can_seal, can_store FROM storage_path WHERE storage_id=$1", id).Scan(&canSeal, &canStore) + if err != nil { + return xerrors.Errorf("Querying for storage id %s fails with err %v", id, err) + } + + _, err = dbi.harmonyDB.Exec(ctx, + "UPDATE storage_path set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=$6", + report.Stat.Capacity, + report.Stat.Available, + report.Stat.FSAvailable, + report.Stat.Reserved, + report.Stat.Used, + time.Now()) + if err != nil { + return xerrors.Errorf("updating storage health in DB fails with err: %v", err) + } + + if report.Stat.Capacity > 0 { + ctx, _ = tag.New(ctx, + tag.Upsert(metrics.StorageID, string(id)), + tag.Upsert(metrics.PathStorage, fmt.Sprint(canStore)), + tag.Upsert(metrics.PathSeal, fmt.Sprint(canSeal)), + ) + + stats.Record(ctx, metrics.StorageFSAvailable.M(float64(report.Stat.FSAvailable)/float64(report.Stat.Capacity))) + stats.Record(ctx, metrics.StorageAvailable.M(float64(report.Stat.Available)/float64(report.Stat.Capacity))) + stats.Record(ctx, metrics.StorageReserved.M(float64(report.Stat.Reserved)/float64(report.Stat.Capacity))) + + stats.Record(ctx, metrics.StorageCapacityBytes.M(report.Stat.Capacity)) + stats.Record(ctx, metrics.StorageFSAvailableBytes.M(report.Stat.FSAvailable)) + stats.Record(ctx, metrics.StorageAvailableBytes.M(report.Stat.Available)) + stats.Record(ctx, metrics.StorageReservedBytes.M(report.Stat.Reserved)) + + if report.Stat.Max > 0 { + stats.Record(ctx, metrics.StorageLimitUsed.M(float64(report.Stat.Used)/float64(report.Stat.Max))) + stats.Record(ctx, metrics.StorageLimitUsedBytes.M(report.Stat.Used)) + stats.Record(ctx, metrics.StorageLimitMaxBytes.M(report.Stat.Max)) + } + } + + return nil +} + +// function to check if a filetype is valid +func (dbi *DBIndex) checkFileType(fileType storiface.SectorFileType) bool { + ftValid := false + for _, fileTypeValid := range storiface.PathTypes { + if fileTypeValid&fileType == 0 { + ftValid = true + break + } + } + return ftValid +} + +func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { + + if !dbi.checkFileType(ft) { + return xerrors.Errorf("invalid filetype") + } + + _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + var currPrimary sql.NullBool + err = dbi.harmonyDB.QueryRow(ctx, + "SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", + uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary) + if err != nil && !strings.Contains(err.Error(), "no rows in result set") { + return false, xerrors.Errorf("DB SELECT fails: %v", err) + } + + // If storage id already exists for this sector, update primary if need be + if currPrimary.Valid { + if !currPrimary.Bool && primary { + _, err = dbi.harmonyDB.Exec(ctx, + "UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", + s.Miner, s.Number, ft, storageID) + if err != nil { + return false, xerrors.Errorf("DB update fails: %v", err) + } + } else { + log.Warnf("sector %v redeclared in %s", s, storageID) + } + } else { + _, err = dbi.harmonyDB.Exec(ctx, + "INSERT INTO sector_location "+ + "values($1, $2, $3, $4, $5)", + s.Miner, s.Number, ft, storageID, primary) + if err != nil { + return false, xerrors.Errorf("DB insert fails: %v", err) + } + } + + return true, nil + }) + if err != nil { + return err + } + + return nil +} + +func (dbi *DBIndex) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error { + + if !dbi.checkFileType(ft) { + return xerrors.Errorf("invalid filetype") + } + + _, err := dbi.harmonyDB.Exec(ctx, + "DELETE FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", + int(s.Miner), int(s.Number), int(ft), string(storageID)) + if err != nil { + return xerrors.Errorf("StorageDropSector DELETE query fails: %v", err) + } + + return nil +} + +func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { + + var result []storiface.SectorStorageInfo + + allowList := make(map[string]struct{}) + storageWithSector := map[string]bool{} + + type dbRes struct { + StorageId string + Count uint64 + IsPrimary bool + Urls string + Weight uint64 + CanSeal bool + CanStore bool + Groups string + AllowTo string + AllowTypes string + DenyTypes string + } + + var rows []dbRes + + fts := ft.AllSet() + // Find all storage info which already hold this sector + filetype + err := dbi.harmonyDB.Select(ctx, &rows, + ` SELECT DISTINCT ON (stor.storage_id) + stor.storage_id, + COUNT(*) OVER(PARTITION BY stor.storage_id) as count, + BOOL_OR(is_primary) OVER(PARTITION BY stor.storage_id) AS is_primary, + urls, + weight, + can_seal, + can_store, + groups, + allow_to, + allow_types, + deny_types + FROM sector_location sec + JOIN storage_path stor ON sec.storage_id = stor.storage_id + WHERE sec.miner_id = $1 + AND sec.sector_num = $2 + AND sec.sector_filetype = ANY($3) + ORDER BY stor.storage_id`, + s.Miner, s.Number, fts) + if err != nil { + return nil, xerrors.Errorf("Finding sector storage from DB fails with err: %v", err) + } + + for _, row := range rows { + + // Parse all urls + var urls, burls []string + for _, u := range splitString(row.Urls) { + rl, err := url.Parse(u) + if err != nil { + return nil, xerrors.Errorf("failed to parse url: %w", err) + } + rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s)) + urls = append(urls, rl.String()) + burls = append(burls, u) + } + + result = append(result, storiface.SectorStorageInfo{ + ID: storiface.ID(row.StorageId), + URLs: urls, + BaseURLs: burls, + Weight: row.Weight * row.Count, + CanSeal: row.CanSeal, + CanStore: row.CanStore, + Primary: row.IsPrimary, + AllowTypes: splitString(row.AllowTypes), + DenyTypes: splitString(row.DenyTypes), + }) + + storageWithSector[row.StorageId] = true + + allowTo := splitString(row.AllowTo) + if allowList != nil && len(allowTo) > 0 { + for _, group := range allowTo { + allowList[group] = struct{}{} + } + } else { + allowList = nil // allow to any + } + } + + // Find all storage paths which can hold this sector if allowFetch is true + if allowFetch { + spaceReq, err := ft.SealSpaceUse(ssize) + if err != nil { + return nil, xerrors.Errorf("estimating required space: %w", err) + } + + // Conditions to satisfy when choosing a sector + // 1. CanSeal is true + // 2. Available >= spaceReq + // 3. curr_time - last_heartbeat < SkippedHeartbeatThresh + // 4. heartbeat_err is NULL + // 5. not one of the earlier picked storage ids + // 6. !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes) + // 7. Storage path is part of the groups which are allowed from the storage paths which already hold the sector + + var rows []struct { + StorageId string + Urls string + Weight uint64 + CanSeal bool + CanStore bool + Groups string + AllowTypes string + DenyTypes string + } + err = dbi.harmonyDB.Select(ctx, &rows, + `SELECT storage_id, + urls, + weight, + can_seal, + can_store, + groups, + allow_types, + deny_types + FROM storage_path + WHERE can_seal=true + and available >= $1 + and NOW()-last_heartbeat < $2 + and heartbeat_err is null`, + spaceReq, SkippedHeartbeatThresh) + if err != nil { + return nil, xerrors.Errorf("Selecting allowfetch storage paths from DB fails err: %v", err) + } + + for _, row := range rows { + if ok := storageWithSector[row.StorageId]; ok { + continue + } + + if !ft.AnyAllowed(splitString(row.AllowTypes), splitString(row.DenyTypes)) { + log.Debugf("not selecting on %s, not allowed by file type filters", row.StorageId) + continue + } + + if allowList != nil { + groups := splitString(row.Groups) + allow := false + for _, group := range groups { + if _, found := allowList[group]; found { + log.Debugf("path %s in allowed group %s", row.StorageId, group) + allow = true + break + } + } + + if !allow { + log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", row.StorageId, allowList, groups) + continue + } + } + + var urls, burls []string + for _, u := range splitString(row.Urls) { + rl, err := url.Parse(u) + if err != nil { + return nil, xerrors.Errorf("failed to parse url: %w", err) + } + rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s)) + urls = append(urls, rl.String()) + burls = append(burls, u) + } + + result = append(result, storiface.SectorStorageInfo{ + ID: storiface.ID(row.StorageId), + URLs: urls, + BaseURLs: burls, + Weight: row.Weight * 0, + CanSeal: row.CanSeal, + CanStore: row.CanStore, + Primary: false, + AllowTypes: splitString(row.AllowTypes), + DenyTypes: splitString(row.DenyTypes), + }) + } + } + + return result, nil +} + +func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) { + + var qResults []struct { + Urls string + Weight uint64 + MaxStorage uint64 + CanSeal bool + CanStore bool + Groups string + AllowTo string + AllowTypes string + DenyTypes string + } + + err := dbi.harmonyDB.Select(ctx, &qResults, + "SELECT urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types "+ + "FROM storage_path WHERE storage_id=$1", string(id)) + if err != nil { + return storiface.StorageInfo{}, xerrors.Errorf("StorageInfo query fails: %v", err) + } + + var sinfo storiface.StorageInfo + sinfo.ID = id + sinfo.URLs = splitString(qResults[0].Urls) + sinfo.Weight = qResults[0].Weight + sinfo.MaxStorage = qResults[0].MaxStorage + sinfo.CanSeal = qResults[0].CanSeal + sinfo.CanStore = qResults[0].CanStore + sinfo.Groups = splitString(qResults[0].Groups) + sinfo.AllowTo = splitString(qResults[0].AllowTo) + sinfo.AllowTypes = splitString(qResults[0].AllowTypes) + sinfo.DenyTypes = splitString(qResults[0].DenyTypes) + + return sinfo, nil +} + +func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) { + var err error + var spaceReq uint64 + switch pathType { + case storiface.PathSealing: + spaceReq, err = allocate.SealSpaceUse(ssize) + case storiface.PathStorage: + spaceReq, err = allocate.StoreSpaceUse(ssize) + default: + return nil, xerrors.Errorf("unexpected path type") + } + if err != nil { + return nil, xerrors.Errorf("estimating required space: %w", err) + } + + var rows []struct { + StorageId string + Urls string + Weight uint64 + MaxStorage uint64 + CanSeal bool + CanStore bool + Groups string + AllowTo string + AllowTypes string + DenyTypes string + } + + err = dbi.harmonyDB.Select(ctx, &rows, + `SELECT storage_id, + urls, + weight, + max_storage, + can_seal, + can_store, + groups, + allow_to, + allow_types, + deny_types + FROM storage_path + WHERE available >= $1 + and NOW()-last_heartbeat < $2 + and heartbeat_err is null + and ($3 and can_seal = TRUE or $4 and can_store = TRUE) + order by (available::numeric * weight) desc`, + spaceReq, + SkippedHeartbeatThresh, + pathType == storiface.PathSealing, + pathType == storiface.PathStorage, + ) + if err != nil { + return nil, xerrors.Errorf("Querying for best storage sectors fails with err %w: ", err) + } + + var result []storiface.StorageInfo + for _, row := range rows { + result = append(result, storiface.StorageInfo{ + ID: storiface.ID(row.StorageId), + URLs: splitString(row.Urls), + Weight: row.Weight, + MaxStorage: row.MaxStorage, + CanSeal: row.CanSeal, + CanStore: row.CanStore, + Groups: splitString(row.Groups), + AllowTo: splitString(row.AllowTo), + AllowTypes: splitString(row.AllowTypes), + DenyTypes: splitString(row.DenyTypes), + }) + } + + return result, nil +} + +// timeout after which we consider a lock to be stale +const LockTimeOut = 300 * time.Second + +func isLocked(ts sql.NullTime) bool { + return ts.Valid && ts.Time.After(time.Now().Add(-LockTimeOut)) +} + +func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType, lockUuid uuid.UUID) (bool, error) { + if read|write == 0 { + return false, nil + } + + if read|write > (1< (1<