Skip to content

Commit

Permalink
implement native recording (#1399)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Aug 27, 2023
1 parent 8d2e9e9 commit 3ef750f
Show file tree
Hide file tree
Showing 34 changed files with 1,464 additions and 462 deletions.
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ And can be read from the server with:
* Read live streams from the server
* Streams are automatically converted from a protocol to another. For instance, it's possible to publish a stream with RTSP and read it with HLS
* Serve multiple streams at once in separate paths
* Record streams to disk
* Authenticate users; use internal or external authentication
* Redirect readers to other RTSP servers (load balancing)
* Query and control the server through the API
Expand Down Expand Up @@ -106,7 +107,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi
* [Authentication](#authentication)
* [Encrypt the configuration](#encrypt-the-configuration)
* [Remuxing, re-encoding, compression](#remuxing-re-encoding-compression)
* [Save streams to disk](#save-streams-to-disk)
* [Record streams to disk](#record-streams-to-disk)
* [Forward streams to another server](#forward-streams-to-another-server)
* [On-demand publishing](#on-demand-publishing)
* [Start on boot](#start-on-boot)
Expand Down Expand Up @@ -1136,21 +1137,20 @@ paths:
runOnReadyRestart: yes
```
### Save streams to disk
### Record streams to disk
To save available streams to disk, use _FFmpeg_ inside the `runOnReady` parameter:
To save available streams to disk, set the `record` and the `recordPath` parameter in the configuration file:
```yml
paths:
all:
runOnReady: >
ffmpeg -i rtsp://localhost:$RTSP_PORT/$MTX_PATH
-c copy
-f segment -strftime 1 -segment_time 60 -segment_format mpegts saved_%Y-%m-%d_%H-%M-%S.ts
runOnReadyRestart: yes
# Record streams to disk.
record: yes
# Path of recording segments.
# Extension is added automatically.
# Available variables are %path (path name), %Y %m %d %H %M %S (time in strftime format)
recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S
```
In the configuration above, streams are saved in MPEG-TS format, that is resilient to system crashes.
All available recording parameters are listed in the [sample configuration file](/mediamtx.yml).
### Forward streams to another server
Expand Down
35 changes: 24 additions & 11 deletions apidocs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ components:
Conf:
type: object
properties:
# general
# General
logLevel:
type: string
logDestinations:
Expand Down Expand Up @@ -169,13 +169,27 @@ components:
webrtcICETCPMuxAddress:
type: string

# srt
# SRT
srt:
type: boolean
srtAddress:
type: string

# paths
# Record
record:
type: boolean
recordPath:
type: string
recordFormat:
type: string
recordPartDuration:
type: string
recordSegmentDuration:
type: string
recordDeleteAfter:
type: string

# Paths
paths:
type: object
additionalProperties:
Expand All @@ -184,10 +198,9 @@ components:
PathConf:
type: object
properties:
# General
source:
type: string

# general
sourceFingerprint:
type: string
sourceOnDemand:
Expand All @@ -199,7 +212,7 @@ components:
maxReaders:
type: number

# authentication
# Authentication
publishUser:
type: string
publishPass:
Expand All @@ -217,13 +230,13 @@ components:
items:
type: string

# publisher
# Publisher
overridePublisher:
type: boolean
fallback:
type: string

# rtsp
# RTSP
sourceProtocol:
type: string
sourceAnyPortEnable:
Expand All @@ -233,11 +246,11 @@ components:
rtspRangeStart:
type: string

# redirect
# Redirect
sourceRedirect:
type: string

# raspberry pi camera
# Raspberry Pi Camera
rpiCameraCamID:
type: integer
rpiCameraWidth:
Expand Down Expand Up @@ -303,7 +316,7 @@ components:
rpiCameraTextOverlay:
type: string

# external commands
# External commands
runOnInit:
type: string
runOnInitRestart:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package core
// Package asyncwriter contains an asynchronous writer.
package asyncwriter

import (
"fmt"
Expand All @@ -8,45 +9,50 @@ import (
"github.com/bluenviron/mediamtx/internal/logger"
)

type asyncWriter struct {
// Writer is an asynchronous writer.
type Writer struct {
writeErrLogger logger.Writer
buffer *ringbuffer.RingBuffer

// out
err chan error
}

func newAsyncWriter(
// New allocates a Writer.
func New(
queueSize int,
parent logger.Writer,
) *asyncWriter {
) *Writer {
buffer, _ := ringbuffer.New(uint64(queueSize))

return &asyncWriter{
writeErrLogger: newLimitedLogger(parent),
return &Writer{
writeErrLogger: logger.NewLimitedLogger(parent),
buffer: buffer,
err: make(chan error),
}
}

func (w *asyncWriter) start() {
// Start starts the writer routine.
func (w *Writer) Start() {
go w.run()
}

func (w *asyncWriter) stop() {
// Stop stops the writer routine.
func (w *Writer) Stop() {
w.buffer.Close()
<-w.err
}

func (w *asyncWriter) error() chan error {
// Error returns whenever there's an error.
func (w *Writer) Error() chan error {
return w.err
}

func (w *asyncWriter) run() {
func (w *Writer) run() {
w.err <- w.runInner()
}

func (w *asyncWriter) runInner() error {
func (w *Writer) runInner() error {
for {
cb, ok := w.buffer.Pull()
if !ok {
Expand All @@ -60,7 +66,8 @@ func (w *asyncWriter) runInner() error {
}
}

func (w *asyncWriter) push(cb func() error) {
// Push appends an element to the queue.
func (w *Writer) Push(cb func() error) {
ok := w.buffer.Push(cb)
if !ok {
w.writeErrLogger.Log(logger.Warn, "write queue is full")
Expand Down
28 changes: 27 additions & 1 deletion internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ type Conf struct {
SRT bool `json:"srt"`
SRTAddress string `json:"srtAddress"`

// Record
Record bool `json:"record"`
RecordPath string `json:"recordPath"`
RecordFormat string `json:"recordFormat"`
RecordPartDuration StringDuration `json:"recordPartDuration"`
RecordSegmentDuration StringDuration `json:"recordSegmentDuration"`
RecordDeleteAfter StringDuration `json:"recordDeleteAfter"`

// Paths
Paths map[string]*PathConf `json:"paths"`
}
Expand Down Expand Up @@ -218,7 +226,8 @@ func (conf Conf) Clone() *Conf {

// Check checks the configuration for errors.
func (conf *Conf) Check() error {
// general
// General

if conf.ReadBufferCount != 0 {
conf.WriteQueueSize = conf.ReadBufferCount
}
Expand All @@ -240,6 +249,7 @@ func (conf *Conf) Check() error {
}

// RTSP

if conf.RTSPDisable {
conf.RTSP = false
}
Expand All @@ -253,16 +263,19 @@ func (conf *Conf) Check() error {
}

// RTMP

if conf.RTMPDisable {
conf.RTMP = false
}

// HLS

if conf.HLSDisable {
conf.HLS = false
}

// WebRTC

if conf.WebRTCDisable {
conf.WebRTC = false
}
Expand All @@ -289,6 +302,12 @@ func (conf *Conf) Check() error {
}
}

// Record

if conf.RecordFormat != "fmp4" {
return fmt.Errorf("unsupported record format '%s'", conf.RecordFormat)
}

Check warning on line 309 in internal/conf/conf.go

View check run for this annotation

Codecov / codecov/patch

internal/conf/conf.go#L308-L309

Added lines #L308 - L309 were not covered by tests

// do not add automatically "all", since user may want to
// initialize all paths through API or hot reloading.
if conf.Paths == nil {
Expand Down Expand Up @@ -374,6 +393,13 @@ func (conf *Conf) UnmarshalJSON(b []byte) error {
conf.SRT = true
conf.SRTAddress = ":8890"

// Record
conf.RecordPath = "./recordings/%path/%Y-%m-%d_%H-%M-%S"
conf.RecordFormat = "fmp4"
conf.RecordPartDuration = 100 * StringDuration(time.Millisecond)
conf.RecordSegmentDuration = 3600 * StringDuration(time.Second)
conf.RecordDeleteAfter = 24 * 3600 * StringDuration(time.Second)

type alias Conf
d := json.NewDecoder(bytes.NewReader(b))
d.DisallowUnknownFields()
Expand Down
Loading

0 comments on commit 3ef750f

Please sign in to comment.