Skip to content

Commit

Permalink
@raulk remove overflow mechanism by segregating channels. (#22)
Browse files Browse the repository at this point in the history
We now have three "queues":

1. internal: tasks emitted by the event loop, buffer=1.
2. completion: tasks emitted by asynchronous jobs.
3. external: intake of external tasks through public API.

We now drain internal first; then move on to completion and external.
  • Loading branch information
raulk committed Jul 2, 2021
1 parent 48bf452 commit 258d74d
Showing 1 changed file with 80 additions and 73 deletions.
153 changes: 80 additions & 73 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ type DAGStore struct {
indices index.FullIndexRepo
// scratch *ScratchSpace

// event loop channel.
taskCh chan *Task
// overflow is an unbounded list where tasks are queued when the taskCh is backlogged.
overflow []*Task
// externalCh receives external tasks.
externalCh chan *Task
// internalCh receives internal tasks to the event loop.
internalCh chan *Task
// completionCh receives tasks queued up as a result of async completions.
completionCh chan *Task

ctx context.Context
cancelFn context.CancelFunc
Expand Down Expand Up @@ -97,7 +99,7 @@ const (
type Shard struct {
sync.RWMutex

// immutable.
// immutable, can be read outside the lock.
key shard.Key
mount *mount.Upgrader

Expand Down Expand Up @@ -228,13 +230,15 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {

ctx, cancel := context.WithCancel(context.Background())
dagst := &DAGStore{
mounts: mounts,
config: cfg,
indices: indices,
shards: make(map[shard.Key]*Shard),
taskCh: make(chan *Task, 1024),
ctx: ctx,
cancelFn: cancel,
mounts: mounts,
config: cfg,
indices: indices,
shards: make(map[shard.Key]*Shard),
externalCh: make(chan *Task, 128), // len=128, concurrent external tasks that can be queued up before putting backpressure.
internalCh: make(chan *Task, 1), // len=1, because eventloop will only ever stage another internal event.
completionCh: make(chan *Task, 64), // len=64, hitting this limit will just make async tasks wait.
ctx: ctx,
cancelFn: cancel,
// scratch: scratch,
}

Expand Down Expand Up @@ -281,7 +285,7 @@ func (d *DAGStore) RegisterShard(key shard.Key, mnt mount.Mount, out chan ShardR
d.lk.Unlock()

tsk := &Task{Op: OpShardRegister, Shard: shrd, Resp: out}
return d.queueTask(tsk, false)
return d.queueTask(tsk, d.externalCh)
}

type DestroyOpts struct {
Expand All @@ -297,7 +301,7 @@ func (d *DAGStore) DestroyShard(key shard.Key, out chan ShardResult, _ DestroyOp
d.lk.Unlock()

tsk := &Task{Op: OpShardDestroy, Shard: shrd, Resp: out}
return d.queueTask(tsk, false)
return d.queueTask(tsk, d.externalCh)
}

type AcquireOpts struct {
Expand All @@ -318,44 +322,28 @@ func (d *DAGStore) AcquireShard(key shard.Key, out chan ShardResult, _ AcquireOp
d.lk.Unlock()

tsk := &Task{Op: OpShardAcquire, Shard: shrd, Resp: out}
return d.queueTask(tsk, false)
return d.queueTask(tsk, d.externalCh)
}

func (d *DAGStore) control() {
defer d.wg.Done()

var tsk *Task
for {
select {
case tsk = <-d.taskCh:
case <-d.ctx.Done():
return
}

tsk, err := d.consumeNext()
for ; err == nil; tsk, err = d.consumeNext() {
log.Infow("processing task", "op", tsk.Op, "shard", tsk.Shard.key)

if len(d.overflow) > 0 {
log.Info("moving an overflow item to task queue")
select {
case d.taskCh <- d.overflow[0]:
d.overflow = d.overflow[1:]
default:
log.Info("could not move overflow item into task queue")
}
}

s := tsk.Shard
s.Lock()

switch tsk.Op {
case OpShardRegister:
if s.state != ShardStateNew {
err := fmt.Errorf("%w: expected shard to be in 'new' state; was: %d", ErrShardInitializationFailed, s.state)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, true)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, d.internalCh)
break
}
// queue a fetch.
_ = d.queueTask(&Task{Op: OpShardFetch, Shard: tsk.Shard}, true)
_ = d.queueTask(&Task{Op: OpShardFetch, Shard: tsk.Shard}, d.internalCh)

case OpShardFetch:
s.state = ShardStateFetching
Expand All @@ -365,51 +353,51 @@ func (d *DAGStore) control() {
reader, err := upgrader.Fetch(ctx)
if err != nil {
err = fmt.Errorf("failed to acquire reader of mount: %w", err)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, true)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, d.completionCh)
return
}
_ = reader.Close()
_ = d.queueTask(&Task{Op: OpShardFetchDone, Shard: tsk.Shard}, true)
_ = d.queueTask(&Task{Op: OpShardFetchDone, Shard: tsk.Shard}, d.completionCh)
}(d.ctx, s.mount)

case OpShardFetchDone:
s.state = ShardStateFetched
if !s.indexed {
// shard isn't indexed yet, so let's index.
_ = d.queueTask(&Task{Op: OpShardIndex, Shard: tsk.Shard}, true)
_ = d.queueTask(&Task{Op: OpShardIndex, Shard: tsk.Shard}, d.internalCh)
break
}
// shard is indexed, we're ready to serve requests.
_ = d.queueTask(&Task{Op: OpShardMakeAvailable, Shard: tsk.Shard}, true)
_ = d.queueTask(&Task{Op: OpShardMakeAvailable, Shard: tsk.Shard}, d.internalCh)

case OpShardIndex:
s.state = ShardStateIndexing
go func(ctx context.Context, mnt mount.Mount) {
reader, err := mnt.Fetch(ctx)
if err != nil {
err = fmt.Errorf("failed to acquire reader of mount: %w", err)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, true)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, d.completionCh)
return
}
defer reader.Close()

idx, err := loadIndex(reader)
if err != nil {
err = fmt.Errorf("failed to index shard: %w", err)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, true)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, d.completionCh)
return
}
_ = d.queueTask(&Task{Op: OpShardIndexDone, Shard: tsk.Shard, Index: idx}, true)
_ = d.queueTask(&Task{Op: OpShardIndexDone, Shard: tsk.Shard, Index: idx}, d.completionCh)
}(d.ctx, s.mount)

case OpShardIndexDone:
err := d.indices.AddFullIndex(s.key, tsk.Index)
if err != nil {
err = fmt.Errorf("failed to add index for shard: %w", err)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, true)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, d.internalCh)
break
}
_ = d.queueTask(&Task{Op: OpShardMakeAvailable, Shard: tsk.Shard}, true)
_ = d.queueTask(&Task{Op: OpShardMakeAvailable, Shard: tsk.Shard}, d.internalCh)

case OpShardMakeAvailable:
if s.wRegister != nil {
Expand All @@ -422,7 +410,7 @@ func (d *DAGStore) control() {

// trigger queued acquisition waiters.
for _, acqCh := range s.wAcquire {
_ = d.queueTask(&Task{Op: OpShardAcquire, Shard: tsk.Shard, Resp: acqCh}, true)
go d.acquireAsync(acqCh, s, s.mount)
}
s.wAcquire = s.wAcquire[:0]

Expand All @@ -435,26 +423,7 @@ func (d *DAGStore) control() {

s.state = ShardStateServing
s.refs++
go func(ctx context.Context, tsk *Task, k shard.Key, mnt mount.Mount) {
reader, err := mnt.Fetch(ctx)
if err != nil {
err = fmt.Errorf("failed to acquire reader of mount: %w", err)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, true)
tsk.Resp <- ShardResult{Key: s.key, Error: err}
return
}

idx, err := d.indices.GetFullIndex(k)
if err != nil {
err = fmt.Errorf("failed to recover index for shard %s: %w", k, err)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: tsk.Shard, Error: err}, true)
tsk.Resp <- ShardResult{Key: s.key, Error: err}
return
}

sa, err := NewShardAccessor(s.key, reader, idx)
tsk.Resp <- ShardResult{Key: s.key, Accessor: sa, Error: err}
}(d.ctx, tsk, s.key, s.mount)
go d.acquireAsync(tsk.Resp, s, s.mount)

case OpShardRelease:
if (s.state != ShardStateServing && s.state != ShardStateErrored) || s.refs <= 0 {
Expand Down Expand Up @@ -509,6 +478,10 @@ func (d *DAGStore) control() {

s.Unlock()
}

if err != context.Canceled {
log.Errorw("consuming next task failed; aborted event loop; dagstore unoperational", "error", err)
}
}

func (d *DAGStore) Close() error {
Expand All @@ -517,20 +490,54 @@ func (d *DAGStore) Close() error {
return nil
}

func (d *DAGStore) queueTask(tsk *Task, overflow bool) error {
func (d *DAGStore) consumeNext() (tsk *Task, error error) {
select {
case tsk = <-d.internalCh: // drain internal first; these are tasks emitted from the event loop.
return tsk, nil
case <-d.ctx.Done():
return fmt.Errorf("dag store closed")
case d.taskCh <- tsk:
return nil
return nil, d.ctx.Err() // TODO drain and process before returning?
default:
}
if overflow {
d.overflow = append(d.overflow, tsk)

select {
case tsk = <-d.externalCh:
return tsk, nil
case tsk = <-d.completionCh:
return tsk, nil
case <-d.ctx.Done():
return // TODO drain and process before returning?
}
}

func (d *DAGStore) queueTask(tsk *Task, ch chan<- *Task) error {
select {
case <-d.ctx.Done():
return fmt.Errorf("dag store closed")
case ch <- tsk:
return nil
}
log.Warnf("dag store backlogged")
return fmt.Errorf("dag store backlogged; cannot accept request")
}

func (d *DAGStore) acquireAsync(acqCh chan ShardResult, s *Shard, mnt mount.Mount) {
k := s.key
reader, err := mnt.Fetch(d.ctx)
if err != nil {
err = fmt.Errorf("failed to acquire reader of mount: %w", err)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: s, Error: err}, d.completionCh)
acqCh <- ShardResult{Key: k, Error: err}
return
}

idx, err := d.indices.GetFullIndex(k)
if err != nil {
err = fmt.Errorf("failed to recover index for shard %s: %w", k, err)
_ = d.queueTask(&Task{Op: OpShardFail, Shard: s, Error: err}, d.completionCh)
acqCh <- ShardResult{Key: k, Error: err}
return
}

sa, err := NewShardAccessor(k, reader, idx)
acqCh <- ShardResult{Key: k, Accessor: sa, Error: err}
}

func loadIndex(reader mount.Reader) (carindex.Index, error) {
Expand Down

0 comments on commit 258d74d

Please sign in to comment.