Skip to content

Commit

Permalink
Implement static preamble (#43)
Browse files Browse the repository at this point in the history
* Add script to generate silent, black videos

* Implement static preamble from file

* Fix json tag

* Fix logs

* Shorten preamble generator to 1s

* Document preamble option in config example
  • Loading branch information
srgoni authored Jun 29, 2023
1 parent 8064c35 commit 6a701ca
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 6 deletions.
1 change: 1 addition & 0 deletions cmd/restreamer/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
errorMainMissingNotificationUser = "missing_notification_user"
errorMainMissingStreamUser = "missing_stream_user"
errorMainInvalidAuthentication = "invalid_authentication"
errorMainPreambleRead = "preamble_read"
)

var logger = util.NewGlobalModuleLogger(moduleMain, nil)
21 changes: 21 additions & 0 deletions cmd/restreamer/restreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/onitake/restreamer/metrics"
"github.com/onitake/restreamer/streaming"
"github.com/onitake/restreamer/util"
"io"
"log"
"math/rand"
"net/http"
Expand Down Expand Up @@ -162,6 +163,26 @@ func main() {
streamer.SetCollector(reg)
streamer.SetNotifier(queue)

if streamdef.Preamble != "" {
prein, err := os.Open(streamdef.Preamble)
if err != nil {
logger.Logkv(
"event", eventMainError,
"error", errorMainPreambleRead,
"message", fmt.Sprintf("Cannot open preamble file: %v", err),
)
}
preamble, err := io.ReadAll(prein)
if err != nil {
logger.Logkv(
"event", eventMainError,
"error", errorMainPreambleRead,
"message", fmt.Sprintf("Cannot read preamble file: %v", err),
)
}
streamer.SetPreamble(preamble)
}

// shuffle the list here, not later
// should give a bit more randomness
remotes := util.ShuffleStrings(rnd, streamdef.Remotes)
Expand Down
6 changes: 6 additions & 0 deletions configuration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ type Resource struct {
// Mru (maximum receive unit) is the size of the datagram receive buffer.
// Only used for UDP and RTP protocols.
Mru uint `json:"mru"`
// Preamble specifies the name of a file containing a static preamble, that is sent to each client before
// actual data is streamed. It can be used to synchronize the decoder quickly, instead of needing to wait for
// the next PAT, PMT, SPS and PPS packets.
// Make sure that the format of the preamble content matches the stream, or you will end up with badly
// configured decoder!
Preamble string `json:"preamble"`
}

// UserCredentials is a set of credentials for a single user
Expand Down
5 changes: 3 additions & 2 deletions event/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,10 @@ func (reporter *Queue) handleHeartbeat(when time.Time) {
func (reporter *Queue) handleConnect(connected int) {
logger.Logkv(
"event", queueEventConnect,
"message", fmt.Sprintf("Number of connections changed by %d, new number of connections: %d", connected, reporter.connections),
"message", fmt.Sprintf("Number of connections changed by %d, current number %d, new number %d", connected, reporter.connections, reporter.connections+connected),
"connected", connected,
"connections", reporter.connections,
"current_connections", reporter.connections,
"new_connections", reporter.connections+connected,
)
// calculate the new connection count
var newconn int
Expand Down
4 changes: 4 additions & 0 deletions examples/documented/restreamer.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@
"": "Maximum receive unit, the packet size for datagram sockets (UDP).",
"": "This value is important, because individual datagrams can only be received as a whole. Excess data is discarded.",
"mru": 1500,
"": "Specify a file name to a static preamble that will be sent to each newly connected client.",
"": "This can help when a decoder isn't capable of initializing in the middle of a transmission,",
"": "but it can also make things much worse. You have been warned.",
"preamble": "preamble.ts",
"": "Access control for this resource. If not present, no authentication is necessary.",
"": "Otherwise, an authentication token that matches one of the users is required.",
"authentication": {
Expand Down
9 changes: 9 additions & 0 deletions genpreamble.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/sh

ffmpeg \
-use_wallclock_as_timestamps 1 \
-t 1 -r 50 -f lavfi -i color=c=black:s=1280x720 \
-t 1 -f lavfi -i anullsrc=r=48000:cl=stereo \
-pix_fmt:v yuv420p -color_range:v tv -color_primaries:v bt709 -colorspace bt709 -color_trc bt709 -profile:v high -b:v 3500000 -c:v libx264 \
-b:a 128000 -c:a aac \
-f mpegts -y out.ts
22 changes: 19 additions & 3 deletions streaming/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package streaming

import (
"context"
"fmt"
"github.com/onitake/restreamer/protocol"
"net/http"
"time"
Expand Down Expand Up @@ -57,7 +58,9 @@ func NewConnection(destination http.ResponseWriter, qsize int, clientaddr string
}

// Serve starts serving data to a client, continuously feeding packets from the queue.
func (conn *Connection) Serve() {
// An optional preamble buffer can be passed that will be sent before streaming the live payload
// (but after the HTTP response headers).
func (conn *Connection) Serve(preamble []byte) {
// set the content type (important)
conn.writer.Header().Set("Content-Type", "video/mpeg")
// a stream is always current
Expand Down Expand Up @@ -85,8 +88,21 @@ func (conn *Connection) Serve() {
"message", "Sent header",
)

// start reading packets
running := true

// send the preamble
if len(preamble) > 0 {
_, err := conn.writer.Write(preamble)
if err != nil {
logger.Logkv(
"event", eventConnectionClosed,
"message", "Downstream connection closed during preamble",
)
running = false
}
}

// start reading packets
for running {
select {
case packet, ok := <-conn.Queue:
Expand Down Expand Up @@ -120,7 +136,7 @@ func (conn *Connection) Serve() {
logger.Logkv(
"event", eventConnectionClosedWait,
"message", "Downstream connection closed (while waiting)",
"error", conn.context.Err(),
"error", fmt.Sprintf("%v", conn.context.Err()),
)
running = false
}
Expand Down
8 changes: 7 additions & 1 deletion streaming/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ type Streamer struct {
auth auth.Authenticator
// promCounter allows enabling/disabling Prometheus packet metrics.
promCounter bool
// preamble contains a static preamble that is sent before the actual streamed data
preamble []byte
}

// ConnectionBroker represents a policy handler for new connections.
Expand Down Expand Up @@ -205,6 +207,10 @@ func (streamer *Streamer) SetNotifier(events event.Notifiable) {
streamer.events = events
}

func (streamer *Streamer) SetPreamble(preamble []byte) {
streamer.preamble = preamble
}

func (streamer *Streamer) SetInhibit(inhibit bool) {
if inhibit {
streamer.request <- &ConnectionRequest{
Expand Down Expand Up @@ -446,7 +452,7 @@ func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Re
)

start := time.Now()
conn.Serve()
conn.Serve(streamer.preamble)
duration := time.Since(start)

// done, remove the stale connection
Expand Down

0 comments on commit 6a701ca

Please sign in to comment.