Skip to content

Commit

Permalink
hls client: skip packets received before the 1st packet of the leading
Browse files Browse the repository at this point in the history
track; make sure that the initial DTS is zero
  • Loading branch information
aler9 committed Oct 26, 2022
1 parent 8cec54c commit 7981522
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 142 deletions.
5 changes: 5 additions & 0 deletions internal/hls/client_processor_mpegts_track.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,10 @@ func (t *clientProcessorMPEGTSTrack) processEntry(ctx context.Context, pes *asti
return err
}

// silently discard packets prior to the first packet of the leading track
if pts < 0 {
return nil
}

return t.onEntry(pts, pes.Data)
}
15 changes: 7 additions & 8 deletions internal/hls/client_timesync_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,30 @@ package hls
import (
"context"
"fmt"
"sync"
"time"

"github.com/aler9/rtsp-simple-server/internal/hls/mpegts"
"github.com/aler9/rtsp-simple-server/internal/hls/mpegtstimedec"
)

type clientTimeSyncMPEGTS struct {
startRTC time.Time
startDTS int64
td *mpegts.TimeDecoder
td *mpegtstimedec.Decoder
mutex sync.Mutex
}

func newClientTimeSyncMPEGTS(startDTS int64) *clientTimeSyncMPEGTS {
return &clientTimeSyncMPEGTS{
startRTC: time.Now(),
startDTS: startDTS,
td: mpegts.NewTimeDecoder(),
td: mpegtstimedec.New(startDTS),
}
}

func (ts *clientTimeSyncMPEGTS) convertAndSync(ctx context.Context, rawDTS int64, rawPTS int64) (time.Duration, error) {
rawDTS = (rawDTS - ts.startDTS) & 0x1FFFFFFFF
rawPTS = (rawPTS - ts.startDTS) & 0x1FFFFFFFF

ts.mutex.Lock()
dts := ts.td.Decode(rawDTS)
pts := ts.td.Decode(rawPTS)
ts.mutex.Unlock()

elapsed := time.Since(ts.startRTC)
if dts > elapsed {
Expand Down
55 changes: 0 additions & 55 deletions internal/hls/mpegts/timedecoder.go

This file was deleted.

79 changes: 0 additions & 79 deletions internal/hls/mpegts/timedecoder_test.go

This file was deleted.

46 changes: 46 additions & 0 deletions internal/hls/mpegtstimedec/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Package mpegtstimedec contains a MPEG-TS timestamp decoder.
package mpegtstimedec

import (
"time"
)

const (
maximum = 0x1FFFFFFFF // 33 bits
negativeThreshold = 0x1FFFFFFFF / 2
clockRate = 90000
)

// Decoder is a MPEG-TS timestamp decoder.
type Decoder struct {
overall time.Duration
prev int64
}

// New allocates a Decoder.
func New(start int64) *Decoder {
return &Decoder{
prev: start,
}
}

// Decode decodes a MPEG-TS timestamp.
func (d *Decoder) Decode(ts int64) time.Duration {
diff := (ts - d.prev) & maximum

// negative difference
if diff > negativeThreshold {
diff = (d.prev - ts) & maximum
d.prev = ts
d.overall -= time.Duration(diff)
} else {
d.prev = ts
d.overall += time.Duration(diff)
}

// avoid an int64 overflow and preserve resolution by splitting division into two parts:
// first add the integer part, then the decimal part.
secs := d.overall / clockRate
dec := d.overall % clockRate
return secs*time.Second + dec*time.Second/clockRate
}
72 changes: 72 additions & 0 deletions internal/hls/mpegtstimedec/decoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package mpegtstimedec

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestNegativeDiff(t *testing.T) {
d := New(64523434)

ts := d.Decode(64523434 - 90000)
require.Equal(t, -1*time.Second, ts)

ts = d.Decode(64523434)
require.Equal(t, time.Duration(0), ts)

ts = d.Decode(64523434 + 90000*2)
require.Equal(t, 2*time.Second, ts)

ts = d.Decode(64523434 + 90000)
require.Equal(t, 1*time.Second, ts)
}

func TestOverflow(t *testing.T) {
d := New(0x1FFFFFFFF - 20)

i := int64(0x1FFFFFFFF - 20)
secs := time.Duration(0)
const stride = 150
lim := int64(uint64(0x1FFFFFFFF - (stride * 90000)))

for n := 0; n < 100; n++ {
// overflow
i += 90000 * stride
secs += stride
ts := d.Decode(i)
require.Equal(t, secs*time.Second, ts)

// reach 2^32 slowly
secs += stride
i += 90000 * stride
for ; i < lim; i += 90000 * stride {
ts = d.Decode(i)
require.Equal(t, secs*time.Second, ts)
secs += stride
}
}
}

func TestOverflowAndBack(t *testing.T) {
d := New(0x1FFFFFFFF - 90000 + 1)

ts := d.Decode(0x1FFFFFFFF - 90000 + 1)
require.Equal(t, time.Duration(0), ts)

ts = d.Decode(90000)
require.Equal(t, 2*time.Second, ts)

ts = d.Decode(0x1FFFFFFFF - 90000 + 1)
require.Equal(t, time.Duration(0), ts)

ts = d.Decode(0x1FFFFFFFF - 90000*2 + 1)
require.Equal(t, -1*time.Second, ts)

ts = d.Decode(0x1FFFFFFFF - 90000 + 1)
require.Equal(t, time.Duration(0), ts)

ts = d.Decode(90000)
require.Equal(t, 2*time.Second, ts)
}

0 comments on commit 7981522

Please sign in to comment.