Skip to content

Commit

Permalink
interceptor POCv2
Browse files Browse the repository at this point in the history
  • Loading branch information
masterada committed Nov 18, 2020
1 parent 471e014 commit ca967fd
Show file tree
Hide file tree
Showing 17 changed files with 341 additions and 267 deletions.
18 changes: 18 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type API struct {
settingEngine *SettingEngine
mediaEngine *MediaEngine
interceptors []Interceptor
interceptor Interceptor
}

// NewAPI Creates a new API object for keeping semi-global settings to WebRTC objects
Expand All @@ -35,6 +37,10 @@ func NewAPI(options ...func(*API)) *API {
a.mediaEngine = &MediaEngine{}
}

if len(a.interceptors) > 0 {
a.interceptor = newInterceptorChain(a.interceptors, a.settingEngine.LoggerFactory.NewLogger("interceptor"))
}

return a
}

Expand All @@ -53,3 +59,15 @@ func WithSettingEngine(s SettingEngine) func(a *API) {
a.settingEngine = &s
}
}

func WithInterceptor(interceptor Interceptor) func(a *API) {
return func(a *API) {
a.interceptors = append(a.interceptors, interceptor)
}
}

func ClearInterceptors() func(a *API) {
return func(a *API) {
a.interceptors = nil
}
}
121 changes: 25 additions & 96 deletions interceptor.go
Original file line number Diff line number Diff line change
@@ -1,123 +1,52 @@
package webrtc

import (
"context"
"errors"
"io"

"github.com/pion/rtcp"
"github.com/pion/rtp"
)

type Interceptor interface {
Intercept(*PeerConnection, ReadWriter) ReadWriter
}

// Reader is an interface to handle incoming RTP stream.
type ReadWriter interface {
ReadRTP(context.Context) (*rtp.Packet, map[interface{}]interface{}, error)
WriteRTP(context.Context, *rtp.Packet, map[interface{}]interface{}) error
ReadRTCP(context.Context) ([]rtcp.Packet, error)
WriteRTCP(context.Context, []rtcp.Packet) error
io.Closer
}

type contextReadWriter struct{}
type WriteRTP func(p *rtp.Packet, attributes map[interface{}]interface{}) (int, error)
type ReadRTP func() (*rtp.Packet, map[interface{}]interface{}, error)
type WriteRTCP func(pkts []rtcp.Packet, attributes map[interface{}]interface{}) (int, error)
type ReadRTCP func() ([]rtcp.Packet, map[interface{}]interface{}, error)

type interceptorChain struct {
readWriter ReadWriter
}

type keyReadRTP struct{}
type keyReadRTCP struct{}
type keyWriteRTP struct{}
type keyWriteRTCP struct{}

type writeRTP func(packet *rtp.Packet)
type writeRTCP func(packets []rtcp.Packet)

