diff --git a/go.mod b/go.mod index af6767ba7bc..bd5b0c1080c 100644 --- a/go.mod +++ b/go.mod @@ -106,6 +106,7 @@ require ( github.com/spf13/afero v1.9.3 github.com/spf13/jwalterweatherman v1.1.0 github.com/xlab/treeprint v1.2.0 + go.uber.org/goleak v1.1.11 golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 golang.org/x/sync v0.1.0 modernc.org/sqlite v1.20.3 diff --git a/go.sum b/go.sum index 39777e9b54a..1a690dd3d2c 100644 --- a/go.sum +++ b/go.sum @@ -622,6 +622,7 @@ go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0 go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/mock v0.2.0 h1:TaP3xedm7JaAgScZO7tlvlKrqT0p7I6OsdGB5YNSMDU= go.uber.org/mock v0.2.0/go.mod h1:J0y0rp9L3xiff1+ZBfKxlC1fz2+aO16tw0tsDOixfuM= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= @@ -680,6 +681,7 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= @@ -922,6 +924,7 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y= diff --git a/go/stats/rates.go b/go/stats/rates.go index cc57c45910e..48864585225 100644 --- a/go/stats/rates.go +++ b/go/stats/rates.go @@ -17,6 +17,7 @@ limitations under the License. package stats import ( + "context" "encoding/json" "math" "sync" @@ -65,6 +66,8 @@ type Rates struct { // totalRate is the rate of total counts per second seen in the latest // sampling interval e.g. 100 queries / 5 seconds sampling interval = 20 QPS. totalRate float64 + ctx context.Context + cancel context.CancelFunc } // NewRates reports rolling rate information for countTracker. samples specifies @@ -76,6 +79,7 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time if interval < 1*time.Second && interval != -1*time.Second { panic("interval too small") } + ctx, cancel := context.WithCancel(context.Background()) rt := &Rates{ timeStamps: NewRingInt64(samples + 1), counts: make(map[string]*RingInt64), @@ -83,6 +87,8 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time samples: samples + 1, interval: interval, timestampLastSampling: timeNow(), + ctx: ctx, + cancel: cancel, } if name != "" { publish(name, rt) @@ -93,12 +99,20 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time return rt } +func (rt *Rates) Stop() { + rt.cancel() +} + func (rt *Rates) track() { t := time.NewTicker(rt.interval) defer t.Stop() for { - rt.snapshot() - <-t.C + select { + case <-rt.ctx.Done(): + return + case <-t.C: + rt.snapshot() + } } } diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index f32462602be..6d689bc5436 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -142,6 +142,11 @@ func (bps *Stats) MessageHistory() []string { return strs } +func (bps *Stats) Stop() { + bps.Rates.Stop() + bps.VReplicationLagRates.Stop() +} + // NewStats creates a new Stats structure. func NewStats() *Stats { bps := &Stats{} diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 32d1c7019c2..642c699e284 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -122,6 +122,7 @@ func (tenv *testEnv) close() { tenv.mu.Lock() defer tenv.mu.Unlock() tenv.ts.Close() + tenv.mysqld.Close() } //-------------------------------------- diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 21464312c91..2d7d49f2981 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -103,6 +103,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor if state == binlogdatapb.VReplicationWorkflowState_Stopped.String() || state == binlogdatapb.VReplicationWorkflowState_Error.String() { ct.cancel = func() {} close(ct.done) + blpStats.Stop() return ct, nil } @@ -312,5 +313,6 @@ func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplaye func (ct *controller) Stop() { ct.cancel() + ct.blpStats.Stop() <-ct.done } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index d838e2c2471..592577d9d9c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -382,6 +382,8 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) return nil, err } + stats := binlogplayer.NewStats() + defer stats.Stop() switch plan.opcode { case insertQuery: qr, err := dbClient.ExecuteFetch(plan.query, 1) @@ -396,7 +398,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) return nil, fmt.Errorf("insert id %v out of range", qr.InsertID) } - vdbc := newVDBClient(dbClient, binlogplayer.NewStats()) + vdbc := newVDBClient(dbClient, stats) // If we are creating multiple streams, for example in a // merge workflow going from 2 shards to 1 shard, we @@ -455,7 +457,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) if err != nil { return nil, err } - vdbc := newVDBClient(dbClient, binlogplayer.NewStats()) + vdbc := newVDBClient(dbClient, stats) for _, id := range ids { params, err := readRow(dbClient, id) if err != nil { @@ -482,7 +484,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) return &sqltypes.Result{}, nil } // Stop and delete the current controllers. - vdbc := newVDBClient(dbClient, binlogplayer.NewStats()) + vdbc := newVDBClient(dbClient, stats) for _, id := range ids { if ct := vre.controllers[id]; ct != nil { ct.Stop() diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index 687ce93db47..d81443777f7 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -233,6 +233,9 @@ func (ft *fakeTablet) StopActionLoop(t *testing.T) { if ft.StartHTTPServer { ft.HTTPListener.Close() } + if ft.RPCServer != nil { + ft.RPCServer.Stop() + } ft.Listener.Close() ft.TM.Stop() ft.TM = nil diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index 98e77b38df3..48cca1a0bb8 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -21,12 +21,17 @@ import ( "fmt" "os" "regexp" + "runtime" "strconv" "strings" "sync" "testing" + "time" + + "go.uber.org/goleak" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/sqlparser" @@ -55,9 +60,62 @@ type testMaterializerEnv struct { //---------------------------------------------- // testMaterializerEnv -func TestMain(m *testing.M) { +// EnsureNoLeaks is a helper function to fail tests if there are goroutine leaks. +// At this moment we still have a lot of goroutine leaks in the unit tests in this package. +// So we only use this while debugging and fixing the leaks. Once fixed we will use this +// in TestMain instead of just logging the number of leaked goroutines. +func EnsureNoLeaks(t testing.TB) { + if t.Failed() { + return + } + err := ensureNoGoroutines() + if err != nil { + t.Fatal(err) + } +} + +func ensureNoGoroutines() error { + // These goroutines have been found to stay around. + // Need to investigate and fix the Vitess ones at some point, if we indeed find out that they are unintended leaks. + var leaksToIgnore = []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"), + goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"), + } + + const ( + // give ample time for the goroutines to exit in CI. + waitTime = 100 * time.Millisecond + numIterations = 50 // 5 seconds + ) + var err error + for i := 0; i < numIterations; i++ { + err = goleak.Find(leaksToIgnore...) + if err == nil { + return nil + } + time.Sleep(waitTime) + } + return err +} + +func testMainWrapper(m *testing.M) int { + startingNumGoRoutines := runtime.NumGoroutine() + defer func() { + numGoroutines := runtime.NumGoroutine() + if numGoroutines > startingNumGoRoutines { + log.Infof("!!!!!!!!!!!! Wrangler unit tests Leaked %d goroutines", numGoroutines-startingNumGoRoutines) + } + }() _flag.ParseFlagsForTest() - os.Exit(m.Run()) + return m.Run() +} + +func TestMain(m *testing.M) { + os.Exit(testMainWrapper(m)) } func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv { diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 874850ef9b1..5ac410904b2 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -906,4 +906,5 @@ func (tme *testMigraterEnv) close(t *testing.T) { tme.ts.Close() tme.wr.tmc.Close() tme.wr = nil + tme.tmeDB.Close() }