Skip to content

Commit

Permalink
add media statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Dec 23, 2024
1 parent 6101fb5 commit 1362ec8
Show file tree
Hide file tree
Showing 15 changed files with 266 additions and 64 deletions.
33 changes: 33 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2035,5 +2035,38 @@ func (c *Client) Stats() *ClientStats { //nolint:dupl
}
return v
}(),
Medias: func() map[*description.Media]ClientStatsMedia { //nolint:dupl
ret := make(map[*description.Media]ClientStatsMedia, len(c.setuppedMedias))

for med, sm := range c.setuppedMedias {
ret[med] = ClientStatsMedia{
BytesReceived: atomic.LoadUint64(sm.bytesReceived),
BytesSent: atomic.LoadUint64(sm.bytesSent),
RTPPacketsReceived: atomic.LoadUint64(sm.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(sm.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(sm.rtpPacketsLost),
RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError),
RTPJitter: func() float64 {
v := float64(0)
n := float64(0)
for _, fo := range sm.formats {
if fo.rtcpReceiver != nil {
stats := fo.rtcpReceiver.Stats()
if stats != nil {
v += stats.Jitter
n++
}
}
}
return v / n
}(),
RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived),
RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError),
}
}

return ret
}(),
}
}
6 changes: 2 additions & 4 deletions client_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ func (cf *clientFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Ti
func (cf *clientFormat) readRTPUDP(pkt *rtp.Packet) {
packets, lost := cf.udpReorderer.Process(pkt)
if lost != 0 {
atomic.AddUint64(cf.cm.rtpPacketsLost, uint64(lost))
cf.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost})
cf.cm.onRTPPacketLost(lost)
// do not return
}

Expand All @@ -111,8 +110,7 @@ func (cf *clientFormat) readRTPUDP(pkt *rtp.Packet) {
func (cf *clientFormat) readRTPTCP(pkt *rtp.Packet) {
lost := cf.tcpLossDetector.Process(pkt)
if lost != 0 {
atomic.AddUint64(cf.cm.rtpPacketsLost, uint64(lost))
cf.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost})
cf.cm.onRTPPacketLost(lost)
// do not return
}

Expand Down
37 changes: 30 additions & 7 deletions client_media.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type clientMedia struct {
tcpBuffer []byte
writePacketRTPInQueue func([]byte) error
writePacketRTCPInQueue func([]byte) error
bytesReceived *uint64
bytesSent *uint64
rtpPacketsReceived *uint64
rtpPacketsSent *uint64
rtpPacketsLost *uint64
Expand All @@ -38,6 +40,8 @@ type clientMedia struct {

func (cm *clientMedia) initialize() {
cm.onPacketRTCP = func(rtcp.Packet) {}
cm.bytesReceived = new(uint64)
cm.bytesSent = new(uint64)
cm.rtpPacketsReceived = new(uint64)
cm.rtpPacketsSent = new(uint64)
cm.rtpPacketsLost = new(uint64)
Expand Down Expand Up @@ -173,6 +177,7 @@ func (cm *clientMedia) writePacketRTPInQueueUDP(payload []byte) error {
}

Check warning on line 177 in client_media.go

View check run for this annotation

Codecov / codecov/patch

client_media.go#L176-L177

Added lines #L176 - L177 were not covered by tests

atomic.AddUint64(cm.c.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtpPacketsSent, 1)
return nil
}
Expand All @@ -184,6 +189,7 @@ func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error {
}

Check warning on line 189 in client_media.go

View check run for this annotation

Codecov / codecov/patch

client_media.go#L188-L189

Added lines #L188 - L189 were not covered by tests

atomic.AddUint64(cm.c.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtcpPacketsSent, 1)
return nil
}
Expand All @@ -196,6 +202,7 @@ func (cm *clientMedia) writePacketRTPInQueueTCP(payload []byte) error {
return err
}

Check warning on line 203 in client_media.go

View check run for this annotation

Codecov / codecov/patch

client_media.go#L202-L203

Added lines #L202 - L203 were not covered by tests

atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtpPacketsSent, 1)
return nil
}
Expand All @@ -208,6 +215,7 @@ func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error {
return err
}

Check warning on line 216 in client_media.go

View check run for this annotation

Codecov / codecov/patch

client_media.go#L215-L216

Added lines #L215 - L216 were not covered by tests

atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtcpPacketsSent, 1)
return nil
}
Expand All @@ -224,6 +232,8 @@ func (cm *clientMedia) writePacketRTCP(byts []byte) error {
}

