Skip to content
This repository has been archived by the owner on Mar 21, 2024. It is now read-only.

Commit

Permalink
add trace scheduler (#2103)
Browse files Browse the repository at this point in the history
* add trace scheduler

* release the semaphore

* fix init sync map

* release instances

* fix decre height when detach

* support unconfirmed utxos

* update go mod
  • Loading branch information
shenao78 authored Sep 9, 2021
1 parent bb29ea1 commit 0fd3e95
Show file tree
Hide file tree
Showing 7 changed files with 400 additions and 110 deletions.
2 changes: 2 additions & 0 deletions contract/infrastructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ func NewInfrastructure(chain ChainService, repository Repository) *Infrastructur
type ChainService interface {
BestChain() (uint64, bc.Hash)
GetBlock(hash bc.Hash) (*types.Block, error)
GetBlockByHeight(height uint64) (*types.Block, error)
}

type Repository interface {
GetInstance(id string) (*Instance, error)
LoadInstances() ([]*Instance, error)
SaveInstances(instances []*Instance) error
RemoveInstance(id string) error
Expand Down
34 changes: 26 additions & 8 deletions contract/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,38 @@ import (
"github.com/bytom/bytom/protocol/bc/types"
)

type Status uint8

const (
Lagging Status = iota + 1
InSync
Finalized
OffChain
)

type TreeNode struct {
TxHash bc.Hash
UTXOs []*UTXO
Children []*TreeNode
}

type Instance struct {
TraceID string
UTXOs []*UTXO
Finalized bool
InSync bool
TraceID string
UTXOs []*UTXO
Unconfirmed []*TreeNode
Status Status
ScannedHash bc.Hash
ScannedHeight uint64
}

func NewInstance(traceID string, inUTXOs, outUTXOs []*UTXO) *Instance {
inst := &Instance{
TraceID: traceID,
UTXOs: outUTXOs,
Finalized: len(outUTXOs) == 0,
TraceID: traceID,
UTXOs: outUTXOs,
}
if inst.Finalized {
inst.Status = Lagging
if len(outUTXOs) == 0 {
inst.Status = Finalized
inst.UTXOs = inUTXOs
}
return inst
Expand Down
159 changes: 156 additions & 3 deletions contract/trace_scheduler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,160 @@
package contract

type TraceScheduler struct {
infra *Infrastructure
import (
"errors"
"math"
"sync"
"time"

"github.com/bytom/bytom/protocol/bc"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

var errInstQueueOverflow = errors.New("instance queue is overflow")

type traceScheduler struct {
weighted *semaphore.Weighted
instances *sync.Map
tracerService *TracerService
infra *Infrastructure
tracer *tracer
}

func newTraceScheduler(infra *Infrastructure) *traceScheduler {
scheduler := &traceScheduler{
weighted: semaphore.NewWeighted(1000),
instances: new(sync.Map),
infra: infra,
}
go scheduler.processLoop()
return scheduler
}

func (t *traceScheduler) addNewJob(instance *Instance) error {
if !t.weighted.TryAcquire(1) {
return errInstQueueOverflow
}

t.instances.Store(instance.TraceID, instance)
return nil
}

func (t *traceScheduler) processLoop() {
ticker := time.NewTicker(6 * time.Second)
defer ticker.Stop()

for range ticker.C {
jobs, beginHeight := t.prepareJobs()
if len(jobs) == 0 {
continue
}

t.tracer = newTracer(nil)

var prevHash *bc.Hash
catchedJobs := make(map[bc.Hash][]*Instance)
for height := beginHeight + 1; ; height++ {
if ok, err := t.tryAttach(height, prevHash, jobs, catchedJobs); err != nil {
log.WithField("err", err).Error("try attach on trace scheduler")
break
} else if !ok {
if err := t.detach(prevHash, catchedJobs); err != nil {
log.WithField("err", err).Error("detach on trace scheduler")
break
}
height -= 2
}
if bestHeight, _ := t.infra.Chain.BestChain(); height == bestHeight {
if err := t.finishJobs(jobs, catchedJobs, *prevHash); err != nil {
log.WithField("err", err).Error("finish jobs")
break
}
}
}
}
}

func (t *traceScheduler) prepareJobs() (map[bc.Hash][]*Instance, uint64) {
var beginHeight uint64 = math.MaxUint64
hashToJobs := make(map[bc.Hash][]*Instance)
t.instances.Range(func(_, value interface{}) bool {
inst := value.(*Instance)
hashToJobs[inst.ScannedHash] = append(hashToJobs[inst.ScannedHash], inst)
if inst.ScannedHeight < beginHeight {
beginHeight = inst.ScannedHeight
}
return true
})
return hashToJobs, beginHeight
}

func (t *TraceScheduler) AddNewJob(instance *Instance) {}
func (t *traceScheduler) tryAttach(height uint64, prevHash *bc.Hash, jobs, catchedJobs map[bc.Hash][]*Instance) (bool, error) {
block, err := t.infra.Chain.GetBlockByHeight(height)
if err != nil {
return false, err
}

if prevHash != nil && block.PreviousBlockHash != *prevHash {
return false, nil
}

if instances, ok := jobs[block.PreviousBlockHash]; ok {
t.tracer.addInstances(instances)
catchedJobs[block.PreviousBlockHash] = instances
}

t.tracer.applyBlock(block)
*prevHash = block.Hash()
return true, nil
}

func (t *traceScheduler) detach(prevHash *bc.Hash, catchedJobs map[bc.Hash][]*Instance) error {
prevBlock, err := t.infra.Chain.GetBlock(*prevHash)
if err != nil {
return err
}

if instances, ok := catchedJobs[prevBlock.Hash()]; ok {
for _, inst := range instances {
t.tracer.removeInstance(inst.TraceID)
}
delete(catchedJobs, prevBlock.Hash())
}

t.tracer.detachBlock(prevBlock)
*prevHash = prevBlock.PreviousBlockHash
return nil
}

func (t *traceScheduler) finishJobs(jobs, catchedJobs map[bc.Hash][]*Instance, scannedHash bc.Hash) error {
var inSyncInstances, offChainInstances []*Instance
for hash, instances := range jobs {
if _, ok := catchedJobs[hash]; !ok {
offChainInstances = append(offChainInstances, instances...)
for _, inst := range instances {
inst.Status = OffChain
}
} else {
inSyncInstances = append(inSyncInstances, instances...)
}
}

if err := t.infra.Repository.SaveInstances(offChainInstances); err != nil {
return err
}

t.releaseInstances(offChainInstances)

if ok := t.tracerService.takeOverInstances(inSyncInstances, scannedHash); ok {
t.releaseInstances(inSyncInstances)
}
return nil
}

func (t *traceScheduler) releaseInstances(instances []*Instance) {
t.weighted.Release(int64(len(instances)))
for _, inst := range instances {
t.instances.Delete(inst.TraceID)
}
}
13 changes: 0 additions & 13 deletions contract/trace_service.go

This file was deleted.

Loading

0 comments on commit 0fd3e95

Please sign in to comment.