diff --git a/go/stats/rates.go b/go/stats/rates.go index cc57c45910e..48864585225 100644 --- a/go/stats/rates.go +++ b/go/stats/rates.go @@ -17,6 +17,7 @@ limitations under the License. package stats import ( + "context" "encoding/json" "math" "sync" @@ -65,6 +66,8 @@ type Rates struct { // totalRate is the rate of total counts per second seen in the latest // sampling interval e.g. 100 queries / 5 seconds sampling interval = 20 QPS. totalRate float64 + ctx context.Context + cancel context.CancelFunc } // NewRates reports rolling rate information for countTracker. samples specifies @@ -76,6 +79,7 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time if interval < 1*time.Second && interval != -1*time.Second { panic("interval too small") } + ctx, cancel := context.WithCancel(context.Background()) rt := &Rates{ timeStamps: NewRingInt64(samples + 1), counts: make(map[string]*RingInt64), @@ -83,6 +87,8 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time samples: samples + 1, interval: interval, timestampLastSampling: timeNow(), + ctx: ctx, + cancel: cancel, } if name != "" { publish(name, rt) @@ -93,12 +99,20 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time return rt } +func (rt *Rates) Stop() { + rt.cancel() +} + func (rt *Rates) track() { t := time.NewTicker(rt.interval) defer t.Stop() for { - rt.snapshot() - <-t.C + select { + case <-rt.ctx.Done(): + return + case <-t.C: + rt.snapshot() + } } } diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index f32462602be..6d689bc5436 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -142,6 +142,11 @@ func (bps *Stats) MessageHistory() []string { return strs } +func (bps *Stats) Stop() { + bps.Rates.Stop() + bps.VReplicationLagRates.Stop() +} + // NewStats creates a new Stats structure. func NewStats() *Stats { bps := &Stats{} diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 32d1c7019c2..642c699e284 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -122,6 +122,7 @@ func (tenv *testEnv) close() { tenv.mu.Lock() defer tenv.mu.Unlock() tenv.ts.Close() + tenv.mysqld.Close() } //-------------------------------------- diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 21464312c91..2d7d49f2981 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -103,6 +103,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor if state == binlogdatapb.VReplicationWorkflowState_Stopped.String() || state == binlogdatapb.VReplicationWorkflowState_Error.String() { ct.cancel = func() {} close(ct.done) + blpStats.Stop() return ct, nil } @@ -312,5 +313,6 @@ func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplaye func (ct *controller) Stop() { ct.cancel() + ct.blpStats.Stop() <-ct.done } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index d838e2c2471..592577d9d9c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -382,6 +382,8 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) return nil, err } + stats := binlogplayer.NewStats() + defer stats.Stop() switch plan.opcode { case insertQuery: qr, err := dbClient.ExecuteFetch(plan.query, 1) @@ -396,7 +398,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) return nil, fmt.Errorf("insert id %v out of range", qr.InsertID) } - vdbc := newVDBClient(dbClient, binlogplayer.NewStats()) + vdbc := newVDBClient(dbClient, stats) // If we are creating multiple streams, for example in a // merge workflow going from 2 shards to 1 shard, we @@ -455,7 +457,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) if err != nil { return nil, err } - vdbc := newVDBClient(dbClient, binlogplayer.NewStats()) + vdbc := newVDBClient(dbClient, stats) for _, id := range ids { params, err := readRow(dbClient, id) if err != nil { @@ -482,7 +484,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) return &sqltypes.Result{}, nil } // Stop and delete the current controllers. - vdbc := newVDBClient(dbClient, binlogplayer.NewStats()) + vdbc := newVDBClient(dbClient, stats) for _, id := range ids { if ct := vre.controllers[id]; ct != nil { ct.Stop() diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index 687ce93db47..d81443777f7 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -233,6 +233,9 @@ func (ft *fakeTablet) StopActionLoop(t *testing.T) { if ft.StartHTTPServer { ft.HTTPListener.Close() } + if ft.RPCServer != nil { + ft.RPCServer.Stop() + } ft.Listener.Close() ft.TM.Stop() ft.TM = nil diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index 98e77b38df3..48cca1a0bb8 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -21,12 +21,17 @@ import ( "fmt" "os" "regexp" + "runtime" "strconv" "strings" "sync" "testing" + "time" + + "go.uber.org/goleak" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/sqlparser" @@ -55,9 +60,62 @@ type testMaterializerEnv struct { //---------------------------------------------- // testMaterializerEnv -func TestMain(m *testing.M) { +// EnsureNoLeaks is a helper function to fail tests if there are goroutine leaks. +// At this moment we still have a lot of goroutine leaks in the unit tests in this package. +// So we only use this while debugging and fixing the leaks. Once fixed we will use this +// in TestMain instead of just logging the number of leaked goroutines. +func EnsureNoLeaks(t testing.TB) { + if t.Failed() { + return + } + err := ensureNoGoroutines() + if err != nil { + t.Fatal(err) + } +} + +func ensureNoGoroutines() error { + // These goroutines have been found to stay around. + // Need to investigate and fix the Vitess ones at some point, if we indeed find out that they are unintended leaks. + var leaksToIgnore = []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"), + goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"), + } + + const ( + // give ample time for the goroutines to exit in CI. + waitTime = 100 * time.Millisecond + numIterations = 50 // 5 seconds + ) + var err error + for i := 0; i < numIterations; i++ { + err = goleak.Find(leaksToIgnore...) + if err == nil { + return nil + } + time.Sleep(waitTime) + } + return err +} + +func testMainWrapper(m *testing.M) int { + startingNumGoRoutines := runtime.NumGoroutine() + defer func() { + numGoroutines := runtime.NumGoroutine() + if numGoroutines > startingNumGoRoutines { + log.Infof("!!!!!!!!!!!! Wrangler unit tests Leaked %d goroutines", numGoroutines-startingNumGoRoutines) + } + }() _flag.ParseFlagsForTest() - os.Exit(m.Run()) + return m.Run() +} + +func TestMain(m *testing.M) { + os.Exit(testMainWrapper(m)) } func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv { diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index e95f48eb2e6..9445e849d2b 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -927,4 +927,5 @@ func (tme *testMigraterEnv) close(t *testing.T) { tme.ts.Close() tme.wr.tmc.Close() tme.wr = nil + tme.tmeDB.Close() }