Skip to content

Commit

Permalink
ref(async): implement pool cancellation (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Aug 15, 2023
1 parent aacc8ca commit a7d6775
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 55 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"exportloopref",
"extendio",
"fieldalignment",
"fortytw",
"ginkgolinter",
"gobby",
"goconst",
Expand Down
15 changes: 7 additions & 8 deletions async/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,12 @@ func (p *WorkerPool[I, R]) run(
p.Quit.Done()
fmt.Printf("<--- WorkerPool.run (QUIT). 🧊🧊🧊\n")
}()
fmt.Println("===> 🧊 WorkerPool.run")
fmt.Printf("===> 🧊 WorkerPool.run ...(ctx:%+v)\n", ctx)

for running := true; running; {
select {
case <-ctx.Done():
fmt.Println("===> 🧊 WorkerPool.run - done received ☢️☢️☢️")
p.cancelWorkers()

running = false

Expand Down Expand Up @@ -163,7 +162,7 @@ func (p *WorkerPool[I, R]) run(

func (p *WorkerPool[I, R]) spawn(
ctx context.Context,
jobsInCh JobStreamIn[I],
jobsChIn JobStreamIn[I],
resultsChOut ResultStreamOut[R],
finishedChOut FinishedStreamOut,
) {
Expand All @@ -173,8 +172,8 @@ func (p *WorkerPool[I, R]) spawn(
core: &worker[I, R]{
id: p.composeID(),
exec: p.exec,
jobsInCh: jobsInCh,
resultsOutCh: resultsChOut,
jobsChIn: jobsChIn,
resultsChOut: resultsChOut,
finishedChOut: finishedChOut,
cancelChIn: cancelCh,
},
Expand All @@ -196,9 +195,9 @@ func (p *WorkerPool[I, R]) drain(finishedChIn FinishedStreamIn) {
// 📍 Here, we don't access the finishedChIn channel in a pre-emptive way via
// the ctx.Done() channel. This is because in a unit test, we define a timeout as
// part of the test spec using SpecTimeout. When this fires, this is handled by the
// run loop, which ends that loop then enters drain. When this happens, you can't
// reuse that same done channel as it will immediately return the value already
// handled. This has the effect of short-circuiting this loop meaning that
// run loop, which ends that loop then enters drain the phase. When this happens,
// you can't reuse that same done channel as it will immediately return the value
// already handled. This has the effect of short-circuiting this loop meaning that
// workerID := <-finishedChIn never has a chance to be selected and the drain loop
// exits early. The end result of which means that the p.private.pool collection is
// never depleted.
Expand Down
108 changes: 83 additions & 25 deletions async/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,18 @@ import (
"github.com/snivilised/lorax/internal/helpers"
)

const DefaultNoWorkers = 5

func init() { rand.Seed(time.Now().Unix()) }

// TerminatorFunc brings the work pool processing to an end, eg
// by stopping or cancellation after the requested amount of time.
type TerminatorFunc[I, R any] func(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc)

func (f TerminatorFunc[I, R]) After(ctx context.Context, delay time.Duration, funcs ...context.CancelFunc) {
f(ctx, delay, funcs...)
}

const (
JobChSize = 10
ResultChSize = 10
Expand Down Expand Up @@ -70,23 +80,43 @@ type pipeline[I, R any] struct {
wg sync.WaitGroup
sequence int
resultsCh chan async.JobResult[R]
provider helpers.ProviderFn[I]
provider helpers.ProviderFunc[I]
producer *helpers.Producer[I, R]
pool *async.WorkerPool[I, R]
consumer *helpers.Consumer[R]
cancel TerminatorFunc[I, R]
stop TerminatorFunc[I, R]
}

func start[I, R any]() *pipeline[I, R] {
resultsCh := make(chan async.JobResult[R], ResultChSize)

pipe := &pipeline[I, R]{
resultsCh: resultsCh,
resultsCh: make(chan async.JobResult[R], ResultChSize),
stop: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {
// no-op
},
cancel: func(_ context.Context, _ time.Duration, _ ...context.CancelFunc) {
// no-op
},
}

return pipe
}

func (p *pipeline[I, R]) startProducer(ctx context.Context, provider helpers.ProviderFn[I]) {
func (p *pipeline[I, R]) produce(ctx context.Context, provider helpers.ProviderFunc[I]) {
p.cancel = func(ctx context.Context, delay time.Duration, cancellations ...context.CancelFunc) {
go helpers.CancelProducerAfter[I, R](
delay,
cancellations...,
)
}
p.stop = func(ctx context.Context, delay time.Duration, _ ...context.CancelFunc) {
go helpers.StopProducerAfter(
ctx,
p.producer,
delay,
)
}

p.producer = helpers.StartProducer[I, R](
ctx,
&p.wg,
Expand All @@ -98,10 +128,10 @@ func (p *pipeline[I, R]) startProducer(ctx context.Context, provider helpers.Pro
p.wg.Add(1)
}

func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.ExecutiveFunc[I, R]) {
func (p *pipeline[I, R]) process(ctx context.Context, noWorkers int, executive async.ExecutiveFunc[I, R]) {
p.pool = async.NewWorkerPool[I, R](
&async.NewWorkerPoolParams[I, R]{
NoWorkers: 5,
NoWorkers: noWorkers,
Exec: executive,
JobsCh: p.producer.JobsCh,
CancelCh: make(async.CancelStream),
Expand All @@ -113,7 +143,7 @@ func (p *pipeline[I, R]) startPool(ctx context.Context, executive async.Executiv
p.wg.Add(1)
}

func (p *pipeline[I, R]) startConsumer(ctx context.Context) {
func (p *pipeline[I, R]) consume(ctx context.Context) {
p.consumer = helpers.StartConsumer(ctx,
&p.wg,
p.resultsCh,
Expand All @@ -122,14 +152,6 @@ func (p *pipeline[I, R]) startConsumer(ctx context.Context) {
p.wg.Add(1)
}

func (p *pipeline[I, R]) stopProducerAfter(ctx context.Context, after time.Duration) {
go helpers.StopProducerAfter(
ctx,
p.producer,
after,
)
}

var _ = Describe("WorkerPool", func() {
When("given: a stream of jobs", func() {
Context("and: Stopped", func() {
Expand All @@ -139,7 +161,7 @@ var _ = Describe("WorkerPool", func() {

By("👾 WAIT-GROUP ADD(producer)")
sequence := 0
pipe.startProducer(ctx, func() TestJobInput {
pipe.produce(ctx, func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
sequence++
return TestJobInput{
Expand All @@ -149,13 +171,13 @@ var _ = Describe("WorkerPool", func() {
})

By("👾 WAIT-GROUP ADD(worker-pool)\n")
pipe.startPool(ctx, greeter)
pipe.process(ctx, DefaultNoWorkers, greeter)

By("👾 WAIT-GROUP ADD(consumer)")
pipe.startConsumer(ctx)
pipe.consume(ctx)

By("👾 NOW AWAITING TERMINATION")
pipe.stopProducerAfter(ctx, time.Second/5)
pipe.stop.After(ctx, time.Second/5)
pipe.wg.Wait()

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
Expand All @@ -170,13 +192,49 @@ var _ = Describe("WorkerPool", func() {
})

Context("and: Cancelled", func() {
It("should test something", func() { // It is ginkgo test case function
Expect(audience).To(HaveLen(14))
})
It("🧪 should: handle cancellation and shutdown cleanly", func(ctxSpec SpecContext) {
defer leaktest.Check(GinkgoT())()
pipe := start[TestJobInput, TestJobResult]()

ctxCancel, cancel := context.WithCancel(ctxSpec)
cancellations := []context.CancelFunc{cancel}

By("👾 WAIT-GROUP ADD(producer)")
sequence := 0
pipe.produce(ctxCancel, func() TestJobInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
sequence++
return TestJobInput{
sequenceNo: sequence,
Recipient: audience[recipient],
}
})

By("👾 WAIT-GROUP ADD(worker-pool)\n")
pipe.process(ctxCancel, DefaultNoWorkers, greeter)

By("👾 WAIT-GROUP ADD(consumer)")
pipe.consume(ctxCancel)

By("👾 NOW AWAITING TERMINATION")
pipe.cancel.After(ctxCancel, time.Second/5, cancellations...)

pipe.wg.Wait()

fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
pipe.producer.Count,
pipe.consumer.Count,
)

It("🧪 should: handle cancellation and shutdown cleanly", func(_ SpecContext) {
// The producer count is higher than the consumer count. As a feature, we could
// collate the numbers produced vs the numbers consumed and perhaps also calculate
// which jobs were not processed, each indicated with their corresponding Input
// value.

})
// Eventually(ctxCancel, pipe.resultsCh).WithTimeout(time.Second * 5).Should(BeClosed())
// Eventually(ctxCancel, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed())

}, SpecTimeout(time.Second*5))
})
})
})
9 changes: 5 additions & 4 deletions async/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
type worker[I any, R any] struct {
id WorkerID
exec ExecutiveFunc[I, R]
jobsInCh JobStreamIn[I]
resultsOutCh ResultStreamOut[R]
jobsChIn JobStreamIn[I]
resultsChOut ResultStreamOut[R]
finishedChOut FinishedStreamOut

// this might be better replaced with a broadcast mechanism such as sync.Cond
Expand All @@ -22,14 +22,15 @@ func (w *worker[I, R]) run(ctx context.Context) {
w.finishedChOut <- w.id // ⚠️ non-pre-emptive send, but this should be ok
fmt.Printf(" <--- 🚀 worker.run(%v) (SENT FINISHED). 🚀🚀🚀\n", w.id)
}()
fmt.Printf(" ---> 🚀worker.run(%v) ...(ctx:%+v)\n", w.id, ctx)

for running := true; running; {
select {
case <-ctx.Done():
fmt.Printf(" ---> 🚀 worker.run(%v)(finished) - done received 🔶🔶🔶\n", w.id)

running = false
case job, ok := <-w.jobsInCh:
case job, ok := <-w.jobsChIn:
if ok {
fmt.Printf(" ---> 🚀 worker.run(%v)(input:'%v')\n", w.id, job.Input)
w.invoke(ctx, job)
Expand All @@ -49,6 +50,6 @@ func (w *worker[I, R]) invoke(ctx context.Context, job Job[I]) {
case <-ctx.Done():
fmt.Printf(" ---> 🚀 worker.invoke(%v)(cancel) - done received 💥💥💥\n", w.id)

case w.resultsOutCh <- result:
case w.resultsChOut <- result:
}
}
16 changes: 8 additions & 8 deletions internal/helpers/test-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import (
)

type Consumer[R any] struct {
ResultsCh <-chan async.JobResult[R]
quit *sync.WaitGroup
Count int
ResultsChIn async.ResultStreamIn[R]
quit *sync.WaitGroup
Count int
}

func StartConsumer[R any](
ctx context.Context,
wg *sync.WaitGroup,
resultsCh <-chan async.JobResult[R],
resultsChIn async.ResultStreamIn[R],
) *Consumer[R] {
consumer := &Consumer[R]{
ResultsCh: resultsCh,
quit: wg,
ResultsChIn: resultsChIn,
quit: wg,
}
go consumer.run(ctx)

Expand All @@ -33,7 +33,7 @@ func (c *Consumer[R]) run(ctx context.Context) {
c.quit.Done()
fmt.Printf("<<<< consumer.run - finished (QUIT). 💠💠💠 \n")
}()
fmt.Printf("<<<< 💠 consumer.run ...\n")
fmt.Printf("<<<< 💠 consumer.run ...(ctx:%+v)\n", ctx)

for running := true; running; {
select {
Expand All @@ -42,7 +42,7 @@ func (c *Consumer[R]) run(ctx context.Context) {

fmt.Println("<<<< 💠 consumer.run - done received 💔💔💔")

case result, ok := <-c.ResultsCh:
case result, ok := <-c.ResultsChIn:
if ok {
c.Count++
fmt.Printf("<<<< 💠 consumer.run - new result arrived(#%v): '%+v' \n",
Expand Down
Loading

0 comments on commit a7d6775

Please sign in to comment.