From f0174cce5855358eff092d9f7ea70998f8249286 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Thu, 12 Dec 2024 22:29:16 +0100 Subject: [PATCH] add statistics to Client, ServerSession, ServerConn, ServerStream (#556) --- client.go | 279 +++++++++++++++--- client_format.go | 83 ++++-- client_media.go | 182 +++++++----- client_play_test.go | 9 +- client_record_test.go | 10 +- client_stats.go | 7 + client_udp_listener.go | 4 - internal/rtcpreceiver/rtcpreceiver.go | 64 ++-- internal/rtcpreceiver/rtcpreceiver_test.go | 52 ++-- internal/rtcpsender/rtcpsender.go | 43 ++- internal/rtcpsender/rtcpsender_test.go | 6 +- internal/rtplossdetector/lossdetector.go | 4 +- internal/rtplossdetector/lossdetector_test.go | 6 +- internal/rtpreorderer/reorderer.go | 4 +- internal/rtpreorderer/reorderer_test.go | 14 +- pkg/liberrors/client.go | 2 +- pkg/rtcpreceiver/rtcpreceiver.go | 10 +- pkg/rtcpsender/rtcpsender.go | 16 +- pkg/rtplossdetector/lossdetector.go | 2 +- pkg/rtplossdetector/lossdetector_test.go | 6 +- pkg/rtpreorderer/reorderer.go | 2 +- server_conn.go | 12 + server_conn_reader.go | 3 - server_play_test.go | 24 +- server_record_test.go | 18 +- server_session.go | 247 +++++++++++++--- server_session_format.go | 90 +++++- server_session_media.go | 185 +++++++----- server_stream.go | 110 +++++-- server_stream_format.go | 19 +- server_stream_media.go | 16 + server_stream_stats.go | 36 +++ stats_conn.go | 9 + stats_session.go | 76 +++++ 34 files changed, 1246 insertions(+), 404 deletions(-) create mode 100644 client_stats.go create mode 100644 server_stream_stats.go create mode 100644 stats_conn.go create mode 100644 stats_session.go diff --git a/client.go b/client.go index d1be2db2..ce61cd83 100644 --- a/client.go +++ b/client.go @@ -19,6 +19,8 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" + "github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver" + "github.com/bluenviron/gortsplib/v4/internal/rtcpsender" "github.com/bluenviron/gortsplib/v4/pkg/auth" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/bytecounter" @@ -269,8 +271,10 @@ type Client struct { // explicitly request back channels to the server. RequestBackChannels bool // pointer to a variable that stores received bytes. + // Deprecated: use Client.Stats() BytesReceived *uint64 // pointer to a variable that stores sent bytes. + // Deprecated: use Client.Stats() BytesSent *uint64 // @@ -326,7 +330,7 @@ type Client struct { effectiveTransport *Transport backChannelSetupped bool stdChannelSetupped bool - medias map[*description.Media]*clientMedia + setuppedMedias map[*description.Media]*clientMedia tcpCallbackByChannel map[int]readFunc lastRange *headers.Range checkTimeoutTimer *time.Timer @@ -341,6 +345,8 @@ type Client struct { mustClose bool tcpFrame *base.InterleavedFrame tcpBuffer []byte + bytesReceived *uint64 + bytesSent *uint64 // in chOptions chan optionsReq @@ -380,12 +386,6 @@ func (c *Client) Start(scheme string, host string) error { if c.UserAgent == "" { c.UserAgent = "gortsplib" } - if c.BytesReceived == nil { - c.BytesReceived = new(uint64) - } - if c.BytesSent == nil { - c.BytesSent = new(uint64) - } // system functions if c.DialContext == nil { @@ -454,6 +454,18 @@ func (c *Client) Start(scheme string, host string) error { c.checkTimeoutTimer = emptyTimer() c.keepalivePeriod = 30 * time.Second c.keepaliveTimer = emptyTimer() + + if c.BytesReceived != nil { + c.bytesReceived = c.BytesReceived + } else { + c.bytesReceived = new(uint64) + } + if c.BytesSent != nil { + c.bytesSent = c.BytesSent + } else { + c.bytesSent = new(uint64) + } + c.chOptions = make(chan optionsReq) c.chDescribe = make(chan describeReq) c.chAnnounce = make(chan announceReq) @@ -739,7 +751,7 @@ func (c *Client) doClose() { c.conn = nil } - for _, cm := range c.medias { + for _, cm := range c.setuppedMedias { cm.close() } } @@ -757,7 +769,7 @@ func (c *Client) reset() { c.effectiveTransport = nil c.backChannelSetupped = false c.stdChannelSetupped = false - c.medias = nil + c.setuppedMedias = nil c.tcpCallbackByChannel = nil } @@ -781,7 +793,7 @@ func (c *Client) trySwitchingProtocol() error { prevConnURL := c.connURL prevBaseURL := c.baseURL - prevMedias := c.medias + prevMedias := c.setuppedMedias c.reset() @@ -801,9 +813,9 @@ func (c *Client) trySwitchingProtocol() error { return err } - c.medias[i].onPacketRTCP = cm.onPacketRTCP + c.setuppedMedias[i].onPacketRTCP = cm.onPacketRTCP for j, tr := range cm.formats { - c.medias[i].formats[j].onPacketRTP = tr.onPacketRTP + c.setuppedMedias[i].formats[j].onPacketRTP = tr.onPacketRTP } } @@ -854,7 +866,7 @@ func (c *Client) startTransportRoutines() { c.timeDecoder = rtptime.NewGlobalDecoder2() - for _, cm := range c.medias { + for _, cm := range c.setuppedMedias { cm.start() } @@ -894,7 +906,7 @@ func (c *Client) stopTransportRoutines() { c.checkTimeoutTimer = emptyTimer() c.keepaliveTimer = emptyTimer() - for _, cm := range c.medias { + for _, cm := range c.setuppedMedias { cm.stop() } @@ -935,7 +947,7 @@ func (c *Client) connOpen() error { } c.nconn = nconn - bc := bytecounter.New(c.nconn, c.BytesReceived, c.BytesSent) + bc := bytecounter.New(c.nconn, c.bytesReceived, c.bytesSent) c.conn = conn.NewConn(bc) c.reader = &clientReader{ c: c, @@ -1021,7 +1033,7 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error } func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool { - for _, ct := range c.medias { + for _, ct := range c.setuppedMedias { lft := atomic.LoadInt64(ct.udpRTPListener.lastPacketTime) if lft != 0 { return true @@ -1037,7 +1049,7 @@ func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool { func (c *Client) isInUDPTimeout() bool { now := c.timeNow() - for _, ct := range c.medias { + for _, ct := range c.setuppedMedias { lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0) if now.Sub(lft) < c.ReadTimeout { return false @@ -1347,9 +1359,10 @@ func (c *Client) doSetup( } cm := &clientMedia{ - c: c, - onPacketRTCP: func(rtcp.Packet) {}, + c: c, + media: medi, } + cm.initialize() if c.effectiveTransport == nil { if c.connURL.Scheme == "rtsps" { // always use TCP if encrypted @@ -1583,12 +1596,11 @@ func (c *Client) doSetup( cm.tcpChannel = thRes.InterleavedIDs[0] } - if c.medias == nil { - c.medias = make(map[*description.Media]*clientMedia) + if c.setuppedMedias == nil { + c.setuppedMedias = make(map[*description.Media]*clientMedia) } - c.medias[medi] = cm - cm.setMedia(medi) + c.setuppedMedias[medi] = cm c.baseURL = baseURL c.effectiveTransport = &desiredTransport @@ -1607,7 +1619,7 @@ func (c *Client) doSetup( } func (c *Client) isChannelPairInUse(channel int) bool { - for _, cm := range c.medias { + for _, cm := range c.setuppedMedias { if (cm.tcpChannel+1) == channel || cm.tcpChannel == channel || cm.tcpChannel == (channel+1) { return true } @@ -1712,7 +1724,7 @@ func (c *Client) doPlay(ra *headers.Range) (*base.Response, error) { // don't do this with multicast, otherwise the RTP packet is going to be broadcasted // to all listeners, including us, messing up the stream. if *c.effectiveTransport == TransportUDP { - for _, cm := range c.medias { + for _, cm := range c.setuppedMedias { byts, _ := (&rtp.Packet{Header: rtp.Header{Version: 2}}).Marshal() cm.udpRTPListener.write(byts) //nolint:errcheck @@ -1852,9 +1864,9 @@ func (c *Client) Seek(ra *headers.Range) (*base.Response, error) { return c.Play(ra) } -// OnPacketRTPAny sets the callback that is called when a RTP packet is read from any setupped media. +// OnPacketRTPAny sets a callback that is called when a RTP packet is read from any setupped media. func (c *Client) OnPacketRTPAny(cb OnPacketRTPAnyFunc) { - for _, cm := range c.medias { + for _, cm := range c.setuppedMedias { cmedia := cm.media for _, forma := range cm.media.Formats { c.OnPacketRTP(cm.media, forma, func(pkt *rtp.Packet) { @@ -1864,9 +1876,9 @@ func (c *Client) OnPacketRTPAny(cb OnPacketRTPAnyFunc) { } } -// OnPacketRTCPAny sets the callback that is called when a RTCP packet is read from any setupped media. +// OnPacketRTCPAny sets a callback that is called when a RTCP packet is read from any setupped media. func (c *Client) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) { - for _, cm := range c.medias { + for _, cm := range c.setuppedMedias { cmedia := cm.media c.OnPacketRTCP(cm.media, func(pkt rtcp.Packet) { cb(cmedia, pkt) @@ -1874,16 +1886,16 @@ func (c *Client) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) { } } -// OnPacketRTP sets the callback that is called when a RTP packet is read. +// OnPacketRTP sets a callback that is called when a RTP packet is read. func (c *Client) OnPacketRTP(medi *description.Media, forma format.Format, cb OnPacketRTPFunc) { - cm := c.medias[medi] + cm := c.setuppedMedias[medi] ct := cm.formats[forma.PayloadType()] ct.onPacketRTP = cb } -// OnPacketRTCP sets the callback that is called when a RTCP packet is read. +// OnPacketRTCP sets a callback that is called when a RTCP packet is read. func (c *Client) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFunc) { - cm := c.medias[medi] + cm := c.setuppedMedias[medi] cm.onPacketRTCP = cb } @@ -1908,13 +1920,13 @@ func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, default: } - cm := c.medias[medi] + cm := c.setuppedMedias[medi] cf := cm.formats[pkt.PayloadType] - cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt)) + cf.rtcpSender.ProcessPacketRTP(pkt, ntp, cf.format.PTSEqualsDTS(pkt)) ok := c.writer.push(func() error { - return cm.writePacketRTPInQueue(byts) + return cf.writePacketRTPInQueue(byts) }) if !ok { return liberrors.ErrClientWriteQueueFull{} @@ -1936,7 +1948,7 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error default: } - cm := c.medias[medi] + cm := c.setuppedMedias[medi] ok := c.writer.push(func() error { return cm.writePacketRTCPInQueue(byts) @@ -1953,7 +1965,7 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error // // Deprecated: replaced by PacketPTS2. func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) { - cm := c.medias[medi] + cm := c.setuppedMedias[medi] ct := cm.formats[pkt.PayloadType] v, ok := c.timeDecoder.Decode(ct.format, pkt) @@ -1967,7 +1979,7 @@ func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Durat // PacketPTS2 returns the PTS of an incoming RTP packet. // It is computed by decoding the packet timestamp and sychronizing it with other tracks. func (c *Client) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bool) { - cm := c.medias[medi] + cm := c.setuppedMedias[medi] ct := cm.formats[pkt.PayloadType] return c.timeDecoder.Decode(ct.format, pkt) } @@ -1975,7 +1987,194 @@ func (c *Client) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bo // PacketNTP returns the NTP timestamp of an incoming RTP packet. // The NTP timestamp is computed from RTCP sender reports. func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) { - cm := c.medias[medi] + cm := c.setuppedMedias[medi] ct := cm.formats[pkt.PayloadType] return ct.rtcpReceiver.PacketNTP(pkt.Timestamp) } + +// Stats returns client statistics. +func (c *Client) Stats() *ClientStats { + return &ClientStats{ + Conn: StatsConn{ + BytesReceived: atomic.LoadUint64(c.bytesReceived), + BytesSent: atomic.LoadUint64(c.bytesSent), + }, + Session: StatsSession{ + BytesReceived: func() uint64 { + v := uint64(0) + for _, sm := range c.setuppedMedias { + v += atomic.LoadUint64(sm.bytesReceived) + } + return v + }(), + BytesSent: func() uint64 { + v := uint64(0) + for _, sm := range c.setuppedMedias { + v += atomic.LoadUint64(sm.bytesSent) + } + return v + }(), + RTPPacketsReceived: func() uint64 { + v := uint64(0) + for _, sm := range c.setuppedMedias { + for _, f := range sm.formats { + v += atomic.LoadUint64(f.rtpPacketsReceived) + } + } + return v + }(), + RTPPacketsSent: func() uint64 { + v := uint64(0) + for _, sm := range c.setuppedMedias { + for _, f := range sm.formats { + v += atomic.LoadUint64(f.rtpPacketsSent) + } + } + return v + }(), + RTPPacketsLost: func() uint64 { + v := uint64(0) + for _, sm := range c.setuppedMedias { + for _, f := range sm.formats { + v += atomic.LoadUint64(f.rtpPacketsLost) + } + } + return v + }(), + RTPPacketsInError: func() uint64 { + v := uint64(0) + for _, sm := range c.setuppedMedias { + v += atomic.LoadUint64(sm.rtpPacketsInError) + } + return v + }(), + RTPJitter: func() float64 { + v := float64(0) + n := float64(0) + for _, sm := range c.setuppedMedias { + for _, fo := range sm.formats { + if fo.rtcpReceiver != nil { + stats := fo.rtcpReceiver.Stats() + if stats != nil { + v += stats.Jitter + n++ + } + } + } + } + return v / n + }(), + RTCPPacketsReceived: func() uint64 { + v := uint64(0) + for _, sm := range c.setuppedMedias { + v += atomic.LoadUint64(sm.rtcpPacketsReceived) + } + return v + }(), + RTCPPacketsSent: func() uint64 { + v := uint64(0) + for _, sm := range c.setuppedMedias { + v += atomic.LoadUint64(sm.rtcpPacketsSent) + } + return v + }(), + RTCPPacketsInError: func() uint64 { + v := uint64(0) + for _, sm := range c.setuppedMedias { + v += atomic.LoadUint64(sm.rtcpPacketsInError) + } + return v + }(), + Medias: func() map[*description.Media]StatsSessionMedia { //nolint:dupl + ret := make(map[*description.Media]StatsSessionMedia, len(c.setuppedMedias)) + + for med, sm := range c.setuppedMedias { + ret[med] = StatsSessionMedia{ + BytesReceived: atomic.LoadUint64(sm.bytesReceived), + BytesSent: atomic.LoadUint64(sm.bytesSent), + RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError), + RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived), + RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), + RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError), + Formats: func() map[format.Format]StatsSessionFormat { + ret := make(map[format.Format]StatsSessionFormat, len(sm.formats)) + + for _, fo := range sm.formats { + recvStats := func() *rtcpreceiver.Stats { + if fo.rtcpReceiver != nil { + return fo.rtcpReceiver.Stats() + } + return nil + }() + sentStats := func() *rtcpsender.Stats { + if fo.rtcpSender != nil { + return fo.rtcpSender.Stats() + } + return nil + }() + + ret[fo.format] = StatsSessionFormat{ //nolint:dupl + RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived), + RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), + RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost), + LocalSSRC: func() uint32 { + if fo.rtcpReceiver != nil { + return *fo.rtcpReceiver.LocalSSRC + } + if sentStats != nil { + return sentStats.LocalSSRC + } + return 0 + }(), + RemoteSSRC: func() uint32 { + if recvStats != nil { + return recvStats.RemoteSSRC + } + return 0 + }(), + RTPPacketsLastSequenceNumber: func() uint16 { + if recvStats != nil { + return recvStats.LastSequenceNumber + } + if sentStats != nil { + return sentStats.LastSequenceNumber + } + return 0 + }(), + RTPPacketsLastRTP: func() uint32 { + if recvStats != nil { + return recvStats.LastRTP + } + if sentStats != nil { + return sentStats.LastRTP + } + return 0 + }(), + RTPPacketsLastNTP: func() time.Time { + if recvStats != nil { + return recvStats.LastNTP + } + if sentStats != nil { + return sentStats.LastNTP + } + return time.Time{} + }(), + RTPJitter: func() float64 { + if recvStats != nil { + return recvStats.Jitter + } + return 0 + }(), + } + } + + return ret + }(), + } + } + + return ret + }(), + }, + } +} diff --git a/client_format.go b/client_format.go index 26263af5..ceb7013d 100644 --- a/client_format.go +++ b/client_format.go @@ -1,6 +1,9 @@ package gortsplib import ( + "sync/atomic" + "time" + "github.com/pion/rtcp" "github.com/pion/rtp" @@ -17,13 +20,29 @@ type clientFormat struct { format format.Format onPacketRTP OnPacketRTPFunc - udpReorderer *rtpreorderer.Reorderer // play - tcpLossDetector *rtplossdetector.LossDetector // play - rtcpReceiver *rtcpreceiver.RTCPReceiver // play - rtcpSender *rtcpsender.RTCPSender // record or back channel + udpReorderer *rtpreorderer.Reorderer // play + tcpLossDetector *rtplossdetector.LossDetector // play + rtcpReceiver *rtcpreceiver.RTCPReceiver // play + rtcpSender *rtcpsender.RTCPSender // record or back channel + writePacketRTPInQueue func([]byte) error + rtpPacketsReceived *uint64 + rtpPacketsSent *uint64 + rtpPacketsLost *uint64 +} + +func (cf *clientFormat) initialize() { + cf.rtpPacketsReceived = new(uint64) + cf.rtpPacketsSent = new(uint64) + cf.rtpPacketsLost = new(uint64) } func (cf *clientFormat) start() { + if cf.cm.udpRTPListener != nil { + cf.writePacketRTPInQueue = cf.writePacketRTPInQueueUDP + } else { + cf.writePacketRTPInQueue = cf.writePacketRTPInQueueTCP + } + if cf.cm.c.state == clientStateRecord || cf.cm.media.IsBackChannel { cf.rtcpSender = &rtcpsender.RTCPSender{ ClockRate: cf.format.ClockRate(), @@ -72,40 +91,70 @@ func (cf *clientFormat) stop() { } } -func (cf *clientFormat) readRTPUDP(pkt *rtp.Packet) { +func (cf *clientFormat) readPacketRTPUDP(pkt *rtp.Packet) { packets, lost := cf.udpReorderer.Process(pkt) if lost != 0 { - cf.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost}) + cf.onPacketRTPLost(lost) // do not return } now := cf.cm.c.timeNow() for _, pkt := range packets { - err := cf.rtcpReceiver.ProcessPacket(pkt, now, cf.format.PTSEqualsDTS(pkt)) - if err != nil { - cf.cm.c.OnDecodeError(err) - continue - } - - cf.onPacketRTP(pkt) + cf.handlePacketRTP(pkt, now) } } -func (cf *clientFormat) readRTPTCP(pkt *rtp.Packet) { +func (cf *clientFormat) readPacketRTPTCP(pkt *rtp.Packet) { lost := cf.tcpLossDetector.Process(pkt) if lost != 0 { - cf.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost}) + cf.onPacketRTPLost(lost) // do not return } now := cf.cm.c.timeNow() - err := cf.rtcpReceiver.ProcessPacket(pkt, now, cf.format.PTSEqualsDTS(pkt)) + cf.handlePacketRTP(pkt, now) +} + +func (cf *clientFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) { + err := cf.rtcpReceiver.ProcessPacketRTP(pkt, now, cf.format.PTSEqualsDTS(pkt)) if err != nil { - cf.cm.c.OnDecodeError(err) + cf.cm.onPacketRTPDecodeError(err) return } + atomic.AddUint64(cf.rtpPacketsReceived, 1) + cf.onPacketRTP(pkt) } + +func (cf *clientFormat) onPacketRTPLost(lost uint) { + atomic.AddUint64(cf.rtpPacketsLost, uint64(lost)) + cf.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost}) +} + +func (cf *clientFormat) writePacketRTPInQueueUDP(payload []byte) error { + err := cf.cm.udpRTPListener.write(payload) + if err != nil { + return err + } + + atomic.AddUint64(cf.cm.bytesSent, uint64(len(payload))) + atomic.AddUint64(cf.rtpPacketsSent, 1) + return nil +} + +func (cf *clientFormat) writePacketRTPInQueueTCP(payload []byte) error { + cf.cm.c.tcpFrame.Channel = cf.cm.tcpChannel + cf.cm.c.tcpFrame.Payload = payload + cf.cm.c.nconn.SetWriteDeadline(time.Now().Add(cf.cm.c.WriteTimeout)) + err := cf.cm.c.conn.WriteInterleavedFrame(cf.cm.c.tcpFrame, cf.cm.c.tcpBuffer) + if err != nil { + return err + } + + atomic.AddUint64(cf.cm.bytesSent, uint64(len(payload))) + atomic.AddUint64(cf.rtpPacketsSent, 1) + return nil +} diff --git a/client_media.go b/client_media.go index 6aff9b27..a108497b 100644 --- a/client_media.go +++ b/client_media.go @@ -13,16 +13,43 @@ import ( ) type clientMedia struct { - c *Client - onPacketRTCP OnPacketRTCPFunc + c *Client + media *description.Media - media *description.Media + onPacketRTCP OnPacketRTCPFunc formats map[uint8]*clientFormat tcpChannel int udpRTPListener *clientUDPListener udpRTCPListener *clientUDPListener - writePacketRTPInQueue func([]byte) error writePacketRTCPInQueue func([]byte) error + bytesReceived *uint64 + bytesSent *uint64 + rtpPacketsInError *uint64 + rtcpPacketsReceived *uint64 + rtcpPacketsSent *uint64 + rtcpPacketsInError *uint64 +} + +func (cm *clientMedia) initialize() { + cm.onPacketRTCP = func(rtcp.Packet) {} + cm.bytesReceived = new(uint64) + cm.bytesSent = new(uint64) + cm.rtpPacketsInError = new(uint64) + cm.rtcpPacketsReceived = new(uint64) + cm.rtcpPacketsSent = new(uint64) + cm.rtcpPacketsInError = new(uint64) + + cm.formats = make(map[uint8]*clientFormat) + + for _, forma := range cm.media.Formats { + f := &clientFormat{ + cm: cm, + format: forma, + onPacketRTP: func(*rtp.Packet) {}, + } + f.initialize() + cm.formats[forma.PayloadType()] = f + } } func (cm *clientMedia) close() { @@ -71,33 +98,18 @@ func (cm *clientMedia) allocateUDPListeners( return err } -func (cm *clientMedia) setMedia(medi *description.Media) { - cm.media = medi - - cm.formats = make(map[uint8]*clientFormat) - for _, forma := range medi.Formats { - cm.formats[forma.PayloadType()] = &clientFormat{ - cm: cm, - format: forma, - onPacketRTP: func(*rtp.Packet) {}, - } - } -} - func (cm *clientMedia) start() { if cm.udpRTPListener != nil { - cm.writePacketRTPInQueue = cm.writePacketRTPInQueueUDP cm.writePacketRTCPInQueue = cm.writePacketRTCPInQueueUDP if cm.c.state == clientStateRecord || cm.media.IsBackChannel { - cm.udpRTPListener.readFunc = cm.readRTPUDPRecord - cm.udpRTCPListener.readFunc = cm.readRTCPUDPRecord + cm.udpRTPListener.readFunc = cm.readPacketRTPUDPRecord + cm.udpRTCPListener.readFunc = cm.readPacketRTCPUDPRecord } else { - cm.udpRTPListener.readFunc = cm.readRTPUDPPlay - cm.udpRTCPListener.readFunc = cm.readRTCPUDPPlay + cm.udpRTPListener.readFunc = cm.readPacketRTPUDPPlay + cm.udpRTCPListener.readFunc = cm.readPacketRTCPUDPPlay } } else { - cm.writePacketRTPInQueue = cm.writePacketRTPInQueueTCP cm.writePacketRTCPInQueue = cm.writePacketRTCPInQueueTCP if cm.c.tcpCallbackByChannel == nil { @@ -105,11 +117,11 @@ func (cm *clientMedia) start() { } if cm.c.state == clientStateRecord || cm.media.IsBackChannel { - cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readRTPTCPRecord - cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readRTCPTCPRecord + cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readPacketRTPTCPRecord + cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readPacketRTCPTCPRecord } else { - cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readRTPTCPPlay - cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readRTCPTCPPlay + cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readPacketRTPTCPPlay + cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readPacketRTCPTCPPlay } } @@ -136,73 +148,82 @@ func (cm *clientMedia) stop() { func (cm *clientMedia) findFormatWithSSRC(ssrc uint32) *clientFormat { for _, format := range cm.formats { - tssrc, ok := format.rtcpReceiver.SenderSSRC() - if ok && tssrc == ssrc { + stats := format.rtcpReceiver.Stats() + if stats != nil && stats.RemoteSSRC == ssrc { return format } } return nil } -func (cm *clientMedia) writePacketRTPInQueueUDP(payload []byte) error { - return cm.udpRTPListener.write(payload) -} - func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error { - return cm.udpRTCPListener.write(payload) -} + err := cm.udpRTCPListener.write(payload) + if err != nil { + return err + } -func (cm *clientMedia) writePacketRTPInQueueTCP(payload []byte) error { - cm.c.tcpFrame.Channel = cm.tcpChannel - cm.c.tcpFrame.Payload = payload - cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout)) - return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer) + atomic.AddUint64(cm.bytesSent, uint64(len(payload))) + atomic.AddUint64(cm.rtcpPacketsSent, 1) + return nil } func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error { cm.c.tcpFrame.Channel = cm.tcpChannel + 1 cm.c.tcpFrame.Payload = payload cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout)) - return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer) + err := cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer) + if err != nil { + return err + } + + atomic.AddUint64(cm.bytesSent, uint64(len(payload))) + atomic.AddUint64(cm.rtcpPacketsSent, 1) + return nil } -func (cm *clientMedia) readRTPTCPPlay(payload []byte) bool { +func (cm *clientMedia) readPacketRTPTCPPlay(payload []byte) bool { + atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) + now := cm.c.timeNow() atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) pkt := &rtp.Packet{} err := pkt.Unmarshal(payload) if err != nil { - cm.c.OnDecodeError(err) + cm.onPacketRTPDecodeError(err) return false } forma, ok := cm.formats[pkt.PayloadType] if !ok { - cm.c.OnDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) + cm.onPacketRTPDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) return false } - forma.readRTPTCP(pkt) + forma.readPacketRTPTCP(pkt) return true } -func (cm *clientMedia) readRTCPTCPPlay(payload []byte) bool { +func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool { + atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) + now := cm.c.timeNow() atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) if len(payload) > udpMaxPayloadSize { - cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) + cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) return false } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.OnDecodeError(err) + cm.onPacketRTCPDecodeError(err) return false } + atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets))) + for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { format := cm.findFormatWithSSRC(sr.SSRC) @@ -217,22 +238,26 @@ func (cm *clientMedia) readRTCPTCPPlay(payload []byte) bool { return true } -func (cm *clientMedia) readRTPTCPRecord(_ []byte) bool { +func (cm *clientMedia) readPacketRTPTCPRecord(_ []byte) bool { return false } -func (cm *clientMedia) readRTCPTCPRecord(payload []byte) bool { +func (cm *clientMedia) readPacketRTCPTCPRecord(payload []byte) bool { + atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) + if len(payload) > udpMaxPayloadSize { - cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) + cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) return false } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.OnDecodeError(err) + cm.onPacketRTCPDecodeError(err) return false } + atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets))) + for _, pkt := range packets { cm.onPacketRTCP(pkt) } @@ -240,47 +265,50 @@ func (cm *clientMedia) readRTCPTCPRecord(payload []byte) bool { return true } -func (cm *clientMedia) readRTPUDPPlay(payload []byte) bool { - plen := len(payload) +func (cm *clientMedia) readPacketRTPUDPPlay(payload []byte) bool { + atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) - if plen == (udpMaxPayloadSize + 1) { - cm.c.OnDecodeError(liberrors.ErrClientRTPPacketTooBigUDP{}) + if len(payload) == (udpMaxPayloadSize + 1) { + cm.onPacketRTPDecodeError(liberrors.ErrClientRTPPacketTooBigUDP{}) return false } pkt := &rtp.Packet{} err := pkt.Unmarshal(payload) if err != nil { - cm.c.OnDecodeError(err) + cm.onPacketRTPDecodeError(err) return false } forma, ok := cm.formats[pkt.PayloadType] if !ok { - cm.c.OnDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) + cm.onPacketRTPDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) return false } - forma.readRTPUDP(pkt) + forma.readPacketRTPUDP(pkt) return true } -func (cm *clientMedia) readRTCPUDPPlay(payload []byte) bool { - now := cm.c.timeNow() - plen := len(payload) +func (cm *clientMedia) readPacketRTCPUDPPlay(payload []byte) bool { + atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) - if plen == (udpMaxPayloadSize + 1) { - cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{}) + if len(payload) == (udpMaxPayloadSize + 1) { + cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{}) return false } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.OnDecodeError(err) + cm.onPacketRTCPDecodeError(err) return false } + now := cm.c.timeNow() + + atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets))) + for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { format := cm.findFormatWithSSRC(sr.SSRC) @@ -295,27 +323,39 @@ func (cm *clientMedia) readRTCPUDPPlay(payload []byte) bool { return true } -func (cm *clientMedia) readRTPUDPRecord(_ []byte) bool { +func (cm *clientMedia) readPacketRTPUDPRecord(_ []byte) bool { return false } -func (cm *clientMedia) readRTCPUDPRecord(payload []byte) bool { - plen := len(payload) +func (cm *clientMedia) readPacketRTCPUDPRecord(payload []byte) bool { + atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) - if plen == (udpMaxPayloadSize + 1) { - cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{}) + if len(payload) == (udpMaxPayloadSize + 1) { + cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{}) return false } packets, err := rtcp.Unmarshal(payload) if err != nil { - cm.c.OnDecodeError(err) + cm.onPacketRTCPDecodeError(err) return false } + atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets))) + for _, pkt := range packets { cm.onPacketRTCP(pkt) } return true } + +func (cm *clientMedia) onPacketRTPDecodeError(err error) { + atomic.AddUint64(cm.rtpPacketsInError, 1) + cm.c.OnDecodeError(err) +} + +func (cm *clientMedia) onPacketRTCPDecodeError(err error) { + atomic.AddUint64(cm.rtcpPacketsInError, 1) + cm.c.OnDecodeError(err) +} diff --git a/client_play_test.go b/client_play_test.go index 5d896a73..87f86cbf 100644 --- a/client_play_test.go +++ b/client_play_test.go @@ -545,10 +545,11 @@ func TestClientPlay(t *testing.T) { <-packetRecv - require.Greater(t, atomic.LoadUint64(c.BytesSent), uint64(620)) - require.Less(t, atomic.LoadUint64(c.BytesSent), uint64(850)) - require.Greater(t, atomic.LoadUint64(c.BytesReceived), uint64(580)) - require.Less(t, atomic.LoadUint64(c.BytesReceived), uint64(650)) + s := c.Stats() + require.Greater(t, s.Session.BytesSent, uint64(19)) + require.Less(t, s.Session.BytesSent, uint64(41)) + require.Greater(t, s.Session.BytesReceived, uint64(31)) + require.Less(t, s.Session.BytesReceived, uint64(37)) }) } } diff --git a/client_record_test.go b/client_record_test.go index 90979712..edc10b0a 100644 --- a/client_record_test.go +++ b/client_record_test.go @@ -7,7 +7,6 @@ import ( "net" "strings" "sync" - "sync/atomic" "testing" "time" @@ -336,10 +335,11 @@ func TestClientRecord(t *testing.T) { <-recvDone - require.Greater(t, atomic.LoadUint64(c.BytesSent), uint64(730)) - require.Less(t, atomic.LoadUint64(c.BytesSent), uint64(760)) - require.Greater(t, atomic.LoadUint64(c.BytesReceived), uint64(180)) - require.Less(t, atomic.LoadUint64(c.BytesReceived), uint64(210)) + s := c.Stats() + require.Greater(t, s.Session.BytesSent, uint64(15)) + require.Less(t, s.Session.BytesSent, uint64(17)) + require.Greater(t, s.Session.BytesReceived, uint64(19)) + require.Less(t, s.Session.BytesReceived, uint64(21)) c.Close() <-done diff --git a/client_stats.go b/client_stats.go new file mode 100644 index 00000000..22451ea3 --- /dev/null +++ b/client_stats.go @@ -0,0 +1,7 @@ +package gortsplib + +// ClientStats are client statistics +type ClientStats struct { + Conn StatsConn + Session StatsSession +} diff --git a/client_udp_listener.go b/client_udp_listener.go index 9285faa0..6c0fc575 100644 --- a/client_udp_listener.go +++ b/client_udp_listener.go @@ -173,8 +173,6 @@ func (u *clientUDPListener) run() { now := u.c.timeNow() atomic.StoreInt64(u.lastPacketTime, now.Unix()) - atomic.AddUint64(u.c.BytesReceived, uint64(n)) - if u.readFunc(buf[:n]) { createNewBuffer() } @@ -182,8 +180,6 @@ func (u *clientUDPListener) run() { } func (u *clientUDPListener) write(payload []byte) error { - atomic.AddUint64(u.c.BytesSent, uint64(len(payload))) - // no mutex is needed here since Write() has an internal lock. // https://github.com/golang/go/issues/27203#issuecomment-534386117 u.pc.SetWriteDeadline(time.Now().Add(u.c.WriteTimeout)) diff --git a/internal/rtcpreceiver/rtcpreceiver.go b/internal/rtcpreceiver/rtcpreceiver.go index 928a10e8..fb4d3a5a 100644 --- a/internal/rtcpreceiver/rtcpreceiver.go +++ b/internal/rtcpreceiver/rtcpreceiver.go @@ -30,7 +30,7 @@ func randUint32() (uint32, error) { // RTCPReceiver is a utility to generate RTCP receiver reports. type RTCPReceiver struct { ClockRate int - ReceiverSSRC *uint32 + LocalSSRC *uint32 Period time.Duration TimeNow func() time.Time WritePacketRTCP func(rtcp.Packet) @@ -42,7 +42,7 @@ type RTCPReceiver struct { timeInitialized bool sequenceNumberCycles uint16 lastSequenceNumber uint16 - senderSSRC uint32 + remoteSSRC uint32 lastTimeRTP uint32 lastTimeSystem time.Time totalLost uint32 @@ -62,12 +62,12 @@ type RTCPReceiver struct { // Initialize initializes RTCPReceiver. func (rr *RTCPReceiver) Initialize() error { - if rr.ReceiverSSRC == nil { + if rr.LocalSSRC == nil { v, err := randUint32() if err != nil { return err } - rr.ReceiverSSRC = &v + rr.LocalSSRC = &v } if rr.TimeNow == nil { @@ -119,10 +119,10 @@ func (rr *RTCPReceiver) report() rtcp.Packet { system := rr.TimeNow() report := &rtcp.ReceiverReport{ - SSRC: *rr.ReceiverSSRC, + SSRC: *rr.LocalSSRC, Reports: []rtcp.ReceptionReport{ { - SSRC: rr.senderSSRC, + SSRC: rr.remoteSSRC, LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber), // equivalent to taking the integer part after multiplying the // loss fraction by 256 @@ -149,8 +149,8 @@ func (rr *RTCPReceiver) report() rtcp.Packet { return report } -// ProcessPacket extracts the needed data from RTP packets. -func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error { +// ProcessPacketRTP extracts the needed data from RTP packets. +func (rr *RTCPReceiver) ProcessPacketRTP(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error { rr.mutex.Lock() defer rr.mutex.Unlock() @@ -159,7 +159,7 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqua rr.firstRTPPacketReceived = true rr.totalSinceReport = 1 rr.lastSequenceNumber = pkt.SequenceNumber - rr.senderSSRC = pkt.SSRC + rr.remoteSSRC = pkt.SSRC if ptsEqualsDTS { rr.timeInitialized = true @@ -169,8 +169,8 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqua // subsequent packets } else { - if pkt.SSRC != rr.senderSSRC { - return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.senderSSRC) + if pkt.SSRC != rr.remoteSSRC { + return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.remoteSSRC) } diff := int32(pkt.SequenceNumber) - int32(rr.lastSequenceNumber) @@ -229,11 +229,7 @@ func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.T rr.lastSenderReportTimeSystem = system } -// PacketNTP returns the NTP timestamp of the packet. -func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) { - rr.mutex.Lock() - defer rr.mutex.Unlock() - +func (rr *RTCPReceiver) packetNTPUnsafe(ts uint32) (time.Time, bool) { if !rr.firstSenderReportReceived { return time.Time{}, false } @@ -244,9 +240,39 @@ func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) { return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true } -// SenderSSRC returns the SSRC of outgoing RTP packets. -func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) { +// PacketNTP returns the NTP timestamp of the packet. +func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + return rr.packetNTPUnsafe(ts) +} + +// Stats are statistics. +type Stats struct { + RemoteSSRC uint32 + LastSequenceNumber uint16 + LastRTP uint32 + LastNTP time.Time + Jitter float64 +} + +// Stats returns statistics. +func (rr *RTCPReceiver) Stats() *Stats { rr.mutex.RLock() defer rr.mutex.RUnlock() - return rr.senderSSRC, rr.firstRTPPacketReceived + + if !rr.firstRTPPacketReceived { + return nil + } + + ntp, _ := rr.packetNTPUnsafe(rr.lastTimeRTP) + + return &Stats{ + RemoteSSRC: rr.remoteSSRC, + LastSequenceNumber: rr.lastSequenceNumber, + LastRTP: rr.lastTimeRTP, + LastNTP: ntp, + Jitter: rr.jitter, + } } diff --git a/internal/rtcpreceiver/rtcpreceiver_test.go b/internal/rtcpreceiver/rtcpreceiver_test.go index df821466..e228328d 100644 --- a/internal/rtcpreceiver/rtcpreceiver_test.go +++ b/internal/rtcpreceiver/rtcpreceiver_test.go @@ -17,9 +17,9 @@ func TestRTCPReceiverBase(t *testing.T) { done := make(chan struct{}) rr := &RTCPReceiver{ - ClockRate: 90000, - ReceiverSSRC: uint32Ptr(0x65f83afb), - Period: 500 * time.Millisecond, + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, TimeNow: func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) }, @@ -64,7 +64,7 @@ func TestRTCPReceiverBase(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) + err = rr.ProcessPacketRTP(&rtpPkt, ts, true) require.NoError(t, err) rtpPkt = rtp.Packet{ @@ -79,7 +79,7 @@ func TestRTCPReceiverBase(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) + err = rr.ProcessPacketRTP(&rtpPkt, ts, true) require.NoError(t, err) <-done @@ -89,9 +89,9 @@ func TestRTCPReceiverOverflow(t *testing.T) { done := make(chan struct{}) rr := &RTCPReceiver{ - ClockRate: 90000, - ReceiverSSRC: uint32Ptr(0x65f83afb), - Period: 250 * time.Millisecond, + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 250 * time.Millisecond, TimeNow: func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) }, @@ -138,7 +138,7 @@ func TestRTCPReceiverOverflow(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) + err = rr.ProcessPacketRTP(&rtpPkt, ts, true) require.NoError(t, err) rtpPkt = rtp.Packet{ @@ -153,7 +153,7 @@ func TestRTCPReceiverOverflow(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) + err = rr.ProcessPacketRTP(&rtpPkt, ts, true) require.NoError(t, err) <-done @@ -163,9 +163,9 @@ func TestRTCPReceiverPacketLost(t *testing.T) { done := make(chan struct{}) rr := &RTCPReceiver{ - ClockRate: 90000, - ReceiverSSRC: uint32Ptr(0x65f83afb), - Period: 500 * time.Millisecond, + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, TimeNow: func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) }, @@ -215,7 +215,7 @@ func TestRTCPReceiverPacketLost(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) + err = rr.ProcessPacketRTP(&rtpPkt, ts, true) require.NoError(t, err) rtpPkt = rtp.Packet{ @@ -230,7 +230,7 @@ func TestRTCPReceiverPacketLost(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) + err = rr.ProcessPacketRTP(&rtpPkt, ts, true) require.NoError(t, err) <-done @@ -240,9 +240,9 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) { done := make(chan struct{}) rr := &RTCPReceiver{ - ClockRate: 90000, - ReceiverSSRC: uint32Ptr(0x65f83afb), - Period: 500 * time.Millisecond, + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, TimeNow: func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) }, @@ -292,7 +292,7 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) + err = rr.ProcessPacketRTP(&rtpPkt, ts, true) require.NoError(t, err) rtpPkt = rtp.Packet{ @@ -307,7 +307,7 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) + err = rr.ProcessPacketRTP(&rtpPkt, ts, true) require.NoError(t, err) <-done @@ -317,9 +317,9 @@ func TestRTCPReceiverJitter(t *testing.T) { done := make(chan struct{}) rr := &RTCPReceiver{ - ClockRate: 90000, - ReceiverSSRC: uint32Ptr(0x65f83afb), - Period: 500 * time.Millisecond, + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, TimeNow: func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) }, @@ -365,7 +365,7 @@ func TestRTCPReceiverJitter(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) + err = rr.ProcessPacketRTP(&rtpPkt, ts, true) require.NoError(t, err) rtpPkt = rtp.Packet{ @@ -380,7 +380,7 @@ func TestRTCPReceiverJitter(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) + err = rr.ProcessPacketRTP(&rtpPkt, ts, true) require.NoError(t, err) rtpPkt = rtp.Packet{ @@ -395,7 +395,7 @@ func TestRTCPReceiverJitter(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, false) + err = rr.ProcessPacketRTP(&rtpPkt, ts, false) require.NoError(t, err) <-done diff --git a/internal/rtcpsender/rtcpsender.go b/internal/rtcpsender/rtcpsender.go index 413a37c1..9e35d794 100644 --- a/internal/rtcpsender/rtcpsender.go +++ b/internal/rtcpsender/rtcpsender.go @@ -26,11 +26,11 @@ type RTCPSender struct { mutex sync.RWMutex // data from RTP packets - initialized bool + firstRTPPacketSent bool lastTimeRTP uint32 lastTimeNTP time.Time lastTimeSystem time.Time - senderSSRC uint32 + localSSRC uint32 lastSequenceNumber uint16 packetCount uint32 octetCount uint32 @@ -81,7 +81,7 @@ func (rs *RTCPSender) report() rtcp.Packet { rs.mutex.Lock() defer rs.mutex.Unlock() - if !rs.initialized { + if !rs.firstRTPPacketSent { return nil } @@ -90,7 +90,7 @@ func (rs *RTCPSender) report() rtcp.Packet { rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*float64(rs.ClockRate)) return &rtcp.SenderReport{ - SSRC: rs.senderSSRC, + SSRC: rs.localSSRC, NTPTime: ntpTimeGoToRTCP(ntpTime), RTPTime: rtpTime, PacketCount: rs.packetCount, @@ -98,17 +98,17 @@ func (rs *RTCPSender) report() rtcp.Packet { } } -// ProcessPacket extracts data from RTP packets. -func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { +// ProcessPacketRTP extracts data from RTP packets. +func (rs *RTCPSender) ProcessPacketRTP(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { rs.mutex.Lock() defer rs.mutex.Unlock() if ptsEqualsDTS { - rs.initialized = true + rs.firstRTPPacketSent = true rs.lastTimeRTP = pkt.Timestamp rs.lastTimeNTP = ntp rs.lastTimeSystem = rs.TimeNow() - rs.senderSSRC = pkt.SSRC + rs.localSSRC = pkt.SSRC } rs.lastSequenceNumber = pkt.SequenceNumber @@ -117,16 +117,27 @@ func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS rs.octetCount += uint32(len(pkt.Payload)) } -// SenderSSRC returns the SSRC of outgoing RTP packets. -func (rs *RTCPSender) SenderSSRC() (uint32, bool) { - rs.mutex.RLock() - defer rs.mutex.RUnlock() - return rs.senderSSRC, rs.initialized +// Stats are statistics. +type Stats struct { + LocalSSRC uint32 + LastSequenceNumber uint16 + LastRTP uint32 + LastNTP time.Time } -// LastPacketData returns metadata of the last RTP packet. -func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) { +// Stats returns statistics. +func (rs *RTCPSender) Stats() *Stats { rs.mutex.RLock() defer rs.mutex.RUnlock() - return rs.lastSequenceNumber, rs.lastTimeRTP, rs.lastTimeNTP, rs.initialized + + if !rs.firstRTPPacketSent { + return nil + } + + return &Stats{ + LocalSSRC: rs.localSSRC, + LastSequenceNumber: rs.lastSequenceNumber, + LastRTP: rs.lastTimeRTP, + LastNTP: rs.lastTimeNTP, + } } diff --git a/internal/rtcpsender/rtcpsender_test.go b/internal/rtcpsender/rtcpsender_test.go index faab96e4..17e88da8 100644 --- a/internal/rtcpsender/rtcpsender_test.go +++ b/internal/rtcpsender/rtcpsender_test.go @@ -63,7 +63,7 @@ func TestRTCPSender(t *testing.T) { Payload: []byte("\x00\x00"), } ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rs.ProcessPacket(&rtpPkt, ts, true) + rs.ProcessPacketRTP(&rtpPkt, ts, true) setCurTime(time.Date(2008, 5, 20, 22, 16, 22, 0, time.UTC)) rtpPkt = rtp.Packet{ @@ -78,7 +78,7 @@ func TestRTCPSender(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - rs.ProcessPacket(&rtpPkt, ts, true) + rs.ProcessPacketRTP(&rtpPkt, ts, true) rtpPkt = rtp.Packet{ Header: rtp.Header{ @@ -92,7 +92,7 @@ func TestRTCPSender(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) - rs.ProcessPacket(&rtpPkt, ts, false) + rs.ProcessPacketRTP(&rtpPkt, ts, false) setCurTime(time.Date(2008, 5, 20, 22, 16, 24, 0, time.UTC)) diff --git a/internal/rtplossdetector/lossdetector.go b/internal/rtplossdetector/lossdetector.go index c7abe6ec..6261e14d 100644 --- a/internal/rtplossdetector/lossdetector.go +++ b/internal/rtplossdetector/lossdetector.go @@ -13,7 +13,7 @@ type LossDetector struct { // Process processes a RTP packet. // It returns the number of lost packets. -func (r *LossDetector) Process(pkt *rtp.Packet) int { +func (r *LossDetector) Process(pkt *rtp.Packet) uint { if !r.initialized { r.initialized = true r.expectedSeqNum = pkt.SequenceNumber + 1 @@ -23,7 +23,7 @@ func (r *LossDetector) Process(pkt *rtp.Packet) int { if pkt.SequenceNumber != r.expectedSeqNum { diff := pkt.SequenceNumber - r.expectedSeqNum r.expectedSeqNum = pkt.SequenceNumber + 1 - return int(diff) + return uint(diff) } r.expectedSeqNum = pkt.SequenceNumber + 1 diff --git a/internal/rtplossdetector/lossdetector_test.go b/internal/rtplossdetector/lossdetector_test.go index dd40de70..527ddfe9 100644 --- a/internal/rtplossdetector/lossdetector_test.go +++ b/internal/rtplossdetector/lossdetector_test.go @@ -15,19 +15,19 @@ func TestLossDetector(t *testing.T) { SequenceNumber: 65530, }, }) - require.Equal(t, 0, c) + require.Equal(t, uint(0), c) c = d.Process(&rtp.Packet{ Header: rtp.Header{ SequenceNumber: 65531, }, }) - require.Equal(t, 0, c) + require.Equal(t, uint(0), c) c = d.Process(&rtp.Packet{ Header: rtp.Header{ SequenceNumber: 65535, }, }) - require.Equal(t, 3, c) + require.Equal(t, uint(3), c) } diff --git a/internal/rtpreorderer/reorderer.go b/internal/rtpreorderer/reorderer.go index d09312c1..be79d9bc 100644 --- a/internal/rtpreorderer/reorderer.go +++ b/internal/rtpreorderer/reorderer.go @@ -27,7 +27,7 @@ func (r *Reorderer) Initialize() { // Process processes a RTP packet. // It returns a sequence of ordered packets and the number of lost packets. -func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) { +func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { if !r.initialized { r.initialized = true r.expectedSeqNum = pkt.SequenceNumber + 1 @@ -86,7 +86,7 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) { ret[pos] = pkt r.expectedSeqNum = pkt.SequenceNumber + 1 - return ret, int(relPos) - n + 1 + return ret, uint(int(relPos) - n + 1) } // there's a missing packet diff --git a/internal/rtpreorderer/reorderer_test.go b/internal/rtpreorderer/reorderer_test.go index 9e391de5..c80af535 100644 --- a/internal/rtpreorderer/reorderer_test.go +++ b/internal/rtpreorderer/reorderer_test.go @@ -164,7 +164,7 @@ func TestReorder(t *testing.T) { for _, entry := range sequence { out, missing := r.Process(entry.in) require.Equal(t, entry.out, out) - require.Equal(t, 0, missing) + require.Equal(t, uint(0), missing) } } @@ -173,7 +173,7 @@ func TestBufferIsFull(t *testing.T) { r.Initialize() r.absPos = 25 sn := uint16(1564) - toMiss := 34 + toMiss := uint(34) out, missing := r.Process(&rtp.Packet{ Header: rtp.Header{ @@ -185,19 +185,19 @@ func TestBufferIsFull(t *testing.T) { SequenceNumber: sn, }, }}, out) - require.Equal(t, 0, missing) + require.Equal(t, uint(0), missing) sn++ var expected []*rtp.Packet - for i := 0; i < 64-toMiss; i++ { + for i := uint(0); i < 64-toMiss; i++ { out, missing = r.Process(&rtp.Packet{ Header: rtp.Header{ SequenceNumber: sn + uint16(toMiss), }, }) require.Equal(t, []*rtp.Packet(nil), out) - require.Equal(t, 0, missing) + require.Equal(t, uint(0), missing) expected = append(expected, &rtp.Packet{ Header: rtp.Header{ @@ -242,7 +242,7 @@ func TestReset(t *testing.T) { }, }) require.Equal(t, []*rtp.Packet(nil), out) - require.Equal(t, 0, missing) + require.Equal(t, uint(0), missing) sn++ } @@ -256,5 +256,5 @@ func TestReset(t *testing.T) { SequenceNumber: sn, }, }}, out) - require.Equal(t, 0, missing) + require.Equal(t, uint(0), missing) } diff --git a/pkg/liberrors/client.go b/pkg/liberrors/client.go index b11bd252..f8cc3def 100644 --- a/pkg/liberrors/client.go +++ b/pkg/liberrors/client.go @@ -251,7 +251,7 @@ func (e ErrClientWriteQueueFull) Error() string { // ErrClientRTPPacketsLost is an error that can be returned by a client. type ErrClientRTPPacketsLost struct { - Lost int + Lost uint } // Error implements the error interface. diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index 8f3a6afd..73be0653 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -24,7 +24,7 @@ func New( ) (*RTCPReceiver, error) { rr := &rtcpreceiver.RTCPReceiver{ ClockRate: clockRate, - ReceiverSSRC: receiverSSRC, + LocalSSRC: receiverSSRC, Period: period, TimeNow: timeNow, WritePacketRTCP: writePacketRTCP, @@ -44,7 +44,7 @@ func (rr *RTCPReceiver) Close() { // ProcessPacket extracts the needed data from RTP packets. func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error { - return (*rtcpreceiver.RTCPReceiver)(rr).ProcessPacket(pkt, system, ptsEqualsDTS) + return (*rtcpreceiver.RTCPReceiver)(rr).ProcessPacketRTP(pkt, system, ptsEqualsDTS) } // ProcessSenderReport extracts the needed data from RTCP sender reports. @@ -59,5 +59,9 @@ func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) { // SenderSSRC returns the SSRC of outgoing RTP packets. func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) { - return (*rtcpreceiver.RTCPReceiver)(rr).SenderSSRC() + stats := (*rtcpreceiver.RTCPReceiver)(rr).Stats() + if stats == nil { + return 0, false + } + return stats.RemoteSSRC, true } diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index cd2307d2..135268c8 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -39,15 +39,25 @@ func (rs *RTCPSender) Close() { // ProcessPacket extracts data from RTP packets. func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { - (*rtcpsender.RTCPSender)(rs).ProcessPacket(pkt, ntp, ptsEqualsDTS) + (*rtcpsender.RTCPSender)(rs).ProcessPacketRTP(pkt, ntp, ptsEqualsDTS) } // SenderSSRC returns the SSRC of outgoing RTP packets. func (rs *RTCPSender) SenderSSRC() (uint32, bool) { - return (*rtcpsender.RTCPSender)(rs).SenderSSRC() + stats := (*rtcpsender.RTCPSender)(rs).Stats() + if stats == nil { + return 0, false + } + + return stats.LocalSSRC, true } // LastPacketData returns metadata of the last RTP packet. func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) { - return (*rtcpsender.RTCPSender)(rs).LastPacketData() + stats := (*rtcpsender.RTCPSender)(rs).Stats() + if stats == nil { + return 0, 0, time.Time{}, false + } + + return stats.LastSequenceNumber, stats.LastRTP, stats.LastNTP, true } diff --git a/pkg/rtplossdetector/lossdetector.go b/pkg/rtplossdetector/lossdetector.go index 9b37238b..7fdca19d 100644 --- a/pkg/rtplossdetector/lossdetector.go +++ b/pkg/rtplossdetector/lossdetector.go @@ -18,6 +18,6 @@ func New() *LossDetector { // Process processes a RTP packet. // It returns the number of lost packets. -func (r *LossDetector) Process(pkt *rtp.Packet) int { +func (r *LossDetector) Process(pkt *rtp.Packet) uint { return (*rtplossdetector.LossDetector)(r).Process(pkt) } diff --git a/pkg/rtplossdetector/lossdetector_test.go b/pkg/rtplossdetector/lossdetector_test.go index 1baaebf2..d8aad6e0 100644 --- a/pkg/rtplossdetector/lossdetector_test.go +++ b/pkg/rtplossdetector/lossdetector_test.go @@ -15,19 +15,19 @@ func TestLossDetector(t *testing.T) { SequenceNumber: 65530, }, }) - require.Equal(t, 0, c) + require.Equal(t, int(0), c) c = d.Process(&rtp.Packet{ Header: rtp.Header{ SequenceNumber: 65531, }, }) - require.Equal(t, 0, c) + require.Equal(t, int(0), c) c = d.Process(&rtp.Packet{ Header: rtp.Header{ SequenceNumber: 65535, }, }) - require.Equal(t, 3, c) + require.Equal(t, int(3), c) } diff --git a/pkg/rtpreorderer/reorderer.go b/pkg/rtpreorderer/reorderer.go index 7c5b4fde..692de444 100644 --- a/pkg/rtpreorderer/reorderer.go +++ b/pkg/rtpreorderer/reorderer.go @@ -22,6 +22,6 @@ func New() *Reorderer { // Process processes a RTP packet. // It returns a sequence of ordered packets and the number of lost packets. -func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) { +func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { return (*rtpreorderer.Reorderer)(r).Process(pkt) } diff --git a/server_conn.go b/server_conn.go index 1a1cbd66..b8b80444 100644 --- a/server_conn.go +++ b/server_conn.go @@ -101,11 +101,15 @@ func (sc *ServerConn) NetConn() net.Conn { } // BytesReceived returns the number of read bytes. +// +// Deprecated: replaced by Stats() func (sc *ServerConn) BytesReceived() uint64 { return sc.bc.BytesReceived() } // BytesSent returns the number of written bytes. +// +// Deprecated: replaced by Stats() func (sc *ServerConn) BytesSent() uint64 { return sc.bc.BytesSent() } @@ -120,6 +124,14 @@ func (sc *ServerConn) UserData() interface{} { return sc.userData } +// Stats returns connection statistics. +func (sc *ServerConn) Stats() *StatsConn { + return &StatsConn{ + BytesReceived: sc.bc.BytesReceived(), + BytesSent: sc.bc.BytesSent(), + } +} + func (sc *ServerConn) ip() net.IP { return sc.remoteAddr.IP } diff --git a/server_conn_reader.go b/server_conn_reader.go index d8deb691..461f610f 100644 --- a/server_conn_reader.go +++ b/server_conn_reader.go @@ -3,7 +3,6 @@ package gortsplib import ( "errors" "fmt" - "sync/atomic" "time" "github.com/bluenviron/gortsplib/v4/pkg/base" @@ -131,8 +130,6 @@ func (cr *serverConnReader) readFuncTCP() error { return liberrors.ErrServerUnexpectedResponse{} case *base.InterleavedFrame: - atomic.AddUint64(cr.sc.session.bytesReceived, uint64(len(what.Payload))) - if cb, ok := cr.sc.session.tcpCallbackByChannel[what.Channel]; ok { cb(what.Payload) } diff --git a/server_play_test.go b/server_play_test.go index 8dd1a984..0c1edbaa 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -609,10 +609,11 @@ func TestServerPlay(t *testing.T) { close(nconnOpened) }, onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) { - require.Greater(t, ctx.Conn.BytesSent(), uint64(810)) - require.Less(t, ctx.Conn.BytesSent(), uint64(1150)) - require.Greater(t, ctx.Conn.BytesReceived(), uint64(440)) - require.Less(t, ctx.Conn.BytesReceived(), uint64(660)) + s := ctx.Conn.Stats() + require.Greater(t, s.BytesSent, uint64(810)) + require.Less(t, s.BytesSent, uint64(1150)) + require.Greater(t, s.BytesReceived, uint64(440)) + require.Less(t, s.BytesReceived, uint64(660)) close(nconnClosed) }, @@ -621,10 +622,11 @@ func TestServerPlay(t *testing.T) { }, onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) { if transport != "multicast" { - require.Greater(t, ctx.Session.BytesSent(), uint64(50)) - require.Less(t, ctx.Session.BytesSent(), uint64(60)) - require.Greater(t, ctx.Session.BytesReceived(), uint64(15)) - require.Less(t, ctx.Session.BytesReceived(), uint64(25)) + s := ctx.Session.Stats() + require.Greater(t, s.BytesSent, uint64(50)) + require.Less(t, s.BytesSent, uint64(60)) + require.Greater(t, s.BytesReceived, uint64(15)) + require.Less(t, s.BytesReceived, uint64(25)) } close(sessionClosed) @@ -2325,7 +2327,7 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) { } } -func TestServerPlayBytesSent(t *testing.T) { +func TestServerPlayStreamStats(t *testing.T) { var stream *ServerStream s := &Server{ @@ -2355,7 +2357,6 @@ func TestServerPlayBytesSent(t *testing.T) { err := s.Start() require.NoError(t, err) defer s.Close() - stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}}) defer stream.Close() @@ -2393,5 +2394,6 @@ func TestServerPlayBytesSent(t *testing.T) { err = stream.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket) require.NoError(t, err) - require.Equal(t, uint64(16*2), stream.BytesSent()) + st := stream.Stats() + require.Equal(t, uint64(16*2), st.BytesSent) } diff --git a/server_record_test.go b/server_record_test.go index c70cba2d..a8042785 100644 --- a/server_record_test.go +++ b/server_record_test.go @@ -545,10 +545,11 @@ func TestServerRecord(t *testing.T) { close(nconnOpened) }, onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) { - require.Greater(t, ctx.Conn.BytesSent(), uint64(510)) - require.Less(t, ctx.Conn.BytesSent(), uint64(560)) - require.Greater(t, ctx.Conn.BytesReceived(), uint64(1000)) - require.Less(t, ctx.Conn.BytesReceived(), uint64(1200)) + s := ctx.Conn.Stats() + require.Greater(t, s.BytesSent, uint64(510)) + require.Less(t, s.BytesSent, uint64(560)) + require.Greater(t, s.BytesReceived, uint64(1000)) + require.Less(t, s.BytesReceived, uint64(1200)) close(nconnClosed) }, @@ -556,10 +557,11 @@ func TestServerRecord(t *testing.T) { close(sessionOpened) }, onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) { - require.Greater(t, ctx.Session.BytesSent(), uint64(75)) - require.Less(t, ctx.Session.BytesSent(), uint64(130)) - require.Greater(t, ctx.Session.BytesReceived(), uint64(70)) - require.Less(t, ctx.Session.BytesReceived(), uint64(80)) + s := ctx.Session.Stats() + require.Greater(t, s.BytesSent, uint64(75)) + require.Less(t, s.BytesSent, uint64(130)) + require.Greater(t, s.BytesReceived, uint64(70)) + require.Less(t, s.BytesReceived, uint64(80)) close(sessionClosed) }, diff --git a/server_session.go b/server_session.go index e640d2f2..3baddb43 100644 --- a/server_session.go +++ b/server_session.go @@ -14,6 +14,8 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" + "github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver" + "github.com/bluenviron/gortsplib/v4/internal/rtcpsender" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" @@ -186,7 +188,7 @@ func generateRTPInfo( Scheme: u.Scheme, Host: u.Host, Path: setuppedPath + "/trackID=" + - strconv.FormatInt(int64(setuppedStream.streamMedias[sm.media].trackID), 10), + strconv.FormatInt(int64(setuppedStream.medias[sm.media].trackID), 10), }).String() ri = append(ri, entry) } @@ -235,8 +237,6 @@ type ServerSession struct { secretID string // must not be shared, allows to take ownership of the session ctx context.Context ctxCancel func() - bytesReceived *uint64 - bytesSent *uint64 userData interface{} conns map[*ServerConn]struct{} state ServerSessionState @@ -272,11 +272,10 @@ func (ss *ServerSession) initialize() { ss.secretID = secretID ss.ctx = ctx ss.ctxCancel = ctxCancel - ss.bytesReceived = new(uint64) - ss.bytesSent = new(uint64) ss.conns = make(map[*ServerConn]struct{}) ss.lastRequestTime = ss.s.timeNow() ss.udpCheckStreamTimer = emptyTimer() + ss.chHandleRequest = make(chan sessionRequestReq) ss.chRemoveConn = make(chan *ServerConn) ss.chStartWriter = make(chan struct{}) @@ -291,13 +290,25 @@ func (ss *ServerSession) Close() { } // BytesReceived returns the number of read bytes. +// +// Deprecated: replaced by Stats() func (ss *ServerSession) BytesReceived() uint64 { - return atomic.LoadUint64(ss.bytesReceived) + v := uint64(0) + for _, sm := range ss.setuppedMedias { + v += atomic.LoadUint64(sm.bytesReceived) + } + return v } // BytesSent returns the number of written bytes. +// +// Deprecated: replaced by Stats() func (ss *ServerSession) BytesSent() uint64 { - return atomic.LoadUint64(ss.bytesSent) + v := uint64(0) + for _, sm := range ss.setuppedMedias { + v += atomic.LoadUint64(sm.bytesSent) + } + return v } // State returns the state of the session. @@ -349,25 +360,190 @@ func (ss *ServerSession) UserData() interface{} { return ss.userData } -func (ss *ServerSession) onPacketLost(err error) { - if h, ok := ss.s.Handler.(ServerHandlerOnPacketLost); ok { - h.OnPacketLost(&ServerHandlerOnPacketLostCtx{ - Session: ss, - Error: err, - }) - } else { - log.Println(err.Error()) - } -} +// Stats returns server session statistics. +func (ss *ServerSession) Stats() *StatsSession { + return &StatsSession{ + BytesReceived: func() uint64 { + v := uint64(0) + for _, sm := range ss.setuppedMedias { + v += atomic.LoadUint64(sm.bytesReceived) + } + return v + }(), + BytesSent: func() uint64 { + v := uint64(0) + for _, sm := range ss.setuppedMedias { + v += atomic.LoadUint64(sm.bytesSent) + } + return v + }(), + RTPPacketsReceived: func() uint64 { + v := uint64(0) + for _, sm := range ss.setuppedMedias { + for _, f := range sm.formats { + v += atomic.LoadUint64(f.rtpPacketsReceived) + } + } + return v + }(), + RTPPacketsSent: func() uint64 { + v := uint64(0) + for _, sm := range ss.setuppedMedias { + for _, f := range sm.formats { + v += atomic.LoadUint64(f.rtpPacketsSent) + } + } + return v + }(), + RTPPacketsLost: func() uint64 { + v := uint64(0) + for _, sm := range ss.setuppedMedias { + for _, f := range sm.formats { + v += atomic.LoadUint64(f.rtpPacketsLost) + } + } + return v + }(), + RTPPacketsInError: func() uint64 { + v := uint64(0) + for _, sm := range ss.setuppedMedias { + v += atomic.LoadUint64(sm.rtpPacketsInError) + } + return v + }(), + RTPJitter: func() float64 { + v := float64(0) + n := float64(0) + for _, sm := range ss.setuppedMedias { + for _, fo := range sm.formats { + if fo.rtcpReceiver != nil { + stats := fo.rtcpReceiver.Stats() + if stats != nil { + v += stats.Jitter + n++ + } + } + } + } + return v / n + }(), + RTCPPacketsReceived: func() uint64 { + v := uint64(0) + for _, sm := range ss.setuppedMedias { + v += atomic.LoadUint64(sm.rtcpPacketsReceived) + } + return v + }(), + RTCPPacketsSent: func() uint64 { + v := uint64(0) + for _, sm := range ss.setuppedMedias { + v += atomic.LoadUint64(sm.rtcpPacketsSent) + } + return v + }(), + RTCPPacketsInError: func() uint64 { + v := uint64(0) + for _, sm := range ss.setuppedMedias { + v += atomic.LoadUint64(sm.rtcpPacketsInError) + } + return v + }(), + Medias: func() map[*description.Media]StatsSessionMedia { //nolint:dupl + ret := make(map[*description.Media]StatsSessionMedia, len(ss.setuppedMedias)) + + for med, sm := range ss.setuppedMedias { + ret[med] = StatsSessionMedia{ + BytesReceived: atomic.LoadUint64(sm.bytesReceived), + BytesSent: atomic.LoadUint64(sm.bytesSent), + RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError), + RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived), + RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), + RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError), + Formats: func() map[format.Format]StatsSessionFormat { + ret := make(map[format.Format]StatsSessionFormat, len(sm.formats)) + + for _, fo := range sm.formats { + recvStats := func() *rtcpreceiver.Stats { + if fo.rtcpReceiver != nil { + return fo.rtcpReceiver.Stats() + } + return nil + }() + rtcpSender := func() *rtcpsender.RTCPSender { + if ss.setuppedStream != nil { + return ss.setuppedStream.medias[med].formats[fo.format.PayloadType()].rtcpSender + } + return nil + }() + sentStats := func() *rtcpsender.Stats { + if rtcpSender != nil { + return rtcpSender.Stats() + } + return nil + }() + + ret[fo.format] = StatsSessionFormat{ //nolint:dupl + RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived), + RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), + RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost), + LocalSSRC: func() uint32 { + if fo.rtcpReceiver != nil { + return *fo.rtcpReceiver.LocalSSRC + } + if sentStats != nil { + return sentStats.LocalSSRC + } + return 0 + }(), + RemoteSSRC: func() uint32 { + if recvStats != nil { + return recvStats.RemoteSSRC + } + return 0 + }(), + RTPPacketsLastSequenceNumber: func() uint16 { + if recvStats != nil { + return recvStats.LastSequenceNumber + } + if sentStats != nil { + return sentStats.LastSequenceNumber + } + return 0 + }(), + RTPPacketsLastRTP: func() uint32 { + if recvStats != nil { + return recvStats.LastRTP + } + if sentStats != nil { + return sentStats.LastRTP + } + return 0 + }(), + RTPPacketsLastNTP: func() time.Time { + if recvStats != nil { + return recvStats.LastNTP + } + if sentStats != nil { + return sentStats.LastNTP + } + return time.Time{} + }(), + RTPJitter: func() float64 { + if recvStats != nil { + return recvStats.Jitter + } + return 0 + }(), + } + } -func (ss *ServerSession) onDecodeError(err error) { - if h, ok := ss.s.Handler.(ServerHandlerOnDecodeError); ok { - h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ - Session: ss, - Error: err, - }) - } else { - log.Println(err.Error()) + return ret + }(), + } + } + + return ret + }(), } } @@ -848,7 +1024,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( th := headers.Transport{} if ss.state == ServerSessionStatePrePlay { - ssrc, ok := stream.senderSSRC(medi) + ssrc, ok := stream.localSSRC(medi) if ok { th.SSRC = &ssrc } @@ -894,7 +1070,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( th.Delivery = &de v := uint(127) th.TTL = &v - d := stream.streamMedias[medi].multicastWriter.ip() + d := stream.medias[medi].multicastWriter.ip() th.Destination = &d th.Ports = &[2]int{ss.s.MulticastRTPPort, ss.s.MulticastRTCPPort} @@ -1229,7 +1405,7 @@ func (ss *ServerSession) findFreeChannelPair() int { } } -// OnPacketRTPAny sets the callback that is called when a RTP packet is read from any setupped media. +// OnPacketRTPAny sets a callback that is called when a RTP packet is read from any setupped media. func (ss *ServerSession) OnPacketRTPAny(cb OnPacketRTPAnyFunc) { for _, sm := range ss.setuppedMedias { cmedia := sm.media @@ -1241,7 +1417,7 @@ func (ss *ServerSession) OnPacketRTPAny(cb OnPacketRTPAnyFunc) { } } -// OnPacketRTCPAny sets the callback that is called when a RTCP packet is read from any setupped media. +// OnPacketRTCPAny sets a callback that is called when a RTCP packet is read from any setupped media. func (ss *ServerSession) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) { for _, sm := range ss.setuppedMedias { cmedia := sm.media @@ -1251,24 +1427,25 @@ func (ss *ServerSession) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) { } } -// OnPacketRTP sets the callback that is called when a RTP packet is read. +// OnPacketRTP sets a callback that is called when a RTP packet is read. func (ss *ServerSession) OnPacketRTP(medi *description.Media, forma format.Format, cb OnPacketRTPFunc) { sm := ss.setuppedMedias[medi] st := sm.formats[forma.PayloadType()] st.onPacketRTP = cb } -// OnPacketRTCP sets the callback that is called when a RTCP packet is read. +// OnPacketRTCP sets a callback that is called when a RTCP packet is read. func (ss *ServerSession) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFunc) { sm := ss.setuppedMedias[medi] sm.onPacketRTCP = cb } -func (ss *ServerSession) writePacketRTP(medi *description.Media, byts []byte) error { +func (ss *ServerSession) writePacketRTP(medi *description.Media, payloadType uint8, byts []byte) error { sm := ss.setuppedMedias[medi] + sf := sm.formats[payloadType] - ok := sm.ss.writer.push(func() error { - return sm.writePacketRTPInQueue(byts) + ok := ss.writer.push(func() error { + return sf.writePacketRTPInQueue(byts) }) if !ok { return liberrors.ErrServerWriteQueueFull{} @@ -1286,7 +1463,7 @@ func (ss *ServerSession) WritePacketRTP(medi *description.Media, pkt *rtp.Packet } byts = byts[:n] - return ss.writePacketRTP(medi, byts) + return ss.writePacketRTP(medi, pkt.PayloadType, byts) } func (ss *ServerSession) writePacketRTCP(medi *description.Media, byts []byte) error { diff --git a/server_session_format.go b/server_session_format.go index 30c721e3..f693c289 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -1,6 +1,8 @@ package gortsplib import ( + "log" + "sync/atomic" "time" "github.com/pion/rtcp" @@ -18,12 +20,30 @@ type serverSessionFormat struct { format format.Format onPacketRTP OnPacketRTPFunc - udpReorderer *rtpreorderer.Reorderer - tcpLossDetector *rtplossdetector.LossDetector - rtcpReceiver *rtcpreceiver.RTCPReceiver + udpReorderer *rtpreorderer.Reorderer + tcpLossDetector *rtplossdetector.LossDetector + rtcpReceiver *rtcpreceiver.RTCPReceiver + writePacketRTPInQueue func([]byte) error + rtpPacketsReceived *uint64 + rtpPacketsSent *uint64 + rtpPacketsLost *uint64 +} + +func (sf *serverSessionFormat) initialize() { + sf.rtpPacketsReceived = new(uint64) + sf.rtpPacketsSent = new(uint64) + sf.rtpPacketsLost = new(uint64) } func (sf *serverSessionFormat) start() { + switch *sf.sm.ss.setuppedTransport { + case TransportUDP, TransportUDPMulticast: + sf.writePacketRTPInQueue = sf.writePacketRTPInQueueUDP + + default: + sf.writePacketRTPInQueue = sf.writePacketRTPInQueueTCP + } + if sf.sm.ss.state != ServerSessionStatePlay { if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast { sf.udpReorderer = &rtpreorderer.Reorderer{} @@ -56,38 +76,76 @@ func (sf *serverSessionFormat) stop() { } } -func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) { +func (sf *serverSessionFormat) readPacketRTPUDP(pkt *rtp.Packet, now time.Time) { packets, lost := sf.udpReorderer.Process(pkt) if lost != 0 { - sf.sm.ss.onPacketLost(liberrors.ErrServerRTPPacketsLost{Lost: lost}) + sf.onPacketRTPLost(lost) // do not return } for _, pkt := range packets { - err := sf.rtcpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt)) - if err != nil { - sf.sm.ss.onDecodeError(err) - continue - } - - sf.onPacketRTP(pkt) + sf.handlePacketRTP(pkt, now) } } -func (sf *serverSessionFormat) readRTPTCP(pkt *rtp.Packet) { +func (sf *serverSessionFormat) readPacketRTPTCP(pkt *rtp.Packet) { lost := sf.tcpLossDetector.Process(pkt) if lost != 0 { - sf.sm.ss.onPacketLost(liberrors.ErrServerRTPPacketsLost{Lost: lost}) + sf.onPacketRTPLost(lost) // do not return } now := sf.sm.ss.s.timeNow() - err := sf.rtcpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt)) + sf.handlePacketRTP(pkt, now) +} + +func (sf *serverSessionFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) { + err := sf.rtcpReceiver.ProcessPacketRTP(pkt, now, sf.format.PTSEqualsDTS(pkt)) if err != nil { - sf.sm.ss.onDecodeError(err) + sf.sm.onPacketRTPDecodeError(err) return } + atomic.AddUint64(sf.rtpPacketsReceived, 1) + sf.onPacketRTP(pkt) } + +func (sf *serverSessionFormat) onPacketRTPLost(lost uint) { + atomic.AddUint64(sf.rtpPacketsLost, uint64(lost)) + + if h, ok := sf.sm.ss.s.Handler.(ServerHandlerOnPacketLost); ok { + h.OnPacketLost(&ServerHandlerOnPacketLostCtx{ + Session: sf.sm.ss, + Error: liberrors.ErrServerRTPPacketsLost{Lost: lost}, + }) + } else { + log.Println(liberrors.ErrServerRTPPacketsLost{Lost: lost}.Error()) + } +} + +func (sf *serverSessionFormat) writePacketRTPInQueueUDP(payload []byte) error { + err := sf.sm.ss.s.udpRTPListener.write(payload, sf.sm.udpRTPWriteAddr) + if err != nil { + return err + } + + atomic.AddUint64(sf.sm.bytesSent, uint64(len(payload))) + atomic.AddUint64(sf.rtpPacketsSent, 1) + return nil +} + +func (sf *serverSessionFormat) writePacketRTPInQueueTCP(payload []byte) error { + sf.sm.ss.tcpFrame.Channel = sf.sm.tcpChannel + sf.sm.ss.tcpFrame.Payload = payload + sf.sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sf.sm.ss.s.WriteTimeout)) + err := sf.sm.ss.tcpConn.conn.WriteInterleavedFrame(sf.sm.ss.tcpFrame, sf.sm.ss.tcpBuffer) + if err != nil { + return err + } + + atomic.AddUint64(sf.sm.bytesSent, uint64(len(payload))) + atomic.AddUint64(sf.rtpPacketsSent, 1) + return nil +} diff --git a/server_session_media.go b/server_session_media.go index 1b58d39b..dfe38273 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -1,6 +1,7 @@ package gortsplib import ( + "log" "net" "sync/atomic" "time" @@ -23,20 +24,33 @@ type serverSessionMedia struct { udpRTCPReadPort int udpRTCPWriteAddr *net.UDPAddr formats map[uint8]*serverSessionFormat // record only - writePacketRTPInQueue func([]byte) error writePacketRTCPInQueue func([]byte) error + bytesReceived *uint64 + bytesSent *uint64 + rtpPacketsInError *uint64 + rtcpPacketsReceived *uint64 + rtcpPacketsSent *uint64 + rtcpPacketsInError *uint64 } func (sm *serverSessionMedia) initialize() { - if sm.ss.state == ServerSessionStatePreRecord { - sm.formats = make(map[uint8]*serverSessionFormat) - for _, forma := range sm.media.Formats { - sm.formats[forma.PayloadType()] = &serverSessionFormat{ - sm: sm, - format: forma, - onPacketRTP: func(*rtp.Packet) {}, - } + sm.bytesReceived = new(uint64) + sm.bytesSent = new(uint64) + sm.rtpPacketsInError = new(uint64) + sm.rtcpPacketsReceived = new(uint64) + sm.rtcpPacketsSent = new(uint64) + sm.rtcpPacketsInError = new(uint64) + + sm.formats = make(map[uint8]*serverSessionFormat) + + for _, forma := range sm.media.Formats { + f := &serverSessionFormat{ + sm: sm, + format: forma, + onPacketRTP: func(*rtp.Packet) {}, } + f.initialize() + sm.formats[forma.PayloadType()] = f } } @@ -49,7 +63,6 @@ func (sm *serverSessionMedia) start() { switch *sm.ss.setuppedTransport { case TransportUDP, TransportUDPMulticast: - sm.writePacketRTPInQueue = sm.writePacketRTPInQueueUDP sm.writePacketRTCPInQueue = sm.writePacketRTCPInQueueUDP if *sm.ss.setuppedTransport == TransportUDP { @@ -57,7 +70,7 @@ func (sm *serverSessionMedia) start() { // firewall opening is performed with RTCP sender reports generated by ServerStream // readers can send RTCP packets only - sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readRTCPUDPPlay) + sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readPacketRTCPUDPPlay) } else { // open the firewall by sending empty packets to the counterpart. byts, _ := (&rtp.Packet{Header: rtp.Header{Version: 2}}).Marshal() @@ -66,13 +79,12 @@ func (sm *serverSessionMedia) start() { byts, _ = (&rtcp.ReceiverReport{}).Marshal() sm.ss.s.udpRTCPListener.write(byts, sm.udpRTCPWriteAddr) //nolint:errcheck - sm.ss.s.udpRTPListener.addClient(sm.ss.author.ip(), sm.udpRTPReadPort, sm.readRTPUDPRecord) - sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readRTCPUDPRecord) + sm.ss.s.udpRTPListener.addClient(sm.ss.author.ip(), sm.udpRTPReadPort, sm.readPacketRTPUDPRecord) + sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readPacketRTCPUDPRecord) } } case TransportTCP: - sm.writePacketRTPInQueue = sm.writePacketRTPInQueueTCP sm.writePacketRTCPInQueue = sm.writePacketRTCPInQueueTCP if sm.ss.tcpCallbackByChannel == nil { @@ -80,11 +92,11 @@ func (sm *serverSessionMedia) start() { } if sm.ss.state == ServerSessionStatePlay { - sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readRTPTCPPlay - sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readRTCPTCPPlay + sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readPacketRTPTCPPlay + sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readPacketRTCPTCPPlay } else { - sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readRTPTCPRecord - sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readRTCPTCPRecord + sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readPacketRTPTCPRecord + sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readPacketRTCPTCPRecord } } } @@ -102,59 +114,58 @@ func (sm *serverSessionMedia) stop() { func (sm *serverSessionMedia) findFormatWithSSRC(ssrc uint32) *serverSessionFormat { for _, format := range sm.formats { - tssrc, ok := format.rtcpReceiver.SenderSSRC() - if ok && tssrc == ssrc { + stats := format.rtcpReceiver.Stats() + if stats != nil && stats.RemoteSSRC == ssrc { return format } } return nil } -func (sm *serverSessionMedia) writePacketRTPInQueueUDP(payload []byte) error { - atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload))) - return sm.ss.s.udpRTPListener.write(payload, sm.udpRTPWriteAddr) -} - func (sm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error { - atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload))) - return sm.ss.s.udpRTCPListener.write(payload, sm.udpRTCPWriteAddr) -} + err := sm.ss.s.udpRTCPListener.write(payload, sm.udpRTCPWriteAddr) + if err != nil { + return err + } -func (sm *serverSessionMedia) writePacketRTPInQueueTCP(payload []byte) error { - atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload))) - sm.ss.tcpFrame.Channel = sm.tcpChannel - sm.ss.tcpFrame.Payload = payload - sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout)) - return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer) + atomic.AddUint64(sm.bytesSent, uint64(len(payload))) + atomic.AddUint64(sm.rtcpPacketsSent, 1) + return nil } func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error { - atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload))) sm.ss.tcpFrame.Channel = sm.tcpChannel + 1 sm.ss.tcpFrame.Payload = payload sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout)) - return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer) -} + err := sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer) + if err != nil { + return err + } -func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) bool { - plen := len(payload) + atomic.AddUint64(sm.bytesSent, uint64(len(payload))) + atomic.AddUint64(sm.rtcpPacketsSent, 1) + return nil +} - atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) +func (sm *serverSessionMedia) readPacketRTCPUDPPlay(payload []byte) bool { + atomic.AddUint64(sm.bytesReceived, uint64(len(payload))) - if plen == (udpMaxPayloadSize + 1) { - sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{}) + if len(payload) == (udpMaxPayloadSize + 1) { + sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{}) return false } packets, err := rtcp.Unmarshal(payload) if err != nil { - sm.ss.onDecodeError(err) + sm.onPacketRTCPDecodeError(err) return false } now := sm.ss.s.timeNow() atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) + atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets))) + for _, pkt := range packets { sm.onPacketRTCP(pkt) } @@ -162,56 +173,54 @@ func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) bool { return true } -func (sm *serverSessionMedia) readRTPUDPRecord(payload []byte) bool { - plen := len(payload) - - atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) +func (sm *serverSessionMedia) readPacketRTPUDPRecord(payload []byte) bool { + atomic.AddUint64(sm.bytesReceived, uint64(len(payload))) - if plen == (udpMaxPayloadSize + 1) { - sm.ss.onDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{}) + if len(payload) == (udpMaxPayloadSize + 1) { + sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{}) return false } pkt := &rtp.Packet{} err := pkt.Unmarshal(payload) if err != nil { - sm.ss.onDecodeError(err) + sm.onPacketRTPDecodeError(err) return false } forma, ok := sm.formats[pkt.PayloadType] if !ok { - sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) + sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) return false } now := sm.ss.s.timeNow() atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) - forma.readRTPUDP(pkt, now) + forma.readPacketRTPUDP(pkt, now) return true } -func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) bool { - plen := len(payload) +func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool { + atomic.AddUint64(sm.bytesReceived, uint64(len(payload))) - atomic.AddUint64(sm.ss.bytesReceived, uint64(plen)) - - if plen == (udpMaxPayloadSize + 1) { - sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{}) + if len(payload) == (udpMaxPayloadSize + 1) { + sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{}) return false } packets, err := rtcp.Unmarshal(payload) if err != nil { - sm.ss.onDecodeError(err) + sm.onPacketRTCPDecodeError(err) return false } now := sm.ss.s.timeNow() atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix()) + atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets))) + for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { format := sm.findFormatWithSSRC(sr.SSRC) @@ -226,22 +235,26 @@ func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) bool { return true } -func (sm *serverSessionMedia) readRTPTCPPlay(_ []byte) bool { +func (sm *serverSessionMedia) readPacketRTPTCPPlay(_ []byte) bool { return false } -func (sm *serverSessionMedia) readRTCPTCPPlay(payload []byte) bool { +func (sm *serverSessionMedia) readPacketRTCPTCPPlay(payload []byte) bool { + atomic.AddUint64(sm.bytesReceived, uint64(len(payload))) + if len(payload) > udpMaxPayloadSize { - sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) + sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) return false } packets, err := rtcp.Unmarshal(payload) if err != nil { - sm.ss.onDecodeError(err) + sm.onPacketRTCPDecodeError(err) return false } + atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets))) + for _, pkt := range packets { sm.onPacketRTCP(pkt) } @@ -249,39 +262,45 @@ func (sm *serverSessionMedia) readRTCPTCPPlay(payload []byte) bool { return true } -func (sm *serverSessionMedia) readRTPTCPRecord(payload []byte) bool { +func (sm *serverSessionMedia) readPacketRTPTCPRecord(payload []byte) bool { + atomic.AddUint64(sm.bytesReceived, uint64(len(payload))) + pkt := &rtp.Packet{} err := pkt.Unmarshal(payload) if err != nil { - sm.ss.onDecodeError(err) + sm.onPacketRTPDecodeError(err) return false } forma, ok := sm.formats[pkt.PayloadType] if !ok { - sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) + sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) return false } - forma.readRTPTCP(pkt) + forma.readPacketRTPTCP(pkt) return true } -func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) bool { +func (sm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool { + atomic.AddUint64(sm.bytesReceived, uint64(len(payload))) + if len(payload) > udpMaxPayloadSize { - sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) + sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) return false } packets, err := rtcp.Unmarshal(payload) if err != nil { - sm.ss.onDecodeError(err) + sm.onPacketRTCPDecodeError(err) return false } now := sm.ss.s.timeNow() + atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets))) + for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { format := sm.findFormatWithSSRC(sr.SSRC) @@ -295,3 +314,29 @@ func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) bool { return true } + +func (sm *serverSessionMedia) onPacketRTPDecodeError(err error) { + atomic.AddUint64(sm.rtpPacketsInError, 1) + + if h, ok := sm.ss.s.Handler.(ServerHandlerOnDecodeError); ok { + h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ + Session: sm.ss, + Error: err, + }) + } else { + log.Println(err.Error()) + } +} + +func (sm *serverSessionMedia) onPacketRTCPDecodeError(err error) { + atomic.AddUint64(sm.rtcpPacketsInError, 1) + + if h, ok := sm.ss.s.Handler.(ServerHandlerOnDecodeError); ok { + h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ + Session: sm.ss, + Error: err, + }) + } else { + log.Println(err.Error()) + } +} diff --git a/server_stream.go b/server_stream.go index b479497e..472a6bc3 100644 --- a/server_stream.go +++ b/server_stream.go @@ -9,6 +9,7 @@ import ( "github.com/pion/rtp" "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/headers" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" ) @@ -36,9 +37,8 @@ type ServerStream struct { readers map[*ServerSession]struct{} multicastReaderCount int activeUnicastReaders map[*ServerSession]struct{} - streamMedias map[*description.Media]*serverStreamMedia + medias map[*description.Media]*serverStreamMedia closed bool - bytesSent *uint64 } // NewServerStream allocates a ServerStream. @@ -48,10 +48,9 @@ func NewServerStream(s *Server, desc *description.Session) *ServerStream { desc: desc, readers: make(map[*ServerSession]struct{}), activeUnicastReaders: make(map[*ServerSession]struct{}), - bytesSent: new(uint64), } - st.streamMedias = make(map[*description.Media]*serverStreamMedia, len(desc.Medias)) + st.medias = make(map[*description.Media]*serverStreamMedia, len(desc.Medias)) for i, medi := range desc.Medias { sm := &serverStreamMedia{ st: st, @@ -59,7 +58,7 @@ func NewServerStream(s *Server, desc *description.Session) *ServerStream { trackID: i, } sm.initialize() - st.streamMedias[medi] = sm + st.medias[medi] = sm } return st @@ -75,14 +74,20 @@ func (st *ServerStream) Close() { ss.Close() } - for _, sm := range st.streamMedias { + for _, sm := range st.medias { sm.close() } } // BytesSent returns the number of written bytes. +// +// Deprecated: replaced by Stats() func (st *ServerStream) BytesSent() uint64 { - return atomic.LoadUint64(st.bytesSent) + v := uint64(0) + for _, me := range st.medias { + v += atomic.LoadUint64(me.bytesSent) + } + return v } // Description returns the description of the stream. @@ -90,27 +95,84 @@ func (st *ServerStream) Description() *description.Session { return st.desc } -func (st *ServerStream) senderSSRC(medi *description.Media) (uint32, bool) { +// Stats returns stream statistics. +func (st *ServerStream) Stats() *ServerStreamStats { + return &ServerStreamStats{ + BytesSent: func() uint64 { + v := uint64(0) + for _, me := range st.medias { + v += atomic.LoadUint64(me.bytesSent) + } + return v + }(), + RTPPacketsSent: func() uint64 { + v := uint64(0) + for _, me := range st.medias { + for _, f := range me.formats { + v += atomic.LoadUint64(f.rtpPacketsSent) + } + } + return v + }(), + RTCPPacketsSent: func() uint64 { + v := uint64(0) + for _, me := range st.medias { + v += atomic.LoadUint64(me.rtcpPacketsSent) + } + return v + }(), + Medias: func() map[*description.Media]ServerStreamStatsMedia { + ret := make(map[*description.Media]ServerStreamStatsMedia, len(st.medias)) + + for med, sm := range st.medias { + ret[med] = ServerStreamStatsMedia{ + BytesSent: atomic.LoadUint64(sm.bytesSent), + RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), + Formats: func() map[format.Format]ServerStreamStatsFormat { + ret := make(map[format.Format]ServerStreamStatsFormat) + + for _, fo := range sm.formats { + ret[fo.format] = ServerStreamStatsFormat{ + RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), + } + } + + return ret + }(), + } + } + + return ret + }(), + } +} + +func (st *ServerStream) localSSRC(medi *description.Media) (uint32, bool) { st.mutex.Lock() defer st.mutex.Unlock() - sm := st.streamMedias[medi] + sm := st.medias[medi] - // senderSSRC() is used to fill SSRC inside the Transport header. + // localSSRC() is used to fill SSRC inside the Transport header. // if there are multiple formats inside a single media stream, // do not return anything, since Transport headers don't support multiple SSRCs. if len(sm.formats) > 1 { return 0, false } - return firstFormat(sm.formats).rtcpSender.SenderSSRC() + stats := firstFormat(sm.formats).rtcpSender.Stats() + if stats == nil { + return 0, false + } + + return stats.LocalSSRC, true } func (st *ServerStream) rtpInfoEntry(medi *description.Media, now time.Time) *headers.RTPInfoEntry { st.mutex.Lock() defer st.mutex.Unlock() - sm := st.streamMedias[medi] + sm := st.medias[medi] // if there are multiple formats inside a single media stream, // do not generate a RTP-Info entry, since RTP-Info doesn't support @@ -121,8 +183,8 @@ func (st *ServerStream) rtpInfoEntry(medi *description.Media, now time.Time) *he format := firstFormat(sm.formats) - lastSeqNum, lastTimeRTP, lastTimeNTP, ok := format.rtcpSender.LastPacketData() - if !ok { + stats := format.rtcpSender.Stats() + if stats == nil { return nil } @@ -132,13 +194,13 @@ func (st *ServerStream) rtpInfoEntry(medi *description.Media, now time.Time) *he } // sequence number of the first packet of the stream - seqNum := lastSeqNum + 1 + seqNum := stats.LastSequenceNumber + 1 // RTP timestamp corresponding to the time value in // the Range response header. // remove a small quantity in order to avoid DTS > PTS - ts := uint32(uint64(lastTimeRTP) + - uint64(now.Sub(lastTimeNTP).Seconds()*float64(clockRate)) - + ts := uint32(uint64(stats.LastRTP) + + uint64(now.Sub(stats.LastNTP).Seconds()*float64(clockRate)) - uint64(clockRate)/10) return &headers.RTPInfoEntry{ @@ -175,7 +237,7 @@ func (st *ServerStream) readerAdd( case TransportUDPMulticast: if st.multicastReaderCount == 0 { - for _, media := range st.streamMedias { + for _, media := range st.medias { mw := &serverMulticastWriter{ s: st.s, } @@ -207,7 +269,7 @@ func (st *ServerStream) readerRemove(ss *ServerSession) { if *ss.setuppedTransport == TransportUDPMulticast { st.multicastReaderCount-- if st.multicastReaderCount == 0 { - for _, media := range st.streamMedias { + for _, media := range st.medias { media.multicastWriter.close() media.multicastWriter = nil } @@ -225,9 +287,9 @@ func (st *ServerStream) readerSetActive(ss *ServerSession) { if *ss.setuppedTransport == TransportUDPMulticast { for medi, sm := range ss.setuppedMedias { - streamMedia := st.streamMedias[medi] + streamMedia := st.medias[medi] streamMedia.multicastWriter.rtcpl.addClient( - ss.author.ip(), streamMedia.multicastWriter.rtcpl.port(), sm.readRTCPUDPPlay) + ss.author.ip(), streamMedia.multicastWriter.rtcpl.port(), sm.readPacketRTCPUDPPlay) } } else { st.activeUnicastReaders[ss] = struct{}{} @@ -244,7 +306,7 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) { if *ss.setuppedTransport == TransportUDPMulticast { for medi := range ss.setuppedMedias { - streamMedia := st.streamMedias[medi] + streamMedia := st.medias[medi] streamMedia.multicastWriter.rtcpl.removeClient(ss.author.ip(), streamMedia.multicastWriter.rtcpl.port()) } } else { @@ -274,7 +336,7 @@ func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp. return liberrors.ErrServerStreamClosed{} } - sm := st.streamMedias[medi] + sm := st.medias[medi] sf := sm.formats[pkt.PayloadType] return sf.writePacketRTP(byts, pkt, ntp) } @@ -293,6 +355,6 @@ func (st *ServerStream) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet return liberrors.ErrServerStreamClosed{} } - sm := st.streamMedias[medi] + sm := st.medias[medi] return sm.writePacketRTCP(byts) } diff --git a/server_stream_format.go b/server_stream_format.go index c4fbefc7..d0dde024 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -15,10 +15,13 @@ type serverStreamFormat struct { sm *serverStreamMedia format format.Format - rtcpSender *rtcpsender.RTCPSender + rtcpSender *rtcpsender.RTCPSender + rtpPacketsSent *uint64 } func (sf *serverStreamFormat) initialize() { + sf.rtpPacketsSent = new(uint64) + sf.rtcpSender = &rtcpsender.RTCPSender{ ClockRate: sf.format.ClockRate(), Period: sf.sm.st.s.senderReportPeriod, @@ -33,19 +36,21 @@ func (sf *serverStreamFormat) initialize() { } func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error { - sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt)) + sf.rtcpSender.ProcessPacketRTP(pkt, ntp, sf.format.PTSEqualsDTS(pkt)) le := uint64(len(byts)) // send unicast for r := range sf.sm.st.activeUnicastReaders { if _, ok := r.setuppedMedias[sf.sm.media]; ok { - err := r.writePacketRTP(sf.sm.media, byts) + err := r.writePacketRTP(sf.sm.media, pkt.PayloadType, byts) if err != nil { r.onStreamWriteError(err) - } else { - atomic.AddUint64(sf.sm.st.bytesSent, le) + continue } + + atomic.AddUint64(sf.sm.bytesSent, le) + atomic.AddUint64(sf.rtpPacketsSent, 1) } } @@ -55,7 +60,9 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t if err != nil { return err } - atomic.AddUint64(sf.sm.st.bytesSent, le) + + atomic.AddUint64(sf.sm.bytesSent, le) + atomic.AddUint64(sf.rtpPacketsSent, 1) } return nil diff --git a/server_stream_media.go b/server_stream_media.go index f3253a1a..26bfc8d7 100644 --- a/server_stream_media.go +++ b/server_stream_media.go @@ -1,6 +1,8 @@ package gortsplib import ( + "sync/atomic" + "github.com/bluenviron/gortsplib/v4/pkg/description" ) @@ -11,9 +13,14 @@ type serverStreamMedia struct { formats map[uint8]*serverStreamFormat multicastWriter *serverMulticastWriter + bytesSent *uint64 + rtcpPacketsSent *uint64 } func (sm *serverStreamMedia) initialize() { + sm.bytesSent = new(uint64) + sm.rtcpPacketsSent = new(uint64) + sm.formats = make(map[uint8]*serverStreamFormat) for _, forma := range sm.media.Formats { sf := &serverStreamFormat{ @@ -38,13 +45,19 @@ func (sm *serverStreamMedia) close() { } func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error { + le := len(byts) + // send unicast for r := range sm.st.activeUnicastReaders { if _, ok := r.setuppedMedias[sm.media]; ok { err := r.writePacketRTCP(sm.media, byts) if err != nil { r.onStreamWriteError(err) + continue } + + atomic.AddUint64(sm.bytesSent, uint64(le)) + atomic.AddUint64(sm.rtcpPacketsSent, 1) } } @@ -54,6 +67,9 @@ func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error { if err != nil { return err } + + atomic.AddUint64(sm.bytesSent, uint64(le)) + atomic.AddUint64(sm.rtcpPacketsSent, 1) } return nil diff --git a/server_stream_stats.go b/server_stream_stats.go new file mode 100644 index 00000000..6e845a6e --- /dev/null +++ b/server_stream_stats.go @@ -0,0 +1,36 @@ +package gortsplib + +import ( + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" +) + +// ServerStreamStatsFormat are stream format statistics. +type ServerStreamStatsFormat struct { + // number of sent RTP packets + RTPPacketsSent uint64 +} + +// ServerStreamStatsMedia are stream media statistics. +type ServerStreamStatsMedia struct { + // sent bytes + BytesSent uint64 + // number of sent RTCP packets + RTCPPacketsSent uint64 + + // format statistics + Formats map[format.Format]ServerStreamStatsFormat +} + +// ServerStreamStats are stream statistics. +type ServerStreamStats struct { + // sent bytes + BytesSent uint64 + // number of sent RTP packets + RTPPacketsSent uint64 + // number of sent RTCP packets + RTCPPacketsSent uint64 + + // media statistics + Medias map[*description.Media]ServerStreamStatsMedia +} diff --git a/stats_conn.go b/stats_conn.go new file mode 100644 index 00000000..635519b9 --- /dev/null +++ b/stats_conn.go @@ -0,0 +1,9 @@ +package gortsplib + +// StatsConn are connection statistics. +type StatsConn struct { + // received bytes + BytesReceived uint64 + // sent bytes + BytesSent uint64 +} diff --git a/stats_session.go b/stats_session.go new file mode 100644 index 00000000..4c7fe2ec --- /dev/null +++ b/stats_session.go @@ -0,0 +1,76 @@ +package gortsplib + +import ( + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" +) + +// StatsSessionFormat are session format statistics. +type StatsSessionFormat struct { + // number of RTP packets correctly received and processed + RTPPacketsReceived uint64 + // number of sent RTP packets + RTPPacketsSent uint64 + // number of lost RTP packets + RTPPacketsLost uint64 + // mean jitter of received RTP packets + RTPJitter float64 + // local SSRC + LocalSSRC uint32 + // remote SSRC + RemoteSSRC uint32 + // last sequence number of incoming/outgoing RTP packets + RTPPacketsLastSequenceNumber uint16 + // last RTP time of incoming/outgoing RTP packets + RTPPacketsLastRTP uint32 + // last NTP time of incoming/outgoing NTP packets + RTPPacketsLastNTP time.Time +} + +// StatsSessionMedia are session media statistics. +type StatsSessionMedia struct { + // received bytes + BytesReceived uint64 + // sent bytes + BytesSent uint64 + // number of RTP packets that could not be processed + RTPPacketsInError uint64 + // number of RTCP packets correctly received and processed + RTCPPacketsReceived uint64 + // number of sent RTCP packets + RTCPPacketsSent uint64 + // number of RTCP packets that could not be processed + RTCPPacketsInError uint64 + + // format statistics + Formats map[format.Format]StatsSessionFormat +} + +// StatsSession are session statistics. +type StatsSession struct { + // received bytes + BytesReceived uint64 + // sent bytes + BytesSent uint64 + // number of RTP packets correctly received and processed + RTPPacketsReceived uint64 + // number of sent RTP packets + RTPPacketsSent uint64 + // number of lost RTP packets + RTPPacketsLost uint64 + // number of RTP packets that could not be processed + RTPPacketsInError uint64 + // mean jitter of received RTP packets + RTPJitter float64 + // number of RTCP packets correctly received and processed + RTCPPacketsReceived uint64 + // number of sent RTCP packets + RTCPPacketsSent uint64 + // number of RTCP packets that could not be processed + RTCPPacketsInError uint64 + + // media statistics + Medias map[*description.Media]StatsSessionMedia +}