Skip to content

Commit

Permalink
feat(ai): add orchestrator AI census metrics
Browse files Browse the repository at this point in the history
This commit introduces a suite of AI orchestrator metrics to the census
module, mirroring those received by the Gateway. The newly added metrics
include `ai_models_requested`, `ai_request_latency_score`,
`ai_request_price`, and `ai_request_errors`, facilitating comprehensive
tracking and analysis of AI request handling performance on the orchestrator side.
  • Loading branch information
rickstaa committed Jul 14, 2024
1 parent b7181d4 commit a1d53c6
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 53 deletions.
92 changes: 71 additions & 21 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ func InitCensus(nodeType NodeType, version string) {
// Metrics for AI jobs
census.mAIModelsRequested = stats.Int64("ai_models_requested", "Number of AI models requested over time", "tot")
census.mAIRequestLatencyScore = stats.Float64("ai_request_latency_score", "AI request latency score, based on smallest pipeline unit", "")
census.mAIRequestPrice = stats.Float64("ai_request_price", "Price paid per AI request unit", "")
census.mAIRequestError = stats.Int64("ai_request_errors", "Errors when processing AI requests", "tot")
census.mAIRequestPrice = stats.Float64("ai_request_price", "AI request price per unit, based on smallest pipeline unit", "")
census.mAIRequestError = stats.Int64("ai_request_errors", "Errors during AI request processing", "tot")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Livepeer version: %s", version)
Expand All @@ -380,6 +380,7 @@ func InitCensus(nodeType NodeType, version string) {
baseTagsWithEthAddr := baseTags
baseTagsWithManifestIDAndEthAddr := baseTags
baseTagsWithOrchInfo := baseTags
baseTagsWithGatewayInfo := baseTags
if PerStreamMetrics {
baseTagsWithManifestID = []tag.Key{census.kNodeID, census.kNodeType, census.kManifestID}
baseTagsWithEthAddr = []tag.Key{census.kNodeID, census.kNodeType, census.kSender}
Expand All @@ -391,8 +392,17 @@ func InitCensus(nodeType NodeType, version string) {
}
baseTagsWithManifestIDAndOrchInfo := baseTagsWithManifestID
baseTagsWithOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTags...)
baseTagsWithGatewayInfo = append([]tag.Key{census.kSender}, baseTags...)
baseTagsWithManifestIDAndOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTagsWithManifestID...)

// Add node type specific tags.
baseTagsWithNodeInfo := baseTags
if nodeType == Orchestrator {
baseTagsWithNodeInfo = baseTagsWithOrchInfo
} else {
baseTagsWithNodeInfo = baseTagsWithGatewayInfo
}

views := []*view.View{
{
Name: "versions",
Expand Down Expand Up @@ -889,21 +899,21 @@ func InitCensus(nodeType NodeType, version string) {
Name: "ai_request_latency_score",
Measure: census.mAIRequestLatencyScore,
Description: "AI request latency score",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...),
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...),
Aggregation: view.LastValue(),
},
{
Name: "ai_request_price",
Measure: census.mAIRequestPrice,
Description: "AI request price per unit",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...),
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...),
Aggregation: view.LastValue(),
},
{
Name: "ai_request_errors",
Measure: census.mAIRequestError,
Description: "Errors when processing AI requests",
TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...),
TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...),
Aggregation: view.Sum(),
},
}
Expand Down Expand Up @@ -1760,15 +1770,8 @@ func RewardCallError(sender string) {
}
}

// AIJobProccessed records metrics from AI jobs
func AiJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) {
census.modelRequested(pipeline, model)
census.recordAILatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo)
census.recordAIPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo)
}

// modelRequested records the number of requests per pipeline and model
func (cen *censusMetricsCounter) modelRequested(pipeline, modelName string) {
// recordModelRequested increments request count for a specific AI model and pipeline.
func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) {
cen.lock.Lock()
defer cen.lock.Unlock()

Expand All @@ -1778,31 +1781,38 @@ func (cen *censusMetricsCounter) modelRequested(pipeline, modelName string) {
}
}

// recordAILatencyScore records the latency score for an AI job
func (cen *censusMetricsCounter) recordAILatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) {
// AIRequestFinished records Gateway AI job request metrics.
func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo)
census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo)
}

