Skip to content

Commit

Permalink
Merge pull request #663 from iotexproject/task-processor-3
Browse files Browse the repository at this point in the history
prover main logic
  • Loading branch information
huangzhiran committed Sep 29, 2024
2 parents 0b3274c + 68a13a0 commit a6a8653
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 333 deletions.
26 changes: 13 additions & 13 deletions cmd/prover/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,26 @@ import (
)

type Config struct {
LogLevel slog.Level `env:"LOG_LEVEL,optional"`
VMEndpoints string `env:"VM_ENDPOINTS"`
DatabaseDSN string `env:"DATABASE_DSN"`
ChainEndpoint string `env:"CHAIN_ENDPOINT,optional"`
ProjectContractAddr string `env:"PROJECT_CONTRACT_ADDRESS,optional"`
ProverContractAddr string `env:"PROVER_CONTRACT_ADDRESS,optional"`
ProverOperatorPriKey string `env:"PROVER_OPERATOR_PRIVATE_KEY,optional"`
BeginningBlockNumber uint64 `env:"BEGINNING_BLOCK_NUMBER,optional"`
LocalDBDir string `env:"LOCAL_DB_DIRECTORY,optional"`
env string `env:"-"`
LogLevel slog.Level `env:"LOG_LEVEL,optional"`
VMEndpoints string `env:"VM_ENDPOINTS"`
DatasourceDSN string `env:"DATASOURCE_DSN"`
ChainEndpoint string `env:"CHAIN_ENDPOINT,optional"`
ProjectContractAddr string `env:"PROJECT_CONTRACT_ADDRESS,optional"`
RouterContractAddr string `env:"ROUTER_CONTRACT_ADDRESS,optional"`
TaskManagerContractAddr string `env:"TASK_MANAGER_CONTRACT_ADDRESS,optional"`
ProverOperatorPriKey string `env:"PROVER_OPERATOR_PRIVATE_KEY,optional"`
BeginningBlockNumber uint64 `env:"BEGINNING_BLOCK_NUMBER,optional"`
LocalDBDir string `env:"LOCAL_DB_DIRECTORY,optional"`
env string `env:"-"`
}

