Skip to content

Commit

Permalink
fixup! WIP: server: Take MP4s as input for HTTP push
Browse files Browse the repository at this point in the history
  • Loading branch information
j0sh committed Mar 27, 2020
1 parent 0300639 commit 9782da1
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 4 deletions.
7 changes: 4 additions & 3 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ func createRTMPStreamIDHandler(s *LivepeerServer) func(url *url.URL) (strmID str
key = common.RandomIDGenerator(StreamKeyBytes)
}
return &streamParameters{
mid: mid,
rtmpKey: key,
profiles: profiles,
mid: mid,
rtmpKey: key,
// HTTP push mutates `profiles` so make a copy of it
profiles: append([]ffmpeg.VideoProfile(nil), profiles...),
}
}
}
Expand Down
234 changes: 233 additions & 1 deletion server/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/livepeer/go-livepeer/drivers"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/lpms/ffmpeg"
"github.com/livepeer/lpms/vidplayer"
)

func requestSetup(s *LivepeerServer) (http.Handler, *strings.Reader, *httptest.ResponseRecorder) {
Expand Down Expand Up @@ -246,6 +247,237 @@ func TestShouldUpdateLastUsed(t *testing.T) {
assert.True(lu.Before(s.rtmpConnections["mani1"].lastUsed))
}

func TestPush_MP4(t *testing.T) {

// Do a bunch of setup. Would be nice to simplify this one day...
assert := assert.New(t)
s := setupServer()
defer serverCleanup(s)
s.rtmpConnections = map[core.ManifestID]*rtmpConnection{}
defer func() { s.rtmpConnections = map[core.ManifestID]*rtmpConnection{} }()
segHandler := getHLSSegmentHandler(s)
ts, mux := stubTLSServer()
defer ts.Close()

// sometimes LivepeerServer needs time to start
// esp if this is the only test in the suite being run (eg, via `-run)
time.Sleep(10 * time.Millisecond)

oldProfs := BroadcastJobVideoProfiles
defer func() { BroadcastJobVideoProfiles = oldProfs }()
BroadcastJobVideoProfiles = []ffmpeg.VideoProfile{ffmpeg.P720p25fps16x9}

sd := &stubDiscovery{}
sd.infos = []*net.OrchestratorInfo{&net.OrchestratorInfo{Transcoder: ts.URL}}
s.LivepeerNode.OrchestratorPool = sd

dummyRes := func(tSegData []*net.TranscodedSegmentData) *net.TranscodeResult {
return &net.TranscodeResult{
Result: &net.TranscodeResult_Data{
Data: &net.TranscodeData{
Segments: tSegData,
},
},
}
}
segPath := "/random"
tSegData := []*net.TranscodedSegmentData{
&net.TranscodedSegmentData{Url: ts.URL + segPath, Pixels: 100},
}
tr := dummyRes(tSegData)
buf, err := proto.Marshal(tr)
require.Nil(t, err)

mux.HandleFunc("/segment", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(buf)
})
mux.HandleFunc(segPath, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("transcoded binary data"))
})

