diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 95c971ccd6..a13ec0eb5d 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -10,6 +10,7 @@ - \#1877 Refresh TicketParams for the active session before expiry (@kyriediculous) - \#1879 Add mp4 download of recorded stream (@darkdarkdragon) +- \#1899 Record million pixels processed metric (@yondonfu) #### Orchestrator diff --git a/monitor/census.go b/monitor/census.go index ff4e63af68..efe3a59db9 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -151,6 +151,9 @@ type ( mSuggestedGasPrice *stats.Float64Measure mTranscodingPrice *stats.Float64Measure + // Metrics for pixel accounting + mMilPixelsProcessed *stats.Float64Measure + lock sync.Mutex emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo success map[uint64]*segmentsAverager @@ -275,6 +278,9 @@ func InitCensus(nodeType NodeType, version string) { census.mSuggestedGasPrice = stats.Float64("suggested_gas_price", "SuggestedGasPrice", "gwei") census.mTranscodingPrice = stats.Float64("transcoding_price", "TranscodingPrice", "wei") + // Metrics for pixel accounting + census.mMilPixelsProcessed = stats.Float64("mil_pixels_processed", "MilPixelsProcessed", "mil pixels") + glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version()) glog.Infof("Livepeer version: %s", version) glog.Infof("Node type %s node ID %s", nodeType, NodeID) @@ -681,6 +687,14 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: baseTags, Aggregation: view.Sum(), }, + // Metrics for pixel accounting + { + Name: "mil_pixels_processed", + Measure: census.mMilPixelsProcessed, + Description: "Million pixels processed", + TagKeys: baseTags, + Aggregation: view.Sum(), + }, { Name: "suggested_gas_price", Measure: census.mSuggestedGasPrice, @@ -1407,6 +1421,13 @@ func TicketRedemptionError() { stats.Record(census.ctx, census.mTicketRedemptionError.M(1)) } +func MilPixelsProcessed(milPixels float64) { + census.lock.Lock() + defer census.lock.Unlock() + + stats.Record(census.ctx, census.mMilPixelsProcessed.M(milPixels)) +} + // SuggestedGasPrice records the last suggested gas price func SuggestedGasPrice(gasPrice *big.Int) { census.lock.Lock() diff --git a/server/segment_rpc.go b/server/segment_rpc.go index 705c96d782..880d369193 100644 --- a/server/segment_rpc.go +++ b/server/segment_rpc.go @@ -528,6 +528,10 @@ func SubmitSegment(sess *BroadcastSession, seg *stream.HLSSegment, nonce uint64) } balUpdate.Debit.Mul(new(big.Rat).SetInt64(pixelCount), priceInfo) + + if monitor.Enabled { + monitor.MilPixelsProcessed(float64(pixelCount) / 1000000.0) + } } // transcode succeeded; continue processing response