From de64085ada8e6762046383030651c34b5c9f4222 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Thu, 31 Aug 2023 22:51:26 +0200 Subject: [PATCH] srt, udp: support publishing and reading MPEG-1/2/4 video with SRT and UDP/MPEG-TS --- README.md | 8 +- go.mod | 2 +- go.sum | 4 +- internal/core/srt_conn.go | 70 +++++++++++ internal/core/srt_source.go | 46 ++++++- internal/core/udp_source.go | 38 +++++- internal/formatprocessor/h264.go | 30 ++--- internal/formatprocessor/h265.go | 39 +++--- internal/formatprocessor/mpeg1_video.go | 113 +++++++++++++++++ internal/formatprocessor/mpeg4_video.go | 154 ++++++++++++++++++++++++ internal/formatprocessor/processor.go | 30 +++-- internal/unit/mpeg1_video.go | 7 ++ internal/unit/mpeg4_video.go | 7 ++ 13 files changed, 479 insertions(+), 69 deletions(-) create mode 100644 internal/formatprocessor/mpeg1_video.go create mode 100644 internal/formatprocessor/mpeg4_video.go create mode 100644 internal/unit/mpeg1_video.go create mode 100644 internal/unit/mpeg4_video.go diff --git a/README.md b/README.md index c102efd1b86..9b93088e0c1 100644 --- a/README.md +++ b/README.md @@ -20,8 +20,8 @@ Live streams can be published to the server with: |protocol|variants|video codecs|audio codecs| |--------|--------|------------|------------| -|[SRT clients](#srt-clients)||H265, H264|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| -|[SRT servers](#srt-servers)||H265, H264|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| +|[SRT clients](#srt-clients)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| +|[SRT servers](#srt-servers)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[WebRTC clients](#webrtc-clients)|Browser-based, WHIP|AV1, VP9, VP8, H264|Opus, G722, G711| |[WebRTC servers](#webrtc-servers)|WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| |[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| @@ -29,14 +29,14 @@ Live streams can be published to the server with: |[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[HLS cameras and servers](#hls-cameras-and-servers)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)| -|[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| +|[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[Raspberry Pi Cameras](#raspberry-pi-cameras)||H264|| And can be read from the server with: |protocol|variants|video codecs|audio codecs| |--------|--------|------------|------------| -|[SRT](#srt)||H265, H264|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| +|[SRT](#srt)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[WebRTC](#webrtc)|Browser-based, WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| |[RTSP](#rtsp)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| |[RTMP](#rtmp)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| diff --git a/go.mod b/go.mod index ad724942aa5..0120e45cd7f 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/abema/go-mp4 v0.13.0 github.com/alecthomas/kong v0.8.0 github.com/bluenviron/gohlslib v1.0.2 - github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230911215322-4ede58cda208 + github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9 github.com/bluenviron/mediacommon v1.2.0 github.com/datarhei/gosrt v0.5.4 github.com/fsnotify/fsnotify v1.6.0 diff --git a/go.sum b/go.sum index c64c06a867e..96b0adbf8a1 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYh github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/bluenviron/gohlslib v1.0.2 h1:LDA/CubL525e9rLWw+G/9GbFS6iXwozmOg8KJBT4qF4= github.com/bluenviron/gohlslib v1.0.2/go.mod h1:oam0wsI2XqcHLTG6NM8HRvxAQsa3hIA0MLRiTOE7CB8= -github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230911215322-4ede58cda208 h1:w1aishvJ4U2TuM1SsFmWoKuFNogqhla3eh2Qn6AyRtE= -github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230911215322-4ede58cda208/go.mod h1:l6LO4TlHC3YdLIbEn89GeeSgzHaJlpAFF7NLZ7h4A+A= +github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9 h1:NIJRhT/AYhPNe1GdWqAhDsYOTo6/hvAz5pEe1Ss+NuE= +github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9/go.mod h1:aAHfFXD3LE19h86jUGJK7PpM1uwp9VFVReuH89ZlpuE= github.com/bluenviron/mediacommon v1.2.0 h1:5tz92r2S4gPSiTlycepjXFZCgwGfVL2htCeVsoBac+U= github.com/bluenviron/mediacommon v1.2.0/go.mod h1:/vlOVSebDwzdRtQONOKLua0fOSJg1tUDHpP+h9a0uqM= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= diff --git a/internal/core/srt_conn.go b/internal/core/srt_conn.go index d4dc3e7d3ad..e4315bf99eb 100644 --- a/internal/core/srt_conn.go +++ b/internal/core/srt_conn.go @@ -295,6 +295,42 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { return nil }) + case *mpegts.CodecMPEG4Video: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.MPEG4Video{ + PayloadTyp: 96, + }}, + } + + r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { + stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4Video{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + Frame: frame, + }) + return nil + }) + + case *mpegts.CodecMPEG1Video: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.MPEG1Video{}}, + } + + r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { + stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG1Video{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + Frame: frame, + }) + return nil + }) + case *mpegts.CodecOpus: medi = &description.Media{ Type: description.MediaTypeAudio, @@ -509,6 +545,40 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass return bw.Flush() }) + case *format.MPEG4Video: + track := addTrack(medi, &mpegts.CodecMPEG4Video{}) + + res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Video) + if tunit.Frame == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteMPEGxVideo(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + if err != nil { + return err + } + return bw.Flush() + }) + + case *format.MPEG1Video: + track := addTrack(medi, &mpegts.CodecMPEG1Video{}) + + res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Video) + if tunit.Frame == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteMPEGxVideo(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + if err != nil { + return err + } + return bw.Flush() + }) + case *format.MPEG4AudioGeneric: track := addTrack(medi, &mpegts.CodecMPEG4Audio{ Config: *forma.Config, diff --git a/internal/core/srt_source.go b/internal/core/srt_source.go index f6292a53f9c..54c0169635d 100644 --- a/internal/core/srt_source.go +++ b/internal/core/srt_source.go @@ -112,6 +112,25 @@ func (s *srtSource) runReader(sconn srt.Conn) error { var medi *description.Media switch tcodec := track.Codec.(type) { + case *mpegts.CodecH265: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H265{ + PayloadTyp: 96, + }}, + } + + r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &unit.H265{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + AU: au, + }) + return nil + }) + case *mpegts.CodecH264: medi = &description.Media{ Type: description.MediaTypeVideo, @@ -132,21 +151,38 @@ func (s *srtSource) runReader(sconn srt.Conn) error { return nil }) - case *mpegts.CodecH265: + case *mpegts.CodecMPEG4Video: medi = &description.Media{ Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H265{ + Formats: []format.Format{&format.MPEG4Video{ PayloadTyp: 96, }}, } - r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.H265{ + r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { + stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4Video{ Base: unit.Base{ NTP: time.Now(), PTS: decodeTime(pts), }, - AU: au, + Frame: frame, + }) + return nil + }) + + case *mpegts.CodecMPEG1Video: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.MPEG1Video{}}, + } + + r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { + stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG1Video{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + Frame: frame, }) return nil }) diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go index dfb0a2eb320..26dffd88d3f 100644 --- a/internal/core/udp_source.go +++ b/internal/core/udp_source.go @@ -81,7 +81,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, _ chan *conf.Pa var pc packetConn if ip4 := addr.IP.To4(); ip4 != nil && addr.IP.IsMulticast() { - pc, err = multicast.NewMultiConn(hostPort, net.ListenPacket) + pc, err = multicast.NewMultiConn(hostPort, true, net.ListenPacket) if err != nil { return err } @@ -183,6 +183,42 @@ func (s *udpSource) runReader(pc net.PacketConn) error { return nil }) + case *mpegts.CodecMPEG4Video: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.MPEG4Video{ + PayloadTyp: 96, + }}, + } + + r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { + stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4Video{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + Frame: frame, + }) + return nil + }) + + case *mpegts.CodecMPEG1Video: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.MPEG1Video{}}, + } + + r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { + stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG1Video{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + Frame: frame, + }) + return nil + }) + case *mpegts.CodecOpus: medi = &description.Media{ Type: description.MediaTypeAudio, diff --git a/internal/formatprocessor/h264.go b/internal/formatprocessor/h264.go index ffe0d134b45..90a2ca51ddd 100644 --- a/internal/formatprocessor/h264.go +++ b/internal/formatprocessor/h264.go @@ -13,22 +13,22 @@ import ( ) // extract SPS and PPS without decoding RTP packets -func rtpH264ExtractSPSPPS(pkt *rtp.Packet) ([]byte, []byte) { - if len(pkt.Payload) < 1 { +func rtpH264ExtractSPSPPS(payload []byte) ([]byte, []byte) { + if len(payload) < 1 { return nil, nil } - typ := h264.NALUType(pkt.Payload[0] & 0x1F) + typ := h264.NALUType(payload[0] & 0x1F) switch typ { case h264.NALUTypeSPS: - return pkt.Payload, nil + return payload, nil case h264.NALUTypePPS: - return nil, pkt.Payload + return nil, payload case h264.NALUTypeSTAPA: - payload := pkt.Payload[1:] + payload := payload[1:] var sps []byte var pps []byte @@ -111,19 +111,11 @@ func (t *formatProcessorH264) createEncoder( return t.encoder.Init() } -func (t *formatProcessorH264) updateTrackParametersFromRTPPacket(pkt *rtp.Packet) { - sps, pps := rtpH264ExtractSPSPPS(pkt) - update := false - - if sps != nil && !bytes.Equal(sps, t.format.SPS) { - update = true - } +func (t *formatProcessorH264) updateTrackParametersFromRTPPacket(payload []byte) { + sps, pps := rtpH264ExtractSPSPPS(payload) - if pps != nil && !bytes.Equal(pps, t.format.PPS) { - update = true - } - - if update { + if (sps != nil && !bytes.Equal(sps, t.format.SPS)) || + (pps != nil && !bytes.Equal(pps, t.format.PPS)) { if sps == nil { sps = t.format.SPS } @@ -257,7 +249,7 @@ func (t *formatProcessorH264) ProcessRTPPacket( //nolint:dupl }, } - t.updateTrackParametersFromRTPPacket(pkt) + t.updateTrackParametersFromRTPPacket(pkt.Payload) if t.encoder == nil { // remove padding diff --git a/internal/formatprocessor/h265.go b/internal/formatprocessor/h265.go index f2f9079a120..63bd9a36ebb 100644 --- a/internal/formatprocessor/h265.go +++ b/internal/formatprocessor/h265.go @@ -13,25 +13,25 @@ import ( ) // extract VPS, SPS and PPS without decoding RTP packets -func rtpH265ExtractVPSSPSPPS(pkt *rtp.Packet) ([]byte, []byte, []byte) { - if len(pkt.Payload) < 2 { +func rtpH265ExtractVPSSPSPPS(payload []byte) ([]byte, []byte, []byte) { + if len(payload) < 2 { return nil, nil, nil } - typ := h265.NALUType((pkt.Payload[0] >> 1) & 0b111111) + typ := h265.NALUType((payload[0] >> 1) & 0b111111) switch typ { case h265.NALUType_VPS_NUT: - return pkt.Payload, nil, nil + return payload, nil, nil case h265.NALUType_SPS_NUT: - return nil, pkt.Payload, nil + return nil, payload, nil case h265.NALUType_PPS_NUT: - return nil, nil, pkt.Payload + return nil, nil, payload case h265.NALUType_AggregationUnit: - payload := pkt.Payload[2:] + payload := payload[2:] var vps []byte var sps []byte var pps []byte @@ -55,7 +55,7 @@ func rtpH265ExtractVPSSPSPPS(pkt *rtp.Packet) ([]byte, []byte, []byte) { nalu := payload[:size] payload = payload[size:] - typ = h265.NALUType((pkt.Payload[0] >> 1) & 0b111111) + typ = h265.NALUType((payload[0] >> 1) & 0b111111) switch typ { case h265.NALUType_VPS_NUT: @@ -118,23 +118,12 @@ func (t *formatProcessorH265) createEncoder( return t.encoder.Init() } -func (t *formatProcessorH265) updateTrackParametersFromRTPPacket(pkt *rtp.Packet) { - vps, sps, pps := rtpH265ExtractVPSSPSPPS(pkt) - update := false - - if vps != nil && !bytes.Equal(vps, t.format.VPS) { - update = true - } - - if sps != nil && !bytes.Equal(sps, t.format.SPS) { - update = true - } - - if pps != nil && !bytes.Equal(pps, t.format.PPS) { - update = true - } +func (t *formatProcessorH265) updateTrackParametersFromRTPPacket(payload []byte) { + vps, sps, pps := rtpH265ExtractVPSSPSPPS(payload) - if update { + if (vps != nil && !bytes.Equal(vps, t.format.VPS)) || + (sps != nil && !bytes.Equal(sps, t.format.SPS)) || + (pps != nil && !bytes.Equal(pps, t.format.PPS)) { if vps == nil { vps = t.format.VPS } @@ -279,7 +268,7 @@ func (t *formatProcessorH265) ProcessRTPPacket( //nolint:dupl }, } - t.updateTrackParametersFromRTPPacket(pkt) + t.updateTrackParametersFromRTPPacket(pkt.Payload) if t.encoder == nil { // remove padding diff --git a/internal/formatprocessor/mpeg1_video.go b/internal/formatprocessor/mpeg1_video.go new file mode 100644 index 00000000000..851816e7b87 --- /dev/null +++ b/internal/formatprocessor/mpeg1_video.go @@ -0,0 +1,113 @@ +package formatprocessor //nolint:dupl + +import ( + "fmt" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg1video" + "github.com/pion/rtp" + + "github.com/bluenviron/mediamtx/internal/unit" +) + +type formatProcessorMPEG1Video struct { + udpMaxPayloadSize int + format *format.MPEG1Video + encoder *rtpmpeg1video.Encoder + decoder *rtpmpeg1video.Decoder +} + +func newMPEG1Video( + udpMaxPayloadSize int, + forma *format.MPEG1Video, + generateRTPPackets bool, +) (*formatProcessorMPEG1Video, error) { + t := &formatProcessorMPEG1Video{ + udpMaxPayloadSize: udpMaxPayloadSize, + format: forma, + } + + if generateRTPPackets { + err := t.createEncoder() + if err != nil { + return nil, err + } + } + + return t, nil +} + +func (t *formatProcessorMPEG1Video) createEncoder() error { + t.encoder = &rtpmpeg1video.Encoder{ + PayloadMaxSize: t.udpMaxPayloadSize - 12, + } + return t.encoder.Init() +} + +func (t *formatProcessorMPEG1Video) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.MPEG1Video) + + // encode into RTP + pkts, err := t.encoder.Encode(u.Frame) + if err != nil { + return err + } + + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp = ts + } + + u.RTPPackets = pkts + + return nil +} + +func (t *formatProcessorMPEG1Video) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.MPEG1Video{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + PTS: pts, + }, + } + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() + if err != nil { + return nil, err + } + } + + frame, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtpmpeg1video.ErrNonStartingPacketAndNoPrevious || err == rtpmpeg1video.ErrMorePacketsNeeded { + return u, nil + } + return nil, err + } + + u.Frame = frame + } + + // route packet as is + return u, nil +} diff --git a/internal/formatprocessor/mpeg4_video.go b/internal/formatprocessor/mpeg4_video.go new file mode 100644 index 00000000000..df8f0b74065 --- /dev/null +++ b/internal/formatprocessor/mpeg4_video.go @@ -0,0 +1,154 @@ +package formatprocessor //nolint:dupl + +import ( + "bytes" + "fmt" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg4video" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video" + "github.com/pion/rtp" + + "github.com/bluenviron/mediamtx/internal/unit" +) + +type formatProcessorMPEG4Video struct { + udpMaxPayloadSize int + format *format.MPEG4Video + encoder *rtpmpeg4video.Encoder + decoder *rtpmpeg4video.Decoder +} + +func newMPEG4Video( + udpMaxPayloadSize int, + forma *format.MPEG4Video, + generateRTPPackets bool, +) (*formatProcessorMPEG4Video, error) { + t := &formatProcessorMPEG4Video{ + udpMaxPayloadSize: udpMaxPayloadSize, + format: forma, + } + + if generateRTPPackets { + err := t.createEncoder() + if err != nil { + return nil, err + } + } + + return t, nil +} + +func (t *formatProcessorMPEG4Video) createEncoder() error { + t.encoder = &rtpmpeg4video.Encoder{ + PayloadMaxSize: t.udpMaxPayloadSize - 12, + PayloadType: t.format.PayloadTyp, + } + return t.encoder.Init() +} + +func (t *formatProcessorMPEG4Video) updateTrackParameters(frame []byte) { + if bytes.HasPrefix(frame, []byte{0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode)}) { + end := bytes.Index(frame[4:], []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) + if end < 0 { + return + } + conf := frame[:end+4] + + if !bytes.Equal(conf, t.format.Config) { + t.format.SafeSetParams(conf) + } + } +} + +func (t *formatProcessorMPEG4Video) remuxFrame(frame []byte) []byte { + if bytes.HasPrefix(frame, []byte{0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode)}) { + end := bytes.Index(frame[4:], []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) + if end >= 0 { + frame = frame[end+4:] + } + } + + if bytes.Contains(frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) { + f := make([]byte, len(t.format.Config)+len(frame)) + n := copy(f, t.format.Config) + copy(f[n:], frame) + frame = f + } + + return frame +} + +func (t *formatProcessorMPEG4Video) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.MPEG4Video) + + t.updateTrackParameters(u.Frame) + u.Frame = t.remuxFrame(u.Frame) + + if len(u.Frame) != 0 { + pkts, err := t.encoder.Encode(u.Frame) + if err != nil { + return err + } + + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp = ts + } + + u.RTPPackets = pkts + } + + return nil +} + +func (t *formatProcessorMPEG4Video) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.MPEG4Video{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + PTS: pts, + }, + } + + t.updateTrackParameters(pkt.Payload) + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() + if err != nil { + return nil, err + } + } + + frame, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtpmpeg4video.ErrMorePacketsNeeded { + return u, nil + } + return nil, err + } + + u.Frame = t.remuxFrame(frame) + } + + // route packet as is + return u, nil +} diff --git a/internal/formatprocessor/processor.go b/internal/formatprocessor/processor.go index 6bbfafc65aa..89fe634b6b6 100644 --- a/internal/formatprocessor/processor.go +++ b/internal/formatprocessor/processor.go @@ -39,23 +39,29 @@ func New( generateRTPPackets bool, ) (Processor, error) { switch forma := forma.(type) { - case *format.H264: - return newH264(udpMaxPayloadSize, forma, generateRTPPackets) + case *format.AV1: + return newAV1(udpMaxPayloadSize, forma, generateRTPPackets) - case *format.H265: - return newH265(udpMaxPayloadSize, forma, generateRTPPackets) + case *format.VP9: + return newVP9(udpMaxPayloadSize, forma, generateRTPPackets) case *format.VP8: return newVP8(udpMaxPayloadSize, forma, generateRTPPackets) - case *format.VP9: - return newVP9(udpMaxPayloadSize, forma, generateRTPPackets) + case *format.H265: + return newH265(udpMaxPayloadSize, forma, generateRTPPackets) - case *format.AV1: - return newAV1(udpMaxPayloadSize, forma, generateRTPPackets) + case *format.H264: + return newH264(udpMaxPayloadSize, forma, generateRTPPackets) - case *format.MPEG1Audio: - return newMPEG1Audio(udpMaxPayloadSize, forma, generateRTPPackets) + case *format.MPEG4Video: + return newMPEG4Video(udpMaxPayloadSize, forma, generateRTPPackets) + + case *format.MPEG1Video: + return newMPEG1Video(udpMaxPayloadSize, forma, generateRTPPackets) + + case *format.Opus: + return newOpus(udpMaxPayloadSize, forma, generateRTPPackets) case *format.MPEG4AudioGeneric: return newMPEG4AudioGeneric(udpMaxPayloadSize, forma, generateRTPPackets) @@ -63,8 +69,8 @@ func New( case *format.MPEG4AudioLATM: return newMPEG4AudioLATM(udpMaxPayloadSize, forma, generateRTPPackets) - case *format.Opus: - return newOpus(udpMaxPayloadSize, forma, generateRTPPackets) + case *format.MPEG1Audio: + return newMPEG1Audio(udpMaxPayloadSize, forma, generateRTPPackets) default: return newGeneric(udpMaxPayloadSize, forma, generateRTPPackets) diff --git a/internal/unit/mpeg1_video.go b/internal/unit/mpeg1_video.go new file mode 100644 index 00000000000..a1e11c98b80 --- /dev/null +++ b/internal/unit/mpeg1_video.go @@ -0,0 +1,7 @@ +package unit + +// MPEG1Video is a MPEG-1/2 Video data unit. +type MPEG1Video struct { + Base + Frame []byte +} diff --git a/internal/unit/mpeg4_video.go b/internal/unit/mpeg4_video.go new file mode 100644 index 00000000000..9768c25f3f1 --- /dev/null +++ b/internal/unit/mpeg4_video.go @@ -0,0 +1,7 @@ +package unit + +// MPEG4Video is a MPEG-4 Video data unit. +type MPEG4Video struct { + Base + Frame []byte +}