// recordAIRequestLatencyScore records the latency score for a AI job request.
func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())},
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())},
cen.mAIRequestLatencyScore.M(latencyScore)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// recordAIPricePerUnit records the price per unit for an AI job
func (cen *censusMetricsCounter) recordAIPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) {
// recordAIRequestPricePerUnit records the price per unit for a AI job request.
func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())},
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())},
cen.mAIRequestPrice.M(pricePerUnit)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// AIRequestError records an error during the AI job request
// AIRequestError logs an error in a Gateway AI job request.
func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.OrchestratorInfo) {
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
Expand All @@ -1816,6 +1826,46 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.
}
}

// AIJobProcessed records orchestrator AI job processing metrics.
func AIJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, sender string) {
census.recordModelRequested(pipeline, model)
census.recordAIJobLatencyScore(pipeline, model, jobInfo.LatencyScore, sender)
census.recordAIJobPricePerUnit(pipeline, model, jobInfo.PricePerUnit, sender)
}

// recordAIJobLatencyScore records the latency score for a processed AI job.
func (cen *censusMetricsCounter) recordAIJobLatencyScore(Pipeline string, Model string, latencyScore float64, sender string) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kSender, sender)},
cen.mAIRequestLatencyScore.M(latencyScore)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// recordAIJobPricePerUnit logs the cost per unit of a processed AI job.
func (cen *censusMetricsCounter) recordAIJobPricePerUnit(Pipeline string, Model string, pricePerUnit float64, sender string) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kSender, sender)},
cen.mAIRequestPrice.M(pricePerUnit)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// AIProcessingError logs errors in orchestrator AI job processing.
func AIProcessingError(code string, Pipeline string, Model string, sender string) {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)},
census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
25 changes: 25 additions & 0 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/monitor"
middleware "github.com/oapi-codegen/nethttp-middleware"
"github.com/oapi-codegen/runtime"
)
Expand Down Expand Up @@ -271,6 +272,9 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
start := time.Now()
resp, err := submitFn(ctx)
if err != nil {
if monitor.Enabled {
monitor.AIProcessingError(err.Error(), pipeline, modelID, sender.Hex())
}
respondWithError(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -283,6 +287,27 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
// If additional parameters that influence compute cost become configurable, then the formula should be reconsidered
orch.DebitFees(sender, manifestID, payment.GetExpectedPrice(), outPixels)

if monitor.Enabled {
var latencyScore float64
switch v := req.(type) {
case worker.TextToImageJSONRequestBody:
latencyScore = CalculateTextToImageLatencyScore(took, v, outPixels)
case worker.ImageToImageMultipartRequestBody:
latencyScore = CalculateImageToImageLatencyScore(took, v, outPixels)
case worker.ImageToVideoMultipartRequestBody:
latencyScore = CalculateImageToVideoLatencyScore(took, outPixels)
case worker.UpscaleMultipartRequestBody:
latencyScore = CalculateUpscaleLatencyScore(took, outPixels)
}

var pricePerAIUnit float64
if priceInfo := payment.GetExpectedPrice(); priceInfo != nil && priceInfo.GetPixelsPerUnit() != 0 {
pricePerAIUnit = float64(priceInfo.GetPricePerUnit()) / float64(priceInfo.GetPixelsPerUnit())
}

monitor.AIJobProcessed(ctx, pipeline, modelID, monitor.AIJobInfo{LatencyScore: latencyScore, PricePerUnit: pricePerAIUnit}, sender.Hex())
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(resp)
Expand Down
107 changes: 75 additions & 32 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"image"
"io"
"math"
"math/big"
"net/http"
"path/filepath"
Expand Down Expand Up @@ -44,6 +45,24 @@ type aiRequestParams struct {
sessManager *AISessionManager
}

// CalculateTextToImageLatencyScore computes the time taken per pixel for an text-to-image request.
func CalculateTextToImageLatencyScore(took time.Duration, req worker.TextToImageJSONRequestBody, outPixels int64) float64 {
if outPixels <= 0 {
return 0
}

numImages := float64(1)
if req.NumImagesPerPrompt != nil {
numImages = math.Max(1, float64(*req.NumImagesPerPrompt))
}
numInferenceSteps := float64(50)
if req.NumInferenceSteps != nil {
numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps))
}

return took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps)
}

