Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use channel to notify work done #22

Merged
merged 1 commit into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions worker/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package main

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"math/rand"
"net"
"os"
"sync"
"time"

"github.com/aws/aws-sdk-go/service/s3"
Expand Down Expand Up @@ -118,24 +118,24 @@ func connectToServer(serverAddress string) error {
// PerfTest runs a performance test as configured in testConfig
func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue, workerID string) time.Duration {
workChannel := make(chan WorkItem, len(*Workqueue.Queue))
doneChannel := make(chan bool)
notifyChan := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(testConfig.ParallelClients)

startTime := time.Now().UTC()
promTestStart.WithLabelValues(testConfig.Name).Set(float64(startTime.UnixNano() / int64(1000000)))
// promTestGauge.WithLabelValues(testConfig.Name).Inc()
for worker := 0; worker < testConfig.ParallelClients; worker++ {
go DoWork(workChannel, doneChannel)
go DoWork(workChannel, notifyChan, wg)
}
log.Infof("Started %d parallel clients", testConfig.ParallelClients)
if testConfig.Runtime != 0 {
workUntilTimeout(Workqueue, workChannel, time.Duration(testConfig.Runtime))
workUntilTimeout(Workqueue, workChannel, notifyChan, time.Duration(testConfig.Runtime))
} else {
workUntilOps(Workqueue, workChannel, testConfig.OpsDeadline, testConfig.ParallelClients)
}
// Wait for all the goroutines to finish
for i := 0; i < testConfig.ParallelClients; i++ {
<-doneChannel
}
wg.Wait()
log.Info("All clients finished")
endTime := time.Now().UTC()
promTestEnd.WithLabelValues(testConfig.Name).Set(float64(endTime.UnixNano() / int64(1000000)))
Expand All @@ -161,15 +161,14 @@ func PerfTest(testConfig *common.TestCaseConfiguration, Workqueue *Workqueue, wo
return endTime.Sub(startTime)
}

func workUntilTimeout(Workqueue *Workqueue, workChannel chan WorkItem, runtime time.Duration) {
workContext, WorkCancel = context.WithCancel(context.Background())
func workUntilTimeout(Workqueue *Workqueue, workChannel chan WorkItem, notifyChan chan<- struct{}, runtime time.Duration) {
timer := time.NewTimer(runtime)
for {
for _, work := range *Workqueue.Queue {
select {
case <-timer.C:
log.Debug("Reached Runtime end")
WorkCancel()
close(notifyChan)
return
case workChannel <- work:
}
Expand Down
18 changes: 4 additions & 14 deletions worker/workItems.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package main

import (
"bytes"
"context"
"fmt"
"math/rand"
"sort"
"sync"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -79,15 +79,6 @@ func GetNextOperation(Queue *Workqueue) string {
return Queue.OperationValues[0].Key
}

func init() {
workContext = context.Background()
}

var workContext context.Context

// WorkCancel is the function to stop the execution of jobs
var WorkCancel context.CancelFunc

// IncreaseOperationValue increases the given operation's value by the set amount
func IncreaseOperationValue(operation string, value float64, Queue *Workqueue) error {
for i := range Queue.OperationValues {
Expand Down Expand Up @@ -229,18 +220,17 @@ func (op Stopper) Clean() error {

// DoWork processes the workitems in the workChannel until
// either the time runs out or a stopper is found
func DoWork(workChannel chan WorkItem, doneChannel chan bool) {
func DoWork(workChannel <-chan WorkItem, notifyChan <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-workContext.Done():
case <-notifyChan:
log.Debugf("Runtime over - Got timeout from work context")
doneChannel <- true
return
case work := <-workChannel:
switch work.(type) {
case Stopper:
log.Debug("Found the end of the work Queue - stopping")
doneChannel <- true
return
}
err := work.Do()
Expand Down