func (c *contextReadWriter) ReadRTP(ctx context.Context) (*rtp.Packet, map[interface{}]interface{}, error) {
p, ok := ctx.Value(keyReadRTP{}).(*rtp.Packet)
if !ok {
return nil, nil, errors.New("packet not found in context")
}

return p, make(map[interface{}]interface{}), nil
}
type Interceptor interface {
BindReadRTCP(read ReadRTCP) ReadRTCP // TODO: call this
BindWriteRTCP(write WriteRTCP) WriteRTCP // TODO: call this

func (c *contextReadWriter) WriteRTP(ctx context.Context, packet *rtp.Packet, _ map[interface{}]interface{}) error {
writeRTP, ok := ctx.Value(keyWriteRTP{}).(writeRTP)
if !ok {
return errors.New("callback not found in context")
}
writeRTP(packet)
BindLocalTrack(ctx *TrackLocalContext, write WriteRTP) WriteRTP
UnbindLocalTrack(ctx *TrackLocalContext)

return nil
}
BindRemoteTrack(ctx *TrackRemoteContext, read ReadRTP) ReadRTP
UnbindRemoteTrack(ctx *TrackRemoteContext)

func (c *contextReadWriter) ReadRTCP(ctx context.Context) ([]rtcp.Packet, error) {
p, ok := ctx.Value(keyReadRTCP{}).([]rtcp.Packet)
if !ok {
return nil, errors.New("packets not found in context")
}
return p, nil
io.Closer
}

func (c *contextReadWriter) WriteRTCP(ctx context.Context, packets []rtcp.Packet) error {
writeRTCP, ok := ctx.Value(keyWriteRTCP{}).(writeRTCP)
if !ok {
return errors.New("callback not found in context")
}
writeRTCP(packets)
type DummyInterceptor struct{}

return nil
func (d *DummyInterceptor) BindReadRTCP(read ReadRTCP) ReadRTCP {
return read
}

func (c *contextReadWriter) Close() error {
return nil
func (d *DummyInterceptor) BindWriteRTCP(write WriteRTCP) WriteRTCP {
return write
}

func newInterceptorChain(pc *PeerConnection, interceptors []Interceptor) *interceptorChain {
var readWriter ReadWriter = &contextReadWriter{}
for _, interceptor := range interceptors {
readWriter = interceptor.Intercept(pc, readWriter)
}
return &interceptorChain{readWriter: readWriter}
func (d *DummyInterceptor) BindLocalTrack(_ *TrackLocalContext, write WriteRTP) WriteRTP {
return write
}

func (i *interceptorChain) wrapReadRTP(packet *rtp.Packet) (*rtp.Packet, error) {
ctx := context.WithValue(context.Background(), keyReadRTP{}, packet)
p, _, err := i.readWriter.ReadRTP(ctx)
return p, err
}

func (i *interceptorChain) wrapWriteRTP(packet *rtp.Packet) (*rtp.Packet, error) {
var p *rtp.Packet
ctx := context.WithValue(context.Background(), keyWriteRTP{}, func(p2 *rtp.Packet) {
p = p2
})
err := i.readWriter.WriteRTP(ctx, packet, make(map[interface{}]interface{}))
if err != nil {
return nil, err
}
func (d *DummyInterceptor) UnbindLocalTrack(_ *TrackLocalContext) {}

return p, nil
func (d *DummyInterceptor) BindRemoteTrack(_ *TrackRemoteContext, read ReadRTP) ReadRTP {
return read
}

func (i *interceptorChain) wrapReadRTCP(packets []rtcp.Packet) ([]rtcp.Packet, error) {
ctx := context.WithValue(context.Background(), keyReadRTCP{}, packets)
return i.readWriter.ReadRTCP(ctx)
}
func (d *DummyInterceptor) UnbindRemoteTrack(_ *TrackRemoteContext) {}

func (i *interceptorChain) wrapWriteRTCP(packet *rtp.Packet) (*rtp.Packet, error) {
var p *rtp.Packet
ctx := context.WithValue(context.Background(), keyWriteRTP{}, func(p2 *rtp.Packet) {
p = p2
})
err := i.readWriter.WriteRTP(ctx, packet, make(map[interface{}]interface{}))
if err != nil {
return nil, err
}

return p, nil
func (d *DummyInterceptor) Close() error {
return nil
}
73 changes: 73 additions & 0 deletions interceptor_chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package webrtc

import (
"github.com/pion/logging"
)

type interceptorChain struct {
interceptors []Interceptor
log logging.LeveledLogger
}

func newInterceptorChain(interceptors []Interceptor, log logging.LeveledLogger) Interceptor {
return &interceptorChain{interceptors: interceptors, log: log}
}

func (i *interceptorChain) BindReadRTCP(read ReadRTCP) ReadRTCP {
for _, interceptor := range i.interceptors {
read = interceptor.BindReadRTCP(read)
}

return read
}

func (i *interceptorChain) BindWriteRTCP(write WriteRTCP) WriteRTCP {
for _, interceptor := range i.interceptors {
write = interceptor.BindWriteRTCP(write)
}

return write
}

func (i *interceptorChain) BindLocalTrack(ctx *TrackLocalContext, write WriteRTP) WriteRTP {
for _, interceptor := range i.interceptors {
write = interceptor.BindLocalTrack(ctx, write)
}

return write
}

func (i *interceptorChain) UnbindLocalTrack(ctx *TrackLocalContext) {
for _, interceptor := range i.interceptors {
interceptor.UnbindLocalTrack(ctx)
}
}

func (i *interceptorChain) BindRemoteTrack(ctx *TrackRemoteContext, read ReadRTP) ReadRTP {
for _, interceptor := range i.interceptors {
read = interceptor.BindRemoteTrack(ctx, read)
}

return read
}

func (i *interceptorChain) UnbindRemoteTrack(ctx *TrackRemoteContext) {
for _, interceptor := range i.interceptors {
interceptor.UnbindRemoteTrack(ctx)
}
}

func (i *interceptorChain) Close() error {
var err error
for _, interceptor := range i.interceptors {
if err2 := interceptor.Close(); err2 != nil {
if err == nil {
err = err2
} else {
i.log.Warnf("failed closing interceptor: %+v", err2)
}
}
}

return err
}
22 changes: 22 additions & 0 deletions interceptor_peerconnection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package webrtc

import (
"github.com/pion/rtcp"
)

func WrapPeerConnection(pc *PeerConnection, interceptor Interceptor) *PeerConnection {
if interceptor == nil {
return pc
}

writeRTCP := interceptor.BindWriteRTCP(func(pkts []rtcp.Packet, attributes map[interface{}]interface{}) (int, error) {
return pc.writeRTCP(pkts)
})

pc.interceptorWriteRTCP = func(pkts []rtcp.Packet) error {
_, err := writeRTCP(pkts, make(map[interface{}]interface{}))
return err
}

return pc
}
Loading

0 comments on commit ca967fd

Please sign in to comment.