Skip to content

Commit

Permalink
cmd,core,server: Support dynamic updates to price in USD
Browse files Browse the repository at this point in the history
  • Loading branch information
victorges committed Mar 23, 2024
1 parent 8b7c4d1 commit 644e680
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 203 deletions.
99 changes: 22 additions & 77 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/livepeer/go-livepeer/eth"
"github.com/livepeer/go-livepeer/eth/blockwatch"
"github.com/livepeer/go-livepeer/eth/watchers"
"github.com/livepeer/go-livepeer/monitor"
lpmon "github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-livepeer/server"
Expand All @@ -59,11 +60,6 @@ var (
cleanupInterval = 10 * time.Minute
// The time to live for cached max float values for PM senders (else they will be cleaned up) in seconds
smTTL = 172800 // 2 days

// Regular expression used to parse the price per unit CLI flags
pricePerUnitRex = regexp.MustCompile(`^(\d+(\.\d+)?)([A-z][A-z0-9]*)?$`)
// Number of wei in 1 ETH
weiPerETH = big.NewRat(1e18, 1)
)

const (
Expand Down Expand Up @@ -721,7 +717,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
go serviceRegistryWatcher.Watch()
defer serviceRegistryWatcher.Stop()

priceFeedWatcher, err := watchers.NewPriceFeedWatcher(ctx, backend, *cfg.PriceFeedAddr)
core.PriceFeedWatcher, err = watchers.NewPriceFeedWatcher(backend, *cfg.PriceFeedAddr)
if err != nil {
glog.Errorf("Failed to set up price feed watcher: %v", err)
return
Expand Down Expand Up @@ -764,26 +760,26 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
} else if pricePerUnit.Sign() < 0 {
panic(fmt.Errorf("-pricePerUnit must be >= 0, provided %s", pricePerUnit))
}
err = watchPriceUpdates(ctx, priceFeedWatcher, currency, func(multiplier *big.Rat) {
defaultPrice := toWeiPricePerPixel(pricePerUnit, multiplier, pixelsPerUnit)
n.SetBasePrice("default", defaultPrice)
glog.Infof("Price: %v wei per pixel", defaultPrice.RatString())
pricePerPixel := new(big.Rat).Quo(pricePerUnit, pixelsPerUnit)
autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) {
glog.Infof("Price: %v wei per pixel\n ", price.RatString())
})
if err != nil {
panic(fmt.Errorf("Error starting price update loop: %v", err))
panic(fmt.Errorf("Error converting price: %v", err))
}
n.SetBasePrice("default", autoPrice)

broadcasterPrices := getBroadcasterPrices(*cfg.PricePerBroadcaster)
for _, p := range broadcasterPrices {
currency, pricePerUnit, ethAddress := p.Currency, p.PricePerUnit, p.EthAddress
err = watchPriceUpdates(ctx, priceFeedWatcher, currency, func(multiplier *big.Rat) {
price := toWeiPricePerPixel(pricePerUnit, multiplier, pixelsPerUnit)
n.SetBasePrice(ethAddress, price)
glog.Infof("Price: %v wei per pixel for broadcaster %v", price.RatString(), ethAddress)
p := p
pricePerPixel := new(big.Rat).Quo(pricePerUnit, pixelsPerUnit)
autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) {
glog.Infof("Price: %v wei per pixel for broadcaster %v", price.RatString(), p.EthAddress)
})
if err != nil {
panic(fmt.Errorf("Error starting broadcaster price update loop: %v", err))
panic(fmt.Errorf("Error converting price for broadcaster %s: %v", p.EthAddress, err))
}
n.SetBasePrice(p.EthAddress, autoPrice)
}

n.AutoSessionLimit = *cfg.MaxSessions == "auto"
Expand Down Expand Up @@ -890,14 +886,17 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
panic(fmt.Errorf("The maximum price per unit must be a valid integer with an optional currency, provided %v instead\n", *cfg.MaxPricePerUnit))
}
if maxPricePerUnit.Sign() > 0 {
err = watchPriceUpdates(ctx, priceFeedWatcher, currency, func(multiplier *big.Rat) {
price := toWeiPricePerPixel(maxPricePerUnit, multiplier, pixelsPerUnit)
server.BroadcastCfg.SetMaxPrice(price)
glog.Infof("Price: %v wei per pixel\n ", price.RatString())
pricePerPixel := new(big.Rat).Quo(maxPricePerUnit, pixelsPerUnit)
autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) {
if monitor.Enabled {
monitor.MaxTranscodingPrice(price)
}
glog.Infof("Maximum transcoding price:: %v wei per pixel\n ", price.RatString())
})
if err != nil {
panic(fmt.Errorf("Error starting price update loop: %v", err))
panic(fmt.Errorf("Error converting price: %v", err))
}
server.BroadcastCfg.SetMaxPrice(autoPrice)
} else {
glog.Infof("Maximum transcoding price per pixel is not greater than 0: %v, broadcaster is currently set to accept ANY price.\n", *cfg.MaxPricePerUnit)
glog.Infoln("To update the broadcaster's maximum acceptable transcoding price per pixel, use the CLI or restart the broadcaster with the appropriate 'maxPricePerUnit' and 'pixelsPerUnit' values")
Expand Down Expand Up @@ -1560,6 +1559,7 @@ func parseEthKeystorePath(ethKeystorePath string) (keystorePath, error) {
}

func parsePricePerUnit(pricePerUnitStr string) (*big.Rat, string, error) {
pricePerUnitRex := regexp.MustCompile(`^(\d+(\.\d+)?)([A-z][A-z0-9]*)?$`)
match := pricePerUnitRex.FindStringSubmatch(pricePerUnitStr)
if match == nil {
return nil, "", fmt.Errorf("price must be in the format of <price><currency>, provided %v", pricePerUnitStr)
Expand All @@ -1569,8 +1569,6 @@ func parsePricePerUnit(pricePerUnitStr string) (*big.Rat, string, error) {
pricePerUnit, ok := new(big.Rat).SetString(price)
if !ok {
return nil, "", fmt.Errorf("price must be a valid number, provided %v", match[1])
} else if pricePerUnit.Sign() < 0 {
return nil, "", fmt.Errorf("price must be >= 0, provided %v", pricePerUnit)
}
if currency == "" {
currency = "wei"
Expand All @@ -1579,59 +1577,6 @@ func parsePricePerUnit(pricePerUnitStr string) (*big.Rat, string, error) {
return pricePerUnit, currency, nil
}

func watchPriceUpdates(ctx context.Context, watcher *watchers.PriceFeedWatcher, currency string, updatePrice func(multiplier *big.Rat)) error {
if strings.ToLower(currency) == "wei" {
updatePrice(big.NewRat(1, 1))
return nil
} else if strings.ToUpper(currency) == "ETH" {
updatePrice(weiPerETH)
return nil
}

base, quote := watcher.Currencies()
if base != "ETH" && quote != "ETH" {
return fmt.Errorf("price feed does not have ETH as a currency (%v/%v)", base, quote)
}
if base != currency && quote != currency {
return fmt.Errorf("price feed does not have %v as a currency (%v/%v)", currency, base, quote)
}

price, err := watcher.Current()
if err != nil {
return fmt.Errorf("error fetching price data: %v", err)
}
updatePrice(currencyToWeiMultiplier(price, base))

go func() {
priceUpdated := make(chan eth.PriceData, 1)
sub := watcher.Subscribe(priceUpdated)
defer sub.Unsubscribe()
for {
select {
case <-ctx.Done():
return
case data := <-priceUpdated:
updatePrice(currencyToWeiMultiplier(data, base))
}
}
}()
return nil
}

func currencyToWeiMultiplier(data eth.PriceData, baseCurrency string) *big.Rat {
ethMultipler := data.Price
if baseCurrency == "ETH" {
// Invert the multiplier if the quote is in the form ETH / X
ethMultipler = new(big.Rat).Inv(ethMultipler)
}
return new(big.Rat).Mul(ethMultipler, weiPerETH)
}

func toWeiPricePerPixel(pricePerUnit *big.Rat, currencyMultiplier *big.Rat, pixelsPerUnit *big.Rat) *big.Rat {
weiPricePerUnit := new(big.Rat).Mul(pricePerUnit, currencyMultiplier)
return new(big.Rat).Quo(weiPricePerUnit, pixelsPerUnit)
}

func refreshOrchPerfScoreLoop(ctx context.Context, region string, orchPerfScoreURL string, score *common.PerfScore) {
for {
refreshOrchPerfScore(region, orchPerfScoreURL, score)
Expand Down
29 changes: 0 additions & 29 deletions cmd/livepeer/starter/starter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,32 +383,3 @@ func TestParsePricePerUnit(t *testing.T) {
})
}
}

func TestPriceDataToWei(t *testing.T) {
tests := []struct {
name string
data eth.PriceData
baseCurrency string
expectedWei *big.Rat
}{
{
name: "Base currency is ETH",
data: eth.PriceData{Price: big.NewRat(500, 1)}, // 500 USD per ETH
baseCurrency: "ETH",
expectedWei: big.NewRat(1e18, 500), // (1 / 500 USD/ETH) * 1e18 wei/ETH
},
{
name: "Base currency is not ETH",
data: eth.PriceData{Price: big.NewRat(1, 2000)}, // 1/2000 ETH per USD
baseCurrency: "USD",
expectedWei: big.NewRat(5e14, 1), // (1 * 1/2000 ETH/USD) * 1e18 wei/ETH
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := currencyToWeiMultiplier(tt.data, tt.baseCurrency)
assert.Equal(t, 0, tt.expectedWei.Cmp(result))
})
}
}
100 changes: 100 additions & 0 deletions core/autoconvertedprice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package core

import (
"context"
"fmt"
"math/big"
"strings"

"github.com/livepeer/go-livepeer/eth"
"github.com/livepeer/go-livepeer/eth/watchers"
)

// PriceFeedWatcher is a global instance of a PriceFeedWatcher. It must be
// initialized before creating an AutoConvertedPrice instance.
var PriceFeedWatcher *watchers.PriceFeedWatcher

// Number of wei in 1 ETH
var weiPerETH = big.NewRat(1e18, 1)

type AutoConvertedPrice struct {
cancelSubscription func()
onUpdate func(*big.Rat)
basePrice *big.Rat

current *big.Rat
}

func NewFixedPrice(price *big.Rat) (*AutoConvertedPrice) {
return &AutoConvertedPrice{current: price}
}

func NewAutoConvertedPrice(currency string, basePrice *big.Rat, onUpdate func(*big.Rat)) (*AutoConvertedPrice, error) {
if PriceFeedWatcher == nil {
return nil, fmt.Errorf("PriceFeedWatcher is not initialized")
}
if strings.ToLower(currency) == "wei" {
return NewFixedPrice(basePrice), nil
} else if strings.ToUpper(currency) == "ETH" {
return NewFixedPrice(new(big.Rat).Mul(basePrice, weiPerETH)), nil
}

base, quote := PriceFeedWatcher.Currencies()
if base != "ETH" && quote != "ETH" {
return nil, fmt.Errorf("price feed does not have ETH as a currency (%v/%v)", base, quote)
}
if base != currency && quote != currency {
return nil, fmt.Errorf("price feed does not have %v as a currency (%v/%v)", currency, base, quote)
}

currencyPrice, err := PriceFeedWatcher.Current()
if err != nil {
return nil, fmt.Errorf("error getting current price data: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
price := &AutoConvertedPrice{
cancelSubscription: cancel,
onUpdate: onUpdate,
basePrice: basePrice,
current: new(big.Rat).Mul(basePrice, currencyToWeiMultiplier(currencyPrice, base)),
}
go price.watch(ctx)
onUpdate(price.current)

return price, nil
}

func (a *AutoConvertedPrice) Value() *big.Rat {
return a.current
}

func (a *AutoConvertedPrice) Stop() {
if a.cancelSubscription != nil {
a.cancelSubscription()
}
}

func (a *AutoConvertedPrice) watch(ctx context.Context) {
base, _ := PriceFeedWatcher.Currencies()
priceUpdated := make(chan eth.PriceData, 1)
PriceFeedWatcher.Subscribe(ctx, priceUpdated)
for {
select {
case <-ctx.Done():
return
case currencyPrice := <-priceUpdated:
a.current = new(big.Rat).Mul(a.basePrice, currencyToWeiMultiplier(currencyPrice, base))
a.onUpdate(a.current)
}
}
}

func currencyToWeiMultiplier(data eth.PriceData, baseCurrency string) *big.Rat {
ethMultipler := data.Price
if baseCurrency == "ETH" {
// Invert the multiplier if the quote is in the form ETH / X
ethMultipler = new(big.Rat).Inv(ethMultipler)
}
return new(big.Rat).Mul(ethMultipler, weiPerETH)
}
38 changes: 38 additions & 0 deletions core/autoconvertedprice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package core

import (
"math/big"
"testing"

"github.com/livepeer/go-livepeer/eth"
"github.com/stretchr/testify/assert"
)

func TestCurrencyToWeiMultiplier(t *testing.T) {
tests := []struct {
name string
data eth.PriceData
baseCurrency string
expectedWei *big.Rat
}{
{
name: "Base currency is ETH",
data: eth.PriceData{Price: big.NewRat(500, 1)}, // 500 USD per ETH
baseCurrency: "ETH",
expectedWei: big.NewRat(1e18, 500), // (1 / 500 USD/ETH) * 1e18 wei/ETH
},
{
name: "Base currency is not ETH",
data: eth.PriceData{Price: big.NewRat(1, 2000)}, // 1/2000 ETH per USD
baseCurrency: "USD",
expectedWei: big.NewRat(5e14, 1), // (1 * 1/2000 ETH/USD) * 1e18 wei/ETH
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := currencyToWeiMultiplier(tt.data, tt.baseCurrency)
assert.Equal(t, 0, tt.expectedWei.Cmp(result))
})
}
}
19 changes: 14 additions & 5 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type LivepeerNode struct {
StorageConfigs map[string]*transcodeConfig
storageMutex *sync.RWMutex
// Transcoder private fields
priceInfo map[string]*big.Rat
priceInfo map[string]*AutoConvertedPrice
serviceURI url.URL
segmentMutex *sync.RWMutex
}
Expand All @@ -109,7 +109,7 @@ func NewLivepeerNode(e eth.LivepeerEthClient, wd string, dbh *common.DB) (*Livep
SegmentChans: make(map[ManifestID]SegmentChan),
segmentMutex: &sync.RWMutex{},
Capabilities: &Capabilities{capacities: map[Capability]int{}},
priceInfo: make(map[string]*big.Rat),
priceInfo: make(map[string]*AutoConvertedPrice),
StorageConfigs: make(map[string]*transcodeConfig),
storageMutex: &sync.RWMutex{},
}, nil
Expand All @@ -128,12 +128,17 @@ func (n *LivepeerNode) SetServiceURI(newUrl *url.URL) {
}

// SetBasePrice sets the base price for an orchestrator on the node
func (n *LivepeerNode) SetBasePrice(b_eth_addr string, price *big.Rat) {
func (n *LivepeerNode) SetBasePrice(b_eth_addr string, price *AutoConvertedPrice) {
addr := strings.ToLower(b_eth_addr)
n.mu.Lock()
defer n.mu.Unlock()


prevPrice := n.priceInfo[addr]
n.priceInfo[addr] = price
if prevPrice != nil {
prevPrice.Stop()
}
}

// GetBasePrice gets the base price for an orchestrator
Expand All @@ -142,14 +147,18 @@ func (n *LivepeerNode) GetBasePrice(b_eth_addr string) *big.Rat {
n.mu.RLock()
defer n.mu.RUnlock()

return n.priceInfo[addr]
return n.priceInfo[addr].Value()
}

func (n *LivepeerNode) GetBasePrices() map[string]*big.Rat {
n.mu.RLock()
defer n.mu.RUnlock()

return n.priceInfo
prices := make(map[string]*big.Rat)
for addr, price := range n.priceInfo {
prices[addr] = price.Value()
}
return prices
}

// SetMaxFaceValue sets the faceValue upper limit for tickets received
Expand Down
Loading

0 comments on commit 644e680

Please sign in to comment.