func (cm *clientMedia) readRTPTCPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))

now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())

Expand All @@ -246,6 +256,8 @@ func (cm *clientMedia) readRTPTCPPlay(payload []byte) bool {
}

func (cm *clientMedia) readRTCPTCPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))

now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())

Expand Down Expand Up @@ -281,6 +293,8 @@ func (cm *clientMedia) readRTPTCPRecord(_ []byte) bool {
}

func (cm *clientMedia) readRTCPTCPRecord(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))

if len(payload) > udpMaxPayloadSize {
cm.onRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
return false
Expand All @@ -302,9 +316,10 @@ func (cm *clientMedia) readRTCPTCPRecord(payload []byte) bool {
}

func (cm *clientMedia) readRTPUDPPlay(payload []byte) bool {
plen := len(payload)
atomic.AddUint64(cm.c.bytesReceived, uint64(len(payload)))
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))

if plen == (udpMaxPayloadSize + 1) {
if len(payload) == (udpMaxPayloadSize + 1) {
cm.onRTPDecodeError(liberrors.ErrClientRTPPacketTooBigUDP{})
return false
}
Expand All @@ -328,10 +343,10 @@ func (cm *clientMedia) readRTPUDPPlay(payload []byte) bool {
}

func (cm *clientMedia) readRTCPUDPPlay(payload []byte) bool {
now := cm.c.timeNow()
plen := len(payload)
atomic.AddUint64(cm.c.bytesReceived, uint64(len(payload)))
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))

if plen == (udpMaxPayloadSize + 1) {
if len(payload) == (udpMaxPayloadSize + 1) {
cm.onRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{})
return false
}
Expand All @@ -342,6 +357,8 @@ func (cm *clientMedia) readRTCPUDPPlay(payload []byte) bool {
return false
}

now := cm.c.timeNow()

atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))

for _, pkt := range packets {
Expand All @@ -363,9 +380,10 @@ func (cm *clientMedia) readRTPUDPRecord(_ []byte) bool {
}

func (cm *clientMedia) readRTCPUDPRecord(payload []byte) bool {
plen := len(payload)
atomic.AddUint64(cm.c.bytesReceived, uint64(len(payload)))
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))

if plen == (udpMaxPayloadSize + 1) {
if len(payload) == (udpMaxPayloadSize + 1) {
cm.onRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{})
return false
}
Expand Down Expand Up @@ -394,3 +412,8 @@ func (cm *clientMedia) onRTCPDecodeError(err error) {
atomic.AddUint64(cm.rtcpPacketsInError, 1)
cm.c.OnDecodeError(err)
}

func (cm *clientMedia) onRTPPacketLost(lost uint) {
atomic.AddUint64(cm.rtpPacketsLost, uint64(lost))
cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost})
}
29 changes: 29 additions & 0 deletions client_stats.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
package gortsplib

import "github.com/bluenviron/gortsplib/v4/pkg/description"

// ClientStatsMedia are client media statistics.
type ClientStatsMedia struct {
// received bytes
BytesReceived uint64
// sent bytes
BytesSent uint64
// number of RTP packets correctly received and processed
RTPPacketsReceived uint64
// number of sent RTP packets
RTPPacketsSent uint64
// number of lost RTP packets
RTPPacketsLost uint64
// number of RTP packets that could not be processed
RTPPacketsInError uint64
// mean jitter of received RTP packets
RTPJitter float64
// number of RTCP packets correctly received and processed
RTCPPacketsReceived uint64
// number of sent RTCP packets
RTCPPacketsSent uint64
// number of RTCP packets that could not be processed
RTCPPacketsInError uint64
}

