Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flakes: VReplication unit tests: reduce goroutine leakage #13824

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ require (
github.com/spf13/afero v1.9.3
github.com/spf13/jwalterweatherman v1.1.0
github.com/xlab/treeprint v1.2.0
go.uber.org/goleak v1.1.11
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090
golang.org/x/sync v0.1.0
modernc.org/sqlite v1.20.3
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/mock v0.2.0 h1:TaP3xedm7JaAgScZO7tlvlKrqT0p7I6OsdGB5YNSMDU=
go.uber.org/mock v0.2.0/go.mod h1:J0y0rp9L3xiff1+ZBfKxlC1fz2+aO16tw0tsDOixfuM=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
Expand Down Expand Up @@ -680,6 +681,7 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
Expand Down Expand Up @@ -922,6 +924,7 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ=
golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y=
Expand Down
18 changes: 16 additions & 2 deletions go/stats/rates.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package stats

import (
"context"
"encoding/json"
"math"
"sync"
Expand Down Expand Up @@ -65,6 +66,8 @@ type Rates struct {
// totalRate is the rate of total counts per second seen in the latest
// sampling interval e.g. 100 queries / 5 seconds sampling interval = 20 QPS.
totalRate float64
ctx context.Context
cancel context.CancelFunc
}

// NewRates reports rolling rate information for countTracker. samples specifies
Expand All @@ -76,13 +79,16 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time
if interval < 1*time.Second && interval != -1*time.Second {
panic("interval too small")
}
ctx, cancel := context.WithCancel(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think design wise it's better to pass in the context explicitly as a more idiomatic Go design. But that might mean many more changes, so dunno if that's desired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but it will require other areas to be refactored too: some of the functions that call NewRates themselves don't have contexts passed in and we might need to store some of the contexts around so that the cancellation code paths have access to them.

Hence thought I would take this quicker approach now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's kinda what I suspected as well yeah.

rt := &Rates{
timeStamps: NewRingInt64(samples + 1),
counts: make(map[string]*RingInt64),
countTracker: countTracker,
samples: samples + 1,
interval: interval,
timestampLastSampling: timeNow(),
ctx: ctx,
cancel: cancel,
}
if name != "" {
publish(name, rt)
Expand All @@ -93,12 +99,20 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time
return rt
}

func (rt *Rates) Stop() {
rt.cancel()
}

func (rt *Rates) track() {
t := time.NewTicker(rt.interval)
defer t.Stop()
for {
rt.snapshot()
<-t.C
select {
case <-rt.ctx.Done():
return
case <-t.C:
rt.snapshot()
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func (bps *Stats) MessageHistory() []string {
return strs
}

func (bps *Stats) Stop() {
bps.Rates.Stop()
bps.VReplicationLagRates.Stop()
}

// NewStats creates a new Stats structure.
func NewStats() *Stats {
bps := &Stats{}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (tenv *testEnv) close() {
tenv.mu.Lock()
defer tenv.mu.Unlock()
tenv.ts.Close()
tenv.mysqld.Close()
}

//--------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
if state == binlogdatapb.VReplicationWorkflowState_Stopped.String() || state == binlogdatapb.VReplicationWorkflowState_Error.String() {
ct.cancel = func() {}
close(ct.done)
blpStats.Stop()
return ct, nil
}

Expand Down Expand Up @@ -312,5 +313,6 @@ func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplaye

func (ct *controller) Stop() {
ct.cancel()
ct.blpStats.Stop()
<-ct.done
}
8 changes: 5 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, err
}

stats := binlogplayer.NewStats()
defer stats.Stop()
switch plan.opcode {
case insertQuery:
qr, err := dbClient.ExecuteFetch(plan.query, 1)
Expand All @@ -396,7 +398,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, fmt.Errorf("insert id %v out of range", qr.InsertID)
}

vdbc := newVDBClient(dbClient, binlogplayer.NewStats())
vdbc := newVDBClient(dbClient, stats)

// If we are creating multiple streams, for example in a
// merge workflow going from 2 shards to 1 shard, we
Expand Down Expand Up @@ -455,7 +457,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
if err != nil {
return nil, err
}
vdbc := newVDBClient(dbClient, binlogplayer.NewStats())
vdbc := newVDBClient(dbClient, stats)
for _, id := range ids {
params, err := readRow(dbClient, id)
if err != nil {
Expand All @@ -482,7 +484,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return &sqltypes.Result{}, nil
}
// Stop and delete the current controllers.
vdbc := newVDBClient(dbClient, binlogplayer.NewStats())
vdbc := newVDBClient(dbClient, stats)
for _, id := range ids {
if ct := vre.controllers[id]; ct != nil {
ct.Stop()
Expand Down
3 changes: 3 additions & 0 deletions go/vt/wrangler/fake_tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ func (ft *fakeTablet) StopActionLoop(t *testing.T) {
if ft.StartHTTPServer {
ft.HTTPListener.Close()
}
if ft.RPCServer != nil {
ft.RPCServer.Stop()
}
ft.Listener.Close()
ft.TM.Stop()
ft.TM = nil
Expand Down
62 changes: 60 additions & 2 deletions go/vt/wrangler/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ import (
"fmt"
"os"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"

"go.uber.org/goleak"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -55,9 +60,62 @@ type testMaterializerEnv struct {
//----------------------------------------------
// testMaterializerEnv

func TestMain(m *testing.M) {
// EnsureNoLeaks is a helper function to fail tests if there are goroutine leaks.
// At this moment we still have a lot of goroutine leaks in the unit tests in this package.
// So we only use this while debugging and fixing the leaks. Once fixed we will use this
// in TestMain instead of just logging the number of leaked goroutines.
func EnsureNoLeaks(t testing.TB) {
if t.Failed() {
return
}
err := ensureNoGoroutines()
if err != nil {
t.Fatal(err)
}
}

func ensureNoGoroutines() error {
// These goroutines have been found to stay around.
// Need to investigate and fix the Vitess ones at some point, if we indeed find out that they are unintended leaks.
var leaksToIgnore = []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"),
goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"),
}

const (
// give ample time for the goroutines to exit in CI.
waitTime = 100 * time.Millisecond
numIterations = 50 // 5 seconds
)
var err error
for i := 0; i < numIterations; i++ {
err = goleak.Find(leaksToIgnore...)
if err == nil {
return nil
}
time.Sleep(waitTime)
}
return err
}

func testMainWrapper(m *testing.M) int {
startingNumGoRoutines := runtime.NumGoroutine()
defer func() {
numGoroutines := runtime.NumGoroutine()
if numGoroutines > startingNumGoRoutines {
log.Infof("!!!!!!!!!!!! Wrangler unit tests Leaked %d goroutines", numGoroutines-startingNumGoRoutines)
}
}()
_flag.ParseFlagsForTest()
os.Exit(m.Run())
return m.Run()
}

func TestMain(m *testing.M) {
os.Exit(testMainWrapper(m))
}

func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv {
Expand Down
1 change: 1 addition & 0 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,4 +906,5 @@ func (tme *testMigraterEnv) close(t *testing.T) {
tme.ts.Close()
tme.wr.tmc.Close()
tme.wr = nil
tme.tmeDB.Close()
}
Loading