Skip to content

Commit

Permalink
Merge pull request #6542 from filecoin-project/raulk/worker-disable-r…
Browse files Browse the repository at this point in the history
…esource-filtering

extern/storage: add ability to ignore worker resources when scheduling.
  • Loading branch information
magik6k authored Jun 21, 2021
2 parents fa2b247 + c0a8a9f commit 6e1cf6f
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 54 deletions.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
1 change: 1 addition & 0 deletions documentation/en/api-v0-methods-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,7 @@ Response:
"ef8d99a2-6865-4189-8ffa-9fef0f806eee": {
"Info": {
"Hostname": "host",
"IgnoreResources": false,
"Resources": {
"MemPhysical": 274877906944,
"MemSwap": 128849018880,
Expand Down
1 change: 1 addition & 0 deletions documentation/en/api-v0-methods-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Response:
```json
{
"Hostname": "string value",
"IgnoreResources": true,
"Resources": {
"MemPhysical": 42,
"MemSwap": 42,
Expand Down
29 changes: 25 additions & 4 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ type result struct {
err error
}

// ResourceFilteringStrategy is an enum indicating the kinds of resource
// filtering strategies that can be configured for workers.
type ResourceFilteringStrategy string

const (
// ResourceFilteringHardware specifies that available hardware resources
// should be evaluated when scheduling a task against the worker.
ResourceFilteringHardware = ResourceFilteringStrategy("hardware")

// ResourceFilteringDisabled disables resource filtering against this
// worker. The scheduler may assign any task to this worker.
ResourceFilteringDisabled = ResourceFilteringStrategy("disabled")
)

type SealerConfig struct {
ParallelFetchLimit int

Expand All @@ -96,6 +110,11 @@ type SealerConfig struct {
AllowPreCommit2 bool
AllowCommit bool
AllowUnseal bool

// ResourceFiltering instructs the system which resource filtering strategy
// to use when evaluating tasks against this worker. An empty value defaults
// to "hardware".
ResourceFiltering ResourceFilteringStrategy
}

type StorageAuth http.Header
Expand All @@ -104,7 +123,6 @@ type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore

func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {

prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
Expand Down Expand Up @@ -151,9 +169,12 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store
localTasks = append(localTasks, sealtasks.TTUnseal)
}

err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
TaskTypes: localTasks,
}, stor, lstor, si, m, wss))
wcfg := WorkerConfig{
IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringDisabled,
TaskTypes: localTasks,
}
worker := NewLocalWorker(wcfg, stor, lstor, si, m, wss)
err = m.AddWorker(ctx, worker)
if err != nil {
return nil, xerrors.Errorf("adding local worker: %w", err)
}
Expand Down
24 changes: 12 additions & 12 deletions extern/sector-storage/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,24 +349,24 @@ func (sh *scheduler) trySched() {
defer sh.workersLk.RUnlock()

windowsLen := len(sh.openWindows)
queuneLen := sh.schedQueue.Len()
queueLen := sh.schedQueue.Len()

log.Debugf("SCHED %d queued; %d open windows", queuneLen, windowsLen)
log.Debugf("SCHED %d queued; %d open windows", queueLen, windowsLen)

if windowsLen == 0 || queuneLen == 0 {
if windowsLen == 0 || queueLen == 0 {
// nothing to schedule on
return
}

windows := make([]schedWindow, windowsLen)
acceptableWindows := make([][]int, queuneLen)
acceptableWindows := make([][]int, queueLen)

// Step 1
throttle := make(chan struct{}, windowsLen)

var wg sync.WaitGroup
wg.Add(queuneLen)
for i := 0; i < queuneLen; i++ {
wg.Add(queueLen)
for i := 0; i < queueLen; i++ {
throttle <- struct{}{}

go func(sqi int) {
Expand All @@ -393,7 +393,7 @@ func (sh *scheduler) trySched() {
}

// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) {
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info) {
continue
}

Expand Down Expand Up @@ -451,27 +451,27 @@ func (sh *scheduler) trySched() {

// Step 2
scheduled := 0
rmQueue := make([]int, 0, queuneLen)
rmQueue := make([]int, 0, queueLen)

for sqi := 0; sqi < queuneLen; sqi++ {
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][task.sector.ProofType]

selectedWindow := -1
for _, wnd := range acceptableWindows[task.indexHeap] {
wid := sh.openWindows[wnd].worker
wr := sh.workers[wid].info.Resources
info := sh.workers[wid].info

log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd)

// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", wr) {
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) {
continue
}

log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.ID.Number, task.taskType, wnd)

windows[wnd].allocated.add(wr, needRes)
windows[wnd].allocated.add(info.Resources, needRes)
// TODO: We probably want to re-sort acceptableWindows here based on new
// workerHandle.utilization + windows[wnd].allocated.utilization (workerHandle.utilization is used in all
// task selectors, but not in the same way, so need to figure out how to do that in a non-O(n^2 way), and
Expand Down
15 changes: 11 additions & 4 deletions extern/sector-storage/sched_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)

func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error {
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(r, id, "withResources", wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
a.cond.Wait()
}

a.add(wr, r)
a.add(wr.Resources, r)

err := cb()

a.free(wr, r)
a.free(wr.Resources, r)
if a.cond != nil {
a.cond.Broadcast()
}
Expand All @@ -44,8 +44,15 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
a.memUsedMax -= r.MaxMemory
}

func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, res storiface.WorkerResources) bool {
// canHandleRequest evaluates if the worker has enough available resources to
// handle the request.
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, info storiface.WorkerInfo) bool {
if info.IgnoreResources {
// shortcircuit; if this worker is ignoring resources, it can always handle the request.
return true
}

res := info.Resources
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical {
Expand Down
71 changes: 49 additions & 22 deletions extern/sector-storage/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,30 @@ func TestWithPriority(t *testing.T) {
require.Equal(t, 2222, getPriority(ctx))
}

var decentWorkerResources = storiface.WorkerResources{
MemPhysical: 128 << 30,
MemSwap: 200 << 30,
MemReserved: 2 << 30,
CPUs: 32,
GPUs: []string{"a GPU"},
}

var constrainedWorkerResources = storiface.WorkerResources{
MemPhysical: 1 << 30,
MemReserved: 2 << 30,
CPUs: 1,
}

type schedTestWorker struct {
name string
taskTypes map[sealtasks.TaskType]struct{}
paths []stores.StoragePath

closed bool
session uuid.UUID

resources storiface.WorkerResources
ignoreResources bool
}

func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
Expand Down Expand Up @@ -107,18 +124,11 @@ func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, erro
return s.paths, nil
}

var decentWorkerResources = storiface.WorkerResources{
MemPhysical: 128 << 30,
MemSwap: 200 << 30,
MemReserved: 2 << 30,
CPUs: 32,
GPUs: []string{"a GPU"},
}

func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
return storiface.WorkerInfo{
Hostname: s.name,
Resources: decentWorkerResources,
Hostname: s.name,
IgnoreResources: s.ignoreResources,
Resources: s.resources,
}, nil
}

Expand All @@ -137,13 +147,16 @@ func (s *schedTestWorker) Close() error {

var _ Worker = &schedTestWorker{}

func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}) {
func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) {
w := &schedTestWorker{
name: name,
taskTypes: taskTypes,
paths: []stores.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "<octopus>food</octopus>", CanSeal: true, CanStore: true}},

session: uuid.New(),

resources: resources,
ignoreResources: ignoreResources,
}

for _, path := range w.paths {
Expand All @@ -169,7 +182,7 @@ func TestSchedStartStop(t *testing.T) {
sched := newScheduler()
go sched.runSched()

addTestWorker(t, sched, stores.NewIndex(), "fred", nil)
addTestWorker(t, sched, stores.NewIndex(), "fred", nil, decentWorkerResources, false)

require.NoError(t, sched.Close(context.TODO()))
}
Expand All @@ -183,6 +196,9 @@ func TestSched(t *testing.T) {
type workerSpec struct {
name string
taskTypes map[sealtasks.TaskType]struct{}

resources storiface.WorkerResources
ignoreResources bool
}

noopAction := func(ctx context.Context, w Worker) error {
Expand Down Expand Up @@ -295,7 +311,7 @@ func TestSched(t *testing.T) {
go sched.runSched()

for _, worker := range workers {
addTestWorker(t, sched, index, worker.name, worker.taskTypes)
addTestWorker(t, sched, index, worker.name, worker.taskTypes, worker.resources, worker.ignoreResources)
}

rm := runMeta{
Expand All @@ -322,31 +338,42 @@ func TestSched(t *testing.T) {
}
}

// checks behaviour with workers with constrained resources
// the first one is not ignoring resource constraints, so we assign to the second worker, who is
t.Run("constrained-resources", testFunc([]workerSpec{
{name: "fred1", resources: constrainedWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
{name: "fred2", resources: constrainedWorkerResources, ignoreResources: true, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("pc1-1", "fred2", 8, sealtasks.TTPreCommit1),
taskStarted("pc1-1"),
taskDone("pc1-1"),
}))

t.Run("one-pc1", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))

t.Run("pc1-2workers-1", testFunc([]workerSpec{
{name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
{name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
{name: "fred2", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
{name: "fred1", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))

t.Run("pc1-2workers-2", testFunc([]workerSpec{
{name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
{name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
{name: "fred1", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
{name: "fred2", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))

t.Run("pc1-block-pc2", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc1", "fred", 8, sealtasks.TTPreCommit1),
taskStarted("pc1"),
Expand All @@ -359,7 +386,7 @@ func TestSched(t *testing.T) {
}))

t.Run("pc2-block-pc1", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc2", "fred", 8, sealtasks.TTPreCommit2),
taskStarted("pc2"),
Expand All @@ -372,7 +399,7 @@ func TestSched(t *testing.T) {
}))

t.Run("pc1-batching", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("t1", "fred", 8, sealtasks.TTPreCommit1),
taskStarted("t1"),
Expand Down Expand Up @@ -459,7 +486,7 @@ func TestSched(t *testing.T) {
// run this one a bunch of times, it had a very annoying tendency to fail randomly
for i := 0; i < 40; i++ {
t.Run("pc1-pc2-prio", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
// fill queues
twoPC1("w0", 0, taskStarted),
Expand Down
6 changes: 3 additions & 3 deletions extern/sector-storage/sched_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (sw *schedWorker) workerCompactWindows() {

for ti, todo := range window.todo {
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info.Resources) {
if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info) {
continue
}

Expand Down Expand Up @@ -352,7 +352,7 @@ assignLoop:
worker.lk.Lock()
for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info.Resources) {
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
tidx = t
break
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
}

// wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.info.Resources, needRes, &sh.workersLk, func() error {
err = w.active.withResources(sw.wid, w.info, needRes, &sh.workersLk, func() error {
w.lk.Lock()
w.preparing.free(w.info.Resources, needRes)
w.lk.Unlock()
Expand Down
Loading

0 comments on commit 6e1cf6f

Please sign in to comment.