Skip to content

Commit

Permalink
feat: add initial POC AI gateway metrics
Browse files Browse the repository at this point in the history
This commit adds the initial AI gateway metrics so that they can
reviewed by others. The code still need to be cleaned up and the buckets
adjusted.
  • Loading branch information
rickstaa committed Jun 26, 2024
1 parent 7be1bbd commit e152e12
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 62 deletions.
148 changes: 100 additions & 48 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ type (
kFVErrorType tag.Key
kPipeline tag.Key
kModelName tag.Key
mAIRoundtripTime *stats.Float64Measure
mModelsRequested *stats.Int64Measure
mSegmentSourceAppeared *stats.Int64Measure
mSegmentEmerged *stats.Int64Measure
mSegmentEmergedUnprocessed *stats.Int64Measure
Expand Down Expand Up @@ -194,6 +192,12 @@ type (
mSegmentClassProb *stats.Float64Measure
mSceneClassification *stats.Int64Measure

// Metrics for AI jobs
mAIModelsRequested *stats.Int64Measure
mAILatencyScore *stats.Float64Measure
mAIPricePerUnit *stats.Float64Measure
mAIRequestError *stats.Int64Measure

lock sync.Mutex
emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo
success map[uint64]*segmentsAverager
Expand Down Expand Up @@ -221,6 +225,11 @@ type (
removedAt time.Time
tries map[uint64]tryData // seqNo:try
}

AIJobInfo struct {
LatencyScore float64
PricePerUnit float64
}
)

// Exporter Prometheus exporter that handles `/metrics` endpoint
Expand Down Expand Up @@ -295,8 +304,6 @@ func InitCensus(nodeType NodeType, version string) {
census.mSuccessRate = stats.Float64("success_rate", "Success rate", "per")
census.mSuccessRatePerStream = stats.Float64("success_rate_per_stream", "Success rate, per stream", "per")
census.mTranscodeTime = stats.Float64("transcode_time_seconds", "Transcoding time", "sec")
census.mModelsRequested = stats.Int64("ai_models_requested", "Number of models requested over time", "tot")
census.mAIRoundtripTime = stats.Float64("ai_roundtrip_time_seconds", "AI Roundtrip time", "sec")
census.mTranscodeOverallLatency = stats.Float64("transcode_overall_latency_seconds",
"Transcoding latency, from source segment emerged from segmenter till all transcoded segment apeeared in manifest", "sec")
census.mUploadTime = stats.Float64("upload_time_seconds", "Upload (to Orchestrator) time", "sec")
Expand Down Expand Up @@ -347,6 +354,12 @@ func InitCensus(nodeType NodeType, version string) {
census.mSegmentClassProb = stats.Float64("segment_class_prob", "SegmentClassProb", "tot")
census.mSceneClassification = stats.Int64("scene_classification_done", "SceneClassificationDone", "tot")

// Metrics for AI jobs
census.mAIModelsRequested = stats.Int64("ai_models_requested", "Number of AI models requested over time", "tot")
census.mAILatencyScore = stats.Float64("ai_latency_score", "Orchestrator AI request latency score, based on smallest pipeline unit", "")
census.mAIPricePerUnit = stats.Float64("ai_price_per_unit", "Price paid per AI pipeline unit", "")
census.mAIRequestError = stats.Int64("ai_request_errors", "AIRequestErrors", "tot")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Livepeer version: %s", version)
glog.Infof("Node type %s node ID %s", nodeType, NodeID)
Expand Down Expand Up @@ -556,20 +569,6 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: append([]tag.Key{census.kProfiles, census.kTrusted, census.kVerified}, baseTags...),
Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000),
},
{
Name: "ai_roundtrip_time_seconds",
Measure: census.mAIRoundtripTime,
Description: "AI Roundtrip time, seconds",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestIDAndIP...),
Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000),
},
{
Name: "ai_models_requested",
Measure: census.mModelsRequested,
Description: "Count of Models Requested over time",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestID...),
Aggregation: view.LastValue(),
},
{
Name: "transcode_overall_latency_seconds",
Measure: census.mTranscodeOverallLatency,
Expand Down Expand Up @@ -877,6 +876,36 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: baseTags,
Aggregation: view.Count(),
},

// Metrics for AI jobs
{
Name: "ai_models_requested",
Measure: census.mAIModelsRequested,
Description: "Count of AI model requests over time",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestID...),
Aggregation: view.LastValue(),
},
{
Name: "ai_latency_score",
Measure: census.mAILatencyScore,
Description: "Orchestrator AI request latency score",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestIDAndIP...),
Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000),
},
{
Name: "ai_price_per_unit",
Measure: census.mAIPricePerUnit,
Description: "Price paid per AI pipeline unit",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestIDAndIP...),
Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000),
},
{
Name: "ai_request_errors",
Measure: census.mAIRequestError,
Description: "Errors processing AI requests",
TagKeys: baseTags,
Aggregation: view.Sum(),
},
}

// Register the views
Expand Down Expand Up @@ -1378,36 +1407,6 @@ func SegmentTranscoded(ctx context.Context, nonce, seqNo uint64, sourceDur time.

census.segmentTranscoded(nonce, seqNo, sourceDur, transcodeDur, profiles, trusted, verified)
}
func AiJobProcessed(ctx context.Context, Pipeline string, Model string, responseDuration time.Duration) {

census.aiJobProcessed(Pipeline, Model, responseDuration)
}

func RecordModelRequested(Pipeline string, Model string) {
census.recordModelRequested(Pipeline, Model)
}

func (cen *censusMetricsCounter) aiJobProcessed(Pipeline string, Model string, responseDuration time.Duration) {
cen.lock.Lock()
defer cen.lock.Unlock()

ctx, err := tag.New(cen.ctx, tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model))
if err != nil {
glog.Error("Error creating context", err)
return
}

stats.Record(ctx, census.mAIRoundtripTime.M(responseDuration.Seconds()))
}

func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) {
ctx, err := tag.New(cen.ctx, tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelName))
if err != nil {
glog.Errorf("Failed to create context with tags: %v", err)
return
}
stats.Record(ctx, census.mModelsRequested.M(1))
}

func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration,
profiles string, trusted, verified bool) {
Expand Down Expand Up @@ -1762,6 +1761,59 @@ func RewardCallError(sender string) {
}
}

// AIJobProccessed records metrics from AI jobs
func AiJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) {
census.modelRequested(pipeline, model, orchInfo)
census.recordAILatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo)
census.recordAIPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo)
}

func (cen *censusMetricsCounter) modelRequested(pipeline, modelName string, orchInfo *lpnet.OrchestratorInfo) {
ctx, err := tag.New(cen.ctx, tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelName), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()))
if err != nil {
glog.Errorf("Failed to create context with tags: %v", err)
return
}

stats.Record(ctx, census.mAIModelsRequested.M(1))
}

func (cen *censusMetricsCounter) recordAILatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) {
cen.lock.Lock()
defer cen.lock.Unlock()

ctx, err := tag.New(cen.ctx, tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()))
if err != nil {
glog.Error("Error creating context", err)
return
}

stats.Record(ctx, census.mAILatencyScore.M(latencyScore))
}

func (cen *censusMetricsCounter) recordAIPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) {
cen.lock.Lock()
defer cen.lock.Unlock()

ctx, err := tag.New(cen.ctx, tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()))
if err != nil {
glog.Error("Error creating context", err)
return
}

stats.Record(ctx, census.mAIPricePerUnit.M(pricePerUnit))
}

// RewardCallError records an error during the AI job request
func AIRequestError(sender string) {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kSender, sender)},
census.mRewardCallError.M(1)); err != nil {

glog.Errorf("Error recording metrics err=%q", err)
}
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
15 changes: 1 addition & 14 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-tools/drivers"
middleware "github.com/oapi-codegen/nethttp-middleware"
"github.com/oapi-codegen/runtime"
Expand Down Expand Up @@ -87,7 +86,6 @@ func (ls *LivepeerServer) TextToImage() http.Handler {
}

clog.V(common.VERBOSE).Infof(r.Context(), "Received TextToImage request prompt=%v model_id=%v", req.Prompt, *req.ModelId)
monitor.RecordModelRequested("text-to-image", *req.ModelId)

params := aiRequestParams{
node: ls.LivepeerNode,
Expand All @@ -110,10 +108,6 @@ func (ls *LivepeerServer) TextToImage() http.Handler {
took := time.Since(start)
clog.Infof(ctx, "Processed TextToImage request prompt=%v model_id=%v took=%v", req.Prompt, *req.ModelId, took)

if monitor.Enabled {
monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, took)
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(resp)
Expand All @@ -140,7 +134,6 @@ func (ls *LivepeerServer) ImageToImage() http.Handler {
}

clog.V(common.VERBOSE).Infof(ctx, "Received ImageToImage request imageSize=%v prompt=%v model_id=%v", req.Image.FileSize(), req.Prompt, *req.ModelId)
monitor.RecordModelRequested("image-to-image", *req.ModelId)

params := aiRequestParams{
node: ls.LivepeerNode,
Expand All @@ -162,9 +155,6 @@ func (ls *LivepeerServer) ImageToImage() http.Handler {

took := time.Since(start)
clog.V(common.VERBOSE).Infof(ctx, "Processed ImageToImage request imageSize=%v prompt=%v model_id=%v took=%v", req.Image.FileSize(), req.Prompt, *req.ModelId, took)
if monitor.Enabled {
monitor.AiJobProcessed(ctx, "image-to-image", *req.ModelId, took)
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -198,7 +188,6 @@ func (ls *LivepeerServer) ImageToVideo() http.Handler {
}

clog.V(common.VERBOSE).Infof(ctx, "Received ImageToVideo request imageSize=%v model_id=%v async=%v", req.Image.FileSize(), *req.ModelId, async)
monitor.RecordModelRequested("image-to-video", *req.ModelId)

params := aiRequestParams{
node: ls.LivepeerNode,
Expand All @@ -223,9 +212,7 @@ func (ls *LivepeerServer) ImageToVideo() http.Handler {

took := time.Since(start)
clog.Infof(ctx, "Processed ImageToVideo request imageSize=%v model_id=%v took=%v", req.Image.FileSize(), *req.ModelId, took)
if monitor.Enabled {
monitor.AiJobProcessed(ctx, "image-to-video", *req.ModelId, took)
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(resp)
Expand Down
Loading

0 comments on commit e152e12

Please sign in to comment.