Skip to content

Commit

Permalink
Merge pull request #7902 from fanminshi/fix_runner
Browse files Browse the repository at this point in the history
etcd-runner: remove mutex on validate() and release() in global.go
  • Loading branch information
fanminshi committed May 10, 2017
2 parents 47f5b7c + b44bd6d commit 066062a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
}
}()
err = e.Campaign(ctx, v)
cancel()
<-donec
if err == nil {
observedLeader = v
}
if observedLeader == v {
validateWaiters = len(rcs)
}
cancel()
<-donec
select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -129,8 +129,10 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
return err
}
if observedLeader == v {
close(nextc)
oldNextc := nextc
nextc = make(chan struct{})
close(oldNextc)

}
<-rcNextc
observedLeader = ""
Expand Down
7 changes: 0 additions & 7 deletions tools/functional-tester/etcd-runner/command/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func newClient(eps []string, timeout time.Duration) *clientv3.Client {
}

func doRounds(rcs []roundClient, rounds int, requests int) {
var mu sync.Mutex
var wg sync.WaitGroup

wg.Add(len(rcs))
Expand All @@ -73,22 +72,16 @@ func doRounds(rcs []roundClient, rounds int, requests int) {
for rc.acquire() != nil { /* spin */
}

mu.Lock()
if err := rc.validate(); err != nil {
log.Fatal(err)
}
mu.Unlock()

time.Sleep(10 * time.Millisecond)
rc.progress++
finished <- struct{}{}

mu.Lock()
for rc.release() != nil { /* spin */
mu.Unlock()
mu.Lock()
}
mu.Unlock()
}
}(&rcs[i])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/coreos/etcd/clientv3/concurrency"

Expand Down Expand Up @@ -47,6 +48,8 @@ func runRacerFunc(cmd *cobra.Command, args []string) {

rcs := make([]roundClient, totalClientConnections)
ctx := context.Background()
// mu ensures validate and release funcs are atomic.
var mu sync.Mutex
cnt := 0

eps := endpointsFromFlag(cmd)
Expand All @@ -69,12 +72,16 @@ func runRacerFunc(cmd *cobra.Command, args []string) {
m := concurrency.NewMutex(s, racers)
rcs[i].acquire = func() error { return m.Lock(ctx) }
rcs[i].validate = func() error {
mu.Lock()
defer mu.Unlock()
if cnt++; cnt != 1 {
return fmt.Errorf("bad lock; count: %d", cnt)
}
return nil
}
rcs[i].release = func() error {
mu.Lock()
defer mu.Unlock()
if err := m.Unlock(ctx); err != nil {
return err
}
Expand Down

0 comments on commit 066062a

Please sign in to comment.