From c3154b3efe2581dfafe5fece417f3a33f2fed0c5 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 18 Aug 2023 23:18:31 +0200 Subject: [PATCH 1/4] Cleanup most of the leaked goroutines during the run of TestTableMigrateMainflow. Let's check if other CI tests run with these changes before we test/extend this approach for all tests Signed-off-by: Rohit Nayak --- go.mod | 1 + go.sum | 3 ++ go/mysql/fakesqldb/server.go | 1 + go/stats/rates.go | 26 +++++++++++- go/vt/binlog/binlogplayer/binlog_player.go | 5 +++ .../vttablet/tabletmanager/framework_test.go | 1 + .../tabletmanager/vreplication/controller.go | 2 + .../tabletmanager/vreplication/engine.go | 8 ++-- go/vt/wrangler/fake_tablet_test.go | 3 ++ go/vt/wrangler/traffic_switcher_env_test.go | 1 + go/vt/wrangler/traffic_switcher_test.go | 40 +++++++++++++++++++ 11 files changed, 86 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index af6767ba7bc..bd5b0c1080c 100644 --- a/go.mod +++ b/go.mod @@ -106,6 +106,7 @@ require ( github.com/spf13/afero v1.9.3 github.com/spf13/jwalterweatherman v1.1.0 github.com/xlab/treeprint v1.2.0 + go.uber.org/goleak v1.1.11 golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 golang.org/x/sync v0.1.0 modernc.org/sqlite v1.20.3 diff --git a/go.sum b/go.sum index 39777e9b54a..1a690dd3d2c 100644 --- a/go.sum +++ b/go.sum @@ -622,6 +622,7 @@ go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0 go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/mock v0.2.0 h1:TaP3xedm7JaAgScZO7tlvlKrqT0p7I6OsdGB5YNSMDU= go.uber.org/mock v0.2.0/go.mod h1:J0y0rp9L3xiff1+ZBfKxlC1fz2+aO16tw0tsDOixfuM= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= @@ -680,6 +681,7 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= @@ -922,6 +924,7 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y= diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index cb3d20ae04b..e9585bf413d 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -237,6 +237,7 @@ func (db *DB) Close() { tmpDir := path.Dir(db.socketFile) os.RemoveAll(tmpDir) + } // CloseAllConnections can be used to provoke MySQL client errors for open diff --git a/go/stats/rates.go b/go/stats/rates.go index cc57c45910e..6d6cfb214d8 100644 --- a/go/stats/rates.go +++ b/go/stats/rates.go @@ -17,10 +17,15 @@ limitations under the License. package stats import ( + "context" "encoding/json" "math" + "runtime/debug" + "strings" "sync" "time" + + "vitess.io/vitess/go/vt/log" ) var timeNow = time.Now @@ -65,6 +70,8 @@ type Rates struct { // totalRate is the rate of total counts per second seen in the latest // sampling interval e.g. 100 queries / 5 seconds sampling interval = 20 QPS. totalRate float64 + ctx context.Context + cancel context.CancelFunc } // NewRates reports rolling rate information for countTracker. samples specifies @@ -73,9 +80,14 @@ type Rates struct { // If passing the special value of -1s as interval, we don't snapshot. // (use this for tests). func NewRates(name string, countTracker CountTracker, samples int, interval time.Duration) *Rates { + stack := debug.Stack() + if !strings.Contains(string(stack), "engine.go:385") && !strings.Contains(string(stack), "controller.go:77") { + log.Infof("NewRates(%v, %v, %v, %v, %s)", name, countTracker, samples, interval, debug.Stack()) + } if interval < 1*time.Second && interval != -1*time.Second { panic("interval too small") } + ctx, cancel := context.WithCancel(context.Background()) rt := &Rates{ timeStamps: NewRingInt64(samples + 1), counts: make(map[string]*RingInt64), @@ -83,6 +95,8 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time samples: samples + 1, interval: interval, timestampLastSampling: timeNow(), + ctx: ctx, + cancel: cancel, } if name != "" { publish(name, rt) @@ -93,12 +107,20 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time return rt } +func (rt *Rates) Stop() { + rt.cancel() +} + func (rt *Rates) track() { t := time.NewTicker(rt.interval) defer t.Stop() for { - rt.snapshot() - <-t.C + select { + case <-rt.ctx.Done(): + return + case <-t.C: + rt.snapshot() + } } } diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index f32462602be..6d689bc5436 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -142,6 +142,11 @@ func (bps *Stats) MessageHistory() []string { return strs } +func (bps *Stats) Stop() { + bps.Rates.Stop() + bps.VReplicationLagRates.Stop() +} + // NewStats creates a new Stats structure. func NewStats() *Stats { bps := &Stats{} diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 32d1c7019c2..642c699e284 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -122,6 +122,7 @@ func (tenv *testEnv) close() { tenv.mu.Lock() defer tenv.mu.Unlock() tenv.ts.Close() + tenv.mysqld.Close() } //-------------------------------------- diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 21464312c91..2d7d49f2981 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -103,6 +103,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor if state == binlogdatapb.VReplicationWorkflowState_Stopped.String() || state == binlogdatapb.VReplicationWorkflowState_Error.String() { ct.cancel = func() {} close(ct.done) + blpStats.Stop() return ct, nil } @@ -312,5 +313,6 @@ func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplaye func (ct *controller) Stop() { ct.cancel() + ct.blpStats.Stop() <-ct.done } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index d838e2c2471..592577d9d9c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -382,6 +382,8 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) return nil, err } + stats := binlogplayer.NewStats() + defer stats.Stop() switch plan.opcode { case insertQuery: qr, err := dbClient.ExecuteFetch(plan.query, 1) @@ -396,7 +398,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) return nil, fmt.Errorf("insert id %v out of range", qr.InsertID) } - vdbc := newVDBClient(dbClient, binlogplayer.NewStats()) + vdbc := newVDBClient(dbClient, stats) // If we are creating multiple streams, for example in a // merge workflow going from 2 shards to 1 shard, we @@ -455,7 +457,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) if err != nil { return nil, err } - vdbc := newVDBClient(dbClient, binlogplayer.NewStats()) + vdbc := newVDBClient(dbClient, stats) for _, id := range ids { params, err := readRow(dbClient, id) if err != nil { @@ -482,7 +484,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error) return &sqltypes.Result{}, nil } // Stop and delete the current controllers. - vdbc := newVDBClient(dbClient, binlogplayer.NewStats()) + vdbc := newVDBClient(dbClient, stats) for _, id := range ids { if ct := vre.controllers[id]; ct != nil { ct.Stop() diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index 687ce93db47..d81443777f7 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -233,6 +233,9 @@ func (ft *fakeTablet) StopActionLoop(t *testing.T) { if ft.StartHTTPServer { ft.HTTPListener.Close() } + if ft.RPCServer != nil { + ft.RPCServer.Stop() + } ft.Listener.Close() ft.TM.Stop() ft.TM = nil diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 874850ef9b1..5ac410904b2 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -906,4 +906,5 @@ func (tme *testMigraterEnv) close(t *testing.T) { tme.ts.Close() tme.wr.tmc.Close() tme.wr = nil + tme.tmeDB.Close() } diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index ef7495d13a0..8c21c9a8967 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -21,10 +21,15 @@ import ( "errors" "fmt" "reflect" + "runtime" "strings" "testing" "time" + "go.uber.org/goleak" + + "vitess.io/vitess/go/vt/log" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -71,9 +76,44 @@ const ( tsCheckJournals = "select val from _vt.resharding_journal where id=.*" ) +func ensureNoGoroutines(t testing.TB) { + var ignored = []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"), + goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"), + } + + var err error + for i := 0; i < 5; i++ { + err = goleak.Find(ignored...) + if err == nil { + return + } + time.Sleep(100 * time.Millisecond) + } + t.Fatal(err) +} + // TestTableMigrate tests table mode migrations. // This has to be kept in sync with TestShardMigrate. func TestTableMigrateMainflow(t *testing.T) { + t.Cleanup(func() { + if t.Failed() { + return + } + ensureNoGoroutines(t) + }) + //defer goleak.VerifyNone(t) + + procs := runtime.NumGoroutine() + defer func() { + if procs != runtime.NumGoroutine() { + log.Errorf("TestTableMigrate: goroutine leak: %d, want %d", runtime.NumGoroutine(), procs) + } + }() ctx := context.Background() tme := newTestTableMigrater(ctx, t) defer tme.close(t) From 6fef1828b410227e01345bf3b420f84e76d46c6b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 19 Aug 2023 00:09:27 +0200 Subject: [PATCH 2/4] Don't fail on leaked goroutines, since there are clearly a whole lot of them from previous tests as well to fix. Let's just check if the specific changes have broken any tests. Signed-off-by: Rohit Nayak --- go/vt/wrangler/traffic_switcher_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 8c21c9a8967..930c360ee78 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -100,12 +100,12 @@ func ensureNoGoroutines(t testing.TB) { // TestTableMigrate tests table mode migrations. // This has to be kept in sync with TestShardMigrate. func TestTableMigrateMainflow(t *testing.T) { - t.Cleanup(func() { - if t.Failed() { - return - } - ensureNoGoroutines(t) - }) + //t.Cleanup(func() { + // if t.Failed() { + // return + // } + // ensureNoGoroutines(t) + //}) //defer goleak.VerifyNone(t) procs := runtime.NumGoroutine() From 7abfcf7052150929e05dc406fa1c5f74164e1c3d Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 21 Aug 2023 10:49:10 +0200 Subject: [PATCH 3/4] Log number of leaked goroutines Signed-off-by: Rohit Nayak --- go/vt/wrangler/materializer_env_test.go | 62 ++++++++++++++++++++++++- go/vt/wrangler/traffic_switcher_test.go | 40 ---------------- 2 files changed, 60 insertions(+), 42 deletions(-) diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index 98e77b38df3..48cca1a0bb8 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -21,12 +21,17 @@ import ( "fmt" "os" "regexp" + "runtime" "strconv" "strings" "sync" "testing" + "time" + + "go.uber.org/goleak" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/sqlparser" @@ -55,9 +60,62 @@ type testMaterializerEnv struct { //---------------------------------------------- // testMaterializerEnv -func TestMain(m *testing.M) { +// EnsureNoLeaks is a helper function to fail tests if there are goroutine leaks. +// At this moment we still have a lot of goroutine leaks in the unit tests in this package. +// So we only use this while debugging and fixing the leaks. Once fixed we will use this +// in TestMain instead of just logging the number of leaked goroutines. +func EnsureNoLeaks(t testing.TB) { + if t.Failed() { + return + } + err := ensureNoGoroutines() + if err != nil { + t.Fatal(err) + } +} + +func ensureNoGoroutines() error { + // These goroutines have been found to stay around. + // Need to investigate and fix the Vitess ones at some point, if we indeed find out that they are unintended leaks. + var leaksToIgnore = []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"), + goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"), + } + + const ( + // give ample time for the goroutines to exit in CI. + waitTime = 100 * time.Millisecond + numIterations = 50 // 5 seconds + ) + var err error + for i := 0; i < numIterations; i++ { + err = goleak.Find(leaksToIgnore...) + if err == nil { + return nil + } + time.Sleep(waitTime) + } + return err +} + +func testMainWrapper(m *testing.M) int { + startingNumGoRoutines := runtime.NumGoroutine() + defer func() { + numGoroutines := runtime.NumGoroutine() + if numGoroutines > startingNumGoRoutines { + log.Infof("!!!!!!!!!!!! Wrangler unit tests Leaked %d goroutines", numGoroutines-startingNumGoRoutines) + } + }() _flag.ParseFlagsForTest() - os.Exit(m.Run()) + return m.Run() +} + +func TestMain(m *testing.M) { + os.Exit(testMainWrapper(m)) } func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv { diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 930c360ee78..ef7495d13a0 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -21,15 +21,10 @@ import ( "errors" "fmt" "reflect" - "runtime" "strings" "testing" "time" - "go.uber.org/goleak" - - "vitess.io/vitess/go/vt/log" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -76,44 +71,9 @@ const ( tsCheckJournals = "select val from _vt.resharding_journal where id=.*" ) -func ensureNoGoroutines(t testing.TB) { - var ignored = []goleak.Option{ - goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), - goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"), - goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"), - goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"), - } - - var err error - for i := 0; i < 5; i++ { - err = goleak.Find(ignored...) - if err == nil { - return - } - time.Sleep(100 * time.Millisecond) - } - t.Fatal(err) -} - // TestTableMigrate tests table mode migrations. // This has to be kept in sync with TestShardMigrate. func TestTableMigrateMainflow(t *testing.T) { - //t.Cleanup(func() { - // if t.Failed() { - // return - // } - // ensureNoGoroutines(t) - //}) - //defer goleak.VerifyNone(t) - - procs := runtime.NumGoroutine() - defer func() { - if procs != runtime.NumGoroutine() { - log.Errorf("TestTableMigrate: goroutine leak: %d, want %d", runtime.NumGoroutine(), procs) - } - }() ctx := context.Background() tme := newTestTableMigrater(ctx, t) defer tme.close(t) From 5c41a2a4d59cb677e71ce7d89fb4dc92bee2a309 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 21 Aug 2023 15:51:38 +0200 Subject: [PATCH 4/4] Minor refactor Signed-off-by: Rohit Nayak --- go/mysql/fakesqldb/server.go | 1 - go/stats/rates.go | 8 -------- 2 files changed, 9 deletions(-) diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index e9585bf413d..cb3d20ae04b 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -237,7 +237,6 @@ func (db *DB) Close() { tmpDir := path.Dir(db.socketFile) os.RemoveAll(tmpDir) - } // CloseAllConnections can be used to provoke MySQL client errors for open diff --git a/go/stats/rates.go b/go/stats/rates.go index 6d6cfb214d8..48864585225 100644 --- a/go/stats/rates.go +++ b/go/stats/rates.go @@ -20,12 +20,8 @@ import ( "context" "encoding/json" "math" - "runtime/debug" - "strings" "sync" "time" - - "vitess.io/vitess/go/vt/log" ) var timeNow = time.Now @@ -80,10 +76,6 @@ type Rates struct { // If passing the special value of -1s as interval, we don't snapshot. // (use this for tests). func NewRates(name string, countTracker CountTracker, samples int, interval time.Duration) *Rates { - stack := debug.Stack() - if !strings.Contains(string(stack), "engine.go:385") && !strings.Contains(string(stack), "controller.go:77") { - log.Infof("NewRates(%v, %v, %v, %v, %s)", name, countTracker, samples, interval, debug.Stack()) - } if interval < 1*time.Second && interval != -1*time.Second { panic("interval too small") }