Skip to content

Commit

Permalink
Reuse transcoder for stream (#1842)
Browse files Browse the repository at this point in the history
  • Loading branch information
Reuben Rodrigues committed Apr 21, 2021
1 parent 3908252 commit f3744b9
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ func (n *LivepeerNode) transcodeSegmentLoop(md *SegTranscodingMetadata, segChan
// timeout; clean up goroutine here
os.EndSession()
los.EndSession()
delete(n.TranscoderManager.streamSessions, md.AuthToken.SessionId)
glog.V(common.DEBUG).Infof("Segment loop timed out; closing manifestID=%s sessionID=%s", md.ManifestID, md.AuthToken.SessionId)
n.segmentMutex.Lock()
mid := ManifestID(md.AuthToken.SessionId)
Expand Down Expand Up @@ -764,6 +765,8 @@ func NewRemoteTranscoderManager() *RemoteTranscoderManager {

taskMutex: &sync.RWMutex{},
taskChans: make(map[int64]TranscoderChan),

streamSessions: make(map[string]*RemoteTranscoder),
}
}

Expand All @@ -788,6 +791,9 @@ type RemoteTranscoderManager struct {
taskMutex *sync.RWMutex
taskChans map[int64]TranscoderChan
taskCount int64

//Map for keeping track of sessions and their respective transcoders
streamSessions map[string]*RemoteTranscoder
}

// RegisteredTranscodersCount returns number of registered transcoders
Expand Down Expand Up @@ -847,7 +853,7 @@ func (rtm *RemoteTranscoderManager) Manage(stream net.Transcoder_RegisterTransco
}
}

func (rtm *RemoteTranscoderManager) selectTranscoder() *RemoteTranscoder {
func (rtm *RemoteTranscoderManager) selectTranscoder(sessionId string) *RemoteTranscoder {
rtm.RTmutex.Lock()
defer rtm.RTmutex.Unlock()

Expand All @@ -856,12 +862,18 @@ func (rtm *RemoteTranscoderManager) selectTranscoder() *RemoteTranscoder {
}

for checkTranscoders(rtm) {
last := len(rtm.remoteTranscoders) - 1
currentTranscoder := rtm.remoteTranscoders[last]
if _, ok := rtm.liveTranscoders[currentTranscoder.stream]; !ok {
// transcoder does not exist in table; remove and retry
rtm.remoteTranscoders = rtm.remoteTranscoders[:last]
continue
currentTranscoder, transcoderExists := rtm.streamSessions[sessionId]
if !transcoderExists {
last := len(rtm.remoteTranscoders) - 1
currentTranscoder = rtm.remoteTranscoders[last]

// Assing transcoder to session for future use
rtm.streamSessions[sessionId] = currentTranscoder
if _, ok := rtm.liveTranscoders[currentTranscoder.stream]; !ok {
// transcoder does not exist in table; remove and retry
rtm.remoteTranscoders = rtm.remoteTranscoders[:last]
continue
}
}
if currentTranscoder.load == currentTranscoder.capacity {
// Head of queue is at capacity, so the rest must be too. Exit early
Expand Down Expand Up @@ -899,7 +911,7 @@ func (rtm *RemoteTranscoderManager) totalLoadAndCapacity() (int, int, int) {

// Transcode does actual transcoding using remote transcoder from the pool
func (rtm *RemoteTranscoderManager) Transcode(md *SegTranscodingMetadata) (*TranscodeData, error) {
currentTranscoder := rtm.selectTranscoder()
currentTranscoder := rtm.selectTranscoder(md.AuthToken.SessionId)
if currentTranscoder == nil {
return nil, errors.New("No transcoders available")
}
Expand Down

0 comments on commit f3744b9

Please sign in to comment.