Skip to content

Commit

Permalink
Merge pull request #89 from filecoin-project/fix/locking
Browse files Browse the repository at this point in the history
Fix some locking issues
  • Loading branch information
magik6k authored Aug 3, 2020
2 parents 898a72d + 3cab915 commit ad9a691
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
4 changes: 2 additions & 2 deletions mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,15 @@ func (mgr *SectorMgr) SealCommit2(ctx context.Context, sid abi.SectorID, phase1O

// Test Instrumentation Methods

func (mgr *SectorMgr) FailSector(sid abi.SectorID) error {
func (mgr *SectorMgr) MarkFailed(sid abi.SectorID, failed bool) error {
mgr.lk.Lock()
defer mgr.lk.Unlock()
ss, ok := mgr.sectors[sid]
if !ok {
return fmt.Errorf("no such sector in storage")
}

ss.failed = true
ss.failed = failed
return nil
}

Expand Down
26 changes: 20 additions & 6 deletions sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type WorkerSelector interface {
type scheduler struct {
spt abi.RegisteredSealProof

workersLk sync.Mutex
workersLk sync.RWMutex
nextWorker WorkerID
workers map[WorkerID]*workerHandle

Expand Down Expand Up @@ -83,6 +83,8 @@ type workerHandle struct {
preparing *activeResources
active *activeResources

lk sync.Mutex

// stats / tracking
wt *workTracker

Expand Down Expand Up @@ -283,6 +285,9 @@ func (sh *scheduler) trySched() {

log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows))

sh.workersLk.RLock()
defer sh.workersLk.RUnlock()

// Step 1
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
task := (*sh.schedQueue)[sqi]
Expand Down Expand Up @@ -428,9 +433,9 @@ func (sh *scheduler) runWorker(wid WorkerID) {
defer ready.Wait()

go func() {
sh.workersLk.Lock()
sh.workersLk.RLock()
worker, found := sh.workers[wid]
sh.workersLk.Unlock()
sh.workersLk.RUnlock()

ready.Done()

Expand Down Expand Up @@ -498,16 +503,19 @@ func (sh *scheduler) runWorker(wid WorkerID) {
todo := activeWindows[0].todo[0]
needRes := ResourceTable[todo.taskType][sh.spt]

sh.workersLk.Lock()
sh.workersLk.RLock()
worker.lk.Lock()
ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources)
worker.lk.Unlock()

if !ok {
sh.workersLk.Unlock()
sh.workersLk.RUnlock()
break assignLoop
}

log.Debugf("assign worker sector %d", todo.sector.Number)
err := sh.assignWorker(taskDone, wid, worker, todo)
sh.workersLk.Unlock()
sh.workersLk.RUnlock()

if err != nil {
log.Error("assignWorker error: %+v", err)
Expand All @@ -530,14 +538,18 @@ func (sh *scheduler) runWorker(wid WorkerID) {
func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error {
needRes := ResourceTable[req.taskType][sh.spt]

w.lk.Lock()
w.preparing.add(w.info.Resources, needRes)
w.lk.Unlock()

go func() {
err := req.prepare(req.ctx, w.wt.worker(w.w))
sh.workersLk.Lock()

if err != nil {
w.lk.Lock()
w.preparing.free(w.info.Resources, needRes)
w.lk.Unlock()
sh.workersLk.Unlock()

select {
Expand All @@ -557,7 +569,9 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
}

err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error {
w.lk.Lock()
w.preparing.free(w.info.Resources, needRes)
w.lk.Unlock()
sh.workersLk.Unlock()
defer sh.workersLk.Lock() // we MUST return locked from this function

Expand Down

0 comments on commit ad9a691

Please sign in to comment.