From ee6d77b216c2a6665b56310a0d4f407843f0b66a Mon Sep 17 00:00:00 2001 From: Ivan Tivonenko Date: Fri, 23 Oct 2020 00:04:14 +0300 Subject: [PATCH 1/2] Fix memory leak --- common/testutil.go | 20 ++++++++++++++++++++ discovery/db_discovery.go | 2 +- discovery/discovery_test.go | 11 ++++++++--- server/push_test.go | 23 +++-------------------- 4 files changed, 32 insertions(+), 24 deletions(-) diff --git a/common/testutil.go b/common/testutil.go index 85f1d57931..e95ee4629c 100644 --- a/common/testutil.go +++ b/common/testutil.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/livepeer/go-livepeer/net" + "go.uber.org/goleak" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" ) @@ -76,3 +77,22 @@ func (s *StubServerStream) RecvMsg(m interface{}) error { func (s *StubServerStream) Send(n *net.NotifySegment) error { return nil } + +// IgnoreRoutines goroutines to ignore in tests +func IgnoreRoutines() []goleak.Option { + // goleak works by making list of all running goroutines and reporting error if it finds any + // this list tells goleak to ignore these goroutines - we're not interested in these particular goroutines + funcs2ignore := []string{"github.com/golang/glog.(*loggingT).flushDaemon", "go.opencensus.io/stats/view.(*worker).start", + "github.com/rjeczalik/notify.(*recursiveTree).dispatch", "github.com/rjeczalik/notify._Cfunc_CFRunLoopRun", "github.com/ethereum/go-ethereum/metrics.(*meterArbiter).tick", + "github.com/ethereum/go-ethereum/consensus/ethash.(*Ethash).remote", "github.com/ethereum/go-ethereum/core.(*txSenderCacher).cache", + "internal/poll.runtime_pollWait", "github.com/livepeer/go-livepeer/core.(*RemoteTranscoderManager).Manage", "github.com/livepeer/lpms/core.(*LPMS).Start", + "github.com/livepeer/go-livepeer/server.(*LivepeerServer).StartMediaServer", "github.com/livepeer/go-livepeer/core.(*RemoteTranscoderManager).Manage.func1", + "github.com/livepeer/go-livepeer/server.(*LivepeerServer).HandlePush.func1", "github.com/rjeczalik/notify.(*nonrecursiveTree).dispatch", + "github.com/rjeczalik/notify.(*nonrecursiveTree).internal", "github.com/livepeer/lpms/stream.NewBasicRTMPVideoStream.func1"} + + res := make([]goleak.Option, 0, len(funcs2ignore)) + for _, f := range funcs2ignore { + res = append(res, goleak.IgnoreTopFunction(f)) + } + return res +} diff --git a/discovery/db_discovery.go b/discovery/db_discovery.go index 5c1fd9ea62..5b8d1e59c8 100644 --- a/discovery/db_discovery.go +++ b/discovery/db_discovery.go @@ -240,7 +240,7 @@ func (dbo *DBOrchestratorPoolCache) cacheDBOrchs() error { return fmt.Errorf("could not retrieve orchestrators from DB: %v", err) } - resc, errc := make(chan *common.DBOrch), make(chan error) + resc, errc := make(chan *common.DBOrch, len(orchs)), make(chan error, len(orchs)) ctx, cancel := context.WithTimeout(context.Background(), getOrchestratorsTimeoutLoop) defer cancel() diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index e51f6cf4aa..6011850172 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/goleak" ) func TestNewDBOrchestratorPoolCache_NilEthClient_ReturnsError(t *testing.T) { @@ -144,8 +145,6 @@ func TestPoolSize(t *testing.T) { func TestDBOrchestratorPoolCacheSize(t *testing.T) { assert := assert.New(t) dbh, dbraw, err := common.TempDB(t) - defer dbh.Close() - defer dbraw.Close() require := require.New(t) require.Nil(err) @@ -156,7 +155,12 @@ func TestDBOrchestratorPoolCacheSize(t *testing.T) { Sender: sender, } ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + defer func() { + cancel() + dbh.Close() + dbraw.Close() + goleak.VerifyNone(t, common.IgnoreRoutines()...) + }() emptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}) require.NoError(err) @@ -1330,6 +1334,7 @@ func TestOrchestratorPool_ShuffleGetOrchestrators(t *testing.T) { } func TestOrchestratorPool_GetOrchestratorTimeout(t *testing.T) { + defer goleak.VerifyNone(t, common.IgnoreRoutines()...) assert := assert.New(t) addresses := stringsToURIs([]string{"https://127.0.0.1:8936", "https://127.0.0.1:8937", "https://127.0.0.1:8938"}) diff --git a/server/push_test.go b/server/push_test.go index 0b15e098b1..40f9df5585 100644 --- a/server/push_test.go +++ b/server/push_test.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "github.com/golang/protobuf/proto" + "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" "github.com/livepeer/go-livepeer/drivers" "github.com/livepeer/go-livepeer/net" @@ -548,7 +549,7 @@ func TestPush_SetVideoProfileFormats(t *testing.T) { } func TestPush_ShouldRemoveSessionAfterTimeoutIfInternalMIDIsUsed(t *testing.T) { - defer goleak.VerifyNone(t, ignoreRoutines()...) + defer goleak.VerifyNone(t, common.IgnoreRoutines()...) oldRI := httpPushTimeout httpPushTimeout = 2 * time.Millisecond @@ -593,26 +594,8 @@ func TestPush_ShouldRemoveSessionAfterTimeoutIfInternalMIDIsUsed(t *testing.T) { assert.False(extEx) } -func ignoreRoutines() []goleak.Option { - // goleak works by making list of all running goroutines and reporting error if it finds any - // this list tells goleak to ignore these goroutines - we're not interested in these particular goroutines - funcs2ignore := []string{"github.com/golang/glog.(*loggingT).flushDaemon", "go.opencensus.io/stats/view.(*worker).start", - "github.com/rjeczalik/notify.(*recursiveTree).dispatch", "github.com/rjeczalik/notify._Cfunc_CFRunLoopRun", "github.com/ethereum/go-ethereum/metrics.(*meterArbiter).tick", - "github.com/ethereum/go-ethereum/consensus/ethash.(*Ethash).remote", "github.com/ethereum/go-ethereum/core.(*txSenderCacher).cache", - "internal/poll.runtime_pollWait", "github.com/livepeer/go-livepeer/core.(*RemoteTranscoderManager).Manage", "github.com/livepeer/lpms/core.(*LPMS).Start", - "github.com/livepeer/go-livepeer/server.(*LivepeerServer).StartMediaServer", "github.com/livepeer/go-livepeer/core.(*RemoteTranscoderManager).Manage.func1", - "github.com/livepeer/go-livepeer/server.(*LivepeerServer).HandlePush.func1", "github.com/rjeczalik/notify.(*nonrecursiveTree).dispatch", - "github.com/rjeczalik/notify.(*nonrecursiveTree).internal", "github.com/livepeer/lpms/stream.NewBasicRTMPVideoStream.func1"} - - res := make([]goleak.Option, 0, len(funcs2ignore)) - for _, f := range funcs2ignore { - res = append(res, goleak.IgnoreTopFunction(f)) - } - return res -} - func TestPush_ShouldRemoveSessionAfterTimeout(t *testing.T) { - defer goleak.VerifyNone(t, ignoreRoutines()...) + defer goleak.VerifyNone(t, common.IgnoreRoutines()...) oldRI := httpPushTimeout httpPushTimeout = 2 * time.Millisecond From 662ceebf9ebf4286548d0be5a47617506f86592c Mon Sep 17 00:00:00 2001 From: Ivan Tivonenko Date: Sat, 24 Oct 2020 22:40:16 +0300 Subject: [PATCH 2/2] Quit function on timeout --- discovery/db_discovery.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/discovery/db_discovery.go b/discovery/db_discovery.go index 5b8d1e59c8..dc7ddeb3c9 100644 --- a/discovery/db_discovery.go +++ b/discovery/db_discovery.go @@ -164,7 +164,7 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchestratorStake() error { return fmt.Errorf("could not retrieve orchestrators from DB: %v", err) } - resc, errc := make(chan *common.DBOrch), make(chan error) + resc, errc := make(chan *common.DBOrch, len(orchs)), make(chan error, len(orchs)) ctx, cancel := context.WithTimeout(context.Background(), getOrchestratorsTimeoutLoop) defer cancel() @@ -201,7 +201,7 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchestratorStake() error { glog.Errorln(err) case <-ctx.Done(): glog.Info("Done fetching stake for orchestrators, context timeout") - break + return nil } } @@ -270,7 +270,6 @@ func (dbo *DBOrchestratorPoolCache) cacheDBOrchs() error { } numOrchs++ go getOrchInfo(orch) - } for i := 0; i < numOrchs; i++ { @@ -283,7 +282,7 @@ func (dbo *DBOrchestratorPoolCache) cacheDBOrchs() error { glog.Errorln(err) case <-ctx.Done(): glog.Info("Done fetching orch info for orchestrators, context timeout") - break + return nil } }