Skip to content

Commit

Permalink
Reuse transcoder for stream (#1842)
Browse files Browse the repository at this point in the history
  • Loading branch information
Reuben Rodrigues committed Apr 22, 2021
1 parent 3908252 commit bd7b60f
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 38 deletions.
63 changes: 37 additions & 26 deletions core/orch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,61 +290,72 @@ func TestSelectTranscoder(t *testing.T) {

// register transcoders, which adds transcoder to liveTranscoders and remoteTranscoders
wg := newWg(1)
go func() { m.Manage(strm, 2) }()
go func() { m.Manage(strm, 2); wg.Done() }()
time.Sleep(1 * time.Millisecond) // allow time for first stream to register
go func() { m.Manage(strm2, 1); wg.Done() }()
go func() { m.Manage(strm2, 2) }()
time.Sleep(1 * time.Millisecond) // allow time for second stream to register

assert.NotNil(m.liveTranscoders[strm])
assert.NotNil(m.liveTranscoders[strm2])
assert.Len(m.remoteTranscoders, 2)

testSessionId := "testID"
testSessionId2 := "testID2"

// assert transcoder is returned from selectTranscoder
t1 := m.liveTranscoders[strm]
t2 := m.liveTranscoders[strm2]
currentTranscoder := m.selectTranscoder()
currentTranscoder := m.selectTranscoder(testSessionId)
assert.Equal(t2, currentTranscoder)
assert.Equal(1, t2.load)
assert.NotNil(m.liveTranscoders[strm])
assert.Len(m.remoteTranscoders, 2)

// assert transcoder with less load selected
currentTranscoder2 := m.selectTranscoder()
assert.Equal(t1, currentTranscoder2)
// assert that same transcoder is selected for same sessionId
currentTranscoder = m.selectTranscoder(testSessionId)
assert.Equal(t2, currentTranscoder)
assert.Equal(2, t2.load)

// assert that a new transcoder is selected for a new sessionId
currentTranscoder = m.selectTranscoder(testSessionId2)
assert.Equal(t1, currentTranscoder)
assert.Equal(1, t1.load)

currentTranscoder3 := m.selectTranscoder()
assert.Equal(t1, currentTranscoder3)
// assert that a new transcored is selected for the same session
// if the currently assigned transcoder is at capacity
currentTranscoder = m.selectTranscoder(testSessionId)
assert.Equal(t1, currentTranscoder)
assert.Equal(2, t1.load)

// assert no transcoder returned if all at they capacity
noTrans := m.selectTranscoder()
noTrans := m.selectTranscoder(testSessionId)
assert.Nil(noTrans)

m.completeTranscoders(t1)
m.completeTranscoders(t1)
assert.Equal(0, t1.load)
m.completeTranscoders(t2)
m.completeTranscoders(t2)
assert.Equal(0, t2.load)

// unregister transcoder
t2.eof <- struct{}{}
t1.eof <- struct{}{}
assert.True(wgWait(wg), "Wait timed out for transcoder to terminate")
assert.Nil(m.liveTranscoders[strm2])
assert.NotNil(m.liveTranscoders[strm])
assert.Nil(m.liveTranscoders[strm])
assert.NotNil(m.liveTranscoders[strm2])

// assert t1 is selected and t2 drained
currentTranscoder = m.selectTranscoder()
assert.Equal(t1, currentTranscoder)
assert.Equal(1, t1.load)
assert.NotNil(m.liveTranscoders[strm])
// assert t2 is selected and t1 drained(even if was previously selected for session)
currentTranscoder = m.selectTranscoder(testSessionId)
assert.Equal(t2, currentTranscoder)
assert.Equal(1, t2.load)
assert.NotNil(m.liveTranscoders[strm2])
assert.Len(m.remoteTranscoders, 2)

// assert transcoder gets added back to remoteTranscoders if no transcoding error
_, err := m.Transcode(&SegTranscodingMetadata{})
assert.Nil(err)
assert.Len(m.remoteTranscoders, 2)
assert.Equal(1, t1.load)
m.completeTranscoders(t1)
assert.Equal(0, t1.load)
// _, err := m.Transcode(&SegTranscodingMetadata{})
// m.Transcode(&SegTranscodingMetadata{})
// assert.Nil(err)
// assert.Len(m.remoteTranscoders, 2)
// assert.Equal(1, t2.load)
// m.completeTranscoders(t2)
// assert.Equal(0, t2.load)
}

func TestTranscoderManagerTranscoding(t *testing.T) {
Expand Down
37 changes: 25 additions & 12 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ func (n *LivepeerNode) transcodeSegmentLoop(md *SegTranscodingMetadata, segChan
// timeout; clean up goroutine here
os.EndSession()
los.EndSession()
delete(n.TranscoderManager.streamSessions, md.AuthToken.SessionId)
glog.V(common.DEBUG).Infof("Segment loop timed out; closing manifestID=%s sessionID=%s", md.ManifestID, md.AuthToken.SessionId)
n.segmentMutex.Lock()
mid := ManifestID(md.AuthToken.SessionId)
Expand Down Expand Up @@ -764,6 +765,8 @@ func NewRemoteTranscoderManager() *RemoteTranscoderManager {

taskMutex: &sync.RWMutex{},
taskChans: make(map[int64]TranscoderChan),

streamSessions: make(map[string]*RemoteTranscoder),
}
}

Expand All @@ -788,6 +791,9 @@ type RemoteTranscoderManager struct {
taskMutex *sync.RWMutex
taskChans map[int64]TranscoderChan
taskCount int64

//Map for keeping track of sessions and their respective transcoders
streamSessions map[string]*RemoteTranscoder
}

// RegisteredTranscodersCount returns number of registered transcoders
Expand Down Expand Up @@ -847,7 +853,7 @@ func (rtm *RemoteTranscoderManager) Manage(stream net.Transcoder_RegisterTransco
}
}

func (rtm *RemoteTranscoderManager) selectTranscoder() *RemoteTranscoder {
func (rtm *RemoteTranscoderManager) selectTranscoder(sessionId string) *RemoteTranscoder {
rtm.RTmutex.Lock()
defer rtm.RTmutex.Unlock()

Expand All @@ -856,16 +862,23 @@ func (rtm *RemoteTranscoderManager) selectTranscoder() *RemoteTranscoder {
}

for checkTranscoders(rtm) {
last := len(rtm.remoteTranscoders) - 1
currentTranscoder := rtm.remoteTranscoders[last]
if _, ok := rtm.liveTranscoders[currentTranscoder.stream]; !ok {
// transcoder does not exist in table; remove and retry
rtm.remoteTranscoders = rtm.remoteTranscoders[:last]
continue
}
if currentTranscoder.load == currentTranscoder.capacity {
// Head of queue is at capacity, so the rest must be too. Exit early
return nil
currentTranscoder, transcoderExists := rtm.streamSessions[sessionId]
// Assign new transcoder if not assigned, or at full load
if !transcoderExists || currentTranscoder.load == currentTranscoder.capacity {
last := len(rtm.remoteTranscoders) - 1
currentTranscoder = rtm.remoteTranscoders[last]

// Assing transcoder to session for future use
rtm.streamSessions[sessionId] = currentTranscoder
if _, ok := rtm.liveTranscoders[currentTranscoder.stream]; !ok {
// transcoder does not exist in table; remove and retry
rtm.remoteTranscoders = rtm.remoteTranscoders[:last]
continue
}
if currentTranscoder.load == currentTranscoder.capacity {
// Head of queue is at capacity, so the rest must be too. Exit early
return nil
}
}
currentTranscoder.load++
sort.Sort(byLoadFactor(rtm.remoteTranscoders))
Expand Down Expand Up @@ -899,7 +912,7 @@ func (rtm *RemoteTranscoderManager) totalLoadAndCapacity() (int, int, int) {

// Transcode does actual transcoding using remote transcoder from the pool
func (rtm *RemoteTranscoderManager) Transcode(md *SegTranscodingMetadata) (*TranscodeData, error) {
currentTranscoder := rtm.selectTranscoder()
currentTranscoder := rtm.selectTranscoder(md.AuthToken.SessionId)
if currentTranscoder == nil {
return nil, errors.New("No transcoders available")
}
Expand Down

0 comments on commit bd7b60f

Please sign in to comment.