Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: reuse remote transcoder for stream #1849

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

- \#1845 Staking actions with hints (@kyriediculous)
- \#1873 Increase TicketParams expiration to 10 blocks (@kyriediculous)
- \#1849 Re-use remote transcoders for a stream sessions (@reubenr0d)

#### Transcoder

Expand Down
145 changes: 117 additions & 28 deletions core/orch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/big"
"math/rand"
"os"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -290,7 +291,7 @@ func TestSelectTranscoder(t *testing.T) {

// register transcoders, which adds transcoder to liveTranscoders and remoteTranscoders
wg := newWg(1)
go func() { m.Manage(strm, 2) }()
go func() { m.Manage(strm, 1) }()
reubenr0d marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(1 * time.Millisecond) // allow time for first stream to register
go func() { m.Manage(strm2, 1); wg.Done() }()
time.Sleep(1 * time.Millisecond) // allow time for second stream to register
Expand All @@ -299,30 +300,44 @@ func TestSelectTranscoder(t *testing.T) {
assert.NotNil(m.liveTranscoders[strm2])
assert.Len(m.remoteTranscoders, 2)

testSessionId := "testID"
testSessionId2 := "testID2"
testSessionId3 := "testID3"

// assert transcoder is returned from selectTranscoder
t1 := m.liveTranscoders[strm]
t2 := m.liveTranscoders[strm2]
currentTranscoder := m.selectTranscoder()
currentTranscoder, err := m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
assert.Equal(1, t2.load)
assert.NotNil(m.liveTranscoders[strm])
assert.Len(m.remoteTranscoders, 2)

// assert transcoder with less load selected
currentTranscoder2 := m.selectTranscoder()
assert.Equal(t1, currentTranscoder2)
assert.Equal(1, t1.load)
// assert that same transcoder is selected for same sessionId
// and that load stays the same
currentTranscoder, err = m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
assert.Equal(1, t2.load)
m.completeStreamSession(testSessionId)

currentTranscoder3 := m.selectTranscoder()
assert.Equal(t1, currentTranscoder3)
assert.Equal(2, t1.load)
// assert that a new transcoder is selected for a new sessionId
currentTranscoder, err = m.selectTranscoder(testSessionId2)
assert.Nil(err)
assert.Equal(t1, currentTranscoder)
assert.Equal(1, t1.load)

// assert no transcoder returned if all at they capacity
noTrans := m.selectTranscoder()
// Add some more load and assert no transcoder returned if all at capacity
currentTranscoder, err = m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
reubenr0d marked this conversation as resolved.
Show resolved Hide resolved
noTrans, err := m.selectTranscoder(testSessionId3)
assert.Equal(err, ErrNoTranscodersAvailable)
assert.Nil(noTrans)

m.completeTranscoders(t1)
m.completeTranscoders(t1)
// assert that load is empty after ending stream sessions
m.completeStreamSession(testSessionId2)
assert.Equal(0, t1.load)

// unregister transcoder
Expand All @@ -331,35 +346,105 @@ func TestSelectTranscoder(t *testing.T) {
assert.Nil(m.liveTranscoders[strm2])
assert.NotNil(m.liveTranscoders[strm])

// assert t1 is selected and t2 drained
currentTranscoder = m.selectTranscoder()
// assert t1 is selected and t2 drained, but was previously selected
currentTranscoder, err = m.selectTranscoder(testSessionId)
assert.Nil(err)
assert.Equal(t1, currentTranscoder)
assert.Equal(1, t1.load)
assert.NotNil(m.liveTranscoders[strm])
assert.Len(m.remoteTranscoders, 2)
assert.Len(m.remoteTranscoders, 1)

// assert transcoder gets added back to remoteTranscoders if no transcoding error
_, err := m.Transcode(&SegTranscodingMetadata{})
transcodedData, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.NotNil(transcodedData)
assert.Nil(err)
assert.Len(m.remoteTranscoders, 2)
assert.Len(m.remoteTranscoders, 1)
assert.Equal(1, t1.load)
m.completeTranscoders(t1)
m.completeStreamSession(testSessionId)
assert.Equal(0, t1.load)
}

func TestCompleteStreamSession(t *testing.T) {
m := NewRemoteTranscoderManager()
strm := &StubTranscoderServer{manager: m}
testSessionId := "testID"
assert := assert.New(t)

// register transcoders
go func() { m.Manage(strm, 1) }()
time.Sleep(1 * time.Millisecond) // allow time for first stream to register
t1 := m.liveTranscoders[strm]

// selectTranscoder and assert that session is added
m.selectTranscoder(testSessionId)
assert.Equal(t1, m.streamSessions[testSessionId])
assert.Equal(1, t1.load)

// complete session and assert that it is cleared
m.completeStreamSession(testSessionId)
transcoder, ok := m.streamSessions[testSessionId]
assert.Nil(transcoder)
assert.False(ok)
assert.Equal(0, t1.load)
}

func TestRemoveFromRemoteTranscoders(t *testing.T) {
remoteTranscoderList := []*RemoteTranscoder{}
assert := assert.New(t)

// Create 4 tanscoders
tr := make([]*RemoteTranscoder, 4)
for i := 0; i < 4; i++ {
tr[i] = &RemoteTranscoder{addr: "testAddress" + strconv.Itoa(i)}
}

// Add to list
remoteTranscoderList = append(remoteTranscoderList, tr...)
assert.Len(remoteTranscoderList, 4)

// Remove transcoder froms head of the list
remoteTranscoderList = removeFromRemoteTranscoders(tr[0], remoteTranscoderList)
assert.Equal(remoteTranscoderList[0], tr[1])
assert.Equal(remoteTranscoderList[1], tr[2])
assert.Equal(remoteTranscoderList[2], tr[3])
assert.Len(remoteTranscoderList, 3)

// Remove transcoder from the middle of the list
remoteTranscoderList = removeFromRemoteTranscoders(tr[3], remoteTranscoderList)
assert.Equal(remoteTranscoderList[0], tr[1])
assert.Equal(remoteTranscoderList[1], tr[2])
assert.Len(remoteTranscoderList, 2)

// Remove transcoder from the end of the list
remoteTranscoderList = removeFromRemoteTranscoders(tr[2], remoteTranscoderList)
assert.Equal(remoteTranscoderList[0], tr[1])
assert.Len(remoteTranscoderList, 1)

// Remove the last transcoder
remoteTranscoderList = removeFromRemoteTranscoders(tr[1], remoteTranscoderList)
assert.Len(remoteTranscoderList, 0)

// Remove a transcoder when list is empty
remoteTranscoderList = removeFromRemoteTranscoders(tr[1], remoteTranscoderList)
emptyTList := []*RemoteTranscoder{}
assert.Equal(remoteTranscoderList, emptyTList)
}

func TestTranscoderManagerTranscoding(t *testing.T) {
m := NewRemoteTranscoderManager()
s := &StubTranscoderServer{manager: m}
testSessionId := "testID"

// sanity checks
assert := assert.New(t)
assert.Empty(m.liveTranscoders)
assert.Empty(m.remoteTranscoders)

// Attempt to transcode when no transcoders in the set
_, err := m.Transcode(&SegTranscodingMetadata{})
transcodedData, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.Nil(transcodedData)
assert.NotNil(err)
assert.Equal(err.Error(), "No transcoders available")
assert.Equal(err, ErrNoTranscodersAvailable)

wg := newWg(1)
go func() { m.Manage(s, 5); wg.Done() }()
Expand All @@ -370,14 +455,15 @@ func TestTranscoderManagerTranscoding(t *testing.T) {
assert.NotNil(m.liveTranscoders[s])

// happy path
res, err := m.Transcode(&SegTranscodingMetadata{})
res, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.Nil(err)
assert.Len(res.Segments, 1)
assert.Equal(string(res.Segments[0].Data), "asdf")

// non-fatal error should not remove from list
s.TranscodeError = fmt.Errorf("TranscodeError")
_, err = m.Transcode(&SegTranscodingMetadata{})
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.NotNil(transcodedData)
assert.Equal(s.TranscodeError, err)
assert.Len(m.remoteTranscoders, 1) // sanity
assert.Equal(0, m.remoteTranscoders[0].load) // sanity
Expand All @@ -387,13 +473,15 @@ func TestTranscoderManagerTranscoding(t *testing.T) {

// fatal error should retry and remove from list
s.SendError = fmt.Errorf("SendError")
_, err = m.Transcode(&SegTranscodingMetadata{})
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.True(wgWait(wg)) // should disconnect manager
assert.Nil(transcodedData)
assert.NotNil(err)
assert.Equal(err.Error(), "No transcoders available")
_, err = m.Transcode(&SegTranscodingMetadata{}) // need second try to remove from remoteTranscoders
assert.Equal(err, ErrNoTranscodersAvailable)
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) // need second try to remove from remoteTranscoders
assert.Nil(transcodedData)
assert.NotNil(err)
assert.Equal(err.Error(), "No transcoders available")
assert.Equal(err, ErrNoTranscodersAvailable)
assert.Len(m.liveTranscoders, 0)
assert.Len(m.remoteTranscoders, 0) // retries drain the list
s.SendError = nil
Expand All @@ -409,7 +497,8 @@ func TestTranscoderManagerTranscoding(t *testing.T) {
oldTimeout := common.HTTPTimeout
common.HTTPTimeout = 1 * time.Millisecond
defer func() { common.HTTPTimeout = oldTimeout }()
_, err = m.Transcode(&SegTranscodingMetadata{})
transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}})
assert.Nil(transcodedData)
_, fatal := err.(RemoteTranscoderFatalError)
wg.Wait()
assert.True(fatal)
Expand Down
Loading