Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sector index yugabyte implementation #11135

Merged
merged 21 commits into from
Aug 22, 2023
5 changes: 4 additions & 1 deletion cmd/lotus-miner/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,13 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))

// TODO: run sector index init only for devnets. This is not needed for longer running networks
harmonyDB, err := harmonydb.New([]string{"127.0.0.1"}, "yugabyte", "yugabyte", "yugabyte", "5433", "",
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
func(s string) { logging.Logger("harmonydb").Error(s) })
if err != nil {
return err
}

// TODO: get this bool from miner init cmd line
enableSectorIndexDB := true

si := paths.NewIndexProxy(nil, harmonyDB, enableSectorIndexDB)
Expand Down
7 changes: 3 additions & 4 deletions itests/harmonydb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ func TestCrud(t *testing.T) {
Animal string `db:"content"`
Unpopulated int
}
contentfilter := []string{"cows", "cats"}
err = cdb.Select(ctx, &ints, "SELECT content, some_int FROM itest_scratch where content = ANY($1)", contentfilter)
err = cdb.Select(ctx, &ints, "SELECT content, some_int FROM itest_scratch")
if err != nil {
t.Fatal("Could not select: ", err)
}
Expand All @@ -69,7 +68,7 @@ func TestTransaction(t *testing.T) {
if _, err := cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (4), (5), (6)"); err != nil {
t.Fatal("E0", err)
}
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool) {
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
if _, err := tx.Exec("INSERT INTO itest_scratch (some_int) VALUES (7), (8), (9)"); err != nil {
t.Fatal("E1", err)
}
Expand All @@ -91,7 +90,7 @@ func TestTransaction(t *testing.T) {
if sum2 != 4+5+6+7+8+9 {
t.Fatal("Expected 39, got ", sum2)
}
return false // rollback
return false, nil // rollback
})
if err != nil {
t.Fatal("ET", err)
Expand Down
31 changes: 16 additions & 15 deletions lib/harmony/harmonydb/sql/20230712.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
create table SectorLocation
(
"miner_id" int8,
"sector_num" int8,
"miner_id" bigint,
"sector_num" bigint,
"sector_filetype" int,
"storage_id" varchar,
"is_primary" bool,
constraint SectorLocation_pk
primary key ("miner_id", "sector_num", "sector_filetype", "storage_id")
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved

-- TODO: Maybe add index on above PK fields
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
);


Expand All @@ -15,22 +17,21 @@ create table StorageLocation
"storage_id" varchar not null
constraint "StorageLocation_pkey"
primary key,
"urls" varchar,
"weight" int8,
"max_storage" int8,
"urls" varchar, -- comma separated list of urls
"weight" bigint,
"max_storage" bigint,
"can_seal" bool,
"can_store" bool,
"groups" varchar,
"allow_to" varchar,
"allow_types" varchar,
"deny_types" varchar,
"groups" varchar, -- comma separated list of group names
"allow_to" varchar, -- comma separated list of allowed groups
"allow_types" varchar, -- comma separated list of allowed file types
"deny_types" varchar, -- comma separated list of denied file types

"capacity" int8,
"available" int8,
"fs_available" int8,
"reserved" int8,
-- "MaxStorage" int8,
"used" int8,
"capacity" bigint,
"available" bigint,
"fs_available" bigint,
"reserved" bigint,
"used" bigint,
"last_heartbeat" timestamp(6),
"heartbeat_err" varchar
);
Expand Down
9 changes: 6 additions & 3 deletions lib/harmony/harmonydb/userfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Ex:
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)
*/
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql string, arguments ...any) error {
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...)
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, sql, arguments...)
}

type Tx struct {
Expand All @@ -100,7 +100,7 @@ type Tx struct {
// BeginTransaction is how you can access transactions using this library.
// The entire transaction happens in the function passed in.
// The return must be true or a rollback will occur.
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (didCommit bool, retErr error) {
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return false, err
Expand All @@ -111,7 +111,10 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (
retErr = tx.Rollback(ctx)
}
}()
commit = f(&Tx{tx, ctx})
commit, err = f(&Tx{tx, ctx})
if err != nil {
return false, err
}
if commit {
err := tx.Commit(ctx)
if err != nil {
Expand Down
Loading