From f65990068773eddc0c3c619d469dc17b1214d3fc Mon Sep 17 00:00:00 2001 From: Reuben Rodrigues Date: Wed, 28 Apr 2021 07:56:38 +0000 Subject: [PATCH] Minor improvements - tests, error handling (#1842) --- core/core_test.go | 1 - core/orch_test.go | 50 +++++++++++++++++++++++++++++--------------- core/orchestrator.go | 5 +++-- 3 files changed, 36 insertions(+), 20 deletions(-) diff --git a/core/core_test.go b/core/core_test.go index 9ccf569dcd..165b4080ac 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -140,7 +140,6 @@ func TestTranscodeLoop_GivenNoSegmentsPastTimeout_CleansSegmentChan(t *testing.T ss := StubSegment() md := &SegTranscodingMetadata{Profiles: videoProfiles, AuthToken: stubAuthToken()} n.Transcoder = NewLocalTranscoder(tmp) - n.TranscoderManager = NewRemoteTranscoderManager() transcodeLoopTimeout = 100 * time.Millisecond assert := assert.New(t) require := require.New(t) diff --git a/core/orch_test.go b/core/orch_test.go index ade451783d..62a89f03f1 100644 --- a/core/orch_test.go +++ b/core/orch_test.go @@ -306,7 +306,8 @@ func TestSelectTranscoder(t *testing.T) { // assert transcoder is returned from selectTranscoder t1 := m.liveTranscoders[strm] t2 := m.liveTranscoders[strm2] - currentTranscoder, _ := m.selectTranscoder(testSessionId) + currentTranscoder, err := m.selectTranscoder(testSessionId) + assert.Nil(err) assert.Equal(t2, currentTranscoder) assert.Equal(1, t2.load) assert.NotNil(m.liveTranscoders[strm]) @@ -314,18 +315,24 @@ func TestSelectTranscoder(t *testing.T) { // assert that same transcoder is selected for same sessionId // and that load stays the same - currentTranscoder, _ = m.selectTranscoder(testSessionId) + currentTranscoder, err = m.selectTranscoder(testSessionId) + assert.Nil(err) assert.Equal(t2, currentTranscoder) assert.Equal(1, t2.load) + m.completeStreamSession(testSessionId) // assert that a new transcoder is selected for a new sessionId - currentTranscoder, _ = m.selectTranscoder(testSessionId2) + currentTranscoder, err = m.selectTranscoder(testSessionId2) + assert.Nil(err) assert.Equal(t1, currentTranscoder) assert.Equal(1, t1.load) - // assert no transcoder returned if all at they capacity + // Add some more load and assert no transcoder returned if all at capacity + currentTranscoder, err = m.selectTranscoder(testSessionId) + assert.Nil(err) + assert.Equal(t2, currentTranscoder) noTrans, err := m.selectTranscoder(testSessionId3) - assert.Equal(err, errors.New("no transcoders available")) + assert.Equal(err, ErrNoTranscodersAvailable) assert.Nil(noTrans) // assert that load is emtry after ending stream sessions @@ -339,14 +346,16 @@ func TestSelectTranscoder(t *testing.T) { assert.NotNil(m.liveTranscoders[strm]) // assert t1 is selected and t2 drained, but was previously selected - currentTranscoder, _ = m.selectTranscoder(testSessionId) + currentTranscoder, err = m.selectTranscoder(testSessionId) + assert.Nil(err) assert.Equal(t1, currentTranscoder) assert.Equal(1, t1.load) assert.NotNil(m.liveTranscoders[strm]) assert.Len(m.remoteTranscoders, 2) // assert transcoder gets added back to remoteTranscoders if no transcoding error - _, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + transcodedData, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + assert.NotNil(transcodedData) assert.Nil(err) assert.Len(m.remoteTranscoders, 2) assert.Equal(1, t1.load) @@ -356,7 +365,7 @@ func TestSelectTranscoder(t *testing.T) { func TestCompleteStreamSession(t *testing.T) { m := NewRemoteTranscoderManager() - strm := &StubTranscoderServer{manager: m, WithholdResults: false} + strm := &StubTranscoderServer{manager: m} testSessionId := "testID" assert := assert.New(t) @@ -372,7 +381,9 @@ func TestCompleteStreamSession(t *testing.T) { // complete session and assert that it is cleared m.completeStreamSession(testSessionId) - assert.Nil(m.streamSessions[testSessionId]) + transcoder, ok := m.streamSessions[testSessionId] + assert.Nil(transcoder) + assert.False(ok) assert.Equal(0, t1.load) } @@ -387,9 +398,10 @@ func TestTranscoderManagerTranscoding(t *testing.T) { assert.Empty(m.remoteTranscoders) // Attempt to transcode when no transcoders in the set - _, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + transcodedData, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + assert.Nil(transcodedData) assert.NotNil(err) - assert.Equal(err.Error(), "no transcoders available") + assert.Equal(err, ErrNoTranscodersAvailable) wg := newWg(1) go func() { m.Manage(s, 5); wg.Done() }() @@ -407,7 +419,8 @@ func TestTranscoderManagerTranscoding(t *testing.T) { // non-fatal error should not remove from list s.TranscodeError = fmt.Errorf("TranscodeError") - _, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + assert.NotNil(transcodedData) assert.Equal(s.TranscodeError, err) assert.Len(m.remoteTranscoders, 1) // sanity assert.Equal(0, m.remoteTranscoders[0].load) // sanity @@ -417,13 +430,15 @@ func TestTranscoderManagerTranscoding(t *testing.T) { // fatal error should retry and remove from list s.SendError = fmt.Errorf("SendError") - _, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) assert.True(wgWait(wg)) // should disconnect manager + assert.Nil(transcodedData) assert.NotNil(err) - assert.Equal(err.Error(), "no transcoders available") - _, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) // need second try to remove from remoteTranscoders + assert.Equal(err, ErrNoTranscodersAvailable) + transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) // need second try to remove from remoteTranscoders + assert.Nil(transcodedData) assert.NotNil(err) - assert.Equal(err.Error(), "no transcoders available") + assert.Equal(err, ErrNoTranscodersAvailable) assert.Len(m.liveTranscoders, 0) assert.Len(m.remoteTranscoders, 0) // retries drain the list s.SendError = nil @@ -439,7 +454,8 @@ func TestTranscoderManagerTranscoding(t *testing.T) { oldTimeout := common.HTTPTimeout common.HTTPTimeout = 1 * time.Millisecond defer func() { common.HTTPTimeout = oldTimeout }() - _, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + assert.Nil(transcodedData) _, fatal := err.(RemoteTranscoderFatalError) wg.Wait() assert.True(fatal) diff --git a/core/orchestrator.go b/core/orchestrator.go index cdf7f1546a..93aab85150 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -628,6 +628,7 @@ func (n *LivepeerNode) transcodeSegmentLoop(md *SegTranscodingMetadata, segChan // timeout; clean up goroutine here os.EndSession() los.EndSession() + // check to avoid nil pointer caused by garbage collection while this go routine is still running if n.TranscoderManager != nil { n.TranscoderManager.completeStreamSession(md.AuthToken.SessionId) } @@ -926,8 +927,8 @@ func (rtm *RemoteTranscoderManager) totalLoadAndCapacity() (int, int, int) { // Transcode does actual transcoding using remote transcoder from the pool func (rtm *RemoteTranscoderManager) Transcode(md *SegTranscodingMetadata) (*TranscodeData, error) { currentTranscoder, err := rtm.selectTranscoder(md.AuthToken.SessionId) - if err == ErrNoTranscodersAvailable { - return nil, ErrNoTranscodersAvailable + if err != nil { + return nil, err } res, err := currentTranscoder.Transcode(md) if err != nil {