Skip to content

Commit

Permalink
CherryPick(vitessio#13824): Flakes: VReplication unit tests: reduce g…
Browse files Browse the repository at this point in the history
…oroutine leakage (vitessio#3009)

* cherry pick of 13824

* Fix conflict

Signed-off-by: Rohit Nayak <rohit@planetscale.com>

---------

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
Co-authored-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
planetscale-actions-bot and rohit-nayak-ps committed Aug 22, 2023
1 parent 21fb629 commit dcd270c
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 7 deletions.
18 changes: 16 additions & 2 deletions go/stats/rates.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package stats

import (
"context"
"encoding/json"
"math"
"sync"
Expand Down Expand Up @@ -65,6 +66,8 @@ type Rates struct {
// totalRate is the rate of total counts per second seen in the latest
// sampling interval e.g. 100 queries / 5 seconds sampling interval = 20 QPS.
totalRate float64
ctx context.Context
cancel context.CancelFunc
}

// NewRates reports rolling rate information for countTracker. samples specifies
Expand All @@ -76,13 +79,16 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time
if interval < 1*time.Second && interval != -1*time.Second {
panic("interval too small")
}
ctx, cancel := context.WithCancel(context.Background())
rt := &Rates{
timeStamps: NewRingInt64(samples + 1),
counts: make(map[string]*RingInt64),
countTracker: countTracker,
samples: samples + 1,
interval: interval,
timestampLastSampling: timeNow(),
ctx: ctx,
cancel: cancel,
}
if name != "" {
publish(name, rt)
Expand All @@ -93,12 +99,20 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time
return rt
}

func (rt *Rates) Stop() {
rt.cancel()
}

func (rt *Rates) track() {
t := time.NewTicker(rt.interval)
defer t.Stop()
for {
rt.snapshot()
<-t.C
select {
case <-rt.ctx.Done():
return
case <-t.C:
rt.snapshot()
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func (bps *Stats) MessageHistory() []string {
return strs
}

func (bps *Stats) Stop() {
bps.Rates.Stop()
bps.VReplicationLagRates.Stop()
}

// NewStats creates a new Stats structure.
func NewStats() *Stats {
bps := &Stats{}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (tenv *testEnv) close() {
tenv.mu.Lock()
defer tenv.mu.Unlock()
tenv.ts.Close()
tenv.mysqld.Close()
}

//--------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
if state == binlogdatapb.VReplicationWorkflowState_Stopped.String() || state == binlogdatapb.VReplicationWorkflowState_Error.String() {
ct.cancel = func() {}
close(ct.done)
blpStats.Stop()
return ct, nil
}

Expand Down Expand Up @@ -312,5 +313,6 @@ func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplaye

func (ct *controller) Stop() {
ct.cancel()
ct.blpStats.Stop()
<-ct.done
}
8 changes: 5 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, err
}

stats := binlogplayer.NewStats()
defer stats.Stop()
switch plan.opcode {
case insertQuery:
qr, err := dbClient.ExecuteFetch(plan.query, 1)
Expand All @@ -396,7 +398,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, fmt.Errorf("insert id %v out of range", qr.InsertID)
}

vdbc := newVDBClient(dbClient, binlogplayer.NewStats())
vdbc := newVDBClient(dbClient, stats)

// If we are creating multiple streams, for example in a
// merge workflow going from 2 shards to 1 shard, we
Expand Down Expand Up @@ -455,7 +457,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
if err != nil {
return nil, err
}
vdbc := newVDBClient(dbClient, binlogplayer.NewStats())
vdbc := newVDBClient(dbClient, stats)
for _, id := range ids {
params, err := readRow(dbClient, id)
if err != nil {
Expand All @@ -482,7 +484,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return &sqltypes.Result{}, nil
}
// Stop and delete the current controllers.
vdbc := newVDBClient(dbClient, binlogplayer.NewStats())
vdbc := newVDBClient(dbClient, stats)
for _, id := range ids {
if ct := vre.controllers[id]; ct != nil {
ct.Stop()
Expand Down
3 changes: 3 additions & 0 deletions go/vt/wrangler/fake_tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ func (ft *fakeTablet) StopActionLoop(t *testing.T) {
if ft.StartHTTPServer {
ft.HTTPListener.Close()
}
if ft.RPCServer != nil {
ft.RPCServer.Stop()
}
ft.Listener.Close()
ft.TM.Stop()
ft.TM = nil
Expand Down
62 changes: 60 additions & 2 deletions go/vt/wrangler/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ import (
"fmt"
"os"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"

"go.uber.org/goleak"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -55,9 +60,62 @@ type testMaterializerEnv struct {
//----------------------------------------------
// testMaterializerEnv

func TestMain(m *testing.M) {
// EnsureNoLeaks is a helper function to fail tests if there are goroutine leaks.
// At this moment we still have a lot of goroutine leaks in the unit tests in this package.
// So we only use this while debugging and fixing the leaks. Once fixed we will use this
// in TestMain instead of just logging the number of leaked goroutines.
func EnsureNoLeaks(t testing.TB) {
if t.Failed() {
return
}
err := ensureNoGoroutines()
if err != nil {
t.Fatal(err)
}
}

func ensureNoGoroutines() error {
// These goroutines have been found to stay around.
// Need to investigate and fix the Vitess ones at some point, if we indeed find out that they are unintended leaks.
var leaksToIgnore = []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"),
goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"),
}

const (
// give ample time for the goroutines to exit in CI.
waitTime = 100 * time.Millisecond
numIterations = 50 // 5 seconds
)
var err error
for i := 0; i < numIterations; i++ {
err = goleak.Find(leaksToIgnore...)
if err == nil {
return nil
}
time.Sleep(waitTime)
}
return err
}

func testMainWrapper(m *testing.M) int {
startingNumGoRoutines := runtime.NumGoroutine()
defer func() {
numGoroutines := runtime.NumGoroutine()
if numGoroutines > startingNumGoRoutines {
log.Infof("!!!!!!!!!!!! Wrangler unit tests Leaked %d goroutines", numGoroutines-startingNumGoRoutines)
}
}()
_flag.ParseFlagsForTest()
os.Exit(m.Run())
return m.Run()
}

func TestMain(m *testing.M) {
os.Exit(testMainWrapper(m))
}

func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv {
Expand Down
1 change: 1 addition & 0 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,4 +927,5 @@ func (tme *testMigraterEnv) close(t *testing.T) {
tme.ts.Close()
tme.wr.tmc.Close()
tme.wr = nil
tme.tmeDB.Close()
}

0 comments on commit dcd270c

Please sign in to comment.