Skip to content

Commit

Permalink
core, server: Use net.SegData in the O-T protocol.
Browse files Browse the repository at this point in the history
Enables better reuse. Transcoder now has similar compatibility
assurances as B / O without having to re-implement anything.
  • Loading branch information
j0sh committed May 15, 2020
1 parent 7afc713 commit 0e1902b
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 101 deletions.
15 changes: 10 additions & 5 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,16 +701,21 @@ func (rt *RemoteTranscoder) Transcode(job string, fname string, profiles []ffmpe
return nil, RemoteTranscoderFatalError{err}
}

fullProfiles, err := common.FFmpegProfiletoNetProfile(profiles)
md := &SegTranscodingMetadata{
ManifestID: ManifestID(job),
Profiles: profiles,
}
segData, err := NetSegData(md)
if err != nil {
return nil, err
}

msg := &net.NotifySegment{
Job: job,
Url: fname,
TaskId: taskID,
FullProfiles: fullProfiles,
Url: fname,
TaskId: taskID,
SegData: segData,
// Triggers failure on Os that don't know how to use SegData
Profiles: []byte("invalid"),
}
err = rt.stream.Send(msg)

Expand Down
42 changes: 42 additions & 0 deletions core/streamdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,48 @@ func (md *SegTranscodingMetadata) Flatten() []byte {
return buf
}

func NetSegData(md *SegTranscodingMetadata) (*net.SegData, error) {

fullProfiles, err := common.FFmpegProfiletoNetProfile(md.Profiles)
if err != nil {
return nil, err
}
storage := []*net.OSInfo{}
if md.OS != nil {
storage = append(storage, md.OS)
}

// Generate serialized segment info
segData := &net.SegData{
ManifestId: []byte(md.ManifestID),
Seq: md.Seq,
Hash: md.Hash.Bytes(),
Storage: storage,
Duration: int32(md.Duration),
// Triggers failure on Os that don't know how to use FullProfiles/2
Profiles: []byte("invalid"),
}

// If all outputs are mpegts, use the older SegData.FullProfiles field
// for compatibility with older orchestrators
allTS := true
for i := 0; i < len(md.Profiles) && allTS; i++ {
switch md.Profiles[i].Format {
case ffmpeg.FormatNone: // default output is mpegts for FormatNone
case ffmpeg.FormatMPEGTS:
default:
allTS = false
}
}
if allTS {
segData.FullProfiles = fullProfiles
} else {
segData.FullProfiles2 = fullProfiles
}

return segData, nil
}

type ManifestID string

// The StreamID represents a particular variant of a stream.
Expand Down
13 changes: 9 additions & 4 deletions net/lp_rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,21 @@ message NotifySegment {
// URL of the segment to transcode.
string url = 1;

// Job the segment belongs to.
string job = 2;
// Configuration for the transcoding job
SegData segData = 3;

// ID for this particular transcoding task.
int64 taskId = 16;

// Set of profiles to transcode this segment into.
// Deprecated fields. May still be populated sometimes for back compat

// Deprecated by segData. Job the segment belongs to.
string job = 2;

// Deprecated by fullProfiles. Set of presets to transcode into.
bytes profiles = 17;

// Transcoding profiles to use. Supersedes `profiles` field

// Deprecated by segData. Transcoding configuration to use.
repeated VideoProfile fullProfiles = 33;
}

