Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solve the the issue of non-DVB transport stream(TS) which using the PID, 0x10~0x1F, as the PIDs of the video/audio streams of program #54

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package astits
import (
"encoding/binary"
"fmt"

"github.com/asticode/go-astikit"
)

Expand Down Expand Up @@ -35,7 +36,7 @@ type MuxerData struct {
}

// parseData parses a payload spanning over multiple packets and returns a set of data
func parseData(ps []*Packet, prs PacketsParser, pm *programMap) (ds []*DemuxerData, err error) {
func parseData(ps []*Packet, prs PacketsParser, pm *programMap, esm *elementaryStreamMap) (ds []*DemuxerData, err error) {
// Use custom parser first
if prs != nil {
var skip bool
Expand Down Expand Up @@ -79,7 +80,7 @@ func parseData(ps []*Packet, prs PacketsParser, pm *programMap) (ds []*DemuxerDa
if pid == PIDCAT {
// Information in a CAT payload is private and dependent on the CA system. Use the PacketsParser
// to parse this type of payload
} else if isPSIPayload(pid, pm) {
} else if isPSIPayload(pid, pm, esm) {
// Parse PSI data
var psiData *PSIData
if psiData, err = parsePSIData(i); err != nil {
Expand Down Expand Up @@ -110,10 +111,11 @@ func parseData(ps []*Packet, prs PacketsParser, pm *programMap) (ds []*DemuxerDa
}

// isPSIPayload checks whether the payload is a PSI one
func isPSIPayload(pid uint16, pm *programMap) bool {
func isPSIPayload(pid uint16, pm *programMap, esm *elementaryStreamMap) bool {
return pid == PIDPAT || // PAT
pm.existsUnlocked(pid) || // PMT
((pid >= 0x10 && pid <= 0x14) || (pid >= 0x1e && pid <= 0x1f)) //DVB
(((pid >= 0x10 && pid <= 0x14) || (pid >= 0x1e && pid <= 0x1f)) && //DVB
!esm.existsLocked(pid)) // for non-DVB
}

// isPESPayload checks whether the payload is a PES one
Expand Down
14 changes: 8 additions & 6 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
func TestParseData(t *testing.T) {
// Init
pm := newProgramMap()
esm := newElementaryStreamMap()
ps := []*Packet{}

// Custom parser
Expand All @@ -20,13 +21,13 @@ func TestParseData(t *testing.T) {
skip = true
return
}
ds, err := parseData(ps, c, pm)
ds, err := parseData(ps, c, pm, esm)
assert.NoError(t, err)
assert.Equal(t, cds, ds)

// Do nothing for CAT
ps = []*Packet{{Header: PacketHeader{PID: PIDCAT}}}
ds, err = parseData(ps, nil, pm)
ds, err = parseData(ps, nil, pm, esm)
assert.NoError(t, err)
assert.Empty(t, ds)

Expand All @@ -42,7 +43,7 @@ func TestParseData(t *testing.T) {
Payload: p[33:],
},
}
ds, err = parseData(ps, nil, pm)
ds, err = parseData(ps, nil, pm, esm)
assert.NoError(t, err)
assert.Equal(t, []*DemuxerData{
{
Expand All @@ -64,7 +65,7 @@ func TestParseData(t *testing.T) {
Payload: p[33:],
},
}
ds, err = parseData(ps, nil, pm)
ds, err = parseData(ps, nil, pm, esm)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test using esm.setUnlocked() like it was done with pm.setUnlocked(uint16(256), uint16(1)) above?

assert.NoError(t, err)
assert.Equal(t, psi.toData(
&Packet{Header: ps[0].Header, AdaptationField: ps[0].AdaptationField},
Expand All @@ -74,15 +75,16 @@ func TestParseData(t *testing.T) {

func TestIsPSIPayload(t *testing.T) {
pm := newProgramMap()
esm := newElementaryStreamMap()
var pids []int
for i := 0; i <= 255; i++ {
if isPSIPayload(uint16(i), pm) {
if isPSIPayload(uint16(i), pm, esm) {
pids = append(pids, i)
}
}
assert.Equal(t, []int{0, 16, 17, 18, 19, 20, 30, 31}, pids)
pm.setUnlocked(uint16(1), uint16(0))
assert.True(t, isPSIPayload(uint16(1), pm))
assert.True(t, isPSIPayload(uint16(1), pm, esm))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test using esm.setUnlocked() like it was done with pm.setUnlocked(uint16(1), uint16(0)) above?

}

func TestIsPESPayload(t *testing.T) {
Expand Down
11 changes: 9 additions & 2 deletions demuxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Demuxer struct {
packetBuffer *packetBuffer
packetPool *packetPool
programMap *programMap
streamMap *elementaryStreamMap
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you order fields alphabetically?

r io.Reader
}

Expand All @@ -52,6 +53,7 @@ func NewDemuxer(ctx context.Context, r io.Reader, opts ...func(*Demuxer)) (d *De
ctx: ctx,
l: astikit.AdaptStdLogger(nil),
programMap: newProgramMap(),
streamMap: newElementaryStreamMap(),
r: r,
}
d.packetPool = newPacketPool(d.programMap)
Expand Down Expand Up @@ -145,7 +147,7 @@ func (dmx *Demuxer) NextData() (d *DemuxerData, err error) {

// Parse data
var errParseData error
if ds, errParseData = parseData(ps, dmx.optPacketsParser, dmx.programMap); errParseData != nil {
if ds, errParseData = parseData(ps, dmx.optPacketsParser, dmx.programMap, dmx.streamMap); errParseData != nil {
// Log error as there may be some incomplete data here
// We still want to try to parse all packets, in case final data is complete
dmx.l.Error(fmt.Errorf("astits: parsing data failed: %w", errParseData))
Expand All @@ -170,7 +172,7 @@ func (dmx *Demuxer) NextData() (d *DemuxerData, err error) {
}

// Parse data
if ds, err = parseData(ps, dmx.optPacketsParser, dmx.programMap); err != nil {
if ds, err = parseData(ps, dmx.optPacketsParser, dmx.programMap, dmx.streamMap); err != nil {
err = fmt.Errorf("astits: building new data failed: %w", err)
return
}
Expand Down Expand Up @@ -199,6 +201,11 @@ func (dmx *Demuxer) updateData(ds []*DemuxerData) (d *DemuxerData) {
}
}
}
if v.PMT != nil {
for _, es := range v.PMT.ElementaryStreams {
dmx.streamMap.setLocked(es.ElementaryPID, v.PMT.ProgramNumber)
}
}
}
}
return
Expand Down
29 changes: 29 additions & 0 deletions stream_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package astits

// elementaryStreamMap represents an elementary stream ids map
type elementaryStreamMap struct {
// We use map[uint32] instead map[uint16] as go runtime provide optimized hash functions for (u)int32/64 keys
es map[uint32]uint16 // map[StreamID]ProgramNumber
}

// newElementaryStreamMap creates a new elementary stream ids map
func newElementaryStreamMap() *elementaryStreamMap {
return &elementaryStreamMap{
es: make(map[uint32]uint16),
}
}

// setLocked sets a new stream id to the elementary stream
func (m elementaryStreamMap) setLocked(pid, number uint16) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rename Locked to Unlocked in all 3 functions?

The idea is to let the developer know the function is not locking any mutex while manipulating the map which is really unsafe.

m.es[uint32(pid)] = number
}

// existsLocked checks whether the stream with this pid exists
func (m elementaryStreamMap) existsLocked(pid uint16) (ok bool) {
_, ok = m.es[uint32(pid)]
return
}

func (m elementaryStreamMap) unsetLocked(pid uint16) {
delete(m.es, uint32(pid))
}
16 changes: 16 additions & 0 deletions stream_map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package astits

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestElementaryStreamMap(t *testing.T) {
esm := newElementaryStreamMap()
assert.False(t, esm.existsLocked(0x16))
esm.setLocked(0x16, 1)
assert.True(t, esm.existsLocked(0x16))
esm.unsetLocked(0x16)
assert.False(t, esm.existsLocked(0x16))
}