diff --git a/core/orchestrator.go b/core/orchestrator.go index 3fa677f2aa..6e45ba81a6 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -628,6 +628,7 @@ func (n *LivepeerNode) transcodeSegmentLoop(md *SegTranscodingMetadata, segChan // timeout; clean up goroutine here os.EndSession() los.EndSession() + delete(n.TranscoderManager.streamSessions, md.AuthToken.SessionId) glog.V(common.DEBUG).Infof("Segment loop timed out; closing manifestID=%s sessionID=%s", md.ManifestID, md.AuthToken.SessionId) n.segmentMutex.Lock() mid := ManifestID(md.AuthToken.SessionId) @@ -764,6 +765,8 @@ func NewRemoteTranscoderManager() *RemoteTranscoderManager { taskMutex: &sync.RWMutex{}, taskChans: make(map[int64]TranscoderChan), + + streamSessions: make(map[string]*RemoteTranscoder), } } @@ -788,6 +791,9 @@ type RemoteTranscoderManager struct { taskMutex *sync.RWMutex taskChans map[int64]TranscoderChan taskCount int64 + + //Map for keeping track of sessions and their respective transcoders + streamSessions map[string]*RemoteTranscoder } // RegisteredTranscodersCount returns number of registered transcoders @@ -847,7 +853,7 @@ func (rtm *RemoteTranscoderManager) Manage(stream net.Transcoder_RegisterTransco } } -func (rtm *RemoteTranscoderManager) selectTranscoder() *RemoteTranscoder { +func (rtm *RemoteTranscoderManager) selectTranscoder(sessionId string) *RemoteTranscoder { rtm.RTmutex.Lock() defer rtm.RTmutex.Unlock() @@ -856,12 +862,18 @@ func (rtm *RemoteTranscoderManager) selectTranscoder() *RemoteTranscoder { } for checkTranscoders(rtm) { - last := len(rtm.remoteTranscoders) - 1 - currentTranscoder := rtm.remoteTranscoders[last] - if _, ok := rtm.liveTranscoders[currentTranscoder.stream]; !ok { - // transcoder does not exist in table; remove and retry - rtm.remoteTranscoders = rtm.remoteTranscoders[:last] - continue + currentTranscoder, transcoderExists := rtm.streamSessions[sessionId] + if !transcoderExists { + last := len(rtm.remoteTranscoders) - 1 + currentTranscoder = rtm.remoteTranscoders[last] + + // Assing transcoder to session for future use + rtm.streamSessions[sessionId] = currentTranscoder + if _, ok := rtm.liveTranscoders[currentTranscoder.stream]; !ok { + // transcoder does not exist in table; remove and retry + rtm.remoteTranscoders = rtm.remoteTranscoders[:last] + continue + } } if currentTranscoder.load == currentTranscoder.capacity { // Head of queue is at capacity, so the rest must be too. Exit early @@ -899,7 +911,7 @@ func (rtm *RemoteTranscoderManager) totalLoadAndCapacity() (int, int, int) { // Transcode does actual transcoding using remote transcoder from the pool func (rtm *RemoteTranscoderManager) Transcode(md *SegTranscodingMetadata) (*TranscodeData, error) { - currentTranscoder := rtm.selectTranscoder() + currentTranscoder := rtm.selectTranscoder(md.AuthToken.SessionId) if currentTranscoder == nil { return nil, errors.New("No transcoders available") }