Skip to content

Commit

Permalink
return error in OnDecodeError when there are lost RTP packets
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Oct 31, 2022
1 parent 30e0290 commit e8fde26
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 25 deletions.
26 changes: 25 additions & 1 deletion client_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2717,6 +2717,7 @@ func TestClientReadDecodeErrors(t *testing.T) {
for _, ca := range []string{
"invalid rtp",
"invalid rtcp",
"packets lost",
} {
t.Run(ca, func(t *testing.T) {
errorRecv := make(chan struct{})
Expand Down Expand Up @@ -2819,7 +2820,7 @@ func TestClientReadDecodeErrors(t *testing.T) {
})
require.NoError(t, err)

switch ca {
switch ca { //nolint:dupl
case "invalid rtp":
l1.WriteTo([]byte{0x01, 0x02}, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Expand All @@ -2831,6 +2832,27 @@ func TestClientReadDecodeErrors(t *testing.T) {
IP: net.ParseIP("127.0.0.1"),
Port: th.ClientPorts[1],
})

case "packets lost":
byts, _ := rtp.Packet{
Header: rtp.Header{
SequenceNumber: 30,
},
}.Marshal()
l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ClientPorts[0],
})

byts, _ = rtp.Packet{
Header: rtp.Header{
SequenceNumber: 100,
},
}.Marshal()
l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ClientPorts[0],
})
}

req, err = conn.ReadRequest()
Expand All @@ -2855,6 +2877,8 @@ func TestClientReadDecodeErrors(t *testing.T) {
require.EqualError(t, err, "RTP header size insufficient: 2 < 4")
case "invalid rtcp":
require.EqualError(t, err, "rtcp: packet too short")
case "packets lost":
require.EqualError(t, err, "69 RTP packet(s) lost")
}
close(errorRecv)
},
Expand Down
7 changes: 6 additions & 1 deletion clientudpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gortsplib

import (
"crypto/rand"
"fmt"
"net"
"strconv"
"sync/atomic"
Expand Down Expand Up @@ -197,7 +198,11 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) {
return
}

packets := u.ct.reorderer.Process(pkt)
packets, missing := u.ct.reorderer.Process(pkt)

if missing != 0 {
u.c.OnDecodeError(fmt.Errorf("%d RTP packet(s) lost", missing))
}

for _, pkt := range packets {
out, err := u.ct.cleaner.Process(pkt)
Expand Down
22 changes: 12 additions & 10 deletions pkg/rtpreorderer/reorderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
)

const (
bufferSize = 64
bufferSize = 64
negativeThreshold = 0xFFFF / 2
)

// Reorderer filters incoming RTP packets, in order to
Expand All @@ -27,20 +28,21 @@ func New() *Reorderer {
}

