Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MP4 support for HTTP push. #1429

Merged
merged 8 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ If the broadcast is successful, you should be able to access the stream at:

where the "movie" stream name is taken from the path in the RTMP URL.

See the documentation on [RTMP ingest](doc/ingest.md) for more details.
See the documentation on [RTMP ingest](doc/ingest.md) or [HTTP ingest](doc/ingest.md#http-push) for more details.

#### Authentication of incoming RTMP streams
#### Authentication of incoming streams

Incoming RTMP streams can be authenicating using RTMP Authentication Webhook functionality, details is [here](doc/rtmpwebhookauth.md).
Incoming streams can be authenticated using a webhook. More details in the [webhook docs](doc/rtmpwebhookauth.md).


### Streaming
Expand Down
43 changes: 43 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"math/big"
"math/rand"
"mime"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -34,6 +35,10 @@ const priceScalingFactor = int64(1000)
var (
ErrParseBigInt = fmt.Errorf("failed to parse big integer")
ErrProfile = fmt.Errorf("failed to parse profile")

ErrFormatProto = fmt.Errorf("unknown VideoProfile format for protobufs")
ErrFormatMime = fmt.Errorf("unknown VideoProfile format for mime type")
ErrFormatExt = fmt.Errorf("unknown VideoProfile format for extension")
)

func init() {
Expand Down Expand Up @@ -158,12 +163,22 @@ func FFmpegProfiletoNetProfile(ffmpegProfiles []ffmpeg.VideoProfile) ([]*net.Vid
if name == "" {
name = "ffmpeg_" + DefaultProfileName(width, height, bitrate)
}
format := net.VideoProfile_MPEGTS
switch profile.Format {
case ffmpeg.FormatNone:
case ffmpeg.FormatMPEGTS:
case ffmpeg.FormatMP4:
format = net.VideoProfile_MP4
default:
return nil, ErrFormatProto
}
fullProfile := net.VideoProfile{
Name: name,
Width: int32(width),
Height: int32(height),
Bitrate: int32(bitrate),
Fps: uint32(profile.Framerate),
Format: format,
}
profiles = append(profiles, &fullProfile)
}
Expand Down Expand Up @@ -191,6 +206,34 @@ func ProfilesNames(profiles []ffmpeg.VideoProfile) string {
return strings.Join(names, ",")
}

func ProfileExtensionFormat(ext string) ffmpeg.Format {
p, ok := ffmpeg.ExtensionFormats[ext]
if !ok {
return ffmpeg.FormatNone
}
return p
}

func ProfileFormatExtension(f ffmpeg.Format) (string, error) {
ext, ok := ffmpeg.FormatExtensions[f]
if !ok {
return "", ErrFormatExt
}
return ext, nil
}

func ProfileFormatMimeType(f ffmpeg.Format) (string, error) {
ext, err := ProfileFormatExtension(f)
if err != nil {
return "", err
}
m := mime.TypeByExtension(ext)
if m == "" {
return "", ErrFormatMime
}
return m, nil
}

func GetConnectionAddr(ctx context.Context) string {
from := "unknown"
if p, ok := peer.FromContext(ctx); ok {
Expand Down
64 changes: 64 additions & 0 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,26 @@ func TestFFmpegProfiletoNetProfile(t *testing.T) {
profiles[0].Resolution = ""
fullProfiles, err = FFmpegProfiletoNetProfile(profiles)
assert.Equal(ffmpeg.ErrTranscoderRes, err)
profiles[0].Resolution = "123x456"

// Unset format should be mpegts by default
assert.Equal(profiles[0].Format, ffmpeg.FormatNone)
fullProfiles, err = FFmpegProfiletoNetProfile(profiles)
assert.Nil(err)
assert.Equal(fullProfiles[0].Format, net.VideoProfile_MPEGTS)

profiles[0].Format = ffmpeg.FormatMP4
profiles[1].Format = ffmpeg.FormatMPEGTS
fullProfiles, err = FFmpegProfiletoNetProfile(profiles)
assert.Nil(err)
assert.Equal(fullProfiles[0].Format, net.VideoProfile_MP4)
assert.Equal(fullProfiles[1].Format, net.VideoProfile_MPEGTS)

// Invalid format should return error
profiles[1].Format = -1
fullProfiles, err = FFmpegProfiletoNetProfile(profiles)
assert.Equal(ErrFormatProto, err)
assert.Nil(fullProfiles)
}

func TestProfilesToHex(t *testing.T) {
Expand All @@ -121,6 +141,50 @@ func TestProfilesToHex(t *testing.T) {
compare([]ffmpeg.VideoProfile{ffmpeg.P360p30fps16x9, ffmpeg.P240p30fps16x9})
}

func TestVideoProfile_FormatMimeType(t *testing.T) {
inp := []ffmpeg.Format{ffmpeg.FormatNone, ffmpeg.FormatMPEGTS, ffmpeg.FormatMP4}
exp := []string{"video/mp2t", "video/mp2t", "video/mp4"}
for i, v := range inp {
m, err := ProfileFormatMimeType(v)
m = strings.ToLower(m)
if m != exp[i] || err != nil {
t.Error("Mismatched format; expected ", exp[i], " got ", m)
}
}
if _, err := ProfileFormatMimeType(-1); err != ErrFormatExt {
t.Error("Did not get expected error")
}

// test error with unknown mime type (eg, could be missing from system)
ffmpeg.FormatExtensions[-1] = "invalid"
if _, ok := ffmpeg.FormatExtensions[-1]; !ok {
t.Error("Sanity check failed; did not add extension")
}
if _, err := ProfileFormatMimeType(-1); err != ErrFormatMime {
t.Error("Did not get expected error")
}
delete(ffmpeg.FormatExtensions, -1)
if _, ok := ffmpeg.FormatExtensions[-1]; ok {
t.Error("Sanity check failed; did not clean up extension")
}
}

func TestVideoProfile_FormatExtension(t *testing.T) {
inp := []ffmpeg.Format{ffmpeg.FormatNone, ffmpeg.FormatMPEGTS, ffmpeg.FormatMP4}
exp := []string{".ts", ".ts", ".mp4"}
if len(inp) != len(ffmpeg.FormatExtensions) {
t.Error("Format lengths did not match; missing a new format?")
}
for i, v := range inp {
m, err := ProfileFormatExtension(v)
if m != exp[i] || err != nil {
t.Error("Mismatched format; expected ", exp[i], " got ", m)
}
}
if _, err := ProfileFormatExtension(-1); err != ErrFormatExt {
t.Error("Did not get expected error")
}
}
func TestPriceToFixed(t *testing.T) {
assert := assert.New(t)

Expand Down
7 changes: 6 additions & 1 deletion core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ func StubJobId() int64 {
return int64(1234)
}

var videoProfiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9, ffmpeg.P240p30fps16x9}
var videoProfiles = func() []ffmpeg.VideoProfile {
p := []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9, ffmpeg.P240p30fps16x9}
p[0].Format = ffmpeg.FormatMPEGTS
p[1].Format = ffmpeg.FormatMPEGTS
return p
}()

func TestTranscode(t *testing.T) {
//Set up the node
Expand Down
4 changes: 2 additions & 2 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (n *LivepeerNode) transcodeSeg(config transcodeConfig, seg *stream.HLSSegme
// we may still end up doing work multiple times. But this is OK for now.

//Assume d is in the right format, write it to disk
inName := common.RandName() + ".ts"
inName := common.RandName() + ".tempfile"
if _, err := os.Stat(n.WorkDir); os.IsNotExist(err) {
err := os.Mkdir(n.WorkDir, 0700)
if err != nil {
Expand Down Expand Up @@ -498,7 +498,7 @@ func (n *LivepeerNode) transcodeSeg(config transcodeConfig, seg *stream.HLSSegme
} else {
// Need to store segment in our local OS
var err error
name := fmt.Sprintf("%d.ts", seg.SeqNo)
name := fmt.Sprintf("%d.tempfile", seg.SeqNo)
url, err = config.LocalOS.SaveData(name, seg.Data)
if err != nil {
return terr(err)
Expand Down
2 changes: 1 addition & 1 deletion core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, profi
opts := make([]ffmpeg.TranscodeOptions, len(profiles), len(profiles))
for i := range profiles {
o := ffmpeg.TranscodeOptions{
Oname: fmt.Sprintf("%s/out_%s.ts", workDir, common.RandName()),
Oname: fmt.Sprintf("%s/out_%s.tempfile", workDir, common.RandName()),
Profile: profiles[i],
Accel: accel,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
Expand Down
47 changes: 39 additions & 8 deletions core/transcoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ func TestLocalTranscoder(t *testing.T) {
tc := NewLocalTranscoder(tmp)
ffmpeg.InitFFmpeg()

profiles := []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9, ffmpeg.P240p30fps16x9}
res, err := tc.Transcode("", "test.ts", profiles)
res, err := tc.Transcode("", "test.ts", videoProfiles)
if err != nil {
t.Error("Error transcoding ", err)
}
if len(res.Segments) != len(profiles) {
if len(res.Segments) != len(videoProfiles) {
t.Error("Mismatched results")
}
if Over1Pct(len(res.Segments[0].Data), 164876) {
Expand Down Expand Up @@ -232,7 +231,7 @@ func TestProfilesToTranscodeOptions(t *testing.T) {
profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9}
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, profiles)
assert.Equal(1, len(opts))
assert.Equal("foo/out_bar.ts", opts[0].Oname)
assert.Equal("foo/out_bar.tempfile", opts[0].Oname)
assert.Equal(ffmpeg.Software, opts[0].Accel)
assert.Equal(ffmpeg.P144p30fps16x9, opts[0].Profile)
assert.Equal("copy", opts[0].AudioEncoder.Name)
Expand All @@ -243,7 +242,7 @@ func TestProfilesToTranscodeOptions(t *testing.T) {
assert.Equal(2, len(opts))

for i, p := range profiles {
assert.Equal("foo/out_bar.ts", opts[i].Oname)
assert.Equal("foo/out_bar.tempfile", opts[i].Oname)
assert.Equal(ffmpeg.Software, opts[i].Accel)
assert.Equal(p, opts[i].Profile)
assert.Equal("copy", opts[i].AudioEncoder.Name)
Expand All @@ -254,7 +253,7 @@ func TestProfilesToTranscodeOptions(t *testing.T) {
assert.Equal(2, len(opts))

for i, p := range profiles {
assert.Equal("foo/out_bar.ts", opts[i].Oname)
assert.Equal("foo/out_bar.tempfile", opts[i].Oname)
assert.Equal(ffmpeg.Nvidia, opts[i].Accel)
assert.Equal(p, opts[i].Profile)
assert.Equal("copy", opts[i].AudioEncoder.Name)
Expand All @@ -281,11 +280,43 @@ func TestAudioCopy(t *testing.T) {
_, err := ffmpeg.Transcode3(in, out)
assert.Nil(err)

profs := []ffmpeg.VideoProfile{ffmpeg.P720p30fps16x9} // dummy
res, err := tc.Transcode("", audioSample, profs)
res, err := tc.Transcode("", audioSample, videoProfiles)
assert.Nil(err)

o, err := ioutil.ReadFile(audioSample)
assert.Nil(err)
assert.Equal(o, res.Segments[0].Data)
}

func TestTranscoder_Formats(t *testing.T) {
// Helps ensure the necessary ffmpeg configure options are enabled
assert := assert.New(t)
dir, _ := ioutil.TempDir("", "")
defer os.RemoveAll(dir)

in := &ffmpeg.TranscodeOptionsIn{Fname: "test.ts"}
for k, v := range ffmpeg.ExtensionFormats {
assert.NotEqual(ffmpeg.FormatNone, v) // sanity check

p := ffmpeg.P144p30fps16x9 // make a copy bc we mutate the profile
p.Format = v

// use LPMS api directly so we can transcode ; faster
out := []ffmpeg.TranscodeOptions{{
Oname: dir + "/tmp" + k,
Profile: p,
VideoEncoder: ffmpeg.ComponentOptions{Name: "copy"},
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
}}
_, err := ffmpeg.Transcode3(in, out)
assert.Nil(err)

// check output is reasonable
ofile, err := ioutil.ReadFile(out[0].Oname)
assert.Nil(err)
assert.Greater(len(ofile), 500000) // large enough for "valid output"
// Assume that since the file exists, the actual format is correct
}
// sanity check the base format wasn't overwritten (has happened before!)
assert.Equal(ffmpeg.FormatNone, ffmpeg.P144p30fps16x9.Format)
}
50 changes: 43 additions & 7 deletions doc/ingest.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,33 @@ into the Livepeer network. Upon ingest, HTTP stream is pushed to the segmenter
prior to transcoding. The stream can be pushed via a PUT or POST HTTP request to the
`/live/` endpoint. HTTP request timeout is 8 seconds.

The body of the request should be the binary data of the MPEG TS segment.
The body of the request should be the binary data of the video segment.

Two HTTP headers should be provided:
* `Content-Resolution` - in the format `widthxheight`, for example: `1920x1080`.
* `Content-Duration` - duration of the segment, in milliseconds. Should be an integer.
If Content-Duration is missing, 2000ms is assumed by default.

The upload URL should have this structure:

```
http://broadcasters:8935/live/movie/12.ts
```

Where `movie` is name of the stream and `12` is the sequence number of the segment.

The HLS manifest will be available at
The HLS manifest will be available at:

```
http://broadcasters:8935/stream/movie.m3u8
```

MPEG TS and MP4 are supported as formats. To receive results as MP4, upload the
segment to a path ending with ".mp4" rather than ".ts", such as:

```
http://broadcasters:8935/live/movie/14.mp4
```

Possble statuses returned by HTTP request:
- 500 Internal Server Error - in case there was error during segment's transcode
Expand All @@ -125,20 +139,24 @@ Each part will also contain these headers:
Sample URLs and requests:

```
# Push URL
# Push URL, MPEG TS
http://localhost:8935/live/movie/12.ts

# HLS Playback URL
http://localhost:8935/stream/movie.m3u8

# Curl request
# Curl request, MPEG TS
curl -X PUT -H "Accept: multipart/mixed" -H "Content-Duration: 2000" -H "Content-Resolution: 1920x1080" --data-binary "@bbb0.ts" http://localhost:8935/live/movie/0.ts

# FFMPEG request
# Curl request, MP4
curl -X PUT -H "Accept: multipart/mixed" -H "Content-Duration: 2000" -H "Content-Resolution: 1920x1080" --data-binary "@bbb1.ts" http://localhost:8935/live/movie/1.mp4

# HTTP push via FFmpeg
# (ffmpeg produces 2s segments by default; Content-Duration header will be missing but go-livepeer will presume 2s)
ffmpeg -re -i movie.mp4 -c:a copy -c:v copy -f hls http://localhost:8935/live/movie/
```

Exapmle responses:
Example responses:

```
HTTP/1.1 200 OK
Expand Down Expand Up @@ -166,7 +184,25 @@ Transfer-Encoding: chunked
Content-Disposition: attachment; filename="P240p30fps16x9_10.ts"
Content-Length: 105656
Content-Type: video/MP2T
Rendition-Name: P240p30fps16x9^M
Rendition-Name: P240p30fps16x9

Binary data here

--94eaf473f7957940e066--

```

```
HTTP/1.1 200 OK
Content-Type: multipart/mixed; boundary=94eaf473f7957940e066
Date: Fri, 31 Jan 2020 00:04:40 GMT
Transfer-Encoding: chunked

--94eaf473f7957940e066
Content-Disposition: attachment; filename="P240p30fps16x9_10.mp4"
Content-Length: 105656
Content-Type: video/mp4
Rendition-Name: P240p30fps16x9

Binary data here

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/influxdata/influxdb v1.7.8 // indirect
github.com/jackpal/go-nat-pmp v1.0.1 // indirect
github.com/karalabe/usb v0.0.0-20190919080040-51dc0efba356 // indirect
github.com/livepeer/lpms v0.0.0-20200110164555-e34a4737b857
github.com/livepeer/lpms v0.0.0-20200406200332-99fb05c90e86
github.com/livepeer/m3u8 v0.11.0
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mattn/go-sqlite3 v1.11.0
Expand Down
Loading