// Check default response: should be empty, with OS populated
handler, reader, writer := requestSetup(s)
reader = strings.NewReader("a video file goes here")
req := httptest.NewRequest("POST", "/live/name/1.mp4", reader)
handler.ServeHTTP(writer, req)
resp := writer.Result()
defer resp.Body.Close()
assert.Equal(200, resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
assert.Nil(err)
assert.Empty(body)
// Check OS for source
vpath, err := url.Parse("/stream/name/source/1.mp4")
assert.Nil(err)
body, err = segHandler(vpath)
assert.Nil(err)
assert.Equal("a video file goes here", string(body))
// Check OS for transcoded rendition
vpath, err = url.Parse("/stream/name/P720p25fps16x9/1.mp4")
assert.Nil(err)
body, err = segHandler(vpath)
assert.Nil(err)
assert.Equal("transcoded binary data", string(body))
// Sanity check version with mpegts extension doesn't exist
vpath, err = url.Parse("/stream/name/source/1.ts")
assert.Nil(err)
body, err = segHandler(vpath)
assert.Equal(vidplayer.ErrNotFound, err)
assert.Empty(body)
vpath, err = url.Parse("/stream/name/P720p25fps16x9/1.ts")
assert.Nil(err)
body, err = segHandler(vpath)
assert.Equal(vidplayer.ErrNotFound, err)
assert.Empty(body)
// We can't actually test the returned content type here.
// (That is handled within LPMS, so assume it's fine from here.)

// Check multipart response for MP4s
reader = strings.NewReader("a new video goes here")
writer = httptest.NewRecorder()
req = httptest.NewRequest("POST", "/live/name/2.mp4", reader)
req.Header.Set("Accept", "multipart/mixed")
handler.ServeHTTP(writer, req)
resp = writer.Result()
assert.Equal(200, resp.StatusCode)
mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
assert.Equal("multipart/mixed", mediaType)
assert.Nil(err)
mr := multipart.NewReader(resp.Body, params["boundary"])
i := 0
for {
p, err := mr.NextPart()
if err == io.EOF {
break
}
assert.NoError(err)
mediaType, _, err := mime.ParseMediaType(p.Header.Get("Content-Type"))
assert.Nil(err)
assert.Equal(`attachment; filename="P720p25fps16x9_2.mp4"`, p.Header.Get("Content-Disposition"))
assert.Equal("P720p25fps16x9", p.Header.Get("Rendition-Name"))
bodyPart, err := ioutil.ReadAll(p)
assert.Nil(err)
assert.Equal("video/mp4", mediaType)
assert.Equal("transcoded binary data", string(bodyPart))

i++
}
assert.Equal(1, i)

// Check formats
for _, cxn := range s.rtmpConnections {
assert.Equal(ffmpeg.MP4, cxn.profile.Format)
for _, p := range cxn.params.profiles {
assert.Equal(ffmpeg.MP4, p.Format)
}
}
}