// Process processes a RTP packet.
func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {
// It returns a sequence of ordered packets and the number of missing packets.
func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) {
if !r.initialized {
r.initialized = true
r.expectedSeqNum = pkt.SequenceNumber + 1
return []*rtp.Packet{pkt}
return []*rtp.Packet{pkt}, 0
}

relPos := pkt.SequenceNumber - r.expectedSeqNum

// packet is a duplicate or has been sent
// before the first packet processed by Reorderer.
// discard.
if relPos > 0xFFF {
return nil
if relPos > negativeThreshold {
return nil, 0
}

// there's a missing packet and buffer is full.
Expand Down Expand Up @@ -72,7 +74,7 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {
}

r.expectedSeqNum = pkt.SequenceNumber + 1
return ret
return ret, int(relPos) - n + 1
}

// there's a missing packet
Expand All @@ -81,12 +83,12 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {

// current packet is a duplicate. discard
if r.buffer[p] != nil {
return nil
return nil, 0
}

// put current packet in buffer
r.buffer[p] = pkt
return nil
return nil, 0
}

// all packets have been received correctly.
Expand All @@ -102,8 +104,8 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {
}

ret := make([]*rtp.Packet, n)
ret[0] = pkt

ret[0] = pkt
r.absPos++
r.absPos &= (bufferSize - 1)

Expand All @@ -115,5 +117,5 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {

r.expectedSeqNum = pkt.SequenceNumber + n

return ret
return ret, 0
}
30 changes: 19 additions & 11 deletions pkg/rtpreorderer/reorderer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,52 +161,60 @@ func TestReorder(t *testing.T) {
r.absPos = 40

for _, entry := range sequence {
out := r.Process(entry.in)
out, missing := r.Process(entry.in)
require.Equal(t, entry.out, out)
require.Equal(t, 0, missing)
}
}

func TestBufferIsFull(t *testing.T) {
r := New()
r.absPos = 25
sn := uint16(1564)
toMiss := 34

out := r.Process(&rtp.Packet{
out, missing := r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 1,
SequenceNumber: sn,
},
})
require.Equal(t, []*rtp.Packet{{
Header: rtp.Header{
SequenceNumber: 1,
SequenceNumber: sn,
},
}}, out)
require.Equal(t, 0, missing)
sn++

var expected []*rtp.Packet

for i := uint16(0); i < 63; i++ {
out := r.Process(&rtp.Packet{
for i := 0; i < 64-toMiss; i++ {
out, missing := r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + i,
SequenceNumber: sn + uint16(toMiss),
},
})
require.Equal(t, []*rtp.Packet(nil), out)
require.Equal(t, 0, missing)

expected = append(expected, &rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + i,
SequenceNumber: sn + uint16(toMiss),
},
})
sn++
}

out = r.Process(&rtp.Packet{
out, missing = r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + 64,
SequenceNumber: sn + uint16(toMiss),
},
})
require.Equal(t, toMiss, missing)

expected = append(expected, &rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + 64,
SequenceNumber: sn + uint16(toMiss),
},
})

Expand Down
26 changes: 25 additions & 1 deletion server_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,7 @@ func TestServerPublishDecodeErrors(t *testing.T) {
for _, ca := range []string{
"invalid rtp",
"invalid rtcp",
"packets lost",
} {
t.Run(ca, func(t *testing.T) {
errorRecv := make(chan struct{})
Expand Down Expand Up @@ -1506,6 +1507,8 @@ func TestServerPublishDecodeErrors(t *testing.T) {
require.EqualError(t, ctx.Error, "RTP header size insufficient: 2 < 4")
case "invalid rtcp":
require.EqualError(t, ctx.Error, "rtcp: packet too short")
case "packets lost":
require.EqualError(t, ctx.Error, "69 RTP packet(s) lost")
}
close(errorRecv)
},
Expand Down Expand Up @@ -1594,7 +1597,7 @@ func TestServerPublishDecodeErrors(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)

switch ca {
switch ca { //nolint:dupl
case "invalid rtp":
l1.WriteTo([]byte{0x01, 0x02}, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Expand All @@ -1606,6 +1609,27 @@ func TestServerPublishDecodeErrors(t *testing.T) {
IP: net.ParseIP("127.0.0.1"),
Port: resTH.ServerPorts[1],
})

case "packets lost":
byts, _ := rtp.Packet{
Header: rtp.Header{
SequenceNumber: 30,
},
}.Marshal()
l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: resTH.ServerPorts[0],
})

byts, _ = rtp.Packet{
Header: rtp.Header{
SequenceNumber: 100,
},
}.Marshal()
l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: resTH.ServerPorts[0],
})
}

<-errorRecv
Expand Down
11 changes: 10 additions & 1 deletion serverudpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,16 @@ func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) {
return
}

packets := clientData.track.reorderer.Process(pkt)
packets, missing := clientData.track.reorderer.Process(pkt)

if missing != 0 {
if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: clientData.ss,
Error: fmt.Errorf("%d RTP packet(s) lost", missing),
})
}
}

for _, pkt := range packets {
now := time.Now()
Expand Down

0 comments on commit e8fde26

Please sign in to comment.