Skip to content

Commit

Permalink
hls source: start reading live streams from (end of playlist - starti…
Browse files Browse the repository at this point in the history
…ng point)
  • Loading branch information
aler9 committed Oct 20, 2022
1 parent 7fa9f87 commit 8c002dd
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 57 deletions.
1 change: 1 addition & 0 deletions internal/core/hls_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (ts *testHLSServer) onPlaylist(ctx *gin.Context) {
#EXT-X-MEDIA-SEQUENCE:0
#EXTINF:2,
segment.ts
#EXT-X-ENDLIST
`

ctx.Writer.Header().Set("Content-Type", `application/x-mpegURL`)
Expand Down
6 changes: 3 additions & 3 deletions internal/hls/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
)

const (
clientMinDownloadPause = 5 * time.Second
clientQueueSize = 100
clientMinSegmentsBeforeDownloading = 2
clientMinDownloadPause = 5 * time.Second
clientEntryQueueSize = 100
clientLiveStartingPoint = 3
)

func clientURLAbsolute(base *url.URL, relative string) (*url.URL, error) {
Expand Down
137 changes: 85 additions & 52 deletions internal/hls/client_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,33 @@ import (
"github.com/aler9/rtsp-simple-server/internal/logger"
)

func segmentsLen(segments []*m3u8.MediaSegment) int {
for i, seg := range segments {
if seg == nil {
return i
}
}
return 0
}

func findStartingSegment(segments []*m3u8.MediaSegment) *m3u8.MediaSegment {
pos := len(segments) - clientLiveStartingPoint
if pos < 0 {
return nil
}

return segments[pos]
}

func findSegmentWithID(seqNo uint64, segments []*m3u8.MediaSegment, id uint64) *m3u8.MediaSegment {
pos := int(int64(id) - int64(seqNo))
if (pos) >= len(segments) {
return nil
}

return segments[pos]
}

type clientDownloader struct {
primaryPlaylistURL *url.URL
segmentQueue *clientSegmentQueue
Expand All @@ -27,11 +54,9 @@ type clientDownloader struct {
onAudioData func(time.Duration, []byte)
rp *clientRoutinePool

streamPlaylistURL *url.URL
downloadedSegmentURIs []string
httpClient *http.Client
lastDownloadTime time.Time
firstPlaylistReceived bool
streamPlaylistURL *url.URL
httpClient *http.Client
curSegmentID *uint64
}

func newClientDownloader(
Expand Down Expand Up @@ -82,48 +107,32 @@ func newClientDownloader(

func (d *clientDownloader) run(ctx context.Context) error {
for {
ok := d.segmentQueue.waitUntilSizeIsBelow(ctx, clientMinSegmentsBeforeDownloading)
ok := d.segmentQueue.waitUntilSizeIsBelow(ctx, 1)
if !ok {
return fmt.Errorf("terminated")
}

_, err := d.fillSegmentQueue(ctx)
err := d.fillSegmentQueue(ctx)
if err != nil {
return err
}
}
}

func (d *clientDownloader) fillSegmentQueue(ctx context.Context) (bool, error) {
minTime := d.lastDownloadTime.Add(clientMinDownloadPause)
now := time.Now()
if now.Before(minTime) {
select {
case <-time.After(minTime.Sub(now)):
case <-ctx.Done():
return false, fmt.Errorf("terminated")
}
}
func (d *clientDownloader) fillSegmentQueue(ctx context.Context) error {
var pl *m3u8.MediaPlaylist

d.lastDownloadTime = now

pl, err := func() (*m3u8.MediaPlaylist, error) {
if d.streamPlaylistURL == nil {
return d.downloadPrimaryPlaylist(ctx)
if d.streamPlaylistURL == nil {
var err error
pl, err = d.downloadPrimaryPlaylist(ctx)
if err != nil {
return err
}
return d.downloadStreamPlaylist(ctx)
}()
if err != nil {
return false, err
}

if !d.firstPlaylistReceived {
d.firstPlaylistReceived = true

if pl.Map != nil && pl.Map.URI != "" {
byts, err := d.downloadSegment(ctx, pl.Map.URI)
if err != nil {
return false, err
return err
}

proc, err := newClientProcessorFMP4(
Expand All @@ -136,7 +145,7 @@ func (d *clientDownloader) fillSegmentQueue(ctx context.Context) (bool, error) {
d.onAudioData,
)
if err != nil {
return false, err
return err
}

d.rp.add(proc)
Expand All @@ -151,37 +160,60 @@ func (d *clientDownloader) fillSegmentQueue(ctx context.Context) (bool, error) {
)
d.rp.add(proc)
}
} else {
var err error
pl, err = d.downloadStreamPlaylist(ctx)
if err != nil {
return err
}
}

added := false
pl.Segments = pl.Segments[:segmentsLen(pl.Segments)]
var seg *m3u8.MediaSegment

for _, seg := range pl.Segments {
if d.curSegmentID == nil {
if !pl.Closed { // live stream: start from clientLiveStartingPoint
seg = findStartingSegment(pl.Segments)
if seg == nil {
return fmt.Errorf("there aren't enough segments to fill the buffer")
}
} else { // VOD stream: start from beginning
if len(pl.Segments) == 0 {
return fmt.Errorf("no segments found")
}
seg = pl.Segments[0]
}
} else {
seg = findSegmentWithID(pl.SeqNo, pl.Segments, *d.curSegmentID+1)
if seg == nil {
break
if !pl.Closed { // live stream
d.logger.Log(logger.Warn, "resetting segment ID")
seg = findStartingSegment(pl.Segments)
if seg == nil {
return fmt.Errorf("there aren't enough segments to fill the buffer")
}
} else { // VOD stream
return fmt.Errorf("following segment not found")
}
}
}

if !d.segmentWasDownloaded(seg.URI) {
d.downloadedSegmentURIs = append(d.downloadedSegmentURIs, seg.URI)
byts, err := d.downloadSegment(ctx, seg.URI)
if err != nil {
return false, err
}
v := seg.SeqId
d.curSegmentID = &v

d.segmentQueue.push(byts)
added = true
}
byts, err := d.downloadSegment(ctx, seg.URI)
if err != nil {
return err
}

return added, nil
}
d.segmentQueue.push(byts)

func (d *clientDownloader) segmentWasDownloaded(ur string) bool {
for _, q := range d.downloadedSegmentURIs {
if q == ur {
return true
}
if pl.Closed && pl.Segments[len(pl.Segments)-1] == seg {
<-ctx.Done()
return fmt.Errorf("stream has ended")
}
return false

return nil
}

func (d *clientDownloader) downloadPrimaryPlaylist(ctx context.Context) (*m3u8.MediaPlaylist, error) {
Expand All @@ -194,6 +226,7 @@ func (d *clientDownloader) downloadPrimaryPlaylist(ctx context.Context) (*m3u8.M

switch plt := pl.(type) {
case *m3u8.MediaPlaylist:
d.logger.Log(logger.Debug, "primary playlist is a stream playlist")
d.streamPlaylistURL = d.primaryPlaylistURL
return plt, nil

Expand Down
2 changes: 1 addition & 1 deletion internal/hls/client_processor_mpegts_track.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newClientProcessorMPEGTSTrack(
return &clientProcessorMPEGTSTrack{
startRTC: startRTC,
onEntry: onEntry,
queue: make(chan clientProcessorMPEGTSTrackEntry, clientQueueSize),
queue: make(chan clientProcessorMPEGTSTrackEntry, clientEntryQueueSize),
}
}

Expand Down
4 changes: 3 additions & 1 deletion internal/hls/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ func newTestHLSServer(ca string) (*testHLSServer, error) {
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:0
#EXTINF:2,
` + segment + "\n"
` + segment + `
#EXT-X-ENDLIST
`

ctx.Writer.Header().Set("Content-Type", `application/x-mpegURL`)
io.Copy(ctx.Writer, bytes.NewReader([]byte(cnt)))
Expand Down

0 comments on commit 8c002dd

Please sign in to comment.