func TestPush_SetVideoProfileFormats(t *testing.T) {
assert := assert.New(t)
s := setupServer()
defer serverCleanup(s)
// sometimes LivepeerServer needs time to start
// esp if this is the only test in the suite being run (eg, via `-run)
time.Sleep(10 * time.Millisecond)
s.rtmpConnections = map[core.ManifestID]*rtmpConnection{}
defer func() { s.rtmpConnections = map[core.ManifestID]*rtmpConnection{} }()

oldProfs := BroadcastJobVideoProfiles
defer func() { BroadcastJobVideoProfiles = oldProfs }()
BroadcastJobVideoProfiles = []ffmpeg.VideoProfile{ffmpeg.P720p25fps16x9, ffmpeg.P720p60fps16x9}

// Base case, mpegts
h, r, w := requestSetup(s)
req := httptest.NewRequest("POST", "/live/seg/0.ts", r)
h.ServeHTTP(w, req)
resp := w.Result()
defer resp.Body.Close()

assert.Len(s.rtmpConnections, 1)
for _, cxn := range s.rtmpConnections {
assert.Equal(ffmpeg.MPEGTS, cxn.profile.Format)
assert.Len(cxn.params.profiles, 2)
assert.Len(BroadcastJobVideoProfiles, 2)
for i, p := range cxn.params.profiles {
assert.Equal(ffmpeg.MPEGTS, p.Format)
// HTTP push mutates the profiles, causing undesirable changes to
// the default set of broadcast profiles that persist to subsequent
// streams. Make sure this doesn't happen!
assert.Equal(ffmpeg.FormatNone, BroadcastJobVideoProfiles[i].Format)
}
}

// Sending a MP4 under the same stream name doesn't change assigned profiles
h, r, w = requestSetup(s)
req = httptest.NewRequest("POST", "/live/seg/1.ts", r)
h.ServeHTTP(w, req)
resp = w.Result()
defer resp.Body.Close()

assert.Len(s.rtmpConnections, 1)
for _, cxn := range s.rtmpConnections {
assert.Equal(ffmpeg.MPEGTS, cxn.profile.Format)
assert.Len(cxn.params.profiles, 2)
assert.Len(BroadcastJobVideoProfiles, 2)
for i, p := range cxn.params.profiles {
assert.Equal(ffmpeg.MPEGTS, p.Format)
assert.Equal(ffmpeg.FormatNone, BroadcastJobVideoProfiles[i].Format)
}
}

// Sending a MP4 under a new stream name sets the profile correctly
h, r, w = requestSetup(s)
req = httptest.NewRequest("POST", "/live/new/0.mp4", r)
h.ServeHTTP(w, req)
resp = w.Result()
defer resp.Body.Close()

assert.Len(s.rtmpConnections, 2)
cxn, ok := s.rtmpConnections["new"]
assert.True(ok, "stream did not exist")
assert.Equal(ffmpeg.MP4, cxn.profile.Format)
assert.Len(cxn.params.profiles, 2)
assert.Len(BroadcastJobVideoProfiles, 2)
for i, p := range cxn.params.profiles {
assert.Equal(ffmpeg.MP4, p.Format)
assert.Equal(ffmpeg.FormatNone, BroadcastJobVideoProfiles[i].Format)
}

// Sanity check that default profile with webhook is copied
// Checking since there is special handling for the default set of profiles
// within the webhook hander.
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
auth := authWebhookResponse{ManifestID: "web"}
val, err := json.Marshal(auth)
assert.Nil(err, "invalid auth webhook response")
w.Write(val)
}))
defer ts.Close()
oldURL := AuthWebhookURL
defer func() { AuthWebhookURL = oldURL }()
AuthWebhookURL = ts.URL

h, r, w = requestSetup(s)
req = httptest.NewRequest("POST", "/live/web/0.mp4", r)
h.ServeHTTP(w, req)
resp = w.Result()
defer resp.Body.Close()

assert.Len(s.rtmpConnections, 3)
cxn, ok = s.rtmpConnections["web"]
assert.True(ok, "stream did not exist")
assert.Equal(ffmpeg.MP4, cxn.profile.Format)
assert.Len(cxn.params.profiles, 2)
assert.Len(BroadcastJobVideoProfiles, 2)
for i, p := range cxn.params.profiles {
assert.Equal(ffmpeg.MP4, p.Format)
assert.Equal(ffmpeg.FormatNone, BroadcastJobVideoProfiles[i].Format)
}
}

func ignoreRoutines() []goleak.Option {
// goleak works by making list of all running goroutines and reporting error if it finds any
// this list tells goleak to ignore these goroutines - we're not interested in these particular goroutines
Expand All @@ -255,7 +487,7 @@ func ignoreRoutines() []goleak.Option {
"internal/poll.runtime_pollWait", "github.com/livepeer/go-livepeer/core.(*RemoteTranscoderManager).Manage", "github.com/livepeer/lpms/core.(*LPMS).Start",
"github.com/livepeer/go-livepeer/server.(*LivepeerServer).StartMediaServer", "github.com/livepeer/go-livepeer/core.(*RemoteTranscoderManager).Manage.func1",
"github.com/livepeer/go-livepeer/server.(*LivepeerServer).HandlePush.func1", "github.com/rjeczalik/notify.(*nonrecursiveTree).dispatch",
"github.com/rjeczalik/notify.(*nonrecursiveTree).internal"}
"github.com/rjeczalik/notify.(*nonrecursiveTree).internal", "github.com/livepeer/lpms/stream.NewBasicRTMPVideoStream.func1"}

res := make([]goleak.Option, 0, len(funcs2ignore))
for _, f := range funcs2ignore {
Expand Down

0 comments on commit 9782da1

Please sign in to comment.