Skip to content

Commit

Permalink
Add Nack Interceptors
Browse files Browse the repository at this point in the history
* ResponderInterceptor which responds to NACK Requests
* GeneratorInterceptor which generates NACK Requests
  • Loading branch information
masterada authored and Sean-Der committed Dec 14, 2020
1 parent f6577b2 commit 07afb0f
Show file tree
Hide file tree
Showing 15 changed files with 912 additions and 12 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/pion/interceptor
go 1.15

require (
github.com/pion/logging v0.2.2
github.com/pion/rtcp v1.2.6
github.com/pion/rtp v1.6.2
github.com/stretchr/testify v1.6.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo=
Expand Down
12 changes: 0 additions & 12 deletions nack.go

This file was deleted.

6 changes: 6 additions & 0 deletions pkg/nack/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package nack

import "errors"

// ErrInvalidSize is returned by newReceiveLog/newSendBuffer, when an incorrect buffer size is supplied.
var ErrInvalidSize = errors.New("invalid buffer size")
158 changes: 158 additions & 0 deletions pkg/nack/generator_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package nack

import (
"math/rand"
"sync"
"time"

"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)

// GeneratorInterceptor interceptor generates nack feedback messages.
type GeneratorInterceptor struct {
interceptor.NoOp
size uint16
skipLastN uint16
interval time.Duration
receiveLogs *sync.Map
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger

remoteStreamBuf rtp.Packet
}

// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor
func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) {
r := &GeneratorInterceptor{
size: 8192,
skipLastN: 0,
interval: time.Millisecond * 100,
receiveLogs: &sync.Map{},
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
}

for _, opt := range opts {
if err := opt(r); err != nil {
return nil, err
}
}

if _, err := newReceiveLog(r.size); err != nil {
return nil, err
}

return r, nil
}

// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
// will be called once per packet batch.
func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
n.m.Lock()
defer n.m.Unlock()

if n.isClosed() {
return writer
}

n.wg.Add(1)

go n.loop(writer)

return writer
}

// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
if !streamSupportNack(info) {
return reader
}

// error is already checked in NewGeneratorInterceptor
receiveLog, _ := newReceiveLog(n.size)
n.receiveLogs.Store(info.SSRC, receiveLog)

return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
i, attr, err := reader.Read(b, a)
if err != nil {
return 0, nil, err
}

if err = n.remoteStreamBuf.Unmarshal(b[:i]); err != nil {
return 0, nil, err
}
receiveLog.add(n.remoteStreamBuf.Header.SequenceNumber)

return i, attr, nil
})
}

// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (n *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
n.receiveLogs.Delete(info.SSRC)
}

// Close closes the interceptor
func (n *GeneratorInterceptor) Close() error {
defer n.wg.Wait()
n.m.Lock()
defer n.m.Unlock()

if !n.isClosed() {
close(n.close)
}

return nil
}

func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
defer n.wg.Done()

senderSSRC := rand.Uint32() // #nosec

ticker := time.NewTicker(n.interval)
for {
select {
case <-ticker.C:
n.receiveLogs.Range(func(key, value interface{}) bool {
ssrc := key.(uint32)
receiveLog := value.(*receiveLog)

missing := receiveLog.missingSeqNumbers(n.skipLastN)
if len(missing) == 0 {
return true
}

nack := &rtcp.TransportLayerNack{
SenderSSRC: senderSSRC,
MediaSSRC: ssrc,
Nacks: rtcp.NackPairsFromSequenceNumbers(missing),
}

if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil {
n.log.Warnf("failed sending nack: %+v", err)
}

return true
})

case <-n.close:
return
}
}
}

func (n *GeneratorInterceptor) isClosed() bool {
select {
case <-n.close:
return true
default:
return false
}
}
70 changes: 70 additions & 0 deletions pkg/nack/generator_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package nack

import (
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/test"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestGeneratorInterceptor(t *testing.T) {
const interval = time.Millisecond * 10
i, err := NewGeneratorInterceptor(
GeneratorSize(64),
GeneratorSkipLastN(2),
GeneratorInterval(interval),
GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
assert.NoError(t, err)

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 1,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

for _, seqNum := range []uint16{10, 11, 12, 14, 16, 18} {
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}})

select {
case r := <-stream.ReadRTP():
assert.NoError(t, r.Err)
assert.Equal(t, seqNum, r.Packet.SequenceNumber)
case <-time.After(10 * time.Millisecond):
t.Fatal("receiver rtp packet not found")
}
}

time.Sleep(interval * 2) // wait for at least 2 nack packets

select {
case <-stream.WrittenRTCP():
// ignore the first nack, it might only contain the sequence id 13 as missing
default:
}

select {
case pkts := <-stream.WrittenRTCP():
assert.Equal(t, len(pkts), 1, "single packet RTCP Compound Packet expected")

p, ok := pkts[0].(*rtcp.TransportLayerNack)
assert.True(t, ok, "TransportLayerNack rtcp packet expected, found: %T", pkts[0])

assert.Equal(t, uint16(13), p.Nacks[0].PacketID)
assert.Equal(t, rtcp.PacketBitmap(0b10), p.Nacks[0].LostPackets) // we want packets: 13, 15 (not packet 17, because skipLastN is setReceived to 2)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtcp packet not found")
}
}

func TestGeneratorInterceptor_InvalidSize(t *testing.T) {
_, err := NewGeneratorInterceptor(GeneratorSize(5))
assert.Error(t, err, ErrInvalidSize)
}
44 changes: 44 additions & 0 deletions pkg/nack/generator_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package nack

import (
"time"

"github.com/pion/logging"
)

// GeneratorOption can be used to configure GeneratorInterceptor
type GeneratorOption func(r *GeneratorInterceptor) error

// GeneratorSize sets the size of the interceptor.
// Size must be one of: 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768
func GeneratorSize(size uint16) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.size = size
return nil
}
}

// GeneratorSkipLastN sets the number of packets (n-1 packets before the last received packets) to ignore when generating
// nack requests.
func GeneratorSkipLastN(skipLastN uint16) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.skipLastN = skipLastN
return nil
}
}

// GeneratorLog sets a logger for the interceptor
func GeneratorLog(log logging.LeveledLogger) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.log = log
return nil
}
}

// GeneratorInterval sets the nack send interval for the interceptor
func GeneratorInterval(interval time.Duration) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.interval = interval
return nil
}
}
14 changes: 14 additions & 0 deletions pkg/nack/nack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Package nack provides interceptors to implement sending and receiving negative acknowledgements
package nack

import "github.com/pion/interceptor"

func streamSupportNack(info *interceptor.StreamInfo) bool {
for _, fb := range info.RTCPFeedback {
if fb.Type == "nack" && fb.Parameter == "" {
return true
}
}

return false
}
Loading

0 comments on commit 07afb0f

Please sign in to comment.