From 82fca255ca7618db436d48ff489642d242ecfea4 Mon Sep 17 00:00:00 2001 From: Yondon Fu Date: Thu, 26 Nov 2020 09:52:41 -0500 Subject: [PATCH 1/5] cmd+monitor: NodeType type instead of string --- cmd/livepeer/livepeer.go | 10 +++++----- monitor/census.go | 18 ++++++++++++++---- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go index 837a117080..38712dbded 100644 --- a/cmd/livepeer/livepeer.go +++ b/cmd/livepeer/livepeer.go @@ -313,16 +313,16 @@ func main() { if *monitor { lpmon.Enabled = true - nodeType := "dflt" + nodeType := lpmon.Default switch n.NodeType { case core.BroadcasterNode: - nodeType = "bctr" + nodeType = lpmon.Broadcaster case core.OrchestratorNode: - nodeType = "orch" + nodeType = lpmon.Orchestrator case core.TranscoderNode: - nodeType = "trcr" + nodeType = lpmon.Transcoder case core.RedeemerNode: - nodeType = "rdmr" + nodeType = lpmon.Redeemer } lpmon.InitCensus(nodeType, core.LivepeerVersion) } diff --git a/monitor/census.go b/monitor/census.go index cf25add207..0231e31da3 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -50,6 +50,16 @@ const ( // importing `common` package here introduces import cycles ) +type NodeType string + +const ( + Default NodeType = "dflt" + Orchestrator NodeType = "orch" + Broadcaster NodeType = "bctr" + Transcoder NodeType = "trcr" + Redeemer NodeType = "rdmr" +) + // Enabled true if metrics was enabled in command line var Enabled bool @@ -60,7 +70,7 @@ var timeoutWatcherPause = 15 * time.Second type ( censusMetricsCounter struct { - nodeType string + nodeType NodeType nodeID string ctx context.Context kGPU tag.Key @@ -165,7 +175,7 @@ var census censusMetricsCounter // used in unit tests var unitTestMode bool -func InitCensus(nodeType, version string) { +func InitCensus(nodeType NodeType, version string) { census = censusMetricsCounter{ emergeTimes: make(map[uint64]map[uint64]time.Time), nodeID: NodeID, @@ -184,7 +194,7 @@ func InitCensus(nodeType, version string) { census.kSender = tag.MustNewKey("sender") census.kRecipient = tag.MustNewKey("recipient") census.kManifestID = tag.MustNewKey("manifestID") - census.ctx, err = tag.New(ctx, tag.Insert(census.kNodeType, nodeType), tag.Insert(census.kNodeID, NodeID)) + census.ctx, err = tag.New(ctx, tag.Insert(census.kNodeType, string(nodeType)), tag.Insert(census.kNodeID, NodeID)) if err != nil { glog.Fatal("Error creating context", err) } @@ -255,7 +265,7 @@ func InitCensus(nodeType, version string) { goos := tag.MustNewKey("goos") goversion := tag.MustNewKey("goversion") livepeerversion := tag.MustNewKey("livepeerversion") - ctx, err = tag.New(ctx, tag.Insert(census.kNodeType, nodeType), tag.Insert(census.kNodeID, NodeID), + ctx, err = tag.New(ctx, tag.Insert(census.kNodeType, string(nodeType)), tag.Insert(census.kNodeID, NodeID), tag.Insert(compiler, runtime.Compiler), tag.Insert(goarch, runtime.GOARCH), tag.Insert(goos, runtime.GOOS), tag.Insert(goversion, runtime.Version()), tag.Insert(livepeerversion, version)) if err != nil { From 35f7696157d2329e14e2c72091c659c053cadb59 Mon Sep 17 00:00:00 2001 From: Yondon Fu Date: Wed, 25 Nov 2020 17:49:56 -0500 Subject: [PATCH 2/5] server: Record & log source & dl dur --- monitor/census.go | 2 +- server/segment_rpc.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/monitor/census.go b/monitor/census.go index 0231e31da3..70b76ac976 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -479,7 +479,7 @@ func InitCensus(nodeType NodeType, version string) { { Name: "download_time_seconds", Measure: census.mDownloadTime, - Description: "Download (from orchestrator) time", + Description: "Download time", TagKeys: baseTags, Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000), }, diff --git a/server/segment_rpc.go b/server/segment_rpc.go index ef1058edb5..857154cb71 100644 --- a/server/segment_rpc.go +++ b/server/segment_rpc.go @@ -80,6 +80,12 @@ func (h *lphttp) ServeSegment(w http.ResponseWriter, r *http.Request) { return } + glog.V(common.VERBOSE).Infof("Received segment manifestID=%s sessionID=%s seqNo=%d dur=%v", segData.ManifestID, segData.AuthToken.SessionId, segData.Seq, segData.Duration) + + if monitor.Enabled { + monitor.SegmentEmerged(0, uint64(segData.Seq), len(segData.Profiles), segData.Duration.Seconds()) + } + if err := orch.ProcessPayment(payment, core.ManifestID(segData.AuthToken.SessionId)); err != nil { glog.Errorf("error processing payment: %v", err) http.Error(w, err.Error(), http.StatusBadRequest) @@ -105,6 +111,7 @@ func (h *lphttp) ServeSegment(w http.ResponseWriter, r *http.Request) { oInfo.AuthToken = segData.AuthToken // download the segment and check the hash + dlStart := time.Now() data, err := ioutil.ReadAll(r.Body) if err != nil { glog.Errorf("Could not read request body - err=%v", err) @@ -112,6 +119,13 @@ func (h *lphttp) ServeSegment(w http.ResponseWriter, r *http.Request) { return } + dlDur := time.Since(dlStart) + glog.V(common.VERBOSE).Infof("Downloaded segment manifestID=%s sessionID=%s seqNo=%d dur=%v", segData.ManifestID, segData.AuthToken.SessionId, segData.Seq, dlDur) + + if monitor.Enabled { + monitor.SegmentDownloaded(0, uint64(segData.Seq), dlDur) + } + uri := "" if r.Header.Get("Content-Type") == "application/vnd+livepeer.uri" { uri = string(data) From 0b492acc850a9c3328c9066a6d69d3349b8eba92 Mon Sep 17 00:00:00 2001 From: Yondon Fu Date: Thu, 26 Nov 2020 09:56:49 -0500 Subject: [PATCH 3/5] monitor: Tweak dist for source seg dur --- monitor/census.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monitor/census.go b/monitor/census.go index 70b76ac976..cbabcfbd61 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -495,7 +495,7 @@ func InitCensus(nodeType NodeType, version string) { Measure: census.mSourceSegmentDuration, Description: "Source segment's duration", TagKeys: baseTags, - Aggregation: view.Distribution(0, 100, 250, 500, 750, 1000, 1500, 2000, 2500, 3000, 5000, 10000, 20000, 30000), + Aggregation: view.Distribution(0, .5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 10, 15, 20), }, { Name: "max_sessions_total", From 39b267c89d92d08f9bbd0f7eded95a5a1eef0788 Mon Sep 17 00:00:00 2001 From: Yondon Fu Date: Thu, 26 Nov 2020 09:57:01 -0500 Subject: [PATCH 4/5] monitor: Broadcaster only logic in SegmentEmerged --- monitor/census.go | 4 +++- monitor/census_test.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index cbabcfbd61..0afad025d4 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -891,7 +891,9 @@ func SetTranscodersNumberAndLoad(load, capacity, number int) { func SegmentEmerged(nonce, seqNo uint64, profilesNum int, dur float64) { glog.V(logLevel).Infof("Logging SegmentEmerged... nonce=%d seqNo=%d duration=%v", nonce, seqNo, dur) - census.segmentEmerged(nonce, seqNo, profilesNum) + if census.nodeType == Broadcaster { + census.segmentEmerged(nonce, seqNo, profilesNum) + } stats.Record(census.ctx, census.mSourceSegmentDuration.M(dur)) } diff --git a/monitor/census_test.go b/monitor/census_test.go index f5e12eb18b..5140f10150 100644 --- a/monitor/census_test.go +++ b/monitor/census_test.go @@ -49,7 +49,7 @@ func TestLastSegmentTimeout(t *testing.T) { defer func() { unitTestMode = false }() NodeID = "testid" - InitCensus("tst", "testversion") + InitCensus("bctr", "testversion") // defer func() { // shutDown <- nil // }() From d3c21069bdfa0b2252edef50eee8fd2c9712bea1 Mon Sep 17 00:00:00 2001 From: Yondon Fu Date: Thu, 26 Nov 2020 09:57:47 -0500 Subject: [PATCH 5/5] monitor+core+server: Record transcode score --- core/orchestrator.go | 2 +- core/transcoder.go | 4 ++-- monitor/census.go | 17 +++++++++++++---- server/segment_rpc.go | 2 +- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/core/orchestrator.go b/core/orchestrator.go index d03d98196f..50b98bfcd2 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -554,7 +554,7 @@ func (n *LivepeerNode) transcodeSeg(config transcodeConfig, seg *stream.HLSSegme took := time.Since(start) glog.V(common.DEBUG).Infof("Transcoding of segment manifestID=%s sessionID=%s seqNo=%d took=%v", string(md.ManifestID), md.AuthToken.SessionId, seg.SeqNo, took) if monitor.Enabled { - monitor.SegmentTranscoded(0, seg.SeqNo, took, common.ProfilesNames(md.Profiles)) + monitor.SegmentTranscoded(0, seg.SeqNo, md.Duration, took, common.ProfilesNames(md.Profiles)) } // Prepare the result object diff --git a/core/transcoder.go b/core/transcoder.go index f5b03a6e87..4ba30e7be1 100644 --- a/core/transcoder.go +++ b/core/transcoder.go @@ -51,7 +51,7 @@ func (lt *LocalTranscoder) Transcode(md *SegTranscodingMetadata) (*TranscodeData // When orchestrator works as transcoder, `fname` will be relative path to file in local // filesystem and will not contain seqNo in it. For that case `SegmentTranscoded` will // be called in orchestrator.go - monitor.SegmentTranscoded(0, seqNo, time.Since(start), common.ProfilesNames(profiles)) + monitor.SegmentTranscoded(0, seqNo, md.Duration, time.Since(start), common.ProfilesNames(profiles)) } return resToTranscodeData(res, opts) @@ -89,7 +89,7 @@ func (nv *NvidiaTranscoder) Transcode(md *SegTranscodingMetadata) (*TranscodeDat // When orchestrator works as transcoder, `fname` will be relative path to file in local // filesystem and will not contain seqNo in it. For that case `SegmentTranscoded` will // be called in orchestrator.go - monitor.SegmentTranscoded(0, seqNo, time.Since(start), common.ProfilesNames(profiles)) + monitor.SegmentTranscoded(0, seqNo, md.Duration, time.Since(start), common.ProfilesNames(profiles)) } return resToTranscodeData(res, out) diff --git a/monitor/census.go b/monitor/census.go index 0afad025d4..2167715438 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -121,6 +121,7 @@ type ( mRealtime1x *stats.Int64Measure mRealtimeHalf *stats.Int64Measure mRealtimeSlow *stats.Int64Measure + mTranscodeScore *stats.Float64Measure // Metrics for sending payments mTicketValueSent *stats.Float64Measure @@ -238,6 +239,7 @@ func InitCensus(nodeType NodeType, version string) { census.mDownloadTime = stats.Float64("download_time_seconds", "Download (from orchestrator) time", "sec") census.mAuthWebhookTime = stats.Float64("auth_webhook_time_milliseconds", "Authentication webhook execution time", "ms") census.mSourceSegmentDuration = stats.Float64("source_segment_duration_seconds", "Source segment's duration", "sec") + census.mTranscodeScore = stats.Float64("transcode_score", "Ratio of source segment duration vs. transcode time", "rat") // Metrics for sending payments census.mTicketValueSent = stats.Float64("ticket_value_sent", "TicketValueSent", "gwei") @@ -469,6 +471,13 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: append([]tag.Key{census.kProfiles}, baseTags...), Aggregation: view.Distribution(0, .500, .75, 1.000, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), }, + { + Name: "transcode_score", + Measure: census.mTranscodeScore, + Description: "Ratio of source segment duration vs. transcode time", + TagKeys: append([]tag.Key{census.kProfiles}, baseTags...), + Aggregation: view.Distribution(0, .25, .5, .75, 1, 1.25, 1.5, 1.75, 2, 2.25, 2.5, 2.75, 3, 3.25, 3.5, 3.75, 4), + }, { Name: "upload_time_seconds", Measure: census.mUploadTime, @@ -1012,12 +1021,12 @@ func (cen *censusMetricsCounter) segmentUploadFailed(nonce, seqNo uint64, code S } } -func SegmentTranscoded(nonce, seqNo uint64, transcodeDur time.Duration, profiles string) { +func SegmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration, profiles string) { glog.V(logLevel).Infof("Logging SegmentTranscode nonce=%d seqNo=%d dur=%s", nonce, seqNo, transcodeDur) - census.segmentTranscoded(nonce, seqNo, transcodeDur, profiles) + census.segmentTranscoded(nonce, seqNo, sourceDur, transcodeDur, profiles) } -func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, transcodeDur time.Duration, +func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration, profiles string) { cen.lock.Lock() defer cen.lock.Unlock() @@ -1026,7 +1035,7 @@ func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, transcod glog.Error("Error creating context", err) return } - stats.Record(ctx, cen.mSegmentTranscoded.M(1), cen.mTranscodeTime.M(transcodeDur.Seconds())) + stats.Record(ctx, cen.mSegmentTranscoded.M(1), cen.mTranscodeTime.M(transcodeDur.Seconds()), cen.mTranscodeScore.M(sourceDur.Seconds()/transcodeDur.Seconds())) } func SegmentTranscodeFailed(subType SegmentTranscodeError, nonce, seqNo uint64, err error, permanent bool) { diff --git a/server/segment_rpc.go b/server/segment_rpc.go index 857154cb71..2758981996 100644 --- a/server/segment_rpc.go +++ b/server/segment_rpc.go @@ -532,7 +532,7 @@ func SubmitSegment(sess *BroadcastSession, seg *stream.HLSSegment, nonce uint64) // transcode succeeded; continue processing response if monitor.Enabled { - monitor.SegmentTranscoded(nonce, seg.SeqNo, transcodeDur, common.ProfilesNames(params.Profiles)) + monitor.SegmentTranscoded(nonce, seg.SeqNo, time.Duration(seg.Duration*float64(time.Second)), transcodeDur, common.ProfilesNames(params.Profiles)) } glog.Infof("Successfully transcoded segment nonce=%d manifestID=%s sessionID=%s segName=%s seqNo=%d orch=%s dur=%s", nonce,