Skip to content

Commit

Permalink
monitor+core+server: Record transcode score
Browse files Browse the repository at this point in the history
  • Loading branch information
yondonfu committed Nov 27, 2020
1 parent 27f2247 commit 7f9534c
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 8 deletions.
2 changes: 1 addition & 1 deletion core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (n *LivepeerNode) transcodeSeg(config transcodeConfig, seg *stream.HLSSegme
took := time.Since(start)
glog.V(common.DEBUG).Infof("Transcoding of segment manifestID=%s sessionID=%s seqNo=%d took=%v", string(md.ManifestID), md.AuthToken.SessionId, seg.SeqNo, took)
if monitor.Enabled {
monitor.SegmentTranscoded(0, seg.SeqNo, took, common.ProfilesNames(md.Profiles))
monitor.SegmentTranscoded(0, seg.SeqNo, md.Duration, took, common.ProfilesNames(md.Profiles))
}

// Prepare the result object
Expand Down
4 changes: 2 additions & 2 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (lt *LocalTranscoder) Transcode(md *SegTranscodingMetadata) (*TranscodeData
// When orchestrator works as transcoder, `fname` will be relative path to file in local
// filesystem and will not contain seqNo in it. For that case `SegmentTranscoded` will
// be called in orchestrator.go
monitor.SegmentTranscoded(0, seqNo, time.Since(start), common.ProfilesNames(profiles))
monitor.SegmentTranscoded(0, seqNo, md.Duration, time.Since(start), common.ProfilesNames(profiles))
}

return resToTranscodeData(res, opts)
Expand Down Expand Up @@ -89,7 +89,7 @@ func (nv *NvidiaTranscoder) Transcode(md *SegTranscodingMetadata) (*TranscodeDat
// When orchestrator works as transcoder, `fname` will be relative path to file in local
// filesystem and will not contain seqNo in it. For that case `SegmentTranscoded` will
// be called in orchestrator.go
monitor.SegmentTranscoded(0, seqNo, time.Since(start), common.ProfilesNames(profiles))
monitor.SegmentTranscoded(0, seqNo, md.Duration, time.Since(start), common.ProfilesNames(profiles))
}

return resToTranscodeData(res, out)
Expand Down
17 changes: 13 additions & 4 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type (
mRealtime1x *stats.Int64Measure
mRealtimeHalf *stats.Int64Measure
mRealtimeSlow *stats.Int64Measure
mTranscodeScore *stats.Float64Measure

// Metrics for sending payments
mTicketValueSent *stats.Float64Measure
Expand Down Expand Up @@ -238,6 +239,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mDownloadTime = stats.Float64("download_time_seconds", "Download (from orchestrator) time", "sec")
census.mAuthWebhookTime = stats.Float64("auth_webhook_time_milliseconds", "Authentication webhook execution time", "ms")
census.mSourceSegmentDuration = stats.Float64("source_segment_duration_seconds", "Source segment's duration", "sec")
census.mTranscodeScore = stats.Float64("transcode_score", "Ratio of source segment duration vs. transcode time", "rat")

// Metrics for sending payments
census.mTicketValueSent = stats.Float64("ticket_value_sent", "TicketValueSent", "gwei")
Expand Down Expand Up @@ -469,6 +471,13 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: append([]tag.Key{census.kProfiles}, baseTags...),
Aggregation: view.Distribution(0, .500, .75, 1.000, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000),
},
{
Name: "transcode_score",
Measure: census.mTranscodeScore,
Description: "Ratio of source segment duration vs. transcode time",
TagKeys: append([]tag.Key{census.kProfiles}, baseTags...),
Aggregation: view.Distribution(0, .25, .5, .75, 1, 1.25, 1.5, 1.75, 2, 2.25, 2.5, 2.75, 3, 3.25, 3.5, 3.75, 4),
},
{
Name: "upload_time_seconds",
Measure: census.mUploadTime,
Expand Down Expand Up @@ -1012,12 +1021,12 @@ func (cen *censusMetricsCounter) segmentUploadFailed(nonce, seqNo uint64, code S
}
}

func SegmentTranscoded(nonce, seqNo uint64, transcodeDur time.Duration, profiles string) {
func SegmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration, profiles string) {
glog.V(logLevel).Infof("Logging SegmentTranscode nonce=%d seqNo=%d dur=%s", nonce, seqNo, transcodeDur)
census.segmentTranscoded(nonce, seqNo, transcodeDur, profiles)
census.segmentTranscoded(nonce, seqNo, sourceDur, transcodeDur, profiles)
}

func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, transcodeDur time.Duration,
func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration,
profiles string) {
cen.lock.Lock()
defer cen.lock.Unlock()
Expand All @@ -1026,7 +1035,7 @@ func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, transcod
glog.Error("Error creating context", err)
return
}
stats.Record(ctx, cen.mSegmentTranscoded.M(1), cen.mTranscodeTime.M(transcodeDur.Seconds()))
stats.Record(ctx, cen.mSegmentTranscoded.M(1), cen.mTranscodeTime.M(transcodeDur.Seconds()), cen.mTranscodeScore.M(sourceDur.Seconds()/transcodeDur.Seconds()))
}

func SegmentTranscodeFailed(subType SegmentTranscodeError, nonce, seqNo uint64, err error, permanent bool) {
Expand Down
2 changes: 1 addition & 1 deletion server/segment_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func SubmitSegment(sess *BroadcastSession, seg *stream.HLSSegment, nonce uint64)

// transcode succeeded; continue processing response
if monitor.Enabled {
monitor.SegmentTranscoded(nonce, seg.SeqNo, transcodeDur, common.ProfilesNames(params.Profiles))
monitor.SegmentTranscoded(nonce, seg.SeqNo, time.Duration(seg.Duration*float64(time.Second)), transcodeDur, common.ProfilesNames(params.Profiles))
}

glog.Infof("Successfully transcoded segment nonce=%d manifestID=%s sessionID=%s segName=%s seqNo=%d orch=%s dur=%s", nonce,
Expand Down

0 comments on commit 7f9534c

Please sign in to comment.