Skip to content

Commit

Permalink
New Selection Algorithm (#2872)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko committed Sep 29, 2023
1 parent ecc2839 commit c9f47f4
Show file tree
Hide file tree
Showing 12 changed files with 740 additions and 414 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ jobs:
sudo apt purge -yqq dotnet-* mono-* llvm-* libllvm* powershell* openjdk-* \
temurin-* mongodb-* firefox mysql-* \
hhvm google-chrome-stable \
libgl1-mesa-dri microsoft-edge-stable \
google-cloud-sdk azure-cli
libgl1-mesa-dri microsoft-edge-stable azure-cli
sudo apt autoremove -y
sudo rm -rf /usr/share/dotnet /usr/local/lib/android
Expand Down Expand Up @@ -119,8 +118,7 @@ jobs:
sudo apt purge -yqq dotnet-* mono-* llvm-* libllvm* powershell* openjdk-* \
temurin-* mongodb-* firefox mysql-* \
hhvm google-chrome-stable \
libgl1-mesa-dri microsoft-edge-stable \
google-cloud-sdk azure-cli
libgl1-mesa-dri microsoft-edge-stable azure-cli
sudo apt autoremove -y
sudo rm -rf /usr/share/dotnet /usr/local/lib/android
Expand Down
17 changes: 12 additions & 5 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,23 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.CliAddr = flag.String("cliAddr", *cfg.CliAddr, "Address to bind for CLI commands")
cfg.HttpAddr = flag.String("httpAddr", *cfg.HttpAddr, "Address to bind for HTTP commands")
cfg.ServiceAddr = flag.String("serviceAddr", *cfg.ServiceAddr, "Orchestrator only. Overrides the on-chain serviceURI that broadcasters can use to contact this node; may be an IP or hostname.")
cfg.OrchAddr = flag.String("orchAddr", *cfg.OrchAddr, "Comma-separated list of orchestrators to connect to")
cfg.VerifierURL = flag.String("verifierUrl", *cfg.VerifierURL, "URL of the verifier to use")
cfg.VerifierPath = flag.String("verifierPath", *cfg.VerifierPath, "Path to verifier shared volume")
cfg.LocalVerify = flag.Bool("localVerify", *cfg.LocalVerify, "Set to true to enable local verification i.e. pixel count and signature verification.")
cfg.HttpIngest = flag.Bool("httpIngest", *cfg.HttpIngest, "Set to true to enable HTTP ingest")

// Broadcaster's Selection Algorithm
cfg.OrchAddr = flag.String("orchAddr", *cfg.OrchAddr, "Comma-separated list of orchestrators to connect to")
cfg.OrchWebhookURL = flag.String("orchWebhookUrl", *cfg.OrchWebhookURL, "Orchestrator discovery callback URL")
cfg.OrchBlacklist = flag.String("orchBlocklist", "", "Comma-separated list of blocklisted orchestrators")
cfg.SelectRandWeight = flag.Float64("selectRandFreq", *cfg.SelectRandWeight, "Weight of the random factor in the orchestrator selection algorithm")
cfg.SelectStakeWeight = flag.Float64("selectStakeWeight", *cfg.SelectStakeWeight, "Weight of the stake factor in the orchestrator selection algorithm")
cfg.SelectPriceWeight = flag.Float64("selectPriceWeight", *cfg.SelectPriceWeight, "Weight of the price factor in the orchestrator selection algorithm")
cfg.SelectPriceExpFactor = flag.Float64("selectPriceExpFactor", *cfg.SelectPriceExpFactor, "Expresses how significant a small change of price is for the selection algorithm; default 100")
cfg.OrchPerfStatsURL = flag.String("orchPerfStatsUrl", *cfg.OrchPerfStatsURL, "URL of Orchestrator Performance Stream Tester")
cfg.Region = flag.String("region", *cfg.Region, "Region in which a broadcaster is deployed; used to select the region while using the orchestrator's performance stats")
cfg.MaxPricePerUnit = flag.Int("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price (in wei) per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price")
cfg.MinPerfScore = flag.Float64("minPerfScore", *cfg.MinPerfScore, "The minimum orchestrator's performance score a broadcaster is willing to accept")

// Transcoding:
cfg.Orchestrator = flag.Bool("orchestrator", *cfg.Orchestrator, "Set to true to be an orchestrator")
Expand All @@ -132,7 +143,6 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.OrchSecret = flag.String("orchSecret", *cfg.OrchSecret, "Shared secret with the orchestrator as a standalone transcoder or path to file")
cfg.TranscodingOptions = flag.String("transcodingOptions", *cfg.TranscodingOptions, "Transcoding options for broadcast job, or path to json config")
cfg.MaxAttempts = flag.Int("maxAttempts", *cfg.MaxAttempts, "Maximum transcode attempts")
cfg.SelectRandFreq = flag.Float64("selectRandFreq", *cfg.SelectRandFreq, "Frequency to randomly select unknown orchestrators (on-chain mode only)")
cfg.MaxSessions = flag.String("maxSessions", *cfg.MaxSessions, "Maximum number of concurrent transcoding sessions for Orchestrator or 'auto' for dynamic limit, maximum number of RTMP streams for Broadcaster, or maximum capacity for transcoder.")
cfg.CurrentManifest = flag.Bool("currentManifest", *cfg.CurrentManifest, "Expose the currently active ManifestID as \"/stream/current.m3u8\"")
cfg.Nvidia = flag.String("nvidia", *cfg.Nvidia, "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)")
Expand Down Expand Up @@ -163,8 +173,6 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.DepositMultiplier = flag.Int("depositMultiplier", *cfg.DepositMultiplier, "The deposit multiplier used to determine max acceptable faceValue for PM tickets")
// Orchestrator base pricing info
cfg.PricePerUnit = flag.Int("pricePerUnit", 0, "The price per 'pixelsPerUnit' amount pixels")
// Broadcaster max acceptable price
cfg.MaxPricePerUnit = flag.Int("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price (in wei) per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price")
// Unit of pixels for both O's basePriceInfo and B's MaxBroadcastPrice
cfg.PixelsPerUnit = flag.Int("pixelsPerUnit", *cfg.PixelsPerUnit, "Amount of pixels per unit. Set to '> 1' to have smaller price granularity than 1 wei / pixel")
cfg.AutoAdjustPrice = flag.Bool("autoAdjustPrice", *cfg.AutoAdjustPrice, "Enable/disable automatic price adjustments based on the overhead for redeeming tickets")
Expand Down Expand Up @@ -195,7 +203,6 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.FVfailGsKey = flag.String("FVfailGskey", *cfg.FVfailGsKey, "Google Cloud Storage private key file name or key in JSON format for accessing FVfailGsBucket")
// API
cfg.AuthWebhookURL = flag.String("authWebhookUrl", *cfg.AuthWebhookURL, "RTMP authentication webhook URL")
cfg.OrchWebhookURL = flag.String("orchWebhookUrl", *cfg.OrchWebhookURL, "Orchestrator discovery callback URL")
cfg.DetectionWebhookURL = flag.String("detectionWebhookUrl", *cfg.DetectionWebhookURL, "(Experimental) Detection results callback URL")

return cfg
Expand Down
118 changes: 105 additions & 13 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"math/big"
Expand Down Expand Up @@ -59,12 +60,16 @@ var (
smTTL = 60 // 1 minute
)

const BroadcasterRpcPort = "9935"
const BroadcasterCliPort = "5935"
const BroadcasterRtmpPort = "1935"
const OrchestratorRpcPort = "8935"
const OrchestratorCliPort = "7935"
const TranscoderCliPort = "6935"
const (
BroadcasterRpcPort = "9935"
BroadcasterCliPort = "5935"
BroadcasterRtmpPort = "1935"
OrchestratorRpcPort = "8935"
OrchestratorCliPort = "7935"
TranscoderCliPort = "6935"

RefreshPerfScoreInterval = 10 * time.Minute
)

type LivepeerConfig struct {
Network *string
Expand All @@ -84,7 +89,14 @@ type LivepeerConfig struct {
OrchSecret *string
TranscodingOptions *string
MaxAttempts *int
SelectRandFreq *float64
SelectRandWeight *float64
SelectStakeWeight *float64
SelectPriceWeight *float64
SelectPriceExpFactor *float64
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *int
MinPerfScore *float64
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Expand All @@ -109,7 +121,6 @@ type LivepeerConfig struct {
MaxTicketEV *string
DepositMultiplier *int
PricePerUnit *int
MaxPricePerUnit *int
PixelsPerUnit *int
AutoAdjustPrice *bool
PricePerBroadcaster *string
Expand Down Expand Up @@ -153,8 +164,14 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultOrchSecret := ""
defaultTranscodingOptions := "P240p30fps16x9,P360p30fps16x9"
defaultMaxAttempts := 3
defaultSelectRandFreq := 0.3
defaultSelectRandWeight := 0.3
defaultSelectStakeWeight := 0.7
defaultSelectPriceWeight := 0.0
defaultSelectPriceExpFactor := 100.0
defaultMaxSessions := strconv.Itoa(10)
defaultOrchPerfStatsURL := ""
defaultRegion := ""
defaultMinPerfScore := 0.0
defaultCurrentManifest := false
defaultNvidia := ""
defaultNetint := ""
Expand Down Expand Up @@ -231,8 +248,14 @@ func DefaultLivepeerConfig() LivepeerConfig {
OrchSecret: &defaultOrchSecret,
TranscodingOptions: &defaultTranscodingOptions,
MaxAttempts: &defaultMaxAttempts,
SelectRandFreq: &defaultSelectRandFreq,
SelectRandWeight: &defaultSelectRandWeight,
SelectStakeWeight: &defaultSelectStakeWeight,
SelectPriceWeight: &defaultSelectPriceWeight,
SelectPriceExpFactor: &defaultSelectPriceExpFactor,
MaxSessions: &defaultMaxSessions,
OrchPerfStatsURL: &defaultOrchPerfStatsURL,
Region: &defaultRegion,
MinPerfScore: &defaultMinPerfScore,
CurrentManifest: &defaultCurrentManifest,
Nvidia: &defaultNvidia,
Netint: &defaultNetint,
Expand Down Expand Up @@ -542,6 +565,11 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}

} else {
n.SelectionAlgorithm, err = createSelectionAlgorithm(cfg)
if err != nil {
exit("Incorrect parameters for selection algorithm, err=%v", err)
}

var keystoreDir = filepath.Join(*cfg.Datadir, "keystore")
keystoreInfo, err := parseEthKeystorePath(*cfg.EthKeystorePath)
if err == nil {
Expand Down Expand Up @@ -827,7 +855,6 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}

}

if n.NodeType == core.BroadcasterNode {
ev, _ := new(big.Rat).SetString(*cfg.MaxTicketEV)
if ev == nil {
Expand Down Expand Up @@ -1028,8 +1055,12 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
*cfg.CliAddr = defaultAddr(*cfg.CliAddr, "127.0.0.1", BroadcasterCliPort)

bcast := core.NewBroadcaster(n)

orchBlacklist := parseOrchBlacklist(cfg.OrchBlacklist)
if *cfg.OrchPerfStatsURL != "" && *cfg.Region != "" {
glog.Infof("Using Performance Stats, region=%s, URL=%s, minPerfScore=%v", *cfg.Region, *cfg.OrchPerfStatsURL, *cfg.MinPerfScore)
n.OrchPerfScore = &common.PerfScore{Scores: make(map[ethcommon.Address]float64)}
go refreshOrchPerfScoreLoop(ctx, strings.ToUpper(*cfg.Region), *cfg.OrchPerfStatsURL, n.OrchPerfScore)
}

// When the node is on-chain mode always cache the on-chain orchestrators and poll for updates
// Right now we rely on the DBOrchestratorPoolCache constructor to do this. Consider separating the logic
Expand Down Expand Up @@ -1106,7 +1137,6 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {

// Set max transcode attempts. <=0 is OK; it just means "don't transcode"
server.MaxAttempts = *cfg.MaxAttempts
server.SelectRandFreq = *cfg.SelectRandFreq

} else if n.NodeType == core.OrchestratorNode {
*cfg.CliAddr = defaultAddr(*cfg.CliAddr, "127.0.0.1", OrchestratorCliPort)
Expand Down Expand Up @@ -1452,6 +1482,22 @@ func getBroadcasterPrices(broadcasterPrices string) []BroadcasterPrice {
return pricesSet.Prices
}

func createSelectionAlgorithm(cfg LivepeerConfig) (common.SelectionAlgorithm, error) {
sumWeight := *cfg.SelectStakeWeight + *cfg.SelectPriceWeight + *cfg.SelectRandWeight
if math.Abs(sumWeight-1.0) > 0.0001 {
return nil, fmt.Errorf(
"sum of selection algorithm weights must be 1.0, stakeWeight=%v, priceWeight=%v, randWeight=%v",
*cfg.SelectStakeWeight, *cfg.SelectPriceWeight, *cfg.SelectRandWeight)
}
return server.ProbabilitySelectionAlgorithm{
MinPerfScore: *cfg.MinPerfScore,
StakeWeight: *cfg.SelectStakeWeight,
PriceWeight: *cfg.SelectPriceWeight,
RandWeight: *cfg.SelectRandWeight,
PriceExpFactor: *cfg.SelectPriceExpFactor,
}, nil
}

type keystorePath struct {
path string
address ethcommon.Address
Expand Down Expand Up @@ -1485,6 +1531,52 @@ func parseEthKeystorePath(ethKeystorePath string) (keystorePath, error) {
return keystore, nil
}

func refreshOrchPerfScoreLoop(ctx context.Context, region string, orchPerfScoreURL string, score *common.PerfScore) {
for {
refreshOrchPerfScore(region, orchPerfScoreURL, score)

select {
case <-ctx.Done():
return
case <-time.After(RefreshPerfScoreInterval):
}
}
}

func refreshOrchPerfScore(region string, scoreURL string, score *common.PerfScore) {
resp, err := http.Get(scoreURL)
if err != nil {
glog.Warning("Cannot fetch Orchestrator Performance Stats from URL: %s", scoreURL)
return
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
glog.Warning("Cannot fetch Orchestrator Performance Stats from URL: %s", scoreURL)
return
}
updatePerfScore(region, body, score)
}

func updatePerfScore(region string, respBody []byte, score *common.PerfScore) {
respMap := map[ethcommon.Address]map[string]map[string]float64{}
if err := json.Unmarshal(respBody, &respMap); err != nil {
glog.Warning("Cannot unmarshal response from Orchestrator Performance Stats URL, err=%v", err)
return
}

score.Mu.Lock()
defer score.Mu.Unlock()
for orchAddr, regions := range respMap {
if stats, ok := regions[region]; ok {
if sc, ok := stats["score"]; ok {
score.Scores[orchAddr] = sc
}
}
}
glog.Infof("Scores: %v", score.Scores)
}

func exit(msg string, args ...any) {
glog.Errorf(msg, args...)
os.Exit(2)
Expand Down
102 changes: 102 additions & 0 deletions cmd/livepeer/starter/starter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,105 @@ func TestParse_ParseEthKeystorePathFileNotFound(t *testing.T) {
assert.Empty(keystoreInfo.address)
assert.True(err.Error() == "provided -ethKeystorePath was not found")
}

func TestUpdatePerfScore(t *testing.T) {
perfStatsResp := `
{
"0x001ffe939761eea3f37dd2223bd08401a3848bf3": {
"FRA": {
"success_rate": 0,
"round_trip_score": 0,
"score": 0
},
"LAX": {
"success_rate": 0.3333333333333333,
"round_trip_score": 0.978674309814987,
"score": 0.326224769938329
},
"LON": {
"success_rate": 0.3333333333333333,
"round_trip_score": 0.9999999981139247,
"score": 0.33333333270464155
},
"MDW": {
"success_rate": 1,
"round_trip_score": 0.8356601580708897,
"score": 0.8356601580708897
},
"NYC": {
"success_rate": 0.6666666666666666,
"round_trip_score": 0.9564037252220472,
"score": 0.6376024834813647
},
"PRG": {
"success_rate": 0.6666666666666666,
"round_trip_score": 0.9988698987407547,
"score": 0.6659132658271698
},
"SAO": {
"success_rate": 0.3333333333333333,
"round_trip_score": 0.8955986338422629,
"score": 0.29853287794742095
},
"SIN": {
"success_rate": 1,
"round_trip_score": 0.9969482179442755,
"score": 0.9969482179442755
}
},
"0x00803b76dc924ceabf4380a6f9edc2ddd3c90f38": {
"FRA": {
"success_rate": 1,
"round_trip_score": 0.6646347113088987,
"score": 0.6646347113088987
},
"LAX": {
"success_rate": 0.8222222222222223,
"round_trip_score": 0.381062716451423,
"score": 0.3133182335267256
},
"LON": {
"success_rate": 1,
"round_trip_score": 0.7694480079804097,
"score": 0.7694480079804097
},
"MDW": {
"success_rate": 0.6222222222222222,
"round_trip_score": 0.36531156012968535,
"score": 0.22730497074735978
},
"NYC": {
"success_rate": 1,
"round_trip_score": 0.543865046753563,
"score": 0.543865046753563
},
"PRG": {
"success_rate": 1,
"round_trip_score": 0.6681529487891555,
"score": 0.6681529487891555
},
"SAO": {
"success_rate": 0.6888888888888888,
"round_trip_score": 0.33652629465036343,
"score": 0.23182922520358365
},
"SIN": {
"success_rate": 0.6,
"round_trip_score": 0.3958106005746348,
"score": 0.23748636034478088
}
}
}`
scores := &common.PerfScore{Scores: map[ethcommon.Address]float64{
// some previous data
ethcommon.HexToAddress("0x001ffe939761eea3f37dd2223bd08401a3848bf3"): 0.11,
}}

updatePerfScore("LAX", []byte(perfStatsResp), scores)

expScores := map[ethcommon.Address]float64{
ethcommon.HexToAddress("0x001ffe939761eea3f37dd2223bd08401a3848bf3"): 0.326224769938329,
ethcommon.HexToAddress("0x00803b76dc924ceabf4380a6f9edc2ddd3c90f38"): 0.3133182335267256,
}
require.Equal(t, expScores, scores.Scores)
}
Loading

0 comments on commit c9f47f4

Please sign in to comment.