Skip to content

Commit

Permalink
muxer: prevent creating empty parts before switching segments (#184)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Sep 29, 2024
1 parent a6d4897 commit a7b1616
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 227 deletions.
62 changes: 10 additions & 52 deletions muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,42 +72,6 @@ func fmp4TimeScale(c codecs.Codec) uint32 {
return 90000
}

func partDurationIsCompatible(partDuration time.Duration, sampleDuration time.Duration) bool {
if sampleDuration > partDuration {
return false
}

f := (partDuration / sampleDuration)
if (partDuration % sampleDuration) != 0 {
f++
}
f *= sampleDuration

return partDuration > ((f * 85) / 100)
}

func partDurationIsCompatibleWithAll(partDuration time.Duration, sampleDurations map[time.Duration]struct{}) bool {
for sd := range sampleDurations {
if !partDurationIsCompatible(partDuration, sd) {
return false
}
}
return true
}

func findCompatiblePartDuration(
minPartDuration time.Duration,
sampleDurations map[time.Duration]struct{},
) time.Duration {
i := minPartDuration
for ; i < 5*time.Second; i += 5 * time.Millisecond {
if partDurationIsCompatibleWithAll(i, sampleDurations) {
break
}
}
return i
}

type fmp4AugmentedSample struct {
fmp4.PartSample
dts time.Duration
Expand Down Expand Up @@ -168,7 +132,6 @@ type Muxer struct {
nextSegmentID uint64
nextPartID uint64 // low-latency only
segmentDeleteCount int
nextPartHasSamples bool
}

// Start initializes the muxer.
Expand Down Expand Up @@ -406,19 +369,12 @@ func (m *Muxer) rotateParts(nextDTS time.Duration) error {
}

func (m *Muxer) rotatePartsInner(nextDTS time.Duration, createNew bool) error {
if !m.nextPartHasSamples {
for _, stream := range m.streams {
stream.nextPart = nil
}
} else {
m.nextPartID++
m.nextPartHasSamples = false
m.nextPartID++

for _, stream := range m.streams {
err := stream.rotateParts(nextDTS, createNew)
if err != nil {
return err
}
for _, stream := range m.streams {
err := stream.rotateParts(nextDTS, createNew)
if err != nil {
return err
}
}

Expand Down Expand Up @@ -448,9 +404,11 @@ func (m *Muxer) rotateSegmentsInner(
nextNTP time.Time,
force bool,
) error {
err := m.rotatePartsInner(nextDTS, false)
if err != nil {
return err
if m.Variant != MuxerVariantMPEGTS {
err := m.rotatePartsInner(nextDTS, false)
if err != nil {
return err
}
}

m.nextSegmentID++
Expand Down
15 changes: 6 additions & 9 deletions muxer_part.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (
)

type muxerPart struct {
stream *muxerStream
segment *muxerSegmentFMP4
startDTS time.Duration
prefix string
id uint64
storage storage.Part
setNextPartHasSamples func()
stream *muxerStream
segment *muxerSegmentFMP4
startDTS time.Duration
prefix string
id uint64
storage storage.Part

path string
isIndependent bool
Expand Down Expand Up @@ -73,6 +72,4 @@ func (p *muxerPart) writeSample(track *muxerTrack, sample *fmp4AugmentedSample)
}

track.fmp4Samples = append(track.fmp4Samples, &sample.PartSample)

p.setNextPartHasSamples()
}
46 changes: 16 additions & 30 deletions muxer_segment_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@ import (
)

type muxerSegmentFMP4 struct {
variant MuxerVariant
segmentMaxSize uint64
prefix string
nextPartID uint64
storageFactory storage.Factory
rotateParts func(time.Duration) error
setNextPartHasSamples func()
stream *muxerStream
id uint64
startNTP time.Time
startDTS time.Duration
fromForcedRotation bool
variant MuxerVariant
segmentMaxSize uint64
prefix string
nextPartID uint64
storageFactory storage.Factory
stream *muxerStream
id uint64
startNTP time.Time
startDTS time.Duration
fromForcedRotation bool

path string
storage storage.File
Expand All @@ -39,13 +37,12 @@ func (s *muxerSegmentFMP4) initialize() error {
}

s.stream.nextPart = &muxerPart{
stream: s.stream,
segment: s,
startDTS: s.startDTS,
prefix: s.prefix,
id: s.nextPartID,
storage: s.storage.NewPart(),
setNextPartHasSamples: s.setNextPartHasSamples,
stream: s.stream,
segment: s,
startDTS: s.startDTS,
prefix: s.prefix,
id: s.nextPartID,
storage: s.storage.NewPart(),
}
s.stream.nextPart.initialize()

Expand Down Expand Up @@ -87,8 +84,6 @@ func (s *muxerSegmentFMP4) finalize(nextDTS time.Duration) error {
func (s *muxerSegmentFMP4) writeSample(
track *muxerTrack,
sample *fmp4AugmentedSample,
nextDTS time.Duration,
adjustedPartDuration time.Duration,
) error {
size := uint64(len(sample.Payload))
if (s.size + size) > s.segmentMaxSize {
Expand All @@ -98,14 +93,5 @@ func (s *muxerSegmentFMP4) writeSample(

s.stream.nextPart.writeSample(track, sample)

// switch part
if (s.variant == MuxerVariantLowLatency) && track.isLeading &&
s.stream.nextPart.computeDuration(nextDTS) >= adjustedPartDuration {
err := s.rotateParts(nextDTS)
if err != nil {
return err
}
}

return nil
}
Loading

0 comments on commit a7b1616

Please sign in to comment.