var (
defaultTestnetConfig = &Config{
LogLevel: slog.LevelInfo,
VMEndpoints: `{"1":"risc0:4001","2":"halo2:4001","3":"zkwasm:4001","4":"wasm:4001"}`,
ChainEndpoint: "https://babel-api.testnet.iotex.io",
DatabaseDSN: "postgres://test_user:test_passwd@postgres:5432/test?sslmode=disable",
ProjectContractAddr: "0xCBb7a80983Fd3405972F700101A82DB6304C6547",
ProverContractAddr: "0x6B544a7603cead52AdfD99AA64B3d798083cc4CC",
DatasourceDSN: "postgres://test_user:test_passwd@postgres:5432/test?sslmode=disable",
ProjectContractAddr: "0x6B544a7603cead52AdfD99AA64B3d798083cc4CC",
ProverOperatorPriKey: "a5f4e99aa80342d5451e8f8fd0dc357ccddb70d3827428fb1fc366f70833497f",
BeginningBlockNumber: 20000000,
LocalDBDir: "./local_db",
Expand Down
8 changes: 3 additions & 5 deletions cmd/prover/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ func TestConfig_Init(t *testing.T) {
expected := Config{
VMEndpoints: `{"1":"halo2:4001","2":"risc0:4001","3":"zkwasm:4001","4":"wasm:4001"}`,
ChainEndpoint: "http://abc.def.com",
ProjectContractAddr: "0x123",
DatabaseDSN: "postgres://root@localhost/abc?ext=666",
ProverContractAddr: "0x456",
DatasourceDSN: "postgres://root@localhost/abc?ext=666",
ProjectContractAddr: "0x456",
ProverOperatorPriKey: "private key",
LocalDBDir: "./test",
}

_ = os.Setenv("VM_ENDPOINTS", expected.VMEndpoints)
_ = os.Setenv("CHAIN_ENDPOINT", expected.ChainEndpoint)
_ = os.Setenv("DATABASE_DSN", expected.DatabaseDSN)
_ = os.Setenv("PROVER_CONTRACT_ADDRESS", expected.ProverContractAddr)
_ = os.Setenv("DATASOURCE_DSN", expected.DatasourceDSN)
_ = os.Setenv("PROJECT_CONTRACT_ADDRESS", expected.ProjectContractAddr)
_ = os.Setenv("PROVER_OPERATOR_PRIVATE_KEY", expected.ProverOperatorPriKey)
_ = os.Setenv("LOCAL_DB_DIRECTORY", expected.LocalDBDir)
Expand Down
12 changes: 10 additions & 2 deletions cmd/prover/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/logger"
)

type scannedBlockNumber struct {
Expand Down Expand Up @@ -147,8 +149,14 @@ func (p *DB) UnprocessedTask() (uint64, common.Hash, error) {
return t.ProjectID, t.TaskID, nil
}

func New(db *gorm.DB, prover common.Address) (*DB, error) {
if err := db.AutoMigrate(&task{}, &scannedBlockNumber{}); err != nil {
func New(localDBDir string, prover common.Address) (*DB, error) {
db, err := gorm.Open(sqlite.Open(localDBDir), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return nil, errors.Wrap(err, "failed to connect sqlite")
}
if err := db.AutoMigrate(&task{}, &scannedBlockNumber{}, &project{}, &projectFile{}); err != nil {
return nil, errors.Wrap(err, "failed to migrate model")
}
return &DB{db: db, prover: prover}, nil
Expand Down
52 changes: 34 additions & 18 deletions cmd/prover/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"os/signal"
"syscall"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"

"github.com/iotexproject/w3bstream/cmd/prover/config"
"github.com/iotexproject/w3bstream/p2p"
"github.com/iotexproject/w3bstream/cmd/prover/db"
"github.com/iotexproject/w3bstream/datasource"
"github.com/iotexproject/w3bstream/monitor"
"github.com/iotexproject/w3bstream/project"
"github.com/iotexproject/w3bstream/scheduler"
"github.com/iotexproject/w3bstream/task/processor"
"github.com/iotexproject/w3bstream/vm"
)
Expand All @@ -27,15 +29,15 @@ func main() {
cfg.Print()
slog.Info("prover config loaded")

if err := migrateDatabase(cfg.DatabaseDSN); err != nil {
if err := migrateDatabase(cfg.DatasourceDSN); err != nil {
log.Fatal(errors.Wrap(err, "failed to migrate database"))
}

sk, err := crypto.HexToECDSA(cfg.ProverOperatorPriKey)
prv, err := crypto.HexToECDSA(cfg.ProverOperatorPriKey)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to parse prover private key"))
}
proverOperatorAddress := crypto.PubkeyToAddress(sk.PublicKey)
proverOperatorAddress := crypto.PubkeyToAddress(prv.PublicKey)
slog.Info("my prover", "address", proverOperatorAddress.String())

vmEndpoints := map[uint64]string{}
Expand All @@ -48,27 +50,41 @@ func main() {
log.Fatal(errors.Wrap(err, "failed to new vm handler"))
}

projectManager := project.NewManager(kvDB, contractPersistence.LatestProject)
db, err := db.New(cfg.LocalDBDir, proverOperatorAddress)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to new project manager"))
log.Fatal(errors.Wrap(err, "failed to new db"))
}

taskProcessor := processor.NewProcessor(vmHandler, projectManager.Project, sk, defaultDatasourcePubKey, proverID)
if err := monitor.Run(
&monitor.Handler{
ScannedBlockNumber: db.ScannedBlockNumber,
UpsertScannedBlockNumber: db.UpsertScannedBlockNumber,
AssignTask: db.CreateTask,
UpsertProject: db.UpsertProject,
DeleteTask: db.DeleteTask,
},
&monitor.ContractAddr{
Project: common.HexToAddress(cfg.ProjectContractAddr),
TaskManager: common.HexToAddress(cfg.TaskManagerContractAddr),
},
cfg.BeginningBlockNumber,
cfg.ChainEndpoint,
); err != nil {
log.Fatal(errors.Wrap(err, "failed to run contract monitor"))
}

pubSubs, err := p2p.NewPubSub(taskProcessor.HandleP2PData, cfg.BootNodeMultiAddr, cfg.IoTeXChainID)
projectManager := project.NewManager(db.Project, db.ProjectFile, db.UpsertProjectFile)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to new pubsubs"))
log.Fatal(errors.Wrap(err, "failed to new project manager"))
}

if local {
scheduler.RunLocal(pubSubs, taskProcessor.HandleProjectProvers, projectManager)
} else {
projectOffsets := scheduler.NewProjectEpochOffsets(cfg.SchedulerEpoch, contractPersistence.LatestProjects, schedulerNotification)
datasource, err := datasource.NewPostgres(cfg.DatasourceDSN)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to connect datasource"))
}

if err := scheduler.Run(cfg.SchedulerEpoch, proverID, pubSubs, taskProcessor.HandleProjectProvers,
chainHeadNotification, contractPersistence.Project, contractPersistence.Provers, projectOffsets, projectManager); err != nil {
log.Fatal(errors.Wrap(err, "failed to run scheduler"))
}
if err := processor.Run(vmHandler.Handle, projectManager.Project, db, datasource.Retrieve, prv, cfg.ChainEndpoint, common.HexToAddress(cfg.RouterContractAddr)); err != nil {
log.Fatal(errors.Wrap(err, "failed to run task processor"))
}

done := make(chan os.Signal, 1)
Expand Down
10 changes: 9 additions & 1 deletion cmd/sequencer/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package db
import (
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/logger"
)

type scannedBlockNumber struct {
Expand Down Expand Up @@ -138,7 +140,13 @@ func (p *DB) UnassignedTask() (common.Hash, uint64, error) {
return t.TaskID, t.ProjectID, nil
}

func New(db *gorm.DB) (*DB, error) {
func New(localDBDir string) (*DB, error) {
db, err := gorm.Open(sqlite.Open(localDBDir), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return nil, errors.Wrap(err, "failed to connect sqlite")
}
if err := db.AutoMigrate(&task{}, &scannedBlockNumber{}, &currentNBits{}, &blockHead{}); err != nil {
return nil, errors.Wrap(err, "failed to migrate model")
}
Expand Down
11 changes: 1 addition & 10 deletions cmd/sequencer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"

"github.com/iotexproject/w3bstream/cmd/sequencer/api"
"github.com/iotexproject/w3bstream/cmd/sequencer/config"
Expand All @@ -28,13 +25,7 @@ func main() {
cfg.Print()
slog.Info("sequencer config loaded")

sqliteDB, err := gorm.Open(sqlite.Open(cfg.LocalDBDir), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
log.Fatal(errors.Wrap(err, "failed to connect sqlite"))
}
db, err := db.New(sqliteDB)
db, err := db.New(cfg.LocalDBDir)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to new db"))
}
Expand Down
Loading

0 comments on commit a6a8653

Please sign in to comment.