Skip to content

Commit

Permalink
use a single TCP outgoing buffer each client / session (#665)
Browse files Browse the repository at this point in the history
this saves memory.
  • Loading branch information
aler9 authored Dec 24, 2024
1 parent 6750427 commit 5506eb2
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 87 deletions.
31 changes: 28 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ type Client struct {
reader *clientReader
timeDecoder *rtptime.GlobalDecoder2
mustClose bool
tcpFrame *base.InterleavedFrame
tcpBuffer []byte

// in
chOptions chan optionsReq
Expand Down Expand Up @@ -856,6 +858,11 @@ func (c *Client) startTransportRoutines() {
cm.start()
}

if *c.effectiveTransport == TransportTCP {
c.tcpFrame = &base.InterleavedFrame{}
c.tcpBuffer = make([]byte, c.MaxPacketSize+4)
}

if c.state == clientStatePlay && c.stdChannelSetupped {
c.keepaliveTimer = time.NewTimer(c.keepalivePeriod)

Expand Down Expand Up @@ -1902,8 +1909,18 @@ func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet,
}

cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return ct.writePacketRTP(byts, pkt, ntp)
cf := cm.formats[pkt.PayloadType]

cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt))

ok := c.writer.push(func() error {
return cm.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}

return nil
}

// WritePacketRTCP writes a RTCP packet to the server.
Expand All @@ -1920,7 +1937,15 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error
}

cm := c.medias[medi]
return cm.writePacketRTCP(byts)

ok := c.writer.push(func() error {
return cm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}

return nil
}

// PacketPTS returns the PTS of an incoming RTP packet.
Expand Down
15 changes: 0 additions & 15 deletions client_format.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package gortsplib

import (
"time"

"github.com/pion/rtcp"
"github.com/pion/rtp"

Expand Down Expand Up @@ -71,19 +69,6 @@ func (cf *clientFormat) stop() {
}
}

func (cf *clientFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {
cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt))

ok := cf.cm.c.writer.push(func() error {
return cf.cm.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}

return nil
}

func (cf *clientFormat) readRTPUDP(pkt *rtp.Packet) {
packets, lost := cf.udpReorderer.Process(pkt)
if lost != 0 {
Expand Down
29 changes: 6 additions & 23 deletions client_media.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"

"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
)
Expand All @@ -22,9 +21,6 @@ type clientMedia struct {
tcpChannel int
udpRTPListener *clientUDPListener
udpRTCPListener *clientUDPListener
tcpRTPFrame *base.InterleavedFrame
tcpRTCPFrame *base.InterleavedFrame
tcpBuffer []byte
writePacketRTPInQueue func([]byte) error
writePacketRTCPInQueue func([]byte) error
}
Expand Down Expand Up @@ -115,10 +111,6 @@ func (cm *clientMedia) start() {
cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readRTPTCPPlay
cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readRTCPTCPPlay
}

cm.tcpRTPFrame = &base.InterleavedFrame{Channel: cm.tcpChannel}
cm.tcpRTCPFrame = &base.InterleavedFrame{Channel: cm.tcpChannel + 1}
cm.tcpBuffer = make([]byte, cm.c.MaxPacketSize+4)
}

for _, ct := range cm.formats {
Expand Down Expand Up @@ -161,26 +153,17 @@ func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error {
}

func (cm *clientMedia) writePacketRTPInQueueTCP(payload []byte) error {
cm.tcpRTPFrame.Payload = payload
cm.c.tcpFrame.Channel = cm.tcpChannel
cm.c.tcpFrame.Payload = payload
cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout))
return cm.c.conn.WriteInterleavedFrame(cm.tcpRTPFrame, cm.tcpBuffer)
return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer)
}

func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error {
cm.tcpRTCPFrame.Payload = payload
cm.c.tcpFrame.Channel = cm.tcpChannel + 1
cm.c.tcpFrame.Payload = payload
cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout))
return cm.c.conn.WriteInterleavedFrame(cm.tcpRTCPFrame, cm.tcpBuffer)
}

func (cm *clientMedia) writePacketRTCP(byts []byte) error {
ok := cm.c.writer.push(func() error {
return cm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}

return nil
return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer)
}