// ClientStats are Client statistics.
type ClientStats struct {
// received bytes
Expand All @@ -22,4 +48,7 @@ type ClientStats struct {
RTCPPacketsSent uint64
// number of RTCP packets that could not be processed
RTCPPacketsInError uint64

// media statistics
Medias map[*description.Media]ClientStatsMedia
}
2 changes: 0 additions & 2 deletions client_udp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ func (u *clientUDPListener) run() {
now := u.c.timeNow()
atomic.StoreInt64(u.lastPacketTime, now.Unix())

atomic.AddUint64(u.c.bytesReceived, uint64(n))

if u.readFunc(buf[:n]) {
createNewBuffer()
}
Expand Down
3 changes: 0 additions & 3 deletions server_conn_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package gortsplib
import (
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/bluenviron/gortsplib/v4/pkg/base"
Expand Down Expand Up @@ -131,8 +130,6 @@ func (cr *serverConnReader) readFuncTCP() error {
return liberrors.ErrServerUnexpectedResponse{}

case *base.InterleavedFrame:
atomic.AddUint64(cr.sc.session.bytesReceived, uint64(len(what.Payload)))

if cb, ok := cr.sc.session.tcpCallbackByChannel[what.Channel]; ok {
cb(what.Payload)
}
Expand Down
4 changes: 3 additions & 1 deletion server_conn_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package gortsplib

// ServerConnStats are ServerConn statistics.
type ServerConnStats struct {
// received bytes
BytesReceived uint64
BytesSent uint64
// sent bytes
BytesSent uint64
}
74 changes: 56 additions & 18 deletions server_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,6 @@ type ServerSession struct {
udpCheckStreamTimer *time.Timer
writer *asyncProcessor
timeDecoder *rtptime.GlobalDecoder2
bytesReceived *uint64
bytesSent *uint64

// in
chHandleRequest chan sessionRequestReq
Expand All @@ -273,8 +271,6 @@ func (ss *ServerSession) initialize() {
ss.conns = make(map[*ServerConn]struct{})
ss.lastRequestTime = ss.s.timeNow()
ss.udpCheckStreamTimer = emptyTimer()
ss.bytesReceived = new(uint64)
ss.bytesSent = new(uint64)

ss.chHandleRequest = make(chan sessionRequestReq)
ss.chRemoveConn = make(chan *ServerConn)
Expand All @@ -293,14 +289,22 @@ func (ss *ServerSession) Close() {
//
// Deprecated: replaced by Stats()
func (ss *ServerSession) BytesReceived() uint64 {
return atomic.LoadUint64(ss.bytesReceived)
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.bytesReceived)
}
return v

Check warning on line 296 in server_session.go

View check run for this annotation

Codecov / codecov/patch

server_session.go#L292-L296

Added lines #L292 - L296 were not covered by tests
}

// BytesSent returns the number of written bytes.
//
// Deprecated: replaced by Stats()
func (ss *ServerSession) BytesSent() uint64 {
return atomic.LoadUint64(ss.bytesSent)
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.bytesSent)
}
return v

Check warning on line 307 in server_session.go

View check run for this annotation

Codecov / codecov/patch

server_session.go#L303-L307

Added lines #L303 - L307 were not covered by tests
}

// State returns the state of the session.
Expand Down Expand Up @@ -355,8 +359,20 @@ func (ss *ServerSession) UserData() interface{} {
// Stats returns server session statistics.
func (ss *ServerSession) Stats() *ServerSessionStats { //nolint:dupl
return &ServerSessionStats{
BytesReceived: atomic.LoadUint64(ss.bytesReceived),
BytesSent: atomic.LoadUint64(ss.bytesSent),
BytesReceived: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.bytesReceived)
}
return v
}(),
BytesSent: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.bytesSent)
}
return v
}(),
RTPPacketsReceived: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
Expand Down Expand Up @@ -422,17 +438,39 @@ func (ss *ServerSession) Stats() *ServerSessionStats { //nolint:dupl
}
return v
}(),
}
}
Medias: func() map[*description.Media]ServerSessionStatsMedia { //nolint:dupl
ret := make(map[*description.Media]ServerSessionStatsMedia, len(ss.setuppedMedias))

for med, sm := range ss.setuppedMedias {
ret[med] = ServerSessionStatsMedia{
BytesReceived: atomic.LoadUint64(sm.bytesReceived),
BytesSent: atomic.LoadUint64(sm.bytesSent),
RTPPacketsReceived: atomic.LoadUint64(sm.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(sm.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(sm.rtpPacketsLost),
RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError),
RTPJitter: func() float64 {
v := float64(0)
n := float64(0)
for _, fo := range sm.formats {
if fo.rtcpReceiver != nil {
stats := fo.rtcpReceiver.Stats()
if stats != nil {
v += stats.Jitter
n++
}

Check warning on line 461 in server_session.go

View check run for this annotation

Codecov / codecov/patch

server_session.go#L457-L461

Added lines #L457 - L461 were not covered by tests
}
}
return v / n
}(),
RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived),
RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError),
}
}

func (ss *ServerSession) onPacketLost(err error) {
if h, ok := ss.s.Handler.(ServerHandlerOnPacketLost); ok {
h.OnPacketLost(&ServerHandlerOnPacketLostCtx{
Session: ss,
Error: err,
})
} else {
log.Println(err.Error())
return ret
}(),
}
}

Expand Down
Loading

0 comments on commit 1362ec8

Please sign in to comment.