Skip to content

Commit

Permalink
core: reuse remote transcoder for stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Reuben Rodrigues authored and kyriediculous committed May 12, 2021
1 parent 81ab1cb commit bf5397d
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

- \#1845 Staking actions with hints (@kyriediculous)
- \#1873 Increase TicketParams expiration to 10 blocks (@kyriediculous)
- \#1849 Re-use remote transcoders for a stream sessions (@reubenr0d)

#### Transcoder

Expand Down
145 changes: 117 additions & 28 deletions core/orch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/big"
"math/rand"
"os"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -290,7 +291,7 @@ func TestSelectTranscoder(t *testing.T) {

// register transcoders, which adds transcoder to liveTranscoders and remoteTranscoders
wg := newWg(1)
go func() { m.Manage(strm, 2) }()
go func() { m.Manage(strm, 1) }()
time.Sleep(1 * time.Millisecond) // allow time for first stream to register
go func() { m.Manage(strm2, 1); wg.Done() }()
time.Sleep(1 * time.Millisecond) // allow time for second stream to register
Expand All @@ -299,30 +300,44 @@ func TestSelectTranscoder(t *testing.T) {
assert.NotNil(m.liveTranscoders[strm2])
assert.Len(m.remoteTranscoders, 2)

testSessionId := "testID"
testSessionId2 := "testID2"
testSessionId3 := "testID3"

// assert transcoder is returned from selectTranscoder
t1 := m.liveTranscoders[strm]
t2 := m.liveTranscoders[strm2]
currentTranscoder := m.selectTranscoder()
currentTranscoder, err := m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
assert.Equal(1, t2.load)
assert.NotNil(m.liveTranscoders[strm])
assert.Len(m.remoteTranscoders, 2)

// assert transcoder with less load selected
currentTranscoder2 := m.selectTranscoder()
assert.Equal(t1, currentTranscoder2)
assert.Equal(1, t1.load)
// assert that same transcoder is selected for same sessionId
// and that load stays the same
currentTranscoder, err = m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
assert.Equal(1, t2.load)
m.completeStreamSession(testSessionId)

currentTranscoder3 := m.selectTranscoder()
assert.Equal(t1, currentTranscoder3)
assert.Equal(2, t1.load)
// assert that a new transcoder is selected for a new sessionId
currentTranscoder, err = m.selectTranscoder(testSessionId2)
assert.Nil(err)
assert.Equal(t1, currentTranscoder)
assert.Equal(1, t1.load)

// assert no transcoder returned if all at they capacity
noTrans := m.selectTranscoder()
// Add some more load and assert no transcoder returned if all at capacity
currentTranscoder, err = m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
noTrans, err := m.selectTranscoder(testSessionId3)
assert.Equal(err, ErrNoTranscodersAvailable)
assert.Nil(noTrans)

m.completeTranscoders(t1)
m.completeTranscoders(t1)
// assert that load is empty after ending stream sessions
m.completeStreamSession(testSessionId2)
assert.Equal(0, t1.load)

// unregister transcoder
Expand All @@ -331,35 +346,105 @@ func TestSelectTranscoder(t *testing.T) {
assert.Nil(m.liveTranscoders[strm2])
assert.NotNil(m.liveTranscoders[strm])

// assert t1 is selected and t2 drained
currentTranscoder = m.selectTranscoder()
// assert t1 is selected and t2 drained, but was previously selected
currentTranscoder, err = m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t1, currentTranscoder)
assert.Equal(1, t1.load)
assert.NotNil(m.liveTranscoders[strm])
assert.Len(m.remoteTranscoders, 2)
assert.Len(m.remoteTranscoders, 1)

// assert transcoder gets added back to remoteTranscoders if no transcoding error
_, err := m.Transcode(&SegTranscodingMetadata{})
transcodedData, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.NotNil(transcodedData)
assert.Nil(err)
assert.Len(m.remoteTranscoders, 2)
assert.Len(m.remoteTranscoders, 1)
assert.Equal(1, t1.load)
m.completeTranscoders(t1)
m.completeStreamSession(testSessionId)
assert.Equal(0, t1.load)
}

func TestCompleteStreamSession(t *testing.T) {
m := NewRemoteTranscoderManager()
strm := &StubTranscoderServer{manager: m}
testSessionId := "testID"
assert := assert.New(t)

// register transcoders
go func() { m.Manage(strm, 1) }()
time.Sleep(1 * time.Millisecond) // allow time for first stream to register
t1 := m.liveTranscoders[strm]

// selectTranscoder and assert that session is added
m.selectTranscoder(testSessionId)
assert.Equal(t1, m.streamSessions[testSessionId])
assert.Equal(1, t1.load)

// complete session and assert that it is cleared
m.completeStreamSession(testSessionId)
transcoder, ok := m.streamSessions[testSessionId]
assert.Nil(transcoder)
assert.False(ok)
assert.Equal(0, t1.load)
}

func TestRemoveFromRemoteTranscoders(t *testing.T) {
remoteTranscoderList := []*RemoteTranscoder{}
assert := assert.New(t)

// Create 4 tanscoders
tr := make([]*RemoteTranscoder, 4)
for i := 0; i < 4; i++ {
tr[i] = &RemoteTranscoder{addr: "testAddress" + strconv.Itoa(i)}
}

// Add to list
remoteTranscoderList = append(remoteTranscoderList, tr...)
assert.Len(remoteTranscoderList, 4)

// Remove transcoder froms head of the list
remoteTranscoderList = removeFromRemoteTranscoders(tr[0], remoteTranscoderList)
assert.Equal(remoteTranscoderList[0], tr[1])
assert.Equal(remoteTranscoderList[1], tr[2])
assert.Equal(remoteTranscoderList[2], tr[3])
assert.Len(remoteTranscoderList, 3)

// Remove transcoder from the middle of the list
remoteTranscoderList = removeFromRemoteTranscoders(tr[3], remoteTranscoderList)
assert.Equal(remoteTranscoderList[0], tr[1])
assert.Equal(remoteTranscoderList[1], tr[2])
assert.Len(remoteTranscoderList, 2)

// Remove transcoder from the end of the list
remoteTranscoderList = removeFromRemoteTranscoders(tr[2], remoteTranscoderList)
assert.Equal(remoteTranscoderList[0], tr[1])
assert.Len(remoteTranscoderList, 1)

// Remove the last transcoder
remoteTranscoderList = removeFromRemoteTranscoders(tr[1], remoteTranscoderList)
assert.Len(remoteTranscoderList, 0)

// Remove a transcoder when list is empty
remoteTranscoderList = removeFromRemoteTranscoders(tr[1], remoteTranscoderList)
emptyTList := []*RemoteTranscoder{}
assert.Equal(remoteTranscoderList, emptyTList)
}

func TestTranscoderManagerTranscoding(t *testing.T) {
m := NewRemoteTranscoderManager()
s := &StubTranscoderServer{manager: m}
testSessionId := "testID"

// sanity checks
assert := assert.New(t)
assert.Empty(m.liveTranscoders)
assert.Empty(m.remoteTranscoders)

// Attempt to transcode when no transcoders in the set
_, err := m.Transcode(&SegTranscodingMetadata{})
transcodedData, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.Nil(transcodedData)
assert.NotNil(err)
assert.Equal(err.Error(), "No transcoders available")
assert.Equal(err, ErrNoTranscodersAvailable)

wg := newWg(1)
go func() { m.Manage(s, 5); wg.Done() }()
Expand All @@ -370,14 +455,15 @@ func TestTranscoderManagerTranscoding(t *testing.T) {
assert.NotNil(m.liveTranscoders[s])

// happy path
res, err := m.Transcode(&SegTranscodingMetadata{})
res, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.Nil(err)
assert.Len(res.Segments, 1)
assert.Equal(string(res.Segments[0].Data), "asdf")

// non-fatal error should not remove from list
s.TranscodeError = fmt.Errorf("TranscodeError")
_, err = m.Transcode(&SegTranscodingMetadata{})
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.NotNil(transcodedData)
assert.Equal(s.TranscodeError, err)
assert.Len(m.remoteTranscoders, 1) // sanity
assert.Equal(0, m.remoteTranscoders[0].load) // sanity
Expand All @@ -387,13 +473,15 @@ func TestTranscoderManagerTranscoding(t *testing.T) {

// fatal error should retry and remove from list
s.SendError = fmt.Errorf("SendError")
_, err = m.Transcode(&SegTranscodingMetadata{})
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.True(wgWait(wg)) // should disconnect manager
assert.Nil(transcodedData)
assert.NotNil(err)
assert.Equal(err.Error(), "No transcoders available")
_, err = m.Transcode(&SegTranscodingMetadata{}) // need second try to remove from remoteTranscoders
assert.Equal(err, ErrNoTranscodersAvailable)
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) // need second try to remove from remoteTranscoders
assert.Nil(transcodedData)
assert.NotNil(err)
assert.Equal(err.Error(), "No transcoders available")
assert.Equal(err, ErrNoTranscodersAvailable)
assert.Len(m.liveTranscoders, 0)
assert.Len(m.remoteTranscoders, 0) // retries drain the list
s.SendError = nil
Expand All @@ -409,7 +497,8 @@ func TestTranscoderManagerTranscoding(t *testing.T) {
oldTimeout := common.HTTPTimeout
common.HTTPTimeout = 1 * time.Millisecond
defer func() { common.HTTPTimeout = oldTimeout }()
_, err = m.Transcode(&SegTranscodingMetadata{})
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.Nil(transcodedData)
_, fatal := err.(RemoteTranscoderFatalError)
wg.Wait()
assert.True(fatal)
Expand Down
Loading

0 comments on commit bf5397d

Please sign in to comment.