Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not attempt to unpublish a track from a closed room #150

Merged
merged 1 commit into from
Sep 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pkg/lksdk_output/lksdk_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync/atomic"

"github.com/frostbyte73/core"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"

Expand All @@ -37,6 +38,7 @@ type VideoSampleProvider interface {
type LKSDKOutput struct {
logger logger.Logger
room *lksdk.Room
closed core.Fuse

params *params.Params
}
Expand All @@ -59,6 +61,7 @@ func NewLKSDKOutput(ctx context.Context, p *params.Params) (*LKSDKOutput, error)
room: room,
params: p,
logger: logger.GetLogger().WithValues("ingressID", p.IngressId, "resourceID", p.State.ResourceId, "roomID", room.SID()),
closed: core.NewFuse(),
}

s.logger.Infow("connected to room")
Expand All @@ -85,7 +88,8 @@ func (s *LKSDKOutput) AddAudioTrack(output lksdk.SampleProvider, mimeType string
var pub *lksdk.LocalTrackPublication
onComplete := func() {
s.logger.Debugw("audio track write complete, unpublishing audio track")
if pub != nil {
// don't unpublish if the completion is due to the output closing
if pub != nil && !s.closed.IsBroken() {
if err := s.room.LocalParticipant.UnpublishTrack(pub.SID()); err != nil {
s.logger.Errorw("could not unpublish audio track", err)
}
Expand Down Expand Up @@ -124,7 +128,8 @@ func (s *LKSDKOutput) AddVideoTrack(outputs []VideoSampleProvider, layers []*liv
output := outputs[i]
onComplete := func() {
s.logger.Debugw("video track layer write complete", "layer", layer.Quality.String())
if pub != nil {
// don't unpublish if the completion is due to the output closing
if pub != nil && !s.closed.IsBroken() {
if atomic.AddInt32(&activeLayerCount, -1) == 0 {
s.logger.Debugw("unpublishing video track")
if err := s.room.LocalParticipant.UnpublishTrack(pub.SID()); err != nil {
Expand Down Expand Up @@ -175,5 +180,8 @@ func (s *LKSDKOutput) AddVideoTrack(outputs []VideoSampleProvider, layers []*liv

func (s *LKSDKOutput) Close() {
s.logger.Debugw("disconnecting from room")

s.closed.Break()

s.room.Disconnect()
}