Skip to content

Commit

Permalink
Rewrite Receiver/Sender classes
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Jun 5, 2024
1 parent e0b1a50 commit 31e4ba2
Show file tree
Hide file tree
Showing 4 changed files with 320 additions and 160 deletions.
5 changes: 5 additions & 0 deletions pkg/core/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"encoding/base64"
"encoding/json"
"fmt"
"strconv"
"strings"
Expand All @@ -18,6 +19,10 @@ type Codec struct {
PayloadType uint8
}

func (c *Codec) MarshalJSON() ([]byte, error) {
return json.Marshal(c.String())
}

func (c *Codec) String() string {
s := fmt.Sprintf("%d %s", c.PayloadType, c.Name)
if c.ClockRate != 0 && c.ClockRate != 90000 {
Expand Down
120 changes: 120 additions & 0 deletions pkg/core/core_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package core

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

type producer struct {
Medias []*Media
Receivers []*Receiver

id byte
}

func (p *producer) GetMedias() []*Media {
return p.Medias
}

func (p *producer) GetTrack(_ *Media, codec *Codec) (*Receiver, error) {
for _, receiver := range p.Receivers {
if receiver.Codec == codec {
return receiver, nil
}
}
receiver := NewReceiver(nil, codec)
p.Receivers = append(p.Receivers, receiver)
return receiver, nil
}

func (p *producer) Start() error {
pkt := &Packet{Payload: []byte{p.id}}
p.Receivers[0].Input(pkt)
return nil
}

func (p *producer) Stop() error {
for _, receiver := range p.Receivers {
receiver.Close()
}
return nil
}

type consumer struct {
Medias []*Media
Senders []*Sender

cache chan byte
}

func (c *consumer) GetMedias() []*Media {
return c.Medias
}

func (c *consumer) AddTrack(_ *Media, _ *Codec, track *Receiver) error {
c.cache = make(chan byte, 1)
sender := NewSender(nil, track.Codec)
sender.Output = func(packet *Packet) {
c.cache <- packet.Payload[0]
}
sender.HandleRTP(track)
c.Senders = append(c.Senders, sender)
return nil
}

func (c *consumer) Stop() error {
for _, sender := range c.Senders {
sender.Close()
}
return nil
}

func (c *consumer) read() byte {
return <-c.cache
}

func TestName(t *testing.T) {
GetProducer := func(b byte) Producer {
return &producer{
Medias: []*Media{
{
Kind: KindVideo,
Direction: DirectionRecvonly,
Codecs: []*Codec{
{Name: CodecH264},
},
},
},
id: b,
}
}

// stage1
prod1 := GetProducer(1)
cons2 := &consumer{}

media1 := prod1.GetMedias()[0]
track1, _ := prod1.GetTrack(media1, media1.Codecs[0])

_ = cons2.AddTrack(nil, nil, track1)

_ = prod1.Start()
require.Equal(t, byte(1), cons2.read())

// stage2
prod2 := GetProducer(2)
media2 := prod2.GetMedias()[0]
require.NotEqual(t, fmt.Sprintf("%p", media1), fmt.Sprintf("%p", media2))
track2, _ := prod2.GetTrack(media2, media2.Codecs[0])
track1.Replace(track2)

_ = prod1.Stop()

_ = prod2.Start()
require.Equal(t, byte(2), cons2.read())

// stage3
_ = prod2.Stop()
}
87 changes: 87 additions & 0 deletions pkg/core/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package core

import (
"sync"

"github.com/pion/rtp"
)

//type Packet struct {
// Payload []byte
// Timestamp uint32 // PTS if DTS == 0 else DTS
// Composition uint32 // CTS = PTS-DTS (for support B-frames)
// Sequence uint16
//}

type Packet = rtp.Packet

// HandlerFunc - process input packets (just like http.HandlerFunc)
type HandlerFunc func(packet *Packet)

// Filter - a decorator for any HandlerFunc
type Filter func(handler HandlerFunc) HandlerFunc

// Node - Receiver or Sender or Filter (transform)
type Node struct {
Codec *Codec `json:"codec"`
Input HandlerFunc `json:"-"`
Output HandlerFunc `json:"-"`

childs []*Node
parent *Node

mu sync.Mutex
}

func (n *Node) WithParent(parent *Node) *Node {
parent.AppendChild(n)
return n
}

func (n *Node) AppendChild(child *Node) {
n.mu.Lock()
n.childs = append(n.childs, child)
n.mu.Unlock()

child.parent = n
}

func (n *Node) RemoveChild(child *Node) {
n.mu.Lock()
for i, ch := range n.childs {
if ch == child {
n.childs = append(n.childs[:i], n.childs[i+1:]...)
break
}
}
n.mu.Unlock()
}

func (n *Node) Close() {
if parent := n.parent; parent != nil {
parent.RemoveChild(n)

if len(parent.childs) == 0 {
parent.Close()
}
} else {
for _, childs := range n.childs {
childs.Close()
}
}
}

func MoveNode(dst, src *Node) {
src.mu.Lock()
childs := src.childs
src.childs = nil
src.mu.Unlock()

dst.mu.Lock()
dst.childs = childs
dst.mu.Unlock()

for _, child := range childs {
child.parent = dst
}
}
Loading

0 comments on commit 31e4ba2

Please sign in to comment.