Skip to content

Commit

Permalink
use channel to notify work done
Browse files Browse the repository at this point in the history
Signed-off-by: Liang Zheng <zhengliang0901@gmail.com>
  • Loading branch information
microyahoo committed May 29, 2023
1 parent d9e82dc commit cf8a589
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 24 deletions.
19 changes: 9 additions & 10 deletions worker/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package main

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"math/rand"
"net"
"os"
"sync"
"time"

"github.com/aws/aws-sdk-go/service/s3"
Expand Down Expand Up @@ -118,24 +118,24 @@ func connectToServer(serverAddress string) error {
// PerfTest runs a performance test as configured in testConfig
func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue, workerID string) time.Duration {
workChannel := make(chan WorkItem, len(*Workqueue.Queue))
doneChannel := make(chan bool)
notifyChan := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(testConfig.ParallelClients)

startTime := time.Now().UTC()
promTestStart.WithLabelValues(testConfig.Name).Set(float64(startTime.UnixNano() / int64(1000000)))
// promTestGauge.WithLabelValues(testConfig.Name).Inc()
for worker := 0; worker < testConfig.ParallelClients; worker++ {
go DoWork(workChannel, doneChannel)
go DoWork(workChannel, notifyChan, wg)
}
log.Infof("Started %d parallel clients", testConfig.ParallelClients)
if testConfig.Runtime != 0 {
workUntilTimeout(Workqueue, workChannel, time.Duration(testConfig.Runtime))
workUntilTimeout(Workqueue, workChannel, notifyChan, time.Duration(testConfig.Runtime))
} else {
workUntilOps(Workqueue, workChannel, testConfig.OpsDeadline, testConfig.ParallelClients)
}
// Wait for all the goroutines to finish
for i := 0; i < testConfig.ParallelClients; i++ {
<-doneChannel
}
wg.Wait()
log.Info("All clients finished")
endTime := time.Now().UTC()
promTestEnd.WithLabelValues(testConfig.Name).Set(float64(endTime.UnixNano() / int64(1000000)))
Expand All @@ -161,15 +161,14 @@ func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue, wo
return endTime.Sub(startTime)
}

func workUntilTimeout(Workqueue *Workqueue, workChannel chan WorkItem, runtime time.Duration) {
workContext, WorkCancel = context.WithCancel(context.Background())
func workUntilTimeout(Workqueue *Workqueue, workChannel chan WorkItem, notifyChan chan<- struct{}, runtime time.Duration) {
timer := time.NewTimer(runtime)
for {
for _, work := range *Workqueue.Queue {
select {
case <-timer.C:
log.Debug("Reached Runtime end")
WorkCancel()
close(notifyChan)
return
case workChannel <- work:
}
Expand Down
18 changes: 4 additions & 14 deletions worker/workItems.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package main

import (
"bytes"
"context"
"fmt"
"math/rand"
"sort"
"sync"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -79,15 +79,6 @@ func GetNextOperation(Queue *Workqueue) string {
return Queue.OperationValues[0].Key
}

func init() {
workContext = context.Background()
}

var workContext context.Context

// WorkCancel is the function to stop the execution of jobs
var WorkCancel context.CancelFunc

// IncreaseOperationValue increases the given operation's value by the set amount
func IncreaseOperationValue(operation string, value float64, Queue *Workqueue) error {
for i := range Queue.OperationValues {
Expand Down Expand Up @@ -229,18 +220,17 @@ func (op Stopper) Clean() error {

// DoWork processes the workitems in the workChannel until
// either the time runs out or a stopper is found
func DoWork(workChannel chan WorkItem, doneChannel chan bool) {
func DoWork(workChannel <-chan WorkItem, notifyChan <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-workContext.Done():
case <-notifyChan:
log.Debugf("Runtime over - Got timeout from work context")
doneChannel <- true
return
case work := <-workChannel:
switch work.(type) {
case Stopper:
log.Debug("Found the end of the work Queue - stopping")
doneChannel <- true
return
}
err := work.Do()
Expand Down

0 comments on commit cf8a589

Please sign in to comment.