Skip to content

Commit

Permalink
Merge pull request #26 from enriquebris/development
Browse files Browse the repository at this point in the history
Development v0.7.4
  • Loading branch information
enriquebris authored Oct 17, 2018
2 parents a2f062c + 948e87a commit 13467b0
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 50 deletions.
4 changes: 4 additions & 0 deletions history.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## History

### v0.7.4

- Fixed bug that caused randomly worker initialization error

### v0.7.3

- SetTotalWorkers() returns error in case it is invoked before StartWorkers()
Expand Down
119 changes: 70 additions & 49 deletions pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// ** goworkerpool.com **********************
// ** github.com/enriquebris/goworkerpool ***
// ** v0.7.3 ********************************
// ** goworkerpool.com **********************************************************************************************
// ** github.com/enriquebris/goworkerpool **
// ** v0.7.4 ********************************************************************************************************

// Package goworkerpool provides a pool of concurrent workers with the ability to increment / decrement / pause / resume workers on demand.
package goworkerpool
Expand Down Expand Up @@ -136,6 +136,7 @@ func NewPool(initialWorkers int, maxJobsInQueue int, verbose bool) *Pool {
return ret
}

// pool internal initialization
func (st *Pool) initialize(initialWorkers int, maxJobsInQueue int, verbose bool) {
st.jobsChan = make(chan poolJobData, maxJobsInQueue)
st.totalWorkersChan = make(chan workerAction, 100)
Expand All @@ -152,15 +153,15 @@ func (st *Pool) initialize(initialWorkers int, maxJobsInQueue int, verbose bool)
// GR to control the active workers successes / fails
go st.fnSuccessListener()

// GR to control the active workers counter / actions over workers
go st.workerListener()

// worker's immediate action channel
st.immediateChan = make(chan byte)

st.waitForWaitChannel = make(chan bool)
st.waitForNSuccessesChannel = make(chan bool)

// GR to control the active workers counter / actions over workers
go st.workerListener()

// set broad messages default values
st.broadMessages.Store(broadMessagePause, false)
st.broadMessages.Store(broadMessageKillAllWorkers, false)
Expand Down Expand Up @@ -342,6 +343,10 @@ func (st *Pool) fnSuccessListener() {
}
}

// *************************************************************************************************************
// ** Wait functions ******************************************************************************************
// *************************************************************************************************************

