From bf5397d210624b19c643bfbc506f40d8ae1cb325 Mon Sep 17 00:00:00 2001 From: Reuben Rodrigues Date: Sat, 1 May 2021 06:53:01 +0000 Subject: [PATCH] core: reuse remote transcoder for stream --- CHANGELOG_PENDING.md | 1 + core/orch_test.go | 145 ++++++++++++++++++++++++++++++++++--------- core/orchestrator.go | 97 ++++++++++++++++++++++------- 3 files changed, 194 insertions(+), 49 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index da2b6e29b4..47697e1844 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -17,6 +17,7 @@ - \#1845 Staking actions with hints (@kyriediculous) - \#1873 Increase TicketParams expiration to 10 blocks (@kyriediculous) +- \#1849 Re-use remote transcoders for a stream sessions (@reubenr0d) #### Transcoder diff --git a/core/orch_test.go b/core/orch_test.go index b9b60a3715..67fe910c9e 100644 --- a/core/orch_test.go +++ b/core/orch_test.go @@ -9,6 +9,7 @@ import ( "math/big" "math/rand" "os" + "strconv" "sync" "testing" "time" @@ -290,7 +291,7 @@ func TestSelectTranscoder(t *testing.T) { // register transcoders, which adds transcoder to liveTranscoders and remoteTranscoders wg := newWg(1) - go func() { m.Manage(strm, 2) }() + go func() { m.Manage(strm, 1) }() time.Sleep(1 * time.Millisecond) // allow time for first stream to register go func() { m.Manage(strm2, 1); wg.Done() }() time.Sleep(1 * time.Millisecond) // allow time for second stream to register @@ -299,30 +300,44 @@ func TestSelectTranscoder(t *testing.T) { assert.NotNil(m.liveTranscoders[strm2]) assert.Len(m.remoteTranscoders, 2) + testSessionId := "testID" + testSessionId2 := "testID2" + testSessionId3 := "testID3" + // assert transcoder is returned from selectTranscoder t1 := m.liveTranscoders[strm] t2 := m.liveTranscoders[strm2] - currentTranscoder := m.selectTranscoder() + currentTranscoder, err := m.selectTranscoder(testSessionId) + assert.Nil(err) assert.Equal(t2, currentTranscoder) assert.Equal(1, t2.load) assert.NotNil(m.liveTranscoders[strm]) assert.Len(m.remoteTranscoders, 2) - // assert transcoder with less load selected - currentTranscoder2 := m.selectTranscoder() - assert.Equal(t1, currentTranscoder2) - assert.Equal(1, t1.load) + // assert that same transcoder is selected for same sessionId + // and that load stays the same + currentTranscoder, err = m.selectTranscoder(testSessionId) + assert.Nil(err) + assert.Equal(t2, currentTranscoder) + assert.Equal(1, t2.load) + m.completeStreamSession(testSessionId) - currentTranscoder3 := m.selectTranscoder() - assert.Equal(t1, currentTranscoder3) - assert.Equal(2, t1.load) + // assert that a new transcoder is selected for a new sessionId + currentTranscoder, err = m.selectTranscoder(testSessionId2) + assert.Nil(err) + assert.Equal(t1, currentTranscoder) + assert.Equal(1, t1.load) - // assert no transcoder returned if all at they capacity - noTrans := m.selectTranscoder() + // Add some more load and assert no transcoder returned if all at capacity + currentTranscoder, err = m.selectTranscoder(testSessionId) + assert.Nil(err) + assert.Equal(t2, currentTranscoder) + noTrans, err := m.selectTranscoder(testSessionId3) + assert.Equal(err, ErrNoTranscodersAvailable) assert.Nil(noTrans) - m.completeTranscoders(t1) - m.completeTranscoders(t1) + // assert that load is empty after ending stream sessions + m.completeStreamSession(testSessionId2) assert.Equal(0, t1.load) // unregister transcoder @@ -331,25 +346,94 @@ func TestSelectTranscoder(t *testing.T) { assert.Nil(m.liveTranscoders[strm2]) assert.NotNil(m.liveTranscoders[strm]) - // assert t1 is selected and t2 drained - currentTranscoder = m.selectTranscoder() + // assert t1 is selected and t2 drained, but was previously selected + currentTranscoder, err = m.selectTranscoder(testSessionId) + assert.Nil(err) assert.Equal(t1, currentTranscoder) assert.Equal(1, t1.load) assert.NotNil(m.liveTranscoders[strm]) - assert.Len(m.remoteTranscoders, 2) + assert.Len(m.remoteTranscoders, 1) // assert transcoder gets added back to remoteTranscoders if no transcoding error - _, err := m.Transcode(&SegTranscodingMetadata{}) + transcodedData, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + assert.NotNil(transcodedData) assert.Nil(err) - assert.Len(m.remoteTranscoders, 2) + assert.Len(m.remoteTranscoders, 1) assert.Equal(1, t1.load) - m.completeTranscoders(t1) + m.completeStreamSession(testSessionId) assert.Equal(0, t1.load) } +func TestCompleteStreamSession(t *testing.T) { + m := NewRemoteTranscoderManager() + strm := &StubTranscoderServer{manager: m} + testSessionId := "testID" + assert := assert.New(t) + + // register transcoders + go func() { m.Manage(strm, 1) }() + time.Sleep(1 * time.Millisecond) // allow time for first stream to register + t1 := m.liveTranscoders[strm] + + // selectTranscoder and assert that session is added + m.selectTranscoder(testSessionId) + assert.Equal(t1, m.streamSessions[testSessionId]) + assert.Equal(1, t1.load) + + // complete session and assert that it is cleared + m.completeStreamSession(testSessionId) + transcoder, ok := m.streamSessions[testSessionId] + assert.Nil(transcoder) + assert.False(ok) + assert.Equal(0, t1.load) +} + +func TestRemoveFromRemoteTranscoders(t *testing.T) { + remoteTranscoderList := []*RemoteTranscoder{} + assert := assert.New(t) + + // Create 4 tanscoders + tr := make([]*RemoteTranscoder, 4) + for i := 0; i < 4; i++ { + tr[i] = &RemoteTranscoder{addr: "testAddress" + strconv.Itoa(i)} + } + + // Add to list + remoteTranscoderList = append(remoteTranscoderList, tr...) + assert.Len(remoteTranscoderList, 4) + + // Remove transcoder froms head of the list + remoteTranscoderList = removeFromRemoteTranscoders(tr[0], remoteTranscoderList) + assert.Equal(remoteTranscoderList[0], tr[1]) + assert.Equal(remoteTranscoderList[1], tr[2]) + assert.Equal(remoteTranscoderList[2], tr[3]) + assert.Len(remoteTranscoderList, 3) + + // Remove transcoder from the middle of the list + remoteTranscoderList = removeFromRemoteTranscoders(tr[3], remoteTranscoderList) + assert.Equal(remoteTranscoderList[0], tr[1]) + assert.Equal(remoteTranscoderList[1], tr[2]) + assert.Len(remoteTranscoderList, 2) + + // Remove transcoder from the end of the list + remoteTranscoderList = removeFromRemoteTranscoders(tr[2], remoteTranscoderList) + assert.Equal(remoteTranscoderList[0], tr[1]) + assert.Len(remoteTranscoderList, 1) + + // Remove the last transcoder + remoteTranscoderList = removeFromRemoteTranscoders(tr[1], remoteTranscoderList) + assert.Len(remoteTranscoderList, 0) + + // Remove a transcoder when list is empty + remoteTranscoderList = removeFromRemoteTranscoders(tr[1], remoteTranscoderList) + emptyTList := []*RemoteTranscoder{} + assert.Equal(remoteTranscoderList, emptyTList) +} + func TestTranscoderManagerTranscoding(t *testing.T) { m := NewRemoteTranscoderManager() s := &StubTranscoderServer{manager: m} + testSessionId := "testID" // sanity checks assert := assert.New(t) @@ -357,9 +441,10 @@ func TestTranscoderManagerTranscoding(t *testing.T) { assert.Empty(m.remoteTranscoders) // Attempt to transcode when no transcoders in the set - _, err := m.Transcode(&SegTranscodingMetadata{}) + transcodedData, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + assert.Nil(transcodedData) assert.NotNil(err) - assert.Equal(err.Error(), "No transcoders available") + assert.Equal(err, ErrNoTranscodersAvailable) wg := newWg(1) go func() { m.Manage(s, 5); wg.Done() }() @@ -370,14 +455,15 @@ func TestTranscoderManagerTranscoding(t *testing.T) { assert.NotNil(m.liveTranscoders[s]) // happy path - res, err := m.Transcode(&SegTranscodingMetadata{}) + res, err := m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) assert.Nil(err) assert.Len(res.Segments, 1) assert.Equal(string(res.Segments[0].Data), "asdf") // non-fatal error should not remove from list s.TranscodeError = fmt.Errorf("TranscodeError") - _, err = m.Transcode(&SegTranscodingMetadata{}) + transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + assert.NotNil(transcodedData) assert.Equal(s.TranscodeError, err) assert.Len(m.remoteTranscoders, 1) // sanity assert.Equal(0, m.remoteTranscoders[0].load) // sanity @@ -387,13 +473,15 @@ func TestTranscoderManagerTranscoding(t *testing.T) { // fatal error should retry and remove from list s.SendError = fmt.Errorf("SendError") - _, err = m.Transcode(&SegTranscodingMetadata{}) + transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) assert.True(wgWait(wg)) // should disconnect manager + assert.Nil(transcodedData) assert.NotNil(err) - assert.Equal(err.Error(), "No transcoders available") - _, err = m.Transcode(&SegTranscodingMetadata{}) // need second try to remove from remoteTranscoders + assert.Equal(err, ErrNoTranscodersAvailable) + transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) // need second try to remove from remoteTranscoders + assert.Nil(transcodedData) assert.NotNil(err) - assert.Equal(err.Error(), "No transcoders available") + assert.Equal(err, ErrNoTranscodersAvailable) assert.Len(m.liveTranscoders, 0) assert.Len(m.remoteTranscoders, 0) // retries drain the list s.SendError = nil @@ -409,7 +497,8 @@ func TestTranscoderManagerTranscoding(t *testing.T) { oldTimeout := common.HTTPTimeout common.HTTPTimeout = 1 * time.Millisecond defer func() { common.HTTPTimeout = oldTimeout }() - _, err = m.Transcode(&SegTranscodingMetadata{}) + transcodedData, err = m.Transcode(&SegTranscodingMetadata{AuthToken: &net.AuthToken{SessionId: testSessionId}}) + assert.Nil(transcodedData) _, fatal := err.(RemoteTranscoderFatalError) wg.Wait() assert.True(fatal) diff --git a/core/orchestrator.go b/core/orchestrator.go index 3fa677f2aa..ad475e4811 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -628,6 +628,12 @@ func (n *LivepeerNode) transcodeSegmentLoop(md *SegTranscodingMetadata, segChan // timeout; clean up goroutine here os.EndSession() los.EndSession() + // check to avoid nil pointer caused by garbage collection while this go routine is still running + if n.TranscoderManager != nil { + n.TranscoderManager.RTmutex.Lock() + n.TranscoderManager.completeStreamSession(md.AuthToken.SessionId) + n.TranscoderManager.RTmutex.Unlock() + } glog.V(common.DEBUG).Infof("Segment loop timed out; closing manifestID=%s sessionID=%s", md.ManifestID, md.AuthToken.SessionId) n.segmentMutex.Lock() mid := ManifestID(md.AuthToken.SessionId) @@ -684,6 +690,7 @@ func NewRemoteTranscoderFatalError(err error) error { } var ErrRemoteTranscoderTimeout = errors.New("Remote transcoder took too long") +var ErrNoTranscodersAvailable = errors.New("no transcoders available") func (rt *RemoteTranscoder) done() { // select so we don't block indefinitely if there's no listener @@ -760,10 +767,12 @@ func NewRemoteTranscoderManager() *RemoteTranscoderManager { return &RemoteTranscoderManager{ remoteTranscoders: []*RemoteTranscoder{}, liveTranscoders: map[net.Transcoder_RegisterTranscoderServer]*RemoteTranscoder{}, - RTmutex: &sync.Mutex{}, + RTmutex: sync.Mutex{}, taskMutex: &sync.RWMutex{}, taskChans: make(map[int64]TranscoderChan), + + streamSessions: make(map[string]*RemoteTranscoder), } } @@ -782,12 +791,15 @@ func (r byLoadFactor) Less(i, j int) bool { type RemoteTranscoderManager struct { remoteTranscoders []*RemoteTranscoder liveTranscoders map[net.Transcoder_RegisterTranscoderServer]*RemoteTranscoder - RTmutex *sync.Mutex + RTmutex sync.Mutex // For tracking tasks assigned to remote transcoders taskMutex *sync.RWMutex taskChans map[int64]TranscoderChan taskCount int64 + + //Map for keeping track of sessions and their respective transcoders + streamSessions map[string]*RemoteTranscoder } // RegisteredTranscodersCount returns number of registered transcoders @@ -847,7 +859,33 @@ func (rtm *RemoteTranscoderManager) Manage(stream net.Transcoder_RegisterTransco } } -func (rtm *RemoteTranscoderManager) selectTranscoder() *RemoteTranscoder { +func removeFromRemoteTranscoders(rt *RemoteTranscoder, remoteTranscoders []*RemoteTranscoder) []*RemoteTranscoder { + if len(remoteTranscoders) == 0 { + // No transocerds to remove, return + return remoteTranscoders + } + + lastIndex := len(remoteTranscoders) - 1 + last := remoteTranscoders[lastIndex] + if rt == last { + return remoteTranscoders[:lastIndex] + } + + newRemoteTs := make([]*RemoteTranscoder, 0) + for i, t := range remoteTranscoders { + if t == rt { + if i == 0 { + return remoteTranscoders[1:] + } + newRemoteTs = remoteTranscoders[i-1 : i] + newRemoteTs = append(newRemoteTs, remoteTranscoders[i+1:]...) + break + } + } + return newRemoteTs +} + +func (rtm *RemoteTranscoderManager) selectTranscoder(sessionId string) (*RemoteTranscoder, error) { rtm.RTmutex.Lock() defer rtm.RTmutex.Unlock() @@ -856,35 +894,48 @@ func (rtm *RemoteTranscoderManager) selectTranscoder() *RemoteTranscoder { } for checkTranscoders(rtm) { + currentTranscoder, sessionExists := rtm.streamSessions[sessionId] last := len(rtm.remoteTranscoders) - 1 - currentTranscoder := rtm.remoteTranscoders[last] + if !sessionExists { + currentTranscoder = rtm.remoteTranscoders[last] + } + if _, ok := rtm.liveTranscoders[currentTranscoder.stream]; !ok { + // Remove the stream session because the transcoder is no longer live + if sessionExists { + rtm.completeStreamSession(sessionId) + } // transcoder does not exist in table; remove and retry - rtm.remoteTranscoders = rtm.remoteTranscoders[:last] + rtm.remoteTranscoders = removeFromRemoteTranscoders(currentTranscoder, rtm.remoteTranscoders) continue } - if currentTranscoder.load == currentTranscoder.capacity { - // Head of queue is at capacity, so the rest must be too. Exit early - return nil + if !sessionExists { + if currentTranscoder.load == currentTranscoder.capacity { + // Head of queue is at capacity, so the rest must be too. Exit early + return nil, ErrNoTranscodersAvailable + } + + // Assinging transcoder to session for future use + rtm.streamSessions[sessionId] = currentTranscoder + currentTranscoder.load++ + sort.Sort(byLoadFactor(rtm.remoteTranscoders)) } - currentTranscoder.load++ - sort.Sort(byLoadFactor(rtm.remoteTranscoders)) - return currentTranscoder + return currentTranscoder, nil } - return nil + return nil, ErrNoTranscodersAvailable } -func (rtm *RemoteTranscoderManager) completeTranscoders(trans *RemoteTranscoder) { - rtm.RTmutex.Lock() - defer rtm.RTmutex.Unlock() - - t, ok := rtm.liveTranscoders[trans.stream] +// compleStreamSessions end a stream session for a remote transcoder and decrements its laod +// caller should hold the mutex lock +func (rtm *RemoteTranscoderManager) completeStreamSession(sessionId string) { + t, ok := rtm.streamSessions[sessionId] if !ok { return } t.load-- sort.Sort(byLoadFactor(rtm.remoteTranscoders)) + delete(rtm.streamSessions, sessionId) } // Caller of this function should hold RTmutex lock @@ -899,11 +950,16 @@ func (rtm *RemoteTranscoderManager) totalLoadAndCapacity() (int, int, int) { // Transcode does actual transcoding using remote transcoder from the pool func (rtm *RemoteTranscoderManager) Transcode(md *SegTranscodingMetadata) (*TranscodeData, error) { - currentTranscoder := rtm.selectTranscoder() - if currentTranscoder == nil { - return nil, errors.New("No transcoders available") + currentTranscoder, err := rtm.selectTranscoder(md.AuthToken.SessionId) + if err != nil { + return nil, err } res, err := currentTranscoder.Transcode(md) + if err != nil { + rtm.RTmutex.Lock() + rtm.completeStreamSession(md.AuthToken.SessionId) + rtm.RTmutex.Unlock() + } _, fatal := err.(RemoteTranscoderFatalError) if fatal { // Don't retry if we've timed out; broadcaster likely to have moved on @@ -913,6 +969,5 @@ func (rtm *RemoteTranscoderManager) Transcode(md *SegTranscodingMetadata) (*Tran } return rtm.Transcode(md) } - rtm.completeTranscoders(currentTranscoder) return res, err }