From 7f9534c365123f455c7fd3bfb9ebcbf5a99cdd43 Mon Sep 17 00:00:00 2001 From: Yondon Fu Date: Thu, 26 Nov 2020 09:57:47 -0500 Subject: [PATCH] monitor+core+server: Record transcode score --- core/orchestrator.go | 2 +- core/transcoder.go | 4 ++-- monitor/census.go | 17 +++++++++++++---- server/segment_rpc.go | 2 +- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/core/orchestrator.go b/core/orchestrator.go index d03d98196f..50b98bfcd2 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -554,7 +554,7 @@ func (n *LivepeerNode) transcodeSeg(config transcodeConfig, seg *stream.HLSSegme took := time.Since(start) glog.V(common.DEBUG).Infof("Transcoding of segment manifestID=%s sessionID=%s seqNo=%d took=%v", string(md.ManifestID), md.AuthToken.SessionId, seg.SeqNo, took) if monitor.Enabled { - monitor.SegmentTranscoded(0, seg.SeqNo, took, common.ProfilesNames(md.Profiles)) + monitor.SegmentTranscoded(0, seg.SeqNo, md.Duration, took, common.ProfilesNames(md.Profiles)) } // Prepare the result object diff --git a/core/transcoder.go b/core/transcoder.go index f5b03a6e87..4ba30e7be1 100644 --- a/core/transcoder.go +++ b/core/transcoder.go @@ -51,7 +51,7 @@ func (lt *LocalTranscoder) Transcode(md *SegTranscodingMetadata) (*TranscodeData // When orchestrator works as transcoder, `fname` will be relative path to file in local // filesystem and will not contain seqNo in it. For that case `SegmentTranscoded` will // be called in orchestrator.go - monitor.SegmentTranscoded(0, seqNo, time.Since(start), common.ProfilesNames(profiles)) + monitor.SegmentTranscoded(0, seqNo, md.Duration, time.Since(start), common.ProfilesNames(profiles)) } return resToTranscodeData(res, opts) @@ -89,7 +89,7 @@ func (nv *NvidiaTranscoder) Transcode(md *SegTranscodingMetadata) (*TranscodeDat // When orchestrator works as transcoder, `fname` will be relative path to file in local // filesystem and will not contain seqNo in it. For that case `SegmentTranscoded` will // be called in orchestrator.go - monitor.SegmentTranscoded(0, seqNo, time.Since(start), common.ProfilesNames(profiles)) + monitor.SegmentTranscoded(0, seqNo, md.Duration, time.Since(start), common.ProfilesNames(profiles)) } return resToTranscodeData(res, out) diff --git a/monitor/census.go b/monitor/census.go index 0afad025d4..2167715438 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -121,6 +121,7 @@ type ( mRealtime1x *stats.Int64Measure mRealtimeHalf *stats.Int64Measure mRealtimeSlow *stats.Int64Measure + mTranscodeScore *stats.Float64Measure // Metrics for sending payments mTicketValueSent *stats.Float64Measure @@ -238,6 +239,7 @@ func InitCensus(nodeType NodeType, version string) { census.mDownloadTime = stats.Float64("download_time_seconds", "Download (from orchestrator) time", "sec") census.mAuthWebhookTime = stats.Float64("auth_webhook_time_milliseconds", "Authentication webhook execution time", "ms") census.mSourceSegmentDuration = stats.Float64("source_segment_duration_seconds", "Source segment's duration", "sec") + census.mTranscodeScore = stats.Float64("transcode_score", "Ratio of source segment duration vs. transcode time", "rat") // Metrics for sending payments census.mTicketValueSent = stats.Float64("ticket_value_sent", "TicketValueSent", "gwei") @@ -469,6 +471,13 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: append([]tag.Key{census.kProfiles}, baseTags...), Aggregation: view.Distribution(0, .500, .75, 1.000, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), }, + { + Name: "transcode_score", + Measure: census.mTranscodeScore, + Description: "Ratio of source segment duration vs. transcode time", + TagKeys: append([]tag.Key{census.kProfiles}, baseTags...), + Aggregation: view.Distribution(0, .25, .5, .75, 1, 1.25, 1.5, 1.75, 2, 2.25, 2.5, 2.75, 3, 3.25, 3.5, 3.75, 4), + }, { Name: "upload_time_seconds", Measure: census.mUploadTime, @@ -1012,12 +1021,12 @@ func (cen *censusMetricsCounter) segmentUploadFailed(nonce, seqNo uint64, code S } } -func SegmentTranscoded(nonce, seqNo uint64, transcodeDur time.Duration, profiles string) { +func SegmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration, profiles string) { glog.V(logLevel).Infof("Logging SegmentTranscode nonce=%d seqNo=%d dur=%s", nonce, seqNo, transcodeDur) - census.segmentTranscoded(nonce, seqNo, transcodeDur, profiles) + census.segmentTranscoded(nonce, seqNo, sourceDur, transcodeDur, profiles) } -func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, transcodeDur time.Duration, +func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration, profiles string) { cen.lock.Lock() defer cen.lock.Unlock() @@ -1026,7 +1035,7 @@ func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, transcod glog.Error("Error creating context", err) return } - stats.Record(ctx, cen.mSegmentTranscoded.M(1), cen.mTranscodeTime.M(transcodeDur.Seconds())) + stats.Record(ctx, cen.mSegmentTranscoded.M(1), cen.mTranscodeTime.M(transcodeDur.Seconds()), cen.mTranscodeScore.M(sourceDur.Seconds()/transcodeDur.Seconds())) } func SegmentTranscodeFailed(subType SegmentTranscodeError, nonce, seqNo uint64, err error, permanent bool) { diff --git a/server/segment_rpc.go b/server/segment_rpc.go index 857154cb71..2758981996 100644 --- a/server/segment_rpc.go +++ b/server/segment_rpc.go @@ -532,7 +532,7 @@ func SubmitSegment(sess *BroadcastSession, seg *stream.HLSSegment, nonce uint64) // transcode succeeded; continue processing response if monitor.Enabled { - monitor.SegmentTranscoded(nonce, seg.SeqNo, transcodeDur, common.ProfilesNames(params.Profiles)) + monitor.SegmentTranscoded(nonce, seg.SeqNo, time.Duration(seg.Duration*float64(time.Second)), transcodeDur, common.ProfilesNames(params.Profiles)) } glog.Infof("Successfully transcoded segment nonce=%d manifestID=%s sessionID=%s segName=%s seqNo=%d orch=%s dur=%s", nonce,