// Wait waits while at least one worker is up and running
func (st *Pool) Wait() error {
if st.fn == nil {
Expand Down Expand Up @@ -400,41 +405,50 @@ func (st *Pool) WaitUntilNSuccesses(n int) error {
return nil
}

// *************************************************************************************************************
// ** Set Handler Function ************************************************************************************
// *************************************************************************************************************

// SetWorkerFunc sets the worker's function handler.
// This function will be invoked each time a worker pulls a new job, and should return true to let know that the job
// was successfully completed, or false in other case.
func (st *Pool) SetWorkerFunc(fn PoolFunc) {
st.fn = fn
}

// SetTotalWorkers adjusts the number of live workers.
// *************************************************************************************************************
// ** Enqueue jobs ********************************************************************************************
// *************************************************************************************************************

// AddTask will enqueue a job (into a FIFO queue: a channel).
//
// In case it needs to kill some workers (in order to adjust the total based on the given parameter), it will wait until
// their current jobs get processed (in case they are processing jobs).
// The parameter for the job's data accepts any kind of value (interface{}).
//
// It returns an error in the following scenarios:
// - The workers were not started yet by StartWorkers.
// - There is a "in course" KillAllWorkers operation.
func (st *Pool) SetTotalWorkers(n int) error {
// verify that workers were started by StartWorkers()
if !st.workersStarted {
return errors.New(errorNoStartWorkersMsg)
}

// return en error if there is an "in course" KillAllWorkers operation
if tmp, ok := st.broadMessages.Load(broadMessageKillAllWorkers); ok && tmp.(bool) {
return errors.New(errorKillAllWorkersMsg)
}

// sends a "set total workers" signal, to be processed by workerListener()
st.totalWorkersChan <- workerAction{
Action: workerActionSetTotalWorkers,
Value: n,
// Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle,
// the job will stay in the queue until any worker will be ready to pick it up and start processing it.
//
// The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddTask will wait
// for a free queue slot to enqueue a new job in case the queue is at full capacity.
//
// AddTask will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during
// a certain amount of time when WaitUntilNSuccesses meet the stop condition.
func (st *Pool) AddTask(data interface{}) error {
if !st.doNotProcess {
// enqueue a regular job
st.jobsChan <- poolJobData{
Code: poolJobDataCodeRegular,
JobData: data,
}
return nil
}

return nil
return errors.New("No new jobs are accepted at this moment")
}

// *************************************************************************************************************
// ** Workers operations **************************************************************************************
// *************************************************************************************************************

// StartWorkers start all workers. The number of workers was set at the Pool instantiation (NewPool(...) function).
// It will return an error if the worker function was not previously set.
func (st *Pool) StartWorkers() error {
Expand Down Expand Up @@ -601,7 +615,7 @@ func (st *Pool) workerFunc(n int) {
}
}

// late kill signal
// late kill signal
case poolJobDataCodeLateKillWorker:
if st.verbose {
log.Printf("[pool] worker %v is going to be down", n)
Expand All @@ -614,7 +628,7 @@ func (st *Pool) workerFunc(n int) {
keepWorking = false
break

// late kill all workers
// late kill all workers
case poolJobDataCodeLateKillAllWorkers:
if st.verbose {
log.Printf("[pool] worker %v is going to be down :: LateKillAllWorkers()", n)
Expand Down Expand Up @@ -654,29 +668,36 @@ func (st *Pool) workerFunc(n int) {
}
}

// AddTask will enqueue a job (into a FIFO queue: a channel).
//
// The parameter for the job's data accepts any kind of value (interface{}).
//
// Workers (if alive) will be listening to and picking up jobs from this queue. If no workers are alive nor idle,
// the job will stay in the queue until any worker will be ready to pick it up and start processing it.
// *************************************************************************************************************
// ** Workers adjustments && operations ***********************************************************************
// *************************************************************************************************************

// SetTotalWorkers adjusts the number of live workers.
//
// The queue in which this function enqueues the jobs has a limit (it was set up at pool initialization). It means that AddTask will wait
// for a free queue slot to enqueue a new job in case the queue is at full capacity.
// In case it needs to kill some workers (in order to adjust the total based on the given parameter), it will wait until
// their current jobs get processed (in case they are processing jobs).
//
// AddTask will return an error if no new tasks could be enqueued at the execution time. No new tasks could be enqueued during
// a certain amount of time when WaitUntilNSuccesses meet the stop condition.
func (st *Pool) AddTask(data interface{}) error {
if !st.doNotProcess {
// enqueue a regular job
st.jobsChan <- poolJobData{
Code: poolJobDataCodeRegular,
JobData: data,
}
return nil
// It returns an error in the following scenarios:
// - The workers were not started yet by StartWorkers.
// - There is a "in course" KillAllWorkers operation.
func (st *Pool) SetTotalWorkers(n int) error {
// verify that workers were started by StartWorkers()
if !st.workersStarted {
return errors.New(errorNoStartWorkersMsg)
}

return errors.New("No new jobs are accepted at this moment")
// return en error if there is an "in course" KillAllWorkers operation
if tmp, ok := st.broadMessages.Load(broadMessageKillAllWorkers); ok && tmp.(bool) {
return errors.New(errorKillAllWorkersMsg)
}

// sends a "set total workers" signal, to be processed by workerListener()
st.totalWorkersChan <- workerAction{
Action: workerActionSetTotalWorkers,
Value: n,
}

return nil
}

// AddWorker adds a new worker to the pool.
Expand Down
6 changes: 5 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goworkerpool) ![version](https://img.shields.io/badge/version-v0.7.3-yellowgreen.svg?style=flat "goworkerpool v0.7.3") [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goworkerpool)](https://goreportcard.com/report/github.com/enriquebris/goworkerpool) [![Build Status](https://travis-ci.org/enriquebris/goworkerpool.svg?branch=master)](https://travis-ci.org/enriquebris/goworkerpool)
[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goworkerpool) ![version](https://img.shields.io/badge/version-v0.7.4-yellowgreen.svg?style=flat "goworkerpool v0.7.3") [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goworkerpool)](https://goreportcard.com/report/github.com/enriquebris/goworkerpool) [![Build Status](https://travis-ci.org/enriquebris/goworkerpool.svg?branch=master)](https://travis-ci.org/enriquebris/goworkerpool)

# goworkerpool - Pool of workers
Pool of concurrent workers with the ability to increment / decrement / pause / resume workers on demand.
Expand Down Expand Up @@ -298,6 +298,10 @@ pool.ResumeAllWorkers()

## History

### v0.7.4

- Fixed bug that caused randomly worker initialization error

### v0.7.3

- SetTotalWorkers() returns error in case it is invoked before StartWorkers()
Expand Down

0 comments on commit 13467b0

Please sign in to comment.