func processTextToImage(ctx context.Context, params aiRequestParams, req worker.TextToImageJSONRequestBody) (*worker.ImageResponse, error) {
resp, err := processAIRequest(ctx, params, req)
if err != nil {
Expand Down Expand Up @@ -125,27 +144,34 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess
// TODO: Refine this rough estimate in future iterations.
// TODO: Default values for the number of images and inference steps are currently hardcoded.
// These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details.
numImages := float64(1)
if req.NumImagesPerPrompt != nil {
numImages = float64(*req.NumImagesPerPrompt)
}
numInferenceSteps := float64(50)
if req.NumInferenceSteps != nil {
numInferenceSteps = float64(*req.NumInferenceSteps)
}
sess.LatencyScore = took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps)
sess.LatencyScore = CalculateTextToImageLatencyScore(took, req, outPixels)

if monitor.Enabled {
pricePerUnit := 0.0
if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil {
pricePerUnit = float64(priceInfo.PricePerUnit)
var pricePerAIUnit float64
if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 {
pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit)
}
monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo)

monitor.AIRequestFinished(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
}

return resp.JSON200, nil
}

// CalculateImageToImageLatencyScore computes the time taken per pixel for an image-to-image request.
func CalculateImageToImageLatencyScore(took time.Duration, req worker.ImageToImageMultipartRequestBody, outPixels int64) float64 {
if outPixels <= 0 {
return 0
}

numImages := float64(1)
if req.NumImagesPerPrompt != nil {
numImages = math.Max(1, float64(*req.NumImagesPerPrompt))
}

return took.Seconds() / float64(outPixels) / numImages
}

func processImageToImage(ctx context.Context, params aiRequestParams, req worker.ImageToImageMultipartRequestBody) (*worker.ImageResponse, error) {
resp, err := processAIRequest(ctx, params, req)
if err != nil {
Expand Down Expand Up @@ -241,23 +267,29 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes
// TODO: Refine this rough estimate in future iterations.
// TODO: Default values for the number of images is currently hardcoded.
// These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details.
numImages := float64(1)
if req.NumImagesPerPrompt != nil {
numImages = float64(*req.NumImagesPerPrompt)
}
sess.LatencyScore = took.Seconds() / float64(outPixels) / numImages
sess.LatencyScore = CalculateImageToImageLatencyScore(took, req, outPixels)

if monitor.Enabled {
pricePerUnit := 0.0
if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil {
pricePerUnit = float64(priceInfo.PricePerUnit)
var pricePerAIUnit float64
if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 {
pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit)
}
monitor.AiJobProcessed(ctx, "image-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo)

monitor.AIRequestFinished(ctx, "image-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
}

return resp.JSON200, nil
}

// CalculateImageToVideoLatencyScore computes the time taken per pixel for an image-to-video request.
func CalculateImageToVideoLatencyScore(took time.Duration, outPixels int64) float64 {
if outPixels <= 0 {
return 0
}

return took.Seconds() / float64(outPixels)
}

func processImageToVideo(ctx context.Context, params aiRequestParams, req worker.ImageToVideoMultipartRequestBody) (*worker.ImageResponse, error) {
resp, err := processAIRequest(ctx, params, req)
if err != nil {
Expand Down Expand Up @@ -366,19 +398,29 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes
}

// TODO: Refine this rough estimate in future iterations
sess.LatencyScore = took.Seconds() / float64(outPixels)
sess.LatencyScore = CalculateImageToVideoLatencyScore(took, outPixels)

if monitor.Enabled {
pricePerUnit := 0.0
if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil {
pricePerUnit = float64(priceInfo.PricePerUnit)
var pricePerAIUnit float64
if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 {
pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit)
}
monitor.AiJobProcessed(ctx, "image-to-video", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo)

monitor.AIRequestFinished(ctx, "image-to-video", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
}

return &res, nil
}

// CalculateUpscaleLatencyScore computes the time taken per pixel for an upscale request.
func CalculateUpscaleLatencyScore(took time.Duration, outPixels int64) float64 {
if outPixels <= 0 {
return 0
}

return took.Seconds() / float64(outPixels)
}

func processUpscale(ctx context.Context, params aiRequestParams, req worker.UpscaleMultipartRequestBody) (*worker.ImageResponse, error) {
resp, err := processAIRequest(ctx, params, req)
if err != nil {
Expand Down Expand Up @@ -469,14 +511,15 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession,
}

// TODO: Refine this rough estimate in future iterations
sess.LatencyScore = took.Seconds() / float64(outPixels)
sess.LatencyScore = CalculateUpscaleLatencyScore(took, outPixels)

if monitor.Enabled {
pricePerUnit := 0.0
if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil {
pricePerUnit = float64(priceInfo.PricePerUnit)
var pricePerAIUnit float64
if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 {
pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit)
}
monitor.AiJobProcessed(ctx, "upscale", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo)

monitor.AIRequestFinished(ctx, "upscale", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
}

return resp.JSON200, nil
Expand Down

0 comments on commit a1d53c6

Please sign in to comment.