Skip to content
This repository has been archived by the owner on Aug 18, 2020. It is now read-only.

Commit

Permalink
more sched test debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Jul 16, 2020
1 parent 2e55757 commit cab0c74
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 58 deletions.
11 changes: 11 additions & 0 deletions sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type scheduler struct {
openWindows []*schedWindowRequest

closing chan struct{}
testSync chan struct{} // used for testing
}

type workerHandle struct {
Expand Down Expand Up @@ -195,6 +196,9 @@ func (sh *scheduler) runSched() {
heap.Push(sh.schedQueue, req)
sh.trySched()

if sh.testSync != nil {
sh.testSync <- struct{}{}
}
case req := <-sh.windowRequests:
sh.openWindows = append(sh.openWindows, req)
sh.trySched()
Expand Down Expand Up @@ -226,6 +230,8 @@ func (sh *scheduler) trySched() {
windows := make([]schedWindow, len(sh.openWindows))
acceptableWindows := make([][]int, sh.schedQueue.Len())

log.Debugf("trySched %d queued; %d open windows", sh.schedQueue.Len(), len(windows))

// Step 1
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
task := (*sh.schedQueue)[sqi]
Expand Down Expand Up @@ -295,11 +301,15 @@ func (sh *scheduler) trySched() {
wid := sh.openWindows[wnd].worker
wr := sh.workers[wid].info.Resources

log.Debugf("trySched try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)

// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) {
continue
}

log.Debugf("trySched ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)

windows[wnd].allocated.add(wr, needRes)

selectedWindow = wnd
Expand Down Expand Up @@ -419,6 +429,7 @@ func (sh *scheduler) runWorker(wid WorkerID) {
break assignLoop
}

log.Debugf("assign worker sector %d", todo.sector.Number)
err := sh.assignWorker(taskDone, wid, worker, todo)
sh.workersLk.Unlock()

Expand Down
152 changes: 94 additions & 58 deletions sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package sectorstorage

import (
"context"
"fmt"
"io"
"runtime"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -171,13 +173,10 @@ func TestSchedStartStop(t *testing.T) {
}

func TestSched(t *testing.T) {
ctx := context.Background()
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
ctx, done := context.WithTimeout(context.Background(), 20 * time.Second)
defer done()

sectorAte := abi.SectorID{
Miner: 8,
Number: 8,
}
spt := abi.RegisteredSealProof_StackedDrg32GiBV1

type workerSpec struct {
name string
Expand All @@ -196,7 +195,10 @@ func TestSched(t *testing.T) {

type task func(*testing.T, *scheduler, *stores.Index, *runMeta)

sched := func(taskName, expectWorker string, taskType sealtasks.TaskType) task {
sched := func(taskName, expectWorker string, sid abi.SectorNumber, taskType sealtasks.TaskType) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)

return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
done := make(chan struct{})
rm.done[taskName] = done
Expand All @@ -207,7 +209,12 @@ func TestSched(t *testing.T) {
go func() {
defer rm.wg.Done()

err := sched.Schedule(ctx, sectorAte, taskType, sel, noopPrepare, func(ctx context.Context, w Worker) error {
sectorNum := abi.SectorID{
Miner: 8,
Number: sid,
}

err := sched.Schedule(ctx, sectorNum, taskType, sel, noopPrepare, func(ctx context.Context, w Worker) error {
wi, err := w.Info(ctx)
require.NoError(t, err)

Expand All @@ -226,29 +233,45 @@ func TestSched(t *testing.T) {

return nil
})
require.NoError(t, err)
require.NoError(t, err, fmt.Sprint(l, l2))
}()

<-sched.testSync
}
}

taskStarted := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
rm.done[name] <- struct{}{}
select {
case rm.done[name] <- struct{}{}:
case <-ctx.Done():
t.Fatal("ctx error", ctx.Err(), l, l2)
}
}
}

taskDone := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
rm.done[name] <- struct{}{}
select {
case rm.done[name] <- struct{}{}:
case <-ctx.Done():
t.Fatal("ctx error", ctx.Err(), l, l2)
}
close(rm.done[name])
}
}

taskNotScheduled := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
select {
case rm.done[name] <- struct{}{}:
t.Fatal("not expected")
t.Fatal("not expected", l, l2)
case <-time.After(10 * time.Millisecond): // TODO: better synchronization thingy
}
}
Expand All @@ -259,6 +282,8 @@ func TestSched(t *testing.T) {
index := stores.NewIndex()

sched := newScheduler(spt)
sched.testSync = make(chan struct{})

go sched.runSched()

for _, worker := range workers {
Expand Down Expand Up @@ -291,33 +316,33 @@ func TestSched(t *testing.T) {
t.Run("one-pc1", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("pc1-1", "fred", sealtasks.TTPreCommit1),
sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))

t.Run("pc1-2workers-1", testFunc([]workerSpec{
{name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
{name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("pc1-1", "fred1", sealtasks.TTPreCommit1),
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))

t.Run("pc1-2workers-2", testFunc([]workerSpec{
{name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
{name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc1-1", "fred1", sealtasks.TTPreCommit1),
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))

t.Run("pc1-block-pc2", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc1", "fred", sealtasks.TTPreCommit1),
sched("pc1", "fred", 8, sealtasks.TTPreCommit1),
taskStarted("pc1"),

sched("pc2", "fred", sealtasks.TTPreCommit2),
sched("pc2", "fred", 8, sealtasks.TTPreCommit2),
taskNotScheduled("pc2"),

taskDone("pc1"),
Expand All @@ -327,10 +352,10 @@ func TestSched(t *testing.T) {
t.Run("pc2-block-pc1", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc2", "fred", sealtasks.TTPreCommit2),
sched("pc2", "fred", 8, sealtasks.TTPreCommit2),
taskStarted("pc2"),

sched("pc1", "fred", sealtasks.TTPreCommit1),
sched("pc1", "fred", 8, sealtasks.TTPreCommit1),
taskNotScheduled("pc1"),

taskDone("pc2"),
Expand All @@ -340,20 +365,20 @@ func TestSched(t *testing.T) {
t.Run("pc1-batching", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("t1", "fred", sealtasks.TTPreCommit1),
sched("t1", "fred", 8, sealtasks.TTPreCommit1),
taskStarted("t1"),

sched("t2", "fred", sealtasks.TTPreCommit1),
sched("t2", "fred", 8, sealtasks.TTPreCommit1),
taskStarted("t2"),

// with worker settings, we can only run 2 parallel PC1s

// start 2 more to fill fetch buffer

sched("t3", "fred", sealtasks.TTPreCommit1),
sched("t3", "fred", 8, sealtasks.TTPreCommit1),
taskNotScheduled("t3"),

sched("t4", "fred", sealtasks.TTPreCommit1),
sched("t4", "fred", 8, sealtasks.TTPreCommit1),
taskNotScheduled("t4"),

taskDone("t1"),
Expand All @@ -366,60 +391,71 @@ func TestSched(t *testing.T) {
taskDone("t4"),
}))

twoPC1 := func(prefix string, schedAssert func(name string) task) task {
twoPC1 := func(prefix string, sid abi.SectorNumber, schedAssert func(name string) task) task {
return multTask(
sched(prefix+"-a", "fred", sealtasks.TTPreCommit1),
sched(prefix+"-a", "fred", sid, sealtasks.TTPreCommit1),
schedAssert(prefix+"-a"),

sched(prefix+"-b", "fred", sealtasks.TTPreCommit1),
sched(prefix+"-b", "fred", sid + 1, sealtasks.TTPreCommit1),
schedAssert(prefix+"-b"),
)
}

twoPC1Done := func(prefix string) task {
twoPC1Act := func(prefix string, schedAssert func(name string) task) task {
return multTask(
taskDone(prefix+"-1"),
taskDone(prefix+"-b"),
schedAssert(prefix+"-a"),
schedAssert(prefix+"-b"),
)
}

t.Run("pc1-pc2-prio", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
// fill exec/fetch buffers
twoPC1("w0", taskStarted),
twoPC1("w1", taskNotScheduled),
for i := 0; i < 100; i++ {
t.Run("pc1-pc2-prio", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
// fill exec/fetch buffers
twoPC1("w0", 0, taskStarted),
twoPC1("w1", 2, taskNotScheduled),

// fill worker windows
twoPC1("w2", taskNotScheduled),
twoPC1("w3", taskNotScheduled),
// fill worker windows
twoPC1("w2", 4, taskNotScheduled),
//twoPC1("w3", taskNotScheduled),

// windowed
// windowed

sched("t1", "fred", sealtasks.TTPreCommit1),
taskNotScheduled("t1"),
sched("t1", "fred", 6, sealtasks.TTPreCommit1),
taskNotScheduled("t1"),

sched("t2", "fred", sealtasks.TTPreCommit1),
taskNotScheduled("t2"),
sched("t2", "fred", 7, sealtasks.TTPreCommit1),
taskNotScheduled("t2"),

sched("t3", "fred", sealtasks.TTPreCommit2),
taskNotScheduled("t3"),
sched("t3", "fred", 8, sealtasks.TTPreCommit2),
taskNotScheduled("t3"),

twoPC1Done("w0"),
twoPC1Done("w1"),
twoPC1Done("w2"),
twoPC1Done("w3"),
twoPC1Act("w0", taskDone),
twoPC1Act("w1", taskStarted),
twoPC1Act("w2", taskNotScheduled),
//twoPC1Act("w3", taskNotScheduled),

taskStarted("t1"),
taskNotScheduled("t2"),
taskNotScheduled("t3"),
twoPC1Act("w1", taskDone),
twoPC1Act("w2", taskStarted),
//twoPC1Act("w3", taskNotScheduled),

taskDone("t1"),
twoPC1Act("w2", taskDone),
//twoPC1Act("w3", taskStarted),

taskStarted("t2"),
taskStarted("t3"),
//twoPC1Act("w3", taskDone),

taskDone("t2"),
taskDone("t3"),
}))
taskStarted("t3"),
taskNotScheduled("t1"),
taskNotScheduled("t2"),

taskDone("t3"),

taskStarted("t1"),
taskStarted("t2"),

taskDone("t1"),
taskDone("t2"),
}))
}
}

0 comments on commit cab0c74

Please sign in to comment.