Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sector index yugabyte implementation #11135

Merged
merged 21 commits into from
Aug 22, 2023
Merged
13 changes: 12 additions & 1 deletion cmd/lotus-miner/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/google/uuid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/mitchellh/go-homedir"
Expand Down Expand Up @@ -47,6 +48,7 @@ import (
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/fsjournal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
storageminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
Expand Down Expand Up @@ -463,7 +465,16 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))

si := paths.NewIndex(nil)
// TODO: run sector index init only for devnets. This is not needed for longer running networks
harmonyDB, err := harmonydb.New([]string{"127.0.0.1"}, "yugabyte", "yugabyte", "yugabyte", "5433", "",
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
func(s string) { logging.Logger("harmonydb").Error(s) })
if err != nil {
return err
}

enableSectorIndexDB := true

si := paths.NewIndexProxy(nil, harmonyDB, enableSectorIndexDB)

lstor, err := paths.NewLocal(ctx, lr, si, nil)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@
# env var: LOTUS_SUBSYSTEMS_ENABLEMARKETS
#EnableMarkets = false

# When enabled, the sector index will reside in an external database
# as opposed to the local KV store in the miner process
# This is useful to allow workers to bypass the lotus miner to access sector information
#
# type: bool
# env var: LOTUS_SUBSYSTEMS_ENABLESECTORINDEXDB
#EnableSectorIndexDB = false
magik6k marked this conversation as resolved.
Show resolved Hide resolved

# type: string
# env var: LOTUS_SUBSYSTEMS_SEALERAPIINFO
#SealerApiInfo = ""
Expand Down
4 changes: 2 additions & 2 deletions itests/harmonydb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestTransaction(t *testing.T) {
if _, err := cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (4), (5), (6)"); err != nil {
t.Fatal("E0", err)
}
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool) {
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
if _, err := tx.Exec("INSERT INTO itest_scratch (some_int) VALUES (7), (8), (9)"); err != nil {
t.Fatal("E1", err)
}
Expand All @@ -90,7 +90,7 @@ func TestTransaction(t *testing.T) {
if sum2 != 4+5+6+7+8+9 {
t.Fatal("Expected 39, got ", sum2)
}
return false // rollback
return false, nil // rollback
})
if err != nil {
t.Fatal("ET", err)
Expand Down
1 change: 1 addition & 0 deletions itests/path_type_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

func TestPathTypeFilters(t *testing.T) {

runTest := func(t *testing.T, name string, asserts func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func())) {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
41 changes: 41 additions & 0 deletions lib/harmony/harmonydb/sql/20230712.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
create table sector_location
(
miner_id bigint not null,
sector_num bigint not null,
sector_filetype int not null,
storage_id varchar not null,
is_primary bool,
read_ts timestamp(6),
read_refs int,
write_ts timestamp(6),
write_lock_owner varchar,
constraint sectorlocation_pk
primary key (miner_id, sector_num, sector_filetype, storage_id)
);



create table storage_path
(
"storage_id" varchar not null
constraint "storage_path_pkey"
primary key,
"urls" varchar, -- comma separated list of urls
"weight" bigint,
"max_storage" bigint,
"can_seal" bool,
"can_store" bool,
"groups" varchar, -- comma separated list of group names
"allow_to" varchar, -- comma separated list of allowed groups
"allow_types" varchar, -- comma separated list of allowed file types
"deny_types" varchar, -- comma separated list of denied file types

"capacity" bigint,
"available" bigint,
"fs_available" bigint,
"reserved" bigint,
"used" bigint,
"last_heartbeat" timestamp(6),
"heartbeat_err" varchar
);

11 changes: 7 additions & 4 deletions lib/harmony/harmonydb/userfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ Ex:
pet := "cat"
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)
*/
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error {
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...)
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql string, arguments ...any) error {
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, sql, arguments...)
}

type Tx struct {
Expand All @@ -100,7 +100,7 @@ type Tx struct {
// BeginTransaction is how you can access transactions using this library.
// The entire transaction happens in the function passed in.
// The return must be true or a rollback will occur.
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (didCommit bool, retErr error) {
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return false, err
Expand All @@ -111,7 +111,10 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (
retErr = tx.Rollback(ctx)
}
}()
commit = f(&Tx{tx, ctx})
commit, err = f(&Tx{tx, ctx})
if err != nil {
return false, err
}
if commit {
err := tx.Commit(ctx)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions node/builder_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,14 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(sectorblocks.SectorBuilder), From(new(*sealing.Sealing))),
),

Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) {
return harmonydb.NewFromConfigWithITestID(cfg)(id)
}),

If(cfg.Subsystems.EnableSectorStorage,
// Sector storage
Override(new(*paths.Index), paths.NewIndex),
Override(new(paths.SectorIndex), From(new(*paths.Index))),
Override(new(*paths.IndexProxy), paths.NewIndexProxyHelper(true)),
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
Override(new(paths.SectorIndex), From(new(*paths.IndexProxy))),
Override(new(*sectorstorage.Manager), modules.SectorStorage),
Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))),
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
Expand Down Expand Up @@ -234,9 +238,6 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(config.HarmonyDB), cfg.HarmonyDB),
Override(new(harmonydb.ITestID), harmonydb.ITestID("")),
Override(new(*ctladdr.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) {
return harmonydb.NewFromConfigWithITestID(cfg)(id)
}),
)
}

Expand Down
1 change: 1 addition & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func DefaultStorageMiner() *StorageMiner {
EnableSealing: true,
EnableSectorStorage: true,
EnableMarkets: false,
EnableSectorIndexDB: false,
},

Fees: MinerFeeConfig{
Expand Down
2 changes: 2 additions & 0 deletions node/config/def_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func TestDefaultMinerRoundtrip(t *testing.T) {

fmt.Println(s)

fmt.Println(c)
fmt.Println(c2)
require.True(t, reflect.DeepEqual(c, c2))
}

Expand Down
8 changes: 8 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ type MinerSubsystemConfig struct {
EnableSectorStorage bool
EnableMarkets bool

// When enabled, the sector index will reside in an external database
// as opposed to the local KV store in the miner process
// This is useful to allow workers to bypass the lotus miner to access sector information
EnableSectorIndexDB bool
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved

SealerApiInfo string // if EnableSealing == false
SectorIndexApiInfo string // if EnableSectorStorage == false
}
Expand Down
Loading