Skip to content

Commit

Permalink
Flakes: VReplication unit tests: reduce goroutine leakage (#13824)
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Aug 22, 2023
1 parent e6df5d8 commit b9bdef8
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 7 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ require (
github.com/spf13/afero v1.9.3
github.com/spf13/jwalterweatherman v1.1.0
github.com/xlab/treeprint v1.2.0
go.uber.org/goleak v1.1.11
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090
golang.org/x/sync v0.1.0
modernc.org/sqlite v1.20.3
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/mock v0.2.0 h1:TaP3xedm7JaAgScZO7tlvlKrqT0p7I6OsdGB5YNSMDU=
go.uber.org/mock v0.2.0/go.mod h1:J0y0rp9L3xiff1+ZBfKxlC1fz2+aO16tw0tsDOixfuM=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
Expand Down Expand Up @@ -680,6 +681,7 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
Expand Down Expand Up @@ -922,6 +924,7 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ=
golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y=
Expand Down
18 changes: 16 additions & 2 deletions go/stats/rates.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package stats

import (
"context"
"encoding/json"
"math"
"sync"
Expand Down Expand Up @@ -65,6 +66,8 @@ type Rates struct {
// totalRate is the rate of total counts per second seen in the latest
// sampling interval e.g. 100 queries / 5 seconds sampling interval = 20 QPS.
totalRate float64
ctx context.Context
cancel context.CancelFunc
}

// NewRates reports rolling rate information for countTracker. samples specifies
Expand All @@ -76,13 +79,16 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time
if interval < 1*time.Second && interval != -1*time.Second {
panic("interval too small")
}
ctx, cancel := context.WithCancel(context.Background())
rt := &Rates{
timeStamps: NewRingInt64(samples + 1),
counts: make(map[string]*RingInt64),
countTracker: countTracker,
samples: samples + 1,
interval: interval,
timestampLastSampling: timeNow(),
ctx: ctx,
cancel: cancel,
}
if name != "" {
publish(name, rt)
Expand All @@ -93,12 +99,20 @@ func NewRates(name string, countTracker CountTracker, samples int, interval time
return rt
}

func (rt *Rates) Stop() {
rt.cancel()
}

func (rt *Rates) track() {
t := time.NewTicker(rt.interval)
defer t.Stop()
for {
rt.snapshot()
<-t.C
select {
case <-rt.ctx.Done():
return
case <-t.C:
rt.snapshot()
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func (bps *Stats) MessageHistory() []string {
return strs
}

func (bps *Stats) Stop() {
bps.Rates.Stop()
bps.VReplicationLagRates.Stop()
}

// NewStats creates a new Stats structure.
func NewStats() *Stats {
bps := &Stats{}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (tenv *testEnv) close() {
tenv.mu.Lock()
defer tenv.mu.Unlock()
tenv.ts.Close()
tenv.mysqld.Close()
}

//--------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
if state == binlogdatapb.VReplicationWorkflowState_Stopped.String() || state == binlogdatapb.VReplicationWorkflowState_Error.String() {
ct.cancel = func() {}
close(ct.done)
blpStats.Stop()
return ct, nil
}

Expand Down Expand Up @@ -312,5 +313,6 @@ func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplaye

func (ct *controller) Stop() {
ct.cancel()
ct.blpStats.Stop()
<-ct.done
}
8 changes: 5 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, err
}

stats := binlogplayer.NewStats()
defer stats.Stop()
switch plan.opcode {
case insertQuery:
qr, err := dbClient.ExecuteFetch(plan.query, 1)
Expand All @@ -396,7 +398,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return nil, fmt.Errorf("insert id %v out of range", qr.InsertID)
}

vdbc := newVDBClient(dbClient, binlogplayer.NewStats())
vdbc := newVDBClient(dbClient, stats)

// If we are creating multiple streams, for example in a
// merge workflow going from 2 shards to 1 shard, we
Expand Down Expand Up @@ -455,7 +457,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
if err != nil {
return nil, err
}
vdbc := newVDBClient(dbClient, binlogplayer.NewStats())
vdbc := newVDBClient(dbClient, stats)
for _, id := range ids {
params, err := readRow(dbClient, id)
if err != nil {
Expand All @@ -482,7 +484,7 @@ func (vre *Engine) exec(query string, runAsAdmin bool) (*sqltypes.Result, error)
return &sqltypes.Result{}, nil
}
// Stop and delete the current controllers.
vdbc := newVDBClient(dbClient, binlogplayer.NewStats())
vdbc := newVDBClient(dbClient, stats)
for _, id := range ids {
if ct := vre.controllers[id]; ct != nil {
ct.Stop()
Expand Down
3 changes: 3 additions & 0 deletions go/vt/wrangler/fake_tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ func (ft *fakeTablet) StopActionLoop(t *testing.T) {
if ft.StartHTTPServer {
ft.HTTPListener.Close()
}
if ft.RPCServer != nil {
ft.RPCServer.Stop()
}
ft.Listener.Close()
ft.TM.Stop()
ft.TM = nil
Expand Down
62 changes: 60 additions & 2 deletions go/vt/wrangler/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ import (
"fmt"
"os"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"

"go.uber.org/goleak"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -55,9 +60,62 @@ type testMaterializerEnv struct {
//----------------------------------------------
// testMaterializerEnv

func TestMain(m *testing.M) {
// EnsureNoLeaks is a helper function to fail tests if there are goroutine leaks.
// At this moment we still have a lot of goroutine leaks in the unit tests in this package.
// So we only use this while debugging and fixing the leaks. Once fixed we will use this
// in TestMain instead of just logging the number of leaked goroutines.
func EnsureNoLeaks(t testing.TB) {
if t.Failed() {
return
}
err := ensureNoGoroutines()
if err != nil {
t.Fatal(err)
}
}

func ensureNoGoroutines() error {
// These goroutines have been found to stay around.
// Need to investigate and fix the Vitess ones at some point, if we indeed find out that they are unintended leaks.
var leaksToIgnore = []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"),
goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"),
}

const (
// give ample time for the goroutines to exit in CI.
waitTime = 100 * time.Millisecond
numIterations = 50 // 5 seconds
)
var err error
for i := 0; i < numIterations; i++ {
err = goleak.Find(leaksToIgnore...)
if err == nil {
return nil
}
time.Sleep(waitTime)
}
return err
}

func testMainWrapper(m *testing.M) int {
startingNumGoRoutines := runtime.NumGoroutine()
defer func() {
numGoroutines := runtime.NumGoroutine()
if numGoroutines > startingNumGoRoutines {
log.Infof("!!!!!!!!!!!! Wrangler unit tests Leaked %d goroutines", numGoroutines-startingNumGoRoutines)
}
}()
_flag.ParseFlagsForTest()
os.Exit(m.Run())
return m.Run()
}

func TestMain(m *testing.M) {
os.Exit(testMainWrapper(m))
}

func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv {
Expand Down
1 change: 1 addition & 0 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,4 +927,5 @@ func (tme *testMigraterEnv) close(t *testing.T) {
tme.ts.Close()
tme.wr.tmc.Close()
tme.wr = nil
tme.tmeDB.Close()
}

0 comments on commit b9bdef8

Please sign in to comment.