Expand Down
23 changes: 10 additions & 13 deletions server/ot_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
"io"
Expand All @@ -22,7 +21,6 @@ import (

"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/livepeer/lpms/ffmpeg"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -131,22 +129,21 @@ func runTranscoder(n *core.LivepeerNode, orchAddr string, capacity int) error {
}

func runTranscode(n *core.LivepeerNode, orchAddr string, httpc *http.Client, notify *net.NotifySegment) {
var err error
var profiles []ffmpeg.VideoProfile
if len(notify.FullProfiles) > 0 {
profiles, err = makeFfmpegVideoProfiles(notify.FullProfiles)
} else if len(notify.Profiles) > 0 {
profiles, err = common.TxDataToVideoProfile(hex.EncodeToString(notify.Profiles))
}
if err != nil {
glog.Error("Unable to deserialize profiles ", err)
}

glog.Infof("Transcoding taskId=%d url=%s", notify.TaskId, notify.Url)
var contentType string
var body bytes.Buffer

tData, err := n.Transcoder.Transcode(notify.Job, notify.Url, profiles)
md, err := coreSegMetadata(notify.SegData)
if err != nil {
glog.Error("Unable to parse segData err=", err)
md = &core.SegTranscodingMetadata{} // avoid crash.
// TODO short-circuit error handling
}
profiles := md.Profiles
job := string(md.ManifestID)

tData, err := n.Transcoder.Transcode(job, notify.Url, profiles)
glog.V(common.VERBOSE).Infof("Transcoding done for taskId=%d url=%s err=%v", notify.TaskId, notify.Url, err)
if err == nil && len(tData.Segments) != len(profiles) {
err = errors.New("segment / profile mismatch")
Expand Down
26 changes: 17 additions & 9 deletions server/ot_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ func TestRemoteTranscoder_Profiles(t *testing.T) {
profiles := []ffmpeg.VideoProfile{ffmpeg.P720p60fps16x9, ffmpeg.P144p30fps16x9}

assert := assert.New(t)
assert.Nil(nil)
segData, err := core.NetSegData(&core.SegTranscodingMetadata{Profiles: profiles})
assert.Nil(err)
notify := &net.NotifySegment{
TaskId: 742,
Profiles: common.ProfilesToTranscodeOpts(profiles),
Url: "linktomanifest",
TaskId: 742,
SegData: segData,
Url: "linktomanifest",
}
tr := &stubTranscoder{}
node, _ := core.NewLivepeerNode(nil, "/tmp/thisdirisnotactuallyusedinthistest", nil)
Expand All @@ -68,6 +69,13 @@ func TestRemoteTranscoder_Profiles(t *testing.T) {

runTranscode(node, "badaddress", httpc, notify)
assert.Equal(1, tr.called)
// reset some things that are different when using profiles
profiles[0].Bitrate = "6000000" // presets use "k" abbrev; conversions don't
profiles[1].Bitrate = "400000"
profiles[0].AspectRatio = "" // unused across the wire
profiles[1].AspectRatio = ""
profiles[0].Format = ffmpeg.FormatMPEGTS // default is ffmpeg.FormatNone
profiles[1].Format = ffmpeg.FormatMPEGTS
assert.Equal(profiles, tr.profiles)
assert.Equal("linktomanifest", tr.fname)

Expand Down Expand Up @@ -136,13 +144,13 @@ func TestRemoteTranscoder_FullProfiles(t *testing.T) {
},
}

fullProfiles, err := common.FFmpegProfiletoNetProfile(profiles)
segData, err := core.NetSegData(&core.SegTranscodingMetadata{Profiles: profiles})
assert.Nil(err)

notify := &net.NotifySegment{
TaskId: 742,
FullProfiles: fullProfiles,
Url: "linktomanifest",
TaskId: 742,
Url: "linktomanifest",
SegData: segData,
}
tr := &stubTranscoder{}
node, _ := core.NewLivepeerNode(nil, "/tmp/thisdirisnotactuallyusedinthistest", nil)
Expand All @@ -161,7 +169,7 @@ func TestRemoteTranscoder_FullProfiles(t *testing.T) {
assert.Equal("linktomanifest", tr.fname)

// Test deserialization failure from invalid full profile format
notify.FullProfiles[1].Format = -1
notify.SegData.FullProfiles2[1].Format = -1
runTranscode(node, "", httpc, notify)
assert.Equal(2, tr.called)
assert.Nil(nil, tr.profiles)
Expand Down
43 changes: 43 additions & 0 deletions server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,46 @@ func pmTicketParams(params *net.TicketParams) *pm.TicketParams {
},
}
}

func coreSegMetadata(segData *net.SegData) (*core.SegTranscodingMetadata, error) {
if segData == nil {
glog.Error("Empty seg data")
return nil, errors.New("empty seg data")
}
var err error
profiles := []ffmpeg.VideoProfile{}
if len(segData.FullProfiles2) > 0 {
profiles, err = makeFfmpegVideoProfiles(segData.FullProfiles2)
} else if len(segData.FullProfiles) > 0 {
profiles, err = makeFfmpegVideoProfiles(segData.FullProfiles)
} else if len(segData.Profiles) > 0 {
profiles, err = common.BytesToVideoProfile(segData.Profiles)
}
if err != nil {
glog.Error("Unable to deserialize profiles ", err)
return nil, err
}

dur := int(segData.Duration)
if dur < 0 || int64(dur) > maxDurationMs {
glog.Error("Invalid duration")
return nil, errDuration
}
if dur == 0 {
dur = 2000 // assume 2sec default duration
}

var os *net.OSInfo
if len(segData.Storage) > 0 {
os = segData.Storage[0]
}

return &core.SegTranscodingMetadata{
ManifestID: core.ManifestID(segData.ManifestId),
Seq: segData.Seq,
Hash: ethcommon.BytesToHash(segData.Hash),
Profiles: profiles,
OS: os,
Duration: dur,
}, nil
}
82 changes: 12 additions & 70 deletions server/segment_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,50 +250,17 @@ func verifySegCreds(orch Orchestrator, segCreds string, broadcaster ethcommon.Ad
return nil, err
}

profiles := []ffmpeg.VideoProfile{}
if len(segData.FullProfiles2) > 0 {
profiles, err = makeFfmpegVideoProfiles(segData.FullProfiles2)
} else if len(segData.FullProfiles) > 0 {
profiles, err = makeFfmpegVideoProfiles(segData.FullProfiles)
} else if len(segData.Profiles) > 0 {
profiles, err = common.BytesToVideoProfile(segData.Profiles)
}
md, err := coreSegMetadata(&segData)
if err != nil {
glog.Error("Unable to deserialize profiles ", err)
return nil, err
}

dur := int(segData.Duration)
if dur < 0 || int64(dur) > maxDurationMs {
glog.Error("Invalid duration")
return nil, errDuration
}
if dur == 0 {
dur = 2000 // assume 2sec default duration
}

mid := core.ManifestID(segData.ManifestId)

var os *net.OSInfo
if len(segData.Storage) > 0 {
os = segData.Storage[0]
}

md := &core.SegTranscodingMetadata{
ManifestID: mid,
Seq: segData.Seq,
Hash: ethcommon.BytesToHash(segData.Hash),
Profiles: profiles,
OS: os,
Duration: dur,
}

if !orch.VerifySig(broadcaster, string(md.Flatten()), segData.Sig) {
glog.Error("Sig check failed")
return nil, errSegSig
}

if err := orch.CheckCapacity(mid); err != nil {
if err := orch.CheckCapacity(md.ManifestID); err != nil {
glog.Error("Cannot process manifest: ", err)
return nil, err
}
Expand Down Expand Up @@ -505,6 +472,12 @@ func SubmitSegment(sess *BroadcastSession, seg *stream.HLSSegment, nonce uint64)

func genSegCreds(sess *BroadcastSession, seg *stream.HLSSegment) (string, error) {

// Send credentials for our own storage
var storage *net.OSInfo
if bos := sess.BroadcasterOS; bos != nil && bos.IsExternal() {
storage = bos.GetInfo()
}

// Generate signature for relevant parts of segment
hash := crypto.Keccak256(seg.Data)
md := &core.SegTranscodingMetadata{
Expand All @@ -513,50 +486,19 @@ func genSegCreds(sess *BroadcastSession, seg *stream.HLSSegment) (string, error)
Hash: ethcommon.BytesToHash(hash),
Profiles: sess.Profiles,
Duration: int(1000.0 * seg.Duration),
OS: storage,
}
sig, err := sess.Broadcaster.Sign(md.Flatten())
if err != nil {
return "", err
}

// Send credentials for our own storage
var storage []*net.OSInfo
if bos := sess.BroadcasterOS; bos != nil && bos.IsExternal() {
storage = []*net.OSInfo{bos.GetInfo()}
}

fullProfiles, err := common.FFmpegProfiletoNetProfile(sess.Profiles)
// convert to segData
segData, err := core.NetSegData(md)
if err != nil {
return "", err
}

// Generate serialized segment info
segData := &net.SegData{
ManifestId: []byte(md.ManifestID),
Seq: md.Seq,
Hash: hash,
Sig: sig,
Storage: storage,
Duration: int32(md.Duration),
// Triggers failure on Os that don't know how to use FullProfiles/2
Profiles: []byte("invalid"),
}
// If all outputs are mpegts, use the older SegData.FullProfiles field
// for compatibility with older orchestrators
allTS := true
for i := 0; i < len(sess.Profiles) && allTS; i++ {
switch sess.Profiles[i].Format {
case ffmpeg.FormatNone: // default output is mpegts for FormatNone
case ffmpeg.FormatMPEGTS:
default:
allTS = false
}
}
if allTS {
segData.FullProfiles = fullProfiles
} else {
segData.FullProfiles2 = fullProfiles
}
segData.Sig = sig

data, err := proto.Marshal(segData)
if err != nil {
Expand Down

0 comments on commit 0e1902b

Please sign in to comment.