Skip to content

Commit

Permalink
Minor improvements - tests, error handling (#1842)
Browse files Browse the repository at this point in the history
  • Loading branch information
Reuben Rodrigues committed Apr 28, 2021
1 parent f6c6322 commit f659900
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 20 deletions.
1 change: 0 additions & 1 deletion core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func TestTranscodeLoop_GivenNoSegmentsPastTimeout_CleansSegmentChan(t *testing.T
ss := StubSegment()
md := &SegTranscodingMetadata{Profiles: videoProfiles, AuthToken: stubAuthToken()}
n.Transcoder = NewLocalTranscoder(tmp)
n.TranscoderManager = NewRemoteTranscoderManager()
transcodeLoopTimeout = 100 * time.Millisecond
assert := assert.New(t)
require := require.New(t)
Expand Down
50 changes: 33 additions & 17 deletions core/orch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,26 +306,33 @@ func TestSelectTranscoder(t *testing.T) {
// assert transcoder is returned from selectTranscoder
t1 := m.liveTranscoders[strm]
t2 := m.liveTranscoders[strm2]
currentTranscoder, _ := m.selectTranscoder(testSessionId)
currentTranscoder, err := m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
assert.Equal(1, t2.load)
assert.NotNil(m.liveTranscoders[strm])
assert.Len(m.remoteTranscoders, 2)

// assert that same transcoder is selected for same sessionId
// and that load stays the same
currentTranscoder, _ = m.selectTranscoder(testSessionId)
currentTranscoder, err = m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
assert.Equal(1, t2.load)
m.completeStreamSession(testSessionId)

// assert that a new transcoder is selected for a new sessionId
currentTranscoder, _ = m.selectTranscoder(testSessionId2)
currentTranscoder, err = m.selectTranscoder(testSessionId2)
assert.Nil(err)
assert.Equal(t1, currentTranscoder)
assert.Equal(1, t1.load)

// assert no transcoder returned if all at they capacity
// Add some more load and assert no transcoder returned if all at capacity
currentTranscoder, err = m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
noTrans, err := m.selectTranscoder(testSessionId3)
assert.Equal(err, errors.New("no transcoders available"))
assert.Equal(err, ErrNoTranscodersAvailable)
assert.Nil(noTrans)

// assert that load is emtry after ending stream sessions
Expand All @@ -339,14 +346,16 @@ func TestSelectTranscoder(t *testing.T) {
assert.NotNil(m.liveTranscoders[strm])

// assert t1 is selected and t2 drained, but was previously selected
currentTranscoder, _ = m.selectTranscoder(testSessionId)
currentTranscoder, err = m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t1, currentTranscoder)
assert.Equal(1, t1.load)
assert.NotNil(m.liveTranscoders[strm])
assert.Len(m.remoteTranscoders, 2)

// assert transcoder gets added back to remoteTranscoders if no transcoding error
_, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
transcodedData, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.NotNil(transcodedData)
assert.Nil(err)
assert.Len(m.remoteTranscoders, 2)
assert.Equal(1, t1.load)
Expand All @@ -356,7 +365,7 @@ func TestSelectTranscoder(t *testing.T) {

func TestCompleteStreamSession(t *testing.T) {
m := NewRemoteTranscoderManager()
strm := &StubTranscoderServer{manager: m, WithholdResults: false}
strm := &StubTranscoderServer{manager: m}
testSessionId := "testID"
assert := assert.New(t)

Expand All @@ -372,7 +381,9 @@ func TestCompleteStreamSession(t *testing.T) {

// complete session and assert that it is cleared
m.completeStreamSession(testSessionId)
assert.Nil(m.streamSessions[testSessionId])
transcoder, ok := m.streamSessions[testSessionId]
assert.Nil(transcoder)
assert.False(ok)
assert.Equal(0, t1.load)
}

Expand All @@ -387,9 +398,10 @@ func TestTranscoderManagerTranscoding(t *testing.T) {
assert.Empty(m.remoteTranscoders)

// Attempt to transcode when no transcoders in the set
_, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
transcodedData, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.Nil(transcodedData)
assert.NotNil(err)
assert.Equal(err.Error(), "no transcoders available")
assert.Equal(err, ErrNoTranscodersAvailable)

wg := newWg(1)
go func() { m.Manage(s, 5); wg.Done() }()
Expand All @@ -407,7 +419,8 @@ func TestTranscoderManagerTranscoding(t *testing.T) {

// non-fatal error should not remove from list
s.TranscodeError = fmt.Errorf("TranscodeError")
_, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.NotNil(transcodedData)
assert.Equal(s.TranscodeError, err)
assert.Len(m.remoteTranscoders, 1) // sanity
assert.Equal(0, m.remoteTranscoders[0].load) // sanity
Expand All @@ -417,13 +430,15 @@ func TestTranscoderManagerTranscoding(t *testing.T) {

// fatal error should retry and remove from list
s.SendError = fmt.Errorf("SendError")
_, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.True(wgWait(wg)) // should disconnect manager
assert.Nil(transcodedData)
assert.NotNil(err)
assert.Equal(err.Error(), "no transcoders available")
_, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) // need second try to remove from remoteTranscoders
assert.Equal(err, ErrNoTranscodersAvailable)
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) // need second try to remove from remoteTranscoders
assert.Nil(transcodedData)
assert.NotNil(err)
assert.Equal(err.Error(), "no transcoders available")
assert.Equal(err, ErrNoTranscodersAvailable)
assert.Len(m.liveTranscoders, 0)
assert.Len(m.remoteTranscoders, 0) // retries drain the list
s.SendError = nil
Expand All @@ -439,7 +454,8 @@ func TestTranscoderManagerTranscoding(t *testing.T) {
oldTimeout := common.HTTPTimeout
common.HTTPTimeout = 1 * time.Millisecond
defer func() { common.HTTPTimeout = oldTimeout }()
_, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.Nil(transcodedData)
_, fatal := err.(RemoteTranscoderFatalError)
wg.Wait()
assert.True(fatal)
Expand Down
5 changes: 3 additions & 2 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ func (n *LivepeerNode) transcodeSegmentLoop(md *SegTranscodingMetadata, segChan
// timeout; clean up goroutine here
os.EndSession()
los.EndSession()
// check to avoid nil pointer caused by garbage collection while this go routine is still running
if n.TranscoderManager != nil {
n.TranscoderManager.completeStreamSession(md.AuthToken.SessionId)
}
Expand Down Expand Up @@ -926,8 +927,8 @@ func (rtm *RemoteTranscoderManager) totalLoadAndCapacity() (int, int, int) {
// Transcode does actual transcoding using remote transcoder from the pool
func (rtm *RemoteTranscoderManager) Transcode(md *SegTranscodingMetadata) (*TranscodeData, error) {
currentTranscoder, err := rtm.selectTranscoder(md.AuthToken.SessionId)
if err == ErrNoTranscodersAvailable {
return nil, ErrNoTranscodersAvailable
if err != nil {
return nil, err
}
res, err := currentTranscoder.Transcode(md)
if err != nil {
Expand Down

0 comments on commit f659900

Please sign in to comment.