From 9782da1ac8894ab0e82863343ac952cb0c635158 Mon Sep 17 00:00:00 2001 From: Josh Allmann Date: Thu, 26 Mar 2020 22:47:27 -0700 Subject: [PATCH] fixup! WIP: server: Take MP4s as input for HTTP push --- server/mediaserver.go | 7 +- server/push_test.go | 234 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 237 insertions(+), 4 deletions(-) diff --git a/server/mediaserver.go b/server/mediaserver.go index 5193c09144..edcda2dfeb 100644 --- a/server/mediaserver.go +++ b/server/mediaserver.go @@ -251,9 +251,10 @@ func createRTMPStreamIDHandler(s *LivepeerServer) func(url *url.URL) (strmID str key = common.RandomIDGenerator(StreamKeyBytes) } return &streamParameters{ - mid: mid, - rtmpKey: key, - profiles: profiles, + mid: mid, + rtmpKey: key, + // HTTP push mutates `profiles` so make a copy of it + profiles: append([]ffmpeg.VideoProfile(nil), profiles...), } } } diff --git a/server/push_test.go b/server/push_test.go index 9b208ef55e..ae24a9d06d 100644 --- a/server/push_test.go +++ b/server/push_test.go @@ -24,6 +24,7 @@ import ( "github.com/livepeer/go-livepeer/drivers" "github.com/livepeer/go-livepeer/net" "github.com/livepeer/lpms/ffmpeg" + "github.com/livepeer/lpms/vidplayer" ) func requestSetup(s *LivepeerServer) (http.Handler, *strings.Reader, *httptest.ResponseRecorder) { @@ -246,6 +247,237 @@ func TestShouldUpdateLastUsed(t *testing.T) { assert.True(lu.Before(s.rtmpConnections["mani1"].lastUsed)) } +func TestPush_MP4(t *testing.T) { + + // Do a bunch of setup. Would be nice to simplify this one day... + assert := assert.New(t) + s := setupServer() + defer serverCleanup(s) + s.rtmpConnections = map[core.ManifestID]*rtmpConnection{} + defer func() { s.rtmpConnections = map[core.ManifestID]*rtmpConnection{} }() + segHandler := getHLSSegmentHandler(s) + ts, mux := stubTLSServer() + defer ts.Close() + + // sometimes LivepeerServer needs time to start + // esp if this is the only test in the suite being run (eg, via `-run) + time.Sleep(10 * time.Millisecond) + + oldProfs := BroadcastJobVideoProfiles + defer func() { BroadcastJobVideoProfiles = oldProfs }() + BroadcastJobVideoProfiles = []ffmpeg.VideoProfile{ffmpeg.P720p25fps16x9} + + sd := &stubDiscovery{} + sd.infos = []*net.OrchestratorInfo{&net.OrchestratorInfo{Transcoder: ts.URL}} + s.LivepeerNode.OrchestratorPool = sd + + dummyRes := func(tSegData []*net.TranscodedSegmentData) *net.TranscodeResult { + return &net.TranscodeResult{ + Result: &net.TranscodeResult_Data{ + Data: &net.TranscodeData{ + Segments: tSegData, + }, + }, + } + } + segPath := "/random" + tSegData := []*net.TranscodedSegmentData{ + &net.TranscodedSegmentData{Url: ts.URL + segPath, Pixels: 100}, + } + tr := dummyRes(tSegData) + buf, err := proto.Marshal(tr) + require.Nil(t, err) + + mux.HandleFunc("/segment", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write(buf) + }) + mux.HandleFunc(segPath, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("transcoded binary data")) + }) + + // Check default response: should be empty, with OS populated + handler, reader, writer := requestSetup(s) + reader = strings.NewReader("a video file goes here") + req := httptest.NewRequest("POST", "/live/name/1.mp4", reader) + handler.ServeHTTP(writer, req) + resp := writer.Result() + defer resp.Body.Close() + assert.Equal(200, resp.StatusCode) + body, err := ioutil.ReadAll(resp.Body) + assert.Nil(err) + assert.Empty(body) + // Check OS for source + vpath, err := url.Parse("/stream/name/source/1.mp4") + assert.Nil(err) + body, err = segHandler(vpath) + assert.Nil(err) + assert.Equal("a video file goes here", string(body)) + // Check OS for transcoded rendition + vpath, err = url.Parse("/stream/name/P720p25fps16x9/1.mp4") + assert.Nil(err) + body, err = segHandler(vpath) + assert.Nil(err) + assert.Equal("transcoded binary data", string(body)) + // Sanity check version with mpegts extension doesn't exist + vpath, err = url.Parse("/stream/name/source/1.ts") + assert.Nil(err) + body, err = segHandler(vpath) + assert.Equal(vidplayer.ErrNotFound, err) + assert.Empty(body) + vpath, err = url.Parse("/stream/name/P720p25fps16x9/1.ts") + assert.Nil(err) + body, err = segHandler(vpath) + assert.Equal(vidplayer.ErrNotFound, err) + assert.Empty(body) + // We can't actually test the returned content type here. + // (That is handled within LPMS, so assume it's fine from here.) + + // Check multipart response for MP4s + reader = strings.NewReader("a new video goes here") + writer = httptest.NewRecorder() + req = httptest.NewRequest("POST", "/live/name/2.mp4", reader) + req.Header.Set("Accept", "multipart/mixed") + handler.ServeHTTP(writer, req) + resp = writer.Result() + assert.Equal(200, resp.StatusCode) + mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + assert.Equal("multipart/mixed", mediaType) + assert.Nil(err) + mr := multipart.NewReader(resp.Body, params["boundary"]) + i := 0 + for { + p, err := mr.NextPart() + if err == io.EOF { + break + } + assert.NoError(err) + mediaType, _, err := mime.ParseMediaType(p.Header.Get("Content-Type")) + assert.Nil(err) + assert.Equal(`attachment; filename="P720p25fps16x9_2.mp4"`, p.Header.Get("Content-Disposition")) + assert.Equal("P720p25fps16x9", p.Header.Get("Rendition-Name")) + bodyPart, err := ioutil.ReadAll(p) + assert.Nil(err) + assert.Equal("video/mp4", mediaType) + assert.Equal("transcoded binary data", string(bodyPart)) + + i++ + } + assert.Equal(1, i) + + // Check formats + for _, cxn := range s.rtmpConnections { + assert.Equal(ffmpeg.MP4, cxn.profile.Format) + for _, p := range cxn.params.profiles { + assert.Equal(ffmpeg.MP4, p.Format) + } + } +} + +func TestPush_SetVideoProfileFormats(t *testing.T) { + assert := assert.New(t) + s := setupServer() + defer serverCleanup(s) + // sometimes LivepeerServer needs time to start + // esp if this is the only test in the suite being run (eg, via `-run) + time.Sleep(10 * time.Millisecond) + s.rtmpConnections = map[core.ManifestID]*rtmpConnection{} + defer func() { s.rtmpConnections = map[core.ManifestID]*rtmpConnection{} }() + + oldProfs := BroadcastJobVideoProfiles + defer func() { BroadcastJobVideoProfiles = oldProfs }() + BroadcastJobVideoProfiles = []ffmpeg.VideoProfile{ffmpeg.P720p25fps16x9, ffmpeg.P720p60fps16x9} + + // Base case, mpegts + h, r, w := requestSetup(s) + req := httptest.NewRequest("POST", "/live/seg/0.ts", r) + h.ServeHTTP(w, req) + resp := w.Result() + defer resp.Body.Close() + + assert.Len(s.rtmpConnections, 1) + for _, cxn := range s.rtmpConnections { + assert.Equal(ffmpeg.MPEGTS, cxn.profile.Format) + assert.Len(cxn.params.profiles, 2) + assert.Len(BroadcastJobVideoProfiles, 2) + for i, p := range cxn.params.profiles { + assert.Equal(ffmpeg.MPEGTS, p.Format) + // HTTP push mutates the profiles, causing undesirable changes to + // the default set of broadcast profiles that persist to subsequent + // streams. Make sure this doesn't happen! + assert.Equal(ffmpeg.FormatNone, BroadcastJobVideoProfiles[i].Format) + } + } + + // Sending a MP4 under the same stream name doesn't change assigned profiles + h, r, w = requestSetup(s) + req = httptest.NewRequest("POST", "/live/seg/1.ts", r) + h.ServeHTTP(w, req) + resp = w.Result() + defer resp.Body.Close() + + assert.Len(s.rtmpConnections, 1) + for _, cxn := range s.rtmpConnections { + assert.Equal(ffmpeg.MPEGTS, cxn.profile.Format) + assert.Len(cxn.params.profiles, 2) + assert.Len(BroadcastJobVideoProfiles, 2) + for i, p := range cxn.params.profiles { + assert.Equal(ffmpeg.MPEGTS, p.Format) + assert.Equal(ffmpeg.FormatNone, BroadcastJobVideoProfiles[i].Format) + } + } + + // Sending a MP4 under a new stream name sets the profile correctly + h, r, w = requestSetup(s) + req = httptest.NewRequest("POST", "/live/new/0.mp4", r) + h.ServeHTTP(w, req) + resp = w.Result() + defer resp.Body.Close() + + assert.Len(s.rtmpConnections, 2) + cxn, ok := s.rtmpConnections["new"] + assert.True(ok, "stream did not exist") + assert.Equal(ffmpeg.MP4, cxn.profile.Format) + assert.Len(cxn.params.profiles, 2) + assert.Len(BroadcastJobVideoProfiles, 2) + for i, p := range cxn.params.profiles { + assert.Equal(ffmpeg.MP4, p.Format) + assert.Equal(ffmpeg.FormatNone, BroadcastJobVideoProfiles[i].Format) + } + + // Sanity check that default profile with webhook is copied + // Checking since there is special handling for the default set of profiles + // within the webhook hander. + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + auth := authWebhookResponse{ManifestID: "web"} + val, err := json.Marshal(auth) + assert.Nil(err, "invalid auth webhook response") + w.Write(val) + })) + defer ts.Close() + oldURL := AuthWebhookURL + defer func() { AuthWebhookURL = oldURL }() + AuthWebhookURL = ts.URL + + h, r, w = requestSetup(s) + req = httptest.NewRequest("POST", "/live/web/0.mp4", r) + h.ServeHTTP(w, req) + resp = w.Result() + defer resp.Body.Close() + + assert.Len(s.rtmpConnections, 3) + cxn, ok = s.rtmpConnections["web"] + assert.True(ok, "stream did not exist") + assert.Equal(ffmpeg.MP4, cxn.profile.Format) + assert.Len(cxn.params.profiles, 2) + assert.Len(BroadcastJobVideoProfiles, 2) + for i, p := range cxn.params.profiles { + assert.Equal(ffmpeg.MP4, p.Format) + assert.Equal(ffmpeg.FormatNone, BroadcastJobVideoProfiles[i].Format) + } +} + func ignoreRoutines() []goleak.Option { // goleak works by making list of all running goroutines and reporting error if it finds any // this list tells goleak to ignore these goroutines - we're not interested in these particular goroutines @@ -255,7 +487,7 @@ func ignoreRoutines() []goleak.Option { "internal/poll.runtime_pollWait", "github.com/livepeer/go-livepeer/core.(*RemoteTranscoderManager).Manage", "github.com/livepeer/lpms/core.(*LPMS).Start", "github.com/livepeer/go-livepeer/server.(*LivepeerServer).StartMediaServer", "github.com/livepeer/go-livepeer/core.(*RemoteTranscoderManager).Manage.func1", "github.com/livepeer/go-livepeer/server.(*LivepeerServer).HandlePush.func1", "github.com/rjeczalik/notify.(*nonrecursiveTree).dispatch", - "github.com/rjeczalik/notify.(*nonrecursiveTree).internal"} + "github.com/rjeczalik/notify.(*nonrecursiveTree).internal", "github.com/livepeer/lpms/stream.NewBasicRTMPVideoStream.func1"} res := make([]goleak.Option, 0, len(funcs2ignore)) for _, f := range funcs2ignore {