func (cm *clientMedia) readRTPTCPPlay(payload []byte) bool {
Expand Down
8 changes: 4 additions & 4 deletions server_multicast_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func (h *serverMulticastWriter) ip() net.IP {
return h.rtpl.ip()
}

func (h *serverMulticastWriter) writePacketRTP(payload []byte) error {
func (h *serverMulticastWriter) writePacketRTP(byts []byte) error {
ok := h.writer.push(func() error {
return h.rtpl.write(payload, h.rtpAddr)
return h.rtpl.write(byts, h.rtpAddr)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
Expand All @@ -78,9 +78,9 @@ func (h *serverMulticastWriter) writePacketRTP(payload []byte) error {
return nil
}

func (h *serverMulticastWriter) writePacketRTCP(payload []byte) error {
func (h *serverMulticastWriter) writePacketRTCP(byts []byte) error {
ok := h.writer.push(func() error {
return h.rtcpl.write(payload, h.rtcpAddr)
return h.rtcpl.write(byts, h.rtcpAddr)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
Expand Down
32 changes: 30 additions & 2 deletions server_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ type ServerSession struct {
udpCheckStreamTimer *time.Timer
writer *asyncProcessor
timeDecoder *rtptime.GlobalDecoder2
tcpFrame *base.InterleavedFrame
tcpBuffer []byte

// in
chHandleRequest chan sessionRequestReq
Expand Down Expand Up @@ -978,6 +980,11 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
sm.start()
}

if *ss.setuppedTransport == TransportTCP {
ss.tcpFrame = &base.InterleavedFrame{}
ss.tcpBuffer = make([]byte, ss.s.MaxPacketSize+4)
}

switch *ss.setuppedTransport {
case TransportUDP:
ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
Expand Down Expand Up @@ -1067,6 +1074,11 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
sm.start()
}

if *ss.setuppedTransport == TransportTCP {
ss.tcpFrame = &base.InterleavedFrame{}
ss.tcpBuffer = make([]byte, ss.s.MaxPacketSize+4)
}

switch *ss.setuppedTransport {
case TransportUDP:
ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod)
Expand Down Expand Up @@ -1254,7 +1266,15 @@ func (ss *ServerSession) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFu

func (ss *ServerSession) writePacketRTP(medi *description.Media, byts []byte) error {
sm := ss.setuppedMedias[medi]
return sm.writePacketRTP(byts)

ok := sm.ss.writer.push(func() error {
return sm.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}

return nil
}

// WritePacketRTP writes a RTP packet to the session.
Expand All @@ -1271,7 +1291,15 @@ func (ss *ServerSession) WritePacketRTP(medi *description.Media, pkt *rtp.Packet

func (ss *ServerSession) writePacketRTCP(medi *description.Media, byts []byte) error {
sm := ss.setuppedMedias[medi]
return sm.writePacketRTCP(byts)

ok := ss.writer.push(func() error {
return sm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}

return nil
}

// WritePacketRTCP writes a RTCP packet to the session.
Expand Down
40 changes: 6 additions & 34 deletions server_session_media.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"

"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
)
Expand All @@ -23,9 +22,6 @@ type serverSessionMedia struct {
udpRTPWriteAddr *net.UDPAddr
udpRTCPReadPort int
udpRTCPWriteAddr *net.UDPAddr
tcpRTPFrame *base.InterleavedFrame
tcpRTCPFrame *base.InterleavedFrame
tcpBuffer []byte
formats map[uint8]*serverSessionFormat // record only
writePacketRTPInQueue func([]byte) error
writePacketRTCPInQueue func([]byte) error
Expand Down Expand Up @@ -87,10 +83,6 @@ func (sm *serverSessionMedia) start() {
sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readRTPTCPRecord
sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readRTCPTCPRecord
}

sm.tcpRTPFrame = &base.InterleavedFrame{Channel: sm.tcpChannel}
sm.tcpRTCPFrame = &base.InterleavedFrame{Channel: sm.tcpChannel + 1}
sm.tcpBuffer = make([]byte, sm.ss.s.MaxPacketSize+4)
}
}

Expand Down Expand Up @@ -127,38 +119,18 @@ func (sm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error {

func (sm *serverSessionMedia) writePacketRTPInQueueTCP(payload []byte) error {
atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload)))
sm.tcpRTPFrame.Payload = payload
sm.ss.tcpFrame.Channel = sm.tcpChannel
sm.ss.tcpFrame.Payload = payload
sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout))
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.tcpRTPFrame, sm.tcpBuffer)
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
}

func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error {
atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload)))
sm.tcpRTCPFrame.Payload = payload
sm.ss.tcpFrame.Channel = sm.tcpChannel + 1
sm.ss.tcpFrame.Payload = payload
sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout))
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.tcpRTCPFrame, sm.tcpBuffer)
}

func (sm *serverSessionMedia) writePacketRTP(payload []byte) error {
ok := sm.ss.writer.push(func() error {
return sm.writePacketRTPInQueue(payload)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}

return nil
}

func (sm *serverSessionMedia) writePacketRTCP(payload []byte) error {
ok := sm.ss.writer.push(func() error {
return sm.writePacketRTCPInQueue(payload)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}

return nil
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
}

func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) bool {
Expand Down
5 changes: 2 additions & 3 deletions server_stream_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t

// send unicast
for r := range sf.sm.st.activeUnicastReaders {
sm, ok := r.setuppedMedias[sf.sm.media]
if ok {
err := sm.writePacketRTP(byts)
if _, ok := r.setuppedMedias[sf.sm.media]; ok {
err := r.writePacketRTP(sf.sm.media, byts)
if err != nil {
r.onStreamWriteError(err)
} else {
Expand Down
5 changes: 2 additions & 3 deletions server_stream_media.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ func (sm *serverStreamMedia) close() {
func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error {
// send unicast
for r := range sm.st.activeUnicastReaders {
sm, ok := r.setuppedMedias[sm.media]
if ok {
err := sm.writePacketRTCP(byts)
if _, ok := r.setuppedMedias[sm.media]; ok {
err := r.writePacketRTCP(sm.media, byts)
if err != nil {
r.onStreamWriteError(err)
}
Expand Down

0 comments on commit 5506eb2

Please sign in to comment.