Skip to content

Commit

Permalink
Merge pull request #11135 from filecoin-project/sbansal/sector-index
Browse files Browse the repository at this point in the history
feat: sector index yugabyte implementation
  • Loading branch information
shrenujbansal authored Aug 22, 2023
2 parents 1d58bf0 + 1524748 commit 5b6daa2
Show file tree
Hide file tree
Showing 15 changed files with 1,211 additions and 14 deletions.
13 changes: 12 additions & 1 deletion cmd/lotus-miner/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/google/uuid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/mitchellh/go-homedir"
Expand Down Expand Up @@ -47,6 +48,7 @@ import (
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/fsjournal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
storageminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
Expand Down Expand Up @@ -463,7 +465,16 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))

si := paths.NewIndex(nil)
// TODO: run sector index init only for devnets. This is not needed for longer running networks
harmonyDB, err := harmonydb.New([]string{"127.0.0.1"}, "yugabyte", "yugabyte", "yugabyte", "5433", "",
func(s string) { logging.Logger("harmonydb").Error(s) })
if err != nil {
return err
}

enableSectorIndexDB := true

si := paths.NewIndexProxy(nil, harmonyDB, enableSectorIndexDB)

lstor, err := paths.NewLocal(ctx, lr, si, nil)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@
# env var: LOTUS_SUBSYSTEMS_ENABLEMARKETS
#EnableMarkets = false

# When enabled, the sector index will reside in an external database
# as opposed to the local KV store in the miner process
# This is useful to allow workers to bypass the lotus miner to access sector information
#
# type: bool
# env var: LOTUS_SUBSYSTEMS_ENABLESECTORINDEXDB
#EnableSectorIndexDB = false

# type: string
# env var: LOTUS_SUBSYSTEMS_SEALERAPIINFO
#SealerApiInfo = ""
Expand Down
4 changes: 2 additions & 2 deletions itests/harmonydb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestTransaction(t *testing.T) {
if _, err := cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (4), (5), (6)"); err != nil {
t.Fatal("E0", err)
}
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool) {
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
if _, err := tx.Exec("INSERT INTO itest_scratch (some_int) VALUES (7), (8), (9)"); err != nil {
t.Fatal("E1", err)
}
Expand All @@ -90,7 +90,7 @@ func TestTransaction(t *testing.T) {
if sum2 != 4+5+6+7+8+9 {
t.Fatal("Expected 39, got ", sum2)
}
return false // rollback
return false, nil // rollback
})
if err != nil {
t.Fatal("ET", err)
Expand Down
1 change: 1 addition & 0 deletions itests/path_type_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

func TestPathTypeFilters(t *testing.T) {

runTest := func(t *testing.T, name string, asserts func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func())) {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
41 changes: 41 additions & 0 deletions lib/harmony/harmonydb/sql/20230712.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
create table sector_location
(
miner_id bigint not null,
sector_num bigint not null,
sector_filetype int not null,
storage_id varchar not null,
is_primary bool,
read_ts timestamp(6),
read_refs int,
write_ts timestamp(6),
write_lock_owner varchar,
constraint sectorlocation_pk
primary key (miner_id, sector_num, sector_filetype, storage_id)
);



create table storage_path
(
"storage_id" varchar not null
constraint "storage_path_pkey"
primary key,
"urls" varchar, -- comma separated list of urls
"weight" bigint,
"max_storage" bigint,
"can_seal" bool,
"can_store" bool,
"groups" varchar, -- comma separated list of group names
"allow_to" varchar, -- comma separated list of allowed groups
"allow_types" varchar, -- comma separated list of allowed file types
"deny_types" varchar, -- comma separated list of denied file types

"capacity" bigint,
"available" bigint,
"fs_available" bigint,
"reserved" bigint,
"used" bigint,
"last_heartbeat" timestamp(6),
"heartbeat_err" varchar
);

7 changes: 5 additions & 2 deletions lib/harmony/harmonydb/userfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type Tx struct {
// BeginTransaction is how you can access transactions using this library.
// The entire transaction happens in the function passed in.
// The return must be true or a rollback will occur.
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (didCommit bool, retErr error) {
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return false, err
Expand All @@ -111,7 +111,10 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (
retErr = tx.Rollback(ctx)
}
}()
commit = f(&Tx{tx, ctx})
commit, err = f(&Tx{tx, ctx})
if err != nil {
return false, err
}
if commit {
err := tx.Commit(ctx)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions node/builder_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,14 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(sectorblocks.SectorBuilder), From(new(*sealing.Sealing))),
),

Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) {
return harmonydb.NewFromConfigWithITestID(cfg)(id)
}),

If(cfg.Subsystems.EnableSectorStorage,
// Sector storage
Override(new(*paths.Index), paths.NewIndex),
Override(new(paths.SectorIndex), From(new(*paths.Index))),
Override(new(*paths.IndexProxy), paths.NewIndexProxyHelper(cfg.Subsystems.EnableSectorIndexDB)),
Override(new(paths.SectorIndex), From(new(*paths.IndexProxy))),
Override(new(*sectorstorage.Manager), modules.SectorStorage),
Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))),
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
Expand Down Expand Up @@ -234,9 +238,6 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(config.HarmonyDB), cfg.HarmonyDB),
Override(new(harmonydb.ITestID), harmonydb.ITestID("")),
Override(new(*ctladdr.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) {
return harmonydb.NewFromConfigWithITestID(cfg)(id)
}),
)
}

Expand Down
1 change: 1 addition & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func DefaultStorageMiner() *StorageMiner {
EnableSealing: true,
EnableSectorStorage: true,
EnableMarkets: false,
EnableSectorIndexDB: false,
},

Fees: MinerFeeConfig{
Expand Down
2 changes: 2 additions & 0 deletions node/config/def_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func TestDefaultMinerRoundtrip(t *testing.T) {

fmt.Println(s)

fmt.Println(c)
fmt.Println(c2)
require.True(t, reflect.DeepEqual(c, c2))
}

Expand Down
8 changes: 8 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ type MinerSubsystemConfig struct {
EnableSectorStorage bool
EnableMarkets bool

// When enabled, the sector index will reside in an external database
// as opposed to the local KV store in the miner process
// This is useful to allow workers to bypass the lotus miner to access sector information
EnableSectorIndexDB bool

SealerApiInfo string // if EnableSealing == false
SectorIndexApiInfo string // if EnableSectorStorage == false
}
Expand Down
Loading

0 comments on commit 5b6daa2

Please sign in to comment.