-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Provide API so that handling around RTP can be easily defined by the user. See the design doc here[0] [0] pion/webrtc-v3-design#34
- Loading branch information
Showing
21 changed files
with
914 additions
and
72 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package webrtc | ||
|
||
import ( | ||
"github.com/pion/webrtc/v3/pkg/interceptor/movetopionrtp" | ||
) | ||
|
||
func convertRTPParameters(in RTPParameters) movetopionrtp.RTPParameters { | ||
return movetopionrtp.RTPParameters{ | ||
HeaderExtensions: convertHeaderExtensions(in.HeaderExtensions), | ||
Codecs: convertRTPCodecParameters(in.Codecs), | ||
} | ||
} | ||
|
||
func convertHeaderExtensions(in []RTPHeaderExtensionParameter) []movetopionrtp.RTPHeaderExtensionParameter { | ||
result := make([]movetopionrtp.RTPHeaderExtensionParameter, 0, len(in)) | ||
for _, v := range in { | ||
result = append(result, convertHeaderExtension(v)) | ||
} | ||
|
||
return result | ||
} | ||
|
||
func convertHeaderExtension(in RTPHeaderExtensionParameter) movetopionrtp.RTPHeaderExtensionParameter { | ||
return movetopionrtp.RTPHeaderExtensionParameter{ | ||
URI: in.URI, | ||
ID: in.ID, | ||
} | ||
} | ||
|
||
func convertRTPCodecParameters(in []RTPCodecParameters) []movetopionrtp.RTPCodecParameters { | ||
result := make([]movetopionrtp.RTPCodecParameters, 0, len(in)) | ||
for _, v := range in { | ||
result = append(result, convertRTPCodecParameter(v)) | ||
} | ||
|
||
return result | ||
} | ||
|
||
func convertRTPCodecParameter(in RTPCodecParameters) movetopionrtp.RTPCodecParameters { | ||
return movetopionrtp.RTPCodecParameters{ | ||
RTPCodecCapability: movetopionrtp.RTPCodecCapability{ | ||
MimeType: in.MimeType, | ||
ClockRate: in.ClockRate, | ||
Channels: in.Channels, | ||
SDPFmtpLine: in.SDPFmtpLine, | ||
RTCPFeedback: convertRTCPFeedbacks(in.RTCPFeedback), | ||
}, | ||
PayloadType: convertPayloadType(in.PayloadType), | ||
} | ||
} | ||
|
||
func convertRTCPFeedbacks(in []RTCPFeedback) []movetopionrtp.RTCPFeedback { | ||
result := make([]movetopionrtp.RTCPFeedback, 0, len(in)) | ||
for _, v := range in { | ||
result = append(result, convertRTCPFeedback(v)) | ||
} | ||
|
||
return result | ||
} | ||
|
||
func convertRTCPFeedback(in RTCPFeedback) movetopionrtp.RTCPFeedback { | ||
return movetopionrtp.RTCPFeedback{ | ||
Type: in.Type, | ||
Parameter: in.Parameter, | ||
} | ||
} | ||
|
||
func convertPayloadType(in PayloadType) movetopionrtp.PayloadType { | ||
return movetopionrtp.PayloadType(in) | ||
} | ||
|
||
func convertSSRC(in SSRC) movetopionrtp.SSRC { | ||
return movetopionrtp.SSRC(in) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
// +build !js | ||
|
||
package webrtc | ||
|
||
import ( | ||
"github.com/pion/webrtc/v3/pkg/interceptor" | ||
) | ||
|
||
// InterceptorRegistry is a collector for interceptors. | ||
type InterceptorRegistry struct { | ||
interceptors []interceptor.Interceptor | ||
} | ||
|
||
// Add adds a new Interceptor to the registry. | ||
func (i *InterceptorRegistry) Add(icpr interceptor.Interceptor) { | ||
i.interceptors = append(i.interceptors, icpr) | ||
} | ||
|
||
func (i *InterceptorRegistry) build() interceptor.Interceptor { | ||
if len(i.interceptors) == 0 { | ||
return &interceptor.NoOp{} | ||
} | ||
|
||
return interceptor.NewChain(i.interceptors) | ||
} | ||
|
||
// RegisterDefaultInterceptors will register some useful interceptors. If you want to customize which interceptors are loaded, | ||
// you should copy the code from this method and remove unwanted interceptors. | ||
func RegisterDefaultInterceptors(mediaEngine *MediaEngine, interceptorRegistry *InterceptorRegistry) error { | ||
err := ConfigureNack(mediaEngine, interceptorRegistry) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ConfigureNack will setup everything necessary for handling generating/responding to nack messages. | ||
func ConfigureNack(mediaEngine *MediaEngine, interceptorRegistry *InterceptorRegistry) error { | ||
mediaEngine.RegisterFeedback(RTCPFeedback{Type: "nack"}, RTPCodecTypeVideo) | ||
mediaEngine.RegisterFeedback(RTCPFeedback{Type: "nack", Parameter: "pli"}, RTPCodecTypeVideo) | ||
interceptorRegistry.Add(&interceptor.NACK{}) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
// +build !js | ||
|
||
package webrtc | ||
|
||
import ( | ||
"sync" | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
|
||
"github.com/pion/rtcp" | ||
"github.com/pion/rtp" | ||
"github.com/pion/transport/test" | ||
"github.com/pion/webrtc/v3/pkg/interceptor" | ||
"github.com/pion/webrtc/v3/pkg/media" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
type testInterceptor struct { | ||
t *testing.T | ||
extensionID uint8 | ||
rtcpWriter atomic.Value | ||
lastRTCP atomic.Value | ||
interceptor.NoOp | ||
} | ||
|
||
func (t *testInterceptor) BindLocalTrack(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { | ||
return interceptor.RTPWriterFunc(func(p *rtp.Packet, attributes interceptor.Attributes) (int, error) { | ||
// set extension on outgoing packet | ||
p.Header.Extension = true | ||
p.Header.ExtensionProfile = 0xBEDE | ||
assert.NoError(t.t, p.Header.SetExtension(t.extensionID, []byte("write"))) | ||
|
||
return writer.Write(p, attributes) | ||
}) | ||
} | ||
|
||
func (t *testInterceptor) BindRemoteTrack(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { | ||
return interceptor.RTPReaderFunc(func() (*rtp.Packet, interceptor.Attributes, error) { | ||
p, attributes, err := reader.Read() | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
// set extension on incoming packet | ||
p.Header.Extension = true | ||
p.Header.ExtensionProfile = 0xBEDE | ||
assert.NoError(t.t, p.Header.SetExtension(t.extensionID, []byte("read"))) | ||
|
||
// write back a pli | ||
rtcpWriter := t.rtcpWriter.Load().(interceptor.RTCPWriter) | ||
pli := &rtcp.PictureLossIndication{SenderSSRC: uint32(info.SSRC), MediaSSRC: uint32(info.SSRC)} | ||
_, err = rtcpWriter.Write([]rtcp.Packet{pli}, make(interceptor.Attributes)) | ||
assert.NoError(t.t, err) | ||
|
||
return p, attributes, nil | ||
}) | ||
} | ||
|
||
func (t *testInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { | ||
return interceptor.RTCPReaderFunc(func() ([]rtcp.Packet, interceptor.Attributes, error) { | ||
pkts, attributes, err := reader.Read() | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
t.lastRTCP.Store(pkts[0]) | ||
|
||
return pkts, attributes, nil | ||
}) | ||
} | ||
|
||
func (t *testInterceptor) lastReadRTCP() rtcp.Packet { | ||
p, _ := t.lastRTCP.Load().(rtcp.Packet) | ||
return p | ||
} | ||
|
||
func (t *testInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { | ||
t.rtcpWriter.Store(writer) | ||
return writer | ||
} | ||
|
||
func TestPeerConnection_Interceptor(t *testing.T) { | ||
to := test.TimeOut(time.Second * 20) | ||
defer to.Stop() | ||
|
||
report := test.CheckRoutines(t) | ||
defer report() | ||
|
||
createPC := func(interceptor interceptor.Interceptor) *PeerConnection { | ||
m := &MediaEngine{} | ||
err := m.RegisterDefaultCodecs() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
ir := &InterceptorRegistry{} | ||
ir.Add(interceptor) | ||
pc, err := NewAPI(WithMediaEngine(m), WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{}) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
return pc | ||
} | ||
|
||
sendInterceptor := &testInterceptor{t: t, extensionID: 1} | ||
senderPC := createPC(sendInterceptor) | ||
receiverPC := createPC(&testInterceptor{t: t, extensionID: 2}) | ||
|
||
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: "video/vp8"}, "video", "pion") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
sender, err := senderPC.AddTrack(track) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
pending := new(int32) | ||
wg := &sync.WaitGroup{} | ||
|
||
wg.Add(1) | ||
*pending++ | ||
receiverPC.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) { | ||
p, readErr := track.ReadRTP() | ||
if readErr != nil { | ||
t.Fatal(readErr) | ||
} | ||
assert.Equal(t, p.Extension, true) | ||
assert.Equal(t, "write", string(p.GetExtension(1))) | ||
assert.Equal(t, "read", string(p.GetExtension(2))) | ||
atomic.AddInt32(pending, -1) | ||
wg.Done() | ||
|
||
for { | ||
_, readErr = track.ReadRTP() | ||
if readErr != nil { | ||
return | ||
} | ||
} | ||
}) | ||
|
||
wg.Add(1) | ||
*pending++ | ||
go func() { | ||
_, readErr := sender.ReadRTCP() | ||
assert.NoError(t, readErr) | ||
atomic.AddInt32(pending, -1) | ||
wg.Done() | ||
|
||
for { | ||
_, readErr = sender.ReadRTCP() | ||
if readErr != nil { | ||
return | ||
} | ||
} | ||
}() | ||
|
||
err = signalPair(senderPC, receiverPC) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
for { | ||
time.Sleep(time.Millisecond * 100) | ||
if routineErr := track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); routineErr != nil { | ||
t.Error(routineErr) | ||
return | ||
} | ||
|
||
if atomic.LoadInt32(pending) == 0 { | ||
return | ||
} | ||
} | ||
}() | ||
|
||
wg.Wait() | ||
assert.NoError(t, senderPC.Close()) | ||
assert.NoError(t, receiverPC.Close()) | ||
|
||
pli, _ := sendInterceptor.lastReadRTCP().(*rtcp.PictureLossIndication) | ||
if pli == nil || pli.SenderSSRC == 0 { | ||
t.Errorf("pli not found by send interceptor") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
// +build !js | ||
|
||
package webrtc | ||
|
||
import ( | ||
"sync/atomic" | ||
|
||
"github.com/pion/rtp" | ||
"github.com/pion/webrtc/v3/pkg/interceptor" | ||
) | ||
|
||
type interceptorTrackLocalWriter struct { | ||
TrackLocalWriter | ||
rtpWriter atomic.Value | ||
} | ||
|
||
func (i *interceptorTrackLocalWriter) setRTPWriter(writer interceptor.RTPWriter) { | ||
i.rtpWriter.Store(writer) | ||
} | ||
|
||
func (i *interceptorTrackLocalWriter) WriteRTP(header *rtp.Header, payload []byte) (int, error) { | ||
writer := i.rtpWriter.Load().(interceptor.RTPWriter) | ||
|
||
if writer == nil { | ||
return 0, nil | ||
} | ||
|
||
return writer.Write(&rtp.Packet{Header: *header, Payload: payload}, make(interceptor.Attributes)) | ||
} |
Oops, something went wrong.