Skip to content

Commit

Permalink
Merge pull request #660 from iotexproject/task-processor
Browse files Browse the repository at this point in the history
impl task processor
  • Loading branch information
huangzhiran authored Sep 28, 2024
2 parents c935c2b + 2b61e5c commit 0f5f0fa
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 644 deletions.
39 changes: 33 additions & 6 deletions cmd/prover/db/db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package db

import (
"bytes"

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"gorm.io/gorm"
Expand Down Expand Up @@ -28,12 +30,14 @@ type projectFile struct {

type task struct {
gorm.Model
TaskID common.Hash `gorm:"index:task_fetch,not null"`
ProjectID uint64 `gorm:"index:task_fetch,not null"`
TaskID common.Hash `gorm:"uniqueIndex:task_uniq,not null"`
ProjectID uint64 `gorm:"uniqueIndex:task_uniq,not null"`
Processed bool `gorm:"index:unprocessed_task,not null,default:false"`
}

type DB struct {
db *gorm.DB
db *gorm.DB
prover common.Address
}

func (p *DB) ScannedBlockNumber() (uint64, error) {
Expand Down Expand Up @@ -106,23 +110,46 @@ func (p *DB) UpsertProjectFile(projectID uint64, file []byte, hash common.Hash)
return errors.Wrap(err, "failed to upsert project file")
}

func (p *DB) CreateTask(projectID uint64, taskID common.Hash) error {
func (p *DB) CreateTask(projectID uint64, taskID common.Hash, prover common.Address) error {
if !bytes.Equal(prover[:], p.prover[:]) {
return nil
}
t := &task{
TaskID: taskID,
ProjectID: projectID,
Processed: false,
}
err := p.db.Create(t).Error
return errors.Wrap(err, "failed to create task")
}

func (p *DB) ProcessTask(projectID uint64, taskID common.Hash) error {
t := &task{
Processed: true,
}
err := p.db.Model(t).Where("task_id = ?", taskID).Where("project_id = ?", projectID).Updates(t).Error
return errors.Wrap(err, "failed to update task")
}

func (p *DB) DeleteTask(projectID uint64, taskID common.Hash) error {
err := p.db.Where("task_id = ?", taskID).Where("project_id = ?", projectID).Delete(&task{}).Error
return errors.Wrap(err, "failed to delete task")
}

func New(db *gorm.DB) (*DB, error) {
func (p *DB) UnprocessedTask() (uint64, common.Hash, error) {
t := task{}
if err := p.db.Order("created_at ASC").Where("processed = false").First(&t).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return 0, common.Hash{}, nil
}
return 0, common.Hash{}, errors.Wrap(err, "failed to query unprocessed task")
}
return t.ProjectID, t.TaskID, nil
}

func New(db *gorm.DB, prover common.Address) (*DB, error) {
if err := db.AutoMigrate(&task{}, &scannedBlockNumber{}); err != nil {
return nil, errors.Wrap(err, "failed to migrate model")
}
return &DB{db}, nil
return &DB{db: db, prover: prover}, nil
}
18 changes: 9 additions & 9 deletions cmd/sequencer/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type blockHead struct {

type task struct {
gorm.Model
TaskID common.Hash `gorm:"index:task_fetch,not null"`
ProjectID uint64 `gorm:"index:task_fetch,not null"`
TaskID common.Hash `gorm:"uniqueIndex:task_uniq,not null"`
ProjectID uint64 `gorm:"uniqueIndex:task_uniq,not null"`
Assigned bool `gorm:"index:unassigned_task,not null,default:false"`
}

Expand Down Expand Up @@ -114,7 +114,7 @@ func (p *DB) CreateTask(projectID uint64, taskID common.Hash) error {
return errors.Wrap(err, "failed to create task")
}

func (p *DB) AssignTask(projectID uint64, taskID common.Hash) error {
func (p *DB) AssignTask(projectID uint64, taskID common.Hash, prover common.Address) error {
t := &task{
Assigned: true,
}
Expand All @@ -128,14 +128,14 @@ func (p *DB) DeleteTask(projectID uint64, taskID common.Hash) error {
}

func (p *DB) UnassignedTask() (common.Hash, uint64, error) {
ts := []*task{}
if err := p.db.Order("created_at ASC").Where("assigned = false").Find(&ts).Limit(1).Error; err != nil {
t := task{}
if err := p.db.Order("created_at ASC").Where("assigned = false").First(&t).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return common.Hash{}, 0, nil
}
return common.Hash{}, 0, errors.Wrap(err, "failed to query unassigned task")
}
if len(ts) == 0 {
return common.Hash{}, 0, nil
}
return ts[0].TaskID, ts[0].ProjectID, nil
return t.TaskID, t.ProjectID, nil
}

func New(db *gorm.DB) (*DB, error) {
Expand Down
7 changes: 5 additions & 2 deletions datasource/datasource.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package datasource

import tasktype "github.com/iotexproject/w3bstream/task"
import (
"github.com/ethereum/go-ethereum/common"
tasktype "github.com/iotexproject/w3bstream/task"
)

type Datasource interface {
Retrieve(projectID, nextTaskID uint64) (*tasktype.Task, error)
Retrieve(projectID uint64, taskID common.Hash) (*tasktype.Task, error)
}
144 changes: 0 additions & 144 deletions datasource/postgres_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type (
UpsertScannedBlockNumber func(uint64) error
UpsertNBits func(uint32) error
UpsertBlockHead func(uint64, common.Hash) error
AssignTask func(uint64, common.Hash) error
AssignTask func(uint64, common.Hash, common.Address) error
DeleteTask func(uint64, common.Hash) error
UpsertProject func(uint64, string, common.Hash) error
)
Expand Down Expand Up @@ -143,7 +143,7 @@ func (c *contract) processLogs(logs []types.Log) error {
if err != nil {
return errors.Wrap(err, "failed to parse task assigned event")
}
if err := c.h.AssignTask(e.ProjectId.Uint64(), e.TaskId); err != nil {
if err := c.h.AssignTask(e.ProjectId.Uint64(), e.TaskId, e.Prover); err != nil {
return err
}
case taskSettledTopic:
Expand Down
Loading

0 comments on commit 0f5f0fa

Please sign in to comment.