From 644e680fe47d142bda83365fcf7f8cbed4194a14 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Sat, 23 Mar 2024 02:46:20 -0300 Subject: [PATCH] cmd,core,server: Support dynamic updates to price in USD --- cmd/livepeer/starter/starter.go | 99 ++++++------------------- cmd/livepeer/starter/starter_test.go | 29 -------- core/autoconvertedprice.go | 100 ++++++++++++++++++++++++++ core/autoconvertedprice_test.go | 38 ++++++++++ core/livepeernode.go | 19 +++-- core/livepeernode_test.go | 12 ++-- core/orch_test.go | 44 ++++++------ discovery/discovery_test.go | 8 +-- eth/watchers/pricefeedwatcher.go | 55 ++++++++++---- eth/watchers/pricefeedwatcher_test.go | 24 +++---- server/broadcast.go | 15 ++-- server/handlers.go | 45 ++++++++---- server/handlers_test.go | 16 ++--- server/rpc_test.go | 8 +-- server/segment_rpc_test.go | 2 +- test/e2e/e2e.go | 2 +- 16 files changed, 313 insertions(+), 203 deletions(-) create mode 100644 core/autoconvertedprice.go create mode 100644 core/autoconvertedprice_test.go diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 85b923248..03dd99a86 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -33,6 +33,7 @@ import ( "github.com/livepeer/go-livepeer/eth" "github.com/livepeer/go-livepeer/eth/blockwatch" "github.com/livepeer/go-livepeer/eth/watchers" + "github.com/livepeer/go-livepeer/monitor" lpmon "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/go-livepeer/pm" "github.com/livepeer/go-livepeer/server" @@ -59,11 +60,6 @@ var ( cleanupInterval = 10 * time.Minute // The time to live for cached max float values for PM senders (else they will be cleaned up) in seconds smTTL = 172800 // 2 days - - // Regular expression used to parse the price per unit CLI flags - pricePerUnitRex = regexp.MustCompile(`^(\d+(\.\d+)?)([A-z][A-z0-9]*)?$`) - // Number of wei in 1 ETH - weiPerETH = big.NewRat(1e18, 1) ) const ( @@ -721,7 +717,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { go serviceRegistryWatcher.Watch() defer serviceRegistryWatcher.Stop() - priceFeedWatcher, err := watchers.NewPriceFeedWatcher(ctx, backend, *cfg.PriceFeedAddr) + core.PriceFeedWatcher, err = watchers.NewPriceFeedWatcher(backend, *cfg.PriceFeedAddr) if err != nil { glog.Errorf("Failed to set up price feed watcher: %v", err) return @@ -764,26 +760,26 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { } else if pricePerUnit.Sign() < 0 { panic(fmt.Errorf("-pricePerUnit must be >= 0, provided %s", pricePerUnit)) } - err = watchPriceUpdates(ctx, priceFeedWatcher, currency, func(multiplier *big.Rat) { - defaultPrice := toWeiPricePerPixel(pricePerUnit, multiplier, pixelsPerUnit) - n.SetBasePrice("default", defaultPrice) - glog.Infof("Price: %v wei per pixel", defaultPrice.RatString()) + pricePerPixel := new(big.Rat).Quo(pricePerUnit, pixelsPerUnit) + autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) { + glog.Infof("Price: %v wei per pixel\n ", price.RatString()) }) if err != nil { - panic(fmt.Errorf("Error starting price update loop: %v", err)) + panic(fmt.Errorf("Error converting price: %v", err)) } + n.SetBasePrice("default", autoPrice) broadcasterPrices := getBroadcasterPrices(*cfg.PricePerBroadcaster) for _, p := range broadcasterPrices { - currency, pricePerUnit, ethAddress := p.Currency, p.PricePerUnit, p.EthAddress - err = watchPriceUpdates(ctx, priceFeedWatcher, currency, func(multiplier *big.Rat) { - price := toWeiPricePerPixel(pricePerUnit, multiplier, pixelsPerUnit) - n.SetBasePrice(ethAddress, price) - glog.Infof("Price: %v wei per pixel for broadcaster %v", price.RatString(), ethAddress) + p := p + pricePerPixel := new(big.Rat).Quo(pricePerUnit, pixelsPerUnit) + autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) { + glog.Infof("Price: %v wei per pixel for broadcaster %v", price.RatString(), p.EthAddress) }) if err != nil { - panic(fmt.Errorf("Error starting broadcaster price update loop: %v", err)) + panic(fmt.Errorf("Error converting price for broadcaster %s: %v", p.EthAddress, err)) } + n.SetBasePrice(p.EthAddress, autoPrice) } n.AutoSessionLimit = *cfg.MaxSessions == "auto" @@ -890,14 +886,17 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { panic(fmt.Errorf("The maximum price per unit must be a valid integer with an optional currency, provided %v instead\n", *cfg.MaxPricePerUnit)) } if maxPricePerUnit.Sign() > 0 { - err = watchPriceUpdates(ctx, priceFeedWatcher, currency, func(multiplier *big.Rat) { - price := toWeiPricePerPixel(maxPricePerUnit, multiplier, pixelsPerUnit) - server.BroadcastCfg.SetMaxPrice(price) - glog.Infof("Price: %v wei per pixel\n ", price.RatString()) + pricePerPixel := new(big.Rat).Quo(maxPricePerUnit, pixelsPerUnit) + autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) { + if monitor.Enabled { + monitor.MaxTranscodingPrice(price) + } + glog.Infof("Maximum transcoding price:: %v wei per pixel\n ", price.RatString()) }) if err != nil { - panic(fmt.Errorf("Error starting price update loop: %v", err)) + panic(fmt.Errorf("Error converting price: %v", err)) } + server.BroadcastCfg.SetMaxPrice(autoPrice) } else { glog.Infof("Maximum transcoding price per pixel is not greater than 0: %v, broadcaster is currently set to accept ANY price.\n", *cfg.MaxPricePerUnit) glog.Infoln("To update the broadcaster's maximum acceptable transcoding price per pixel, use the CLI or restart the broadcaster with the appropriate 'maxPricePerUnit' and 'pixelsPerUnit' values") @@ -1560,6 +1559,7 @@ func parseEthKeystorePath(ethKeystorePath string) (keystorePath, error) { } func parsePricePerUnit(pricePerUnitStr string) (*big.Rat, string, error) { + pricePerUnitRex := regexp.MustCompile(`^(\d+(\.\d+)?)([A-z][A-z0-9]*)?$`) match := pricePerUnitRex.FindStringSubmatch(pricePerUnitStr) if match == nil { return nil, "", fmt.Errorf("price must be in the format of , provided %v", pricePerUnitStr) @@ -1569,8 +1569,6 @@ func parsePricePerUnit(pricePerUnitStr string) (*big.Rat, string, error) { pricePerUnit, ok := new(big.Rat).SetString(price) if !ok { return nil, "", fmt.Errorf("price must be a valid number, provided %v", match[1]) - } else if pricePerUnit.Sign() < 0 { - return nil, "", fmt.Errorf("price must be >= 0, provided %v", pricePerUnit) } if currency == "" { currency = "wei" @@ -1579,59 +1577,6 @@ func parsePricePerUnit(pricePerUnitStr string) (*big.Rat, string, error) { return pricePerUnit, currency, nil } -func watchPriceUpdates(ctx context.Context, watcher *watchers.PriceFeedWatcher, currency string, updatePrice func(multiplier *big.Rat)) error { - if strings.ToLower(currency) == "wei" { - updatePrice(big.NewRat(1, 1)) - return nil - } else if strings.ToUpper(currency) == "ETH" { - updatePrice(weiPerETH) - return nil - } - - base, quote := watcher.Currencies() - if base != "ETH" && quote != "ETH" { - return fmt.Errorf("price feed does not have ETH as a currency (%v/%v)", base, quote) - } - if base != currency && quote != currency { - return fmt.Errorf("price feed does not have %v as a currency (%v/%v)", currency, base, quote) - } - - price, err := watcher.Current() - if err != nil { - return fmt.Errorf("error fetching price data: %v", err) - } - updatePrice(currencyToWeiMultiplier(price, base)) - - go func() { - priceUpdated := make(chan eth.PriceData, 1) - sub := watcher.Subscribe(priceUpdated) - defer sub.Unsubscribe() - for { - select { - case <-ctx.Done(): - return - case data := <-priceUpdated: - updatePrice(currencyToWeiMultiplier(data, base)) - } - } - }() - return nil -} - -func currencyToWeiMultiplier(data eth.PriceData, baseCurrency string) *big.Rat { - ethMultipler := data.Price - if baseCurrency == "ETH" { - // Invert the multiplier if the quote is in the form ETH / X - ethMultipler = new(big.Rat).Inv(ethMultipler) - } - return new(big.Rat).Mul(ethMultipler, weiPerETH) -} - -func toWeiPricePerPixel(pricePerUnit *big.Rat, currencyMultiplier *big.Rat, pixelsPerUnit *big.Rat) *big.Rat { - weiPricePerUnit := new(big.Rat).Mul(pricePerUnit, currencyMultiplier) - return new(big.Rat).Quo(weiPricePerUnit, pixelsPerUnit) -} - func refreshOrchPerfScoreLoop(ctx context.Context, region string, orchPerfScoreURL string, score *common.PerfScore) { for { refreshOrchPerfScore(region, orchPerfScoreURL, score) diff --git a/cmd/livepeer/starter/starter_test.go b/cmd/livepeer/starter/starter_test.go index 07a6a819c..3cb0baa88 100644 --- a/cmd/livepeer/starter/starter_test.go +++ b/cmd/livepeer/starter/starter_test.go @@ -383,32 +383,3 @@ func TestParsePricePerUnit(t *testing.T) { }) } } - -func TestPriceDataToWei(t *testing.T) { - tests := []struct { - name string - data eth.PriceData - baseCurrency string - expectedWei *big.Rat - }{ - { - name: "Base currency is ETH", - data: eth.PriceData{Price: big.NewRat(500, 1)}, // 500 USD per ETH - baseCurrency: "ETH", - expectedWei: big.NewRat(1e18, 500), // (1 / 500 USD/ETH) * 1e18 wei/ETH - }, - { - name: "Base currency is not ETH", - data: eth.PriceData{Price: big.NewRat(1, 2000)}, // 1/2000 ETH per USD - baseCurrency: "USD", - expectedWei: big.NewRat(5e14, 1), // (1 * 1/2000 ETH/USD) * 1e18 wei/ETH - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := currencyToWeiMultiplier(tt.data, tt.baseCurrency) - assert.Equal(t, 0, tt.expectedWei.Cmp(result)) - }) - } -} diff --git a/core/autoconvertedprice.go b/core/autoconvertedprice.go new file mode 100644 index 000000000..0a4eb4f6f --- /dev/null +++ b/core/autoconvertedprice.go @@ -0,0 +1,100 @@ +package core + +import ( + "context" + "fmt" + "math/big" + "strings" + + "github.com/livepeer/go-livepeer/eth" + "github.com/livepeer/go-livepeer/eth/watchers" +) + +// PriceFeedWatcher is a global instance of a PriceFeedWatcher. It must be +// initialized before creating an AutoConvertedPrice instance. +var PriceFeedWatcher *watchers.PriceFeedWatcher + +// Number of wei in 1 ETH +var weiPerETH = big.NewRat(1e18, 1) + +type AutoConvertedPrice struct { + cancelSubscription func() + onUpdate func(*big.Rat) + basePrice *big.Rat + + current *big.Rat +} + +func NewFixedPrice(price *big.Rat) (*AutoConvertedPrice) { + return &AutoConvertedPrice{current: price} +} + +func NewAutoConvertedPrice(currency string, basePrice *big.Rat, onUpdate func(*big.Rat)) (*AutoConvertedPrice, error) { + if PriceFeedWatcher == nil { + return nil, fmt.Errorf("PriceFeedWatcher is not initialized") + } + if strings.ToLower(currency) == "wei" { + return NewFixedPrice(basePrice), nil + } else if strings.ToUpper(currency) == "ETH" { + return NewFixedPrice(new(big.Rat).Mul(basePrice, weiPerETH)), nil + } + + base, quote := PriceFeedWatcher.Currencies() + if base != "ETH" && quote != "ETH" { + return nil, fmt.Errorf("price feed does not have ETH as a currency (%v/%v)", base, quote) + } + if base != currency && quote != currency { + return nil, fmt.Errorf("price feed does not have %v as a currency (%v/%v)", currency, base, quote) + } + + currencyPrice, err := PriceFeedWatcher.Current() + if err != nil { + return nil, fmt.Errorf("error getting current price data: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + price := &AutoConvertedPrice{ + cancelSubscription: cancel, + onUpdate: onUpdate, + basePrice: basePrice, + current: new(big.Rat).Mul(basePrice, currencyToWeiMultiplier(currencyPrice, base)), + } + go price.watch(ctx) + onUpdate(price.current) + + return price, nil +} + +func (a *AutoConvertedPrice) Value() *big.Rat { + return a.current +} + +func (a *AutoConvertedPrice) Stop() { + if a.cancelSubscription != nil { + a.cancelSubscription() + } +} + +func (a *AutoConvertedPrice) watch(ctx context.Context) { + base, _ := PriceFeedWatcher.Currencies() + priceUpdated := make(chan eth.PriceData, 1) + PriceFeedWatcher.Subscribe(ctx, priceUpdated) + for { + select { + case <-ctx.Done(): + return + case currencyPrice := <-priceUpdated: + a.current = new(big.Rat).Mul(a.basePrice, currencyToWeiMultiplier(currencyPrice, base)) + a.onUpdate(a.current) + } + } +} + +func currencyToWeiMultiplier(data eth.PriceData, baseCurrency string) *big.Rat { + ethMultipler := data.Price + if baseCurrency == "ETH" { + // Invert the multiplier if the quote is in the form ETH / X + ethMultipler = new(big.Rat).Inv(ethMultipler) + } + return new(big.Rat).Mul(ethMultipler, weiPerETH) +} diff --git a/core/autoconvertedprice_test.go b/core/autoconvertedprice_test.go new file mode 100644 index 000000000..fbc71bc67 --- /dev/null +++ b/core/autoconvertedprice_test.go @@ -0,0 +1,38 @@ +package core + +import ( + "math/big" + "testing" + + "github.com/livepeer/go-livepeer/eth" + "github.com/stretchr/testify/assert" +) + +func TestCurrencyToWeiMultiplier(t *testing.T) { + tests := []struct { + name string + data eth.PriceData + baseCurrency string + expectedWei *big.Rat + }{ + { + name: "Base currency is ETH", + data: eth.PriceData{Price: big.NewRat(500, 1)}, // 500 USD per ETH + baseCurrency: "ETH", + expectedWei: big.NewRat(1e18, 500), // (1 / 500 USD/ETH) * 1e18 wei/ETH + }, + { + name: "Base currency is not ETH", + data: eth.PriceData{Price: big.NewRat(1, 2000)}, // 1/2000 ETH per USD + baseCurrency: "USD", + expectedWei: big.NewRat(5e14, 1), // (1 * 1/2000 ETH/USD) * 1e18 wei/ETH + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := currencyToWeiMultiplier(tt.data, tt.baseCurrency) + assert.Equal(t, 0, tt.expectedWei.Cmp(result)) + }) + } +} diff --git a/core/livepeernode.go b/core/livepeernode.go index 57b1055e3..1f653f0a9 100644 --- a/core/livepeernode.go +++ b/core/livepeernode.go @@ -93,7 +93,7 @@ type LivepeerNode struct { StorageConfigs map[string]*transcodeConfig storageMutex *sync.RWMutex // Transcoder private fields - priceInfo map[string]*big.Rat + priceInfo map[string]*AutoConvertedPrice serviceURI url.URL segmentMutex *sync.RWMutex } @@ -109,7 +109,7 @@ func NewLivepeerNode(e eth.LivepeerEthClient, wd string, dbh *common.DB) (*Livep SegmentChans: make(map[ManifestID]SegmentChan), segmentMutex: &sync.RWMutex{}, Capabilities: &Capabilities{capacities: map[Capability]int{}}, - priceInfo: make(map[string]*big.Rat), + priceInfo: make(map[string]*AutoConvertedPrice), StorageConfigs: make(map[string]*transcodeConfig), storageMutex: &sync.RWMutex{}, }, nil @@ -128,12 +128,17 @@ func (n *LivepeerNode) SetServiceURI(newUrl *url.URL) { } // SetBasePrice sets the base price for an orchestrator on the node -func (n *LivepeerNode) SetBasePrice(b_eth_addr string, price *big.Rat) { +func (n *LivepeerNode) SetBasePrice(b_eth_addr string, price *AutoConvertedPrice) { addr := strings.ToLower(b_eth_addr) n.mu.Lock() defer n.mu.Unlock() + + prevPrice := n.priceInfo[addr] n.priceInfo[addr] = price + if prevPrice != nil { + prevPrice.Stop() + } } // GetBasePrice gets the base price for an orchestrator @@ -142,14 +147,18 @@ func (n *LivepeerNode) GetBasePrice(b_eth_addr string) *big.Rat { n.mu.RLock() defer n.mu.RUnlock() - return n.priceInfo[addr] + return n.priceInfo[addr].Value() } func (n *LivepeerNode) GetBasePrices() map[string]*big.Rat { n.mu.RLock() defer n.mu.RUnlock() - return n.priceInfo + prices := make(map[string]*big.Rat) + for addr, price := range n.priceInfo { + prices[addr] = price.Value() + } + return prices } // SetMaxFaceValue sets the faceValue upper limit for tickets received diff --git a/core/livepeernode_test.go b/core/livepeernode_test.go index 259992f89..230f8dd42 100644 --- a/core/livepeernode_test.go +++ b/core/livepeernode_test.go @@ -162,8 +162,8 @@ func TestSetAndGetBasePrice(t *testing.T) { price := big.NewRat(1, 1) - n.SetBasePrice("default", price) - assert.Zero(n.priceInfo["default"].Cmp(price)) + n.SetBasePrice("default", NewFixedPrice(price)) + assert.Zero(n.priceInfo["default"].Value().Cmp(price)) assert.Zero(n.GetBasePrice("default").Cmp(price)) assert.Zero(n.GetBasePrices()["default"].Cmp(price)) @@ -172,10 +172,10 @@ func TestSetAndGetBasePrice(t *testing.T) { price1 := big.NewRat(2, 1) price2 := big.NewRat(3, 1) - n.SetBasePrice(addr1, price1) - n.SetBasePrice(addr2, price2) - assert.Zero(n.priceInfo[addr1].Cmp(price1)) - assert.Zero(n.priceInfo[addr2].Cmp(price2)) + n.SetBasePrice(addr1, NewFixedPrice(price1)) + n.SetBasePrice(addr2, NewFixedPrice(price2)) + assert.Zero(n.priceInfo[addr1].Value().Cmp(price1)) + assert.Zero(n.priceInfo[addr2].Value().Cmp(price2)) assert.Zero(n.GetBasePrices()[addr1].Cmp(price1)) assert.Zero(n.GetBasePrices()[addr2].Cmp(price2)) } diff --git a/core/orch_test.go b/core/orch_test.go index 981661433..dcc8a6c2d 100644 --- a/core/orch_test.go +++ b/core/orch_test.go @@ -704,7 +704,7 @@ func TestProcessPayment_GivenRecipientError_ReturnsNil(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) recipient.On("TxCostMultiplier", mock.Anything).Return(big.NewRat(1, 1), nil) recipient.On("ReceiveTicket", mock.Anything, mock.Anything, mock.Anything).Return("", false, nil) @@ -785,7 +785,7 @@ func TestProcessPayment_ActiveOrchestrator(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) // orchestrator inactive -> error err := orch.ProcessPayment(context.Background(), defaultPayment(t), ManifestID("some manifest")) @@ -856,7 +856,7 @@ func TestProcessPayment_GivenLosingTicket_DoesNotRedeem(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) recipient.On("TxCostMultiplier", mock.Anything).Return(big.NewRat(1, 1), nil) recipient.On("ReceiveTicket", mock.Anything, mock.Anything, mock.Anything).Return("some sessionID", false, nil) @@ -888,7 +888,7 @@ func TestProcessPayment_GivenWinningTicket_RedeemError(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") sessionID := "some sessionID" @@ -928,7 +928,7 @@ func TestProcessPayment_GivenWinningTicket_Redeems(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") sessionID := "some sessionID" @@ -968,7 +968,7 @@ func TestProcessPayment_GivenMultipleWinningTickets_RedeemsAll(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") sessionID := "some sessionID" @@ -1038,7 +1038,7 @@ func TestProcessPayment_GivenConcurrentWinningTickets_RedeemsAll(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestIDs := make([]string, 5) @@ -1097,7 +1097,7 @@ func TestProcessPayment_GivenReceiveTicketError_ReturnsError(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") @@ -1165,7 +1165,7 @@ func TestProcessPayment_PaymentError_DoesNotIncreaseCreditBalance(t *testing.T) } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") paymentError := errors.New("ReceiveTicket error") @@ -1227,7 +1227,7 @@ func TestSufficientBalance_IsSufficient_ReturnsTrue(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") @@ -1265,7 +1265,7 @@ func TestSufficientBalance_IsNotSufficient_ReturnsFalse(t *testing.T) { } orch := NewOrchestrator(n, rm) orch.address = addr - orch.node.SetBasePrice("default", big.NewRat(0, 1)) + orch.node.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) manifestID := ManifestID("some manifest") @@ -1307,7 +1307,7 @@ func TestSufficientBalance_OffChainMode_ReturnsTrue(t *testing.T) { func TestTicketParams(t *testing.T) { n, _ := NewLivepeerNode(nil, "", nil) - n.priceInfo["default"] = big.NewRat(1, 1) + n.priceInfo["default"] = NewFixedPrice(big.NewRat(1, 1)) priceInfo := &net.PriceInfo{PricePerUnit: 1, PixelsPerUnit: 1} recipient := new(pm.MockRecipient) n.Recipient = recipient @@ -1388,7 +1388,7 @@ func TestPriceInfo(t *testing.T) { expPricePerPixel := big.NewRat(101, 100) n, _ := NewLivepeerNode(nil, "", nil) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) recipient := new(pm.MockRecipient) n.Recipient = recipient @@ -1406,7 +1406,7 @@ func TestPriceInfo(t *testing.T) { // basePrice = 10/1, txMultiplier = 100/1 => expPricePerPixel = 1010/100 basePrice = big.NewRat(10, 1) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(1010, 100) @@ -1421,7 +1421,7 @@ func TestPriceInfo(t *testing.T) { // basePrice = 1/10, txMultiplier = 100 => expPricePerPixel = 101/1000 basePrice = big.NewRat(1, 10) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(101, 1000) @@ -1435,7 +1435,7 @@ func TestPriceInfo(t *testing.T) { assert.Equal(priceInfo.PixelsPerUnit, expPrice.Denom().Int64()) // basePrice = 25/10 , txMultiplier = 100 => expPricePerPixel = 2525/1000 basePrice = big.NewRat(25, 10) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) orch = NewOrchestrator(n, nil) expPricePerPixel = big.NewRat(2525, 1000) @@ -1451,7 +1451,7 @@ func TestPriceInfo(t *testing.T) { // basePrice = 10/1 , txMultiplier = 100/10 => expPricePerPixel = 11 basePrice = big.NewRat(10, 1) txMultiplier = big.NewRat(100, 10) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) recipient = new(pm.MockRecipient) n.Recipient = recipient recipient.On("TxCostMultiplier", mock.Anything).Return(txMultiplier, nil) @@ -1470,7 +1470,7 @@ func TestPriceInfo(t *testing.T) { // basePrice = 10/1 , txMultiplier = 1/10 => expPricePerPixel = 110 basePrice = big.NewRat(10, 1) txMultiplier = big.NewRat(1, 10) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) recipient = new(pm.MockRecipient) n.Recipient = recipient recipient.On("TxCostMultiplier", mock.Anything).Return(txMultiplier, nil) @@ -1489,7 +1489,7 @@ func TestPriceInfo(t *testing.T) { // basePrice = 10, txMultiplier = 1 => expPricePerPixel = 20 basePrice = big.NewRat(10, 1) txMultiplier = big.NewRat(1, 1) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) recipient = new(pm.MockRecipient) n.Recipient = recipient recipient.On("TxCostMultiplier", mock.Anything).Return(txMultiplier, nil) @@ -1506,7 +1506,7 @@ func TestPriceInfo(t *testing.T) { assert.Equal(priceInfo.PixelsPerUnit, expPrice.Denom().Int64()) // basePrice = 0 => expPricePerPixel = 0 - n.SetBasePrice("default", big.NewRat(0, 1)) + n.SetBasePrice("default", NewFixedPrice(big.NewRat(0, 1))) orch = NewOrchestrator(n, nil) priceInfo, err = orch.PriceInfo(ethcommon.Address{}, "") @@ -1516,7 +1516,7 @@ func TestPriceInfo(t *testing.T) { // test no overflows basePrice = big.NewRat(25000, 1) - n.SetBasePrice("default", basePrice) + n.SetBasePrice("default", NewFixedPrice(basePrice)) faceValue, _ := new(big.Int).SetString("22245599237119512", 10) txCost := new(big.Int).Mul(big.NewInt(100000), big.NewInt(7500000000)) txMultiplier = new(big.Rat).SetFrac(faceValue, txCost) // 926899968213313/31250000000000 @@ -1572,7 +1572,7 @@ func TestPriceInfo_TxMultiplierError_ReturnsError(t *testing.T) { expError := errors.New("TxMultiplier Error") n, _ := NewLivepeerNode(nil, "", nil) - n.SetBasePrice("default", big.NewRat(1, 1)) + n.SetBasePrice("default", NewFixedPrice(big.NewRat(1, 1))) recipient := new(pm.MockRecipient) n.Recipient = recipient recipient.On("TxCostMultiplier", mock.Anything).Return(nil, expError) diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index c52ce7ce6..c7e2b1d3a 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -608,11 +608,11 @@ func TestNewOrchestratorPoolWithPred_TestPredicate(t *testing.T) { assert.True(t, pool.pred(oInfo)) // Set server.BroadcastCfg.maxPrice higher than PriceInfo , should return true - server.BroadcastCfg.SetMaxPrice(big.NewRat(10, 1)) + server.BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(10, 1))) assert.True(t, pool.pred(oInfo)) // Set MaxBroadcastPrice lower than PriceInfo, should return false - server.BroadcastCfg.SetMaxPrice(big.NewRat(1, 1)) + server.BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 1))) assert.False(t, pool.pred(oInfo)) // PixelsPerUnit is 0 , return false @@ -629,7 +629,7 @@ func TestCachedPool_AllOrchestratorsTooExpensive_ReturnsEmptyList(t *testing.T) expTranscoder := "transcoderFromTest" expPricePerPixel, _ := common.PriceToFixed(big.NewRat(999, 1)) - server.BroadcastCfg.SetMaxPrice(big.NewRat(1, 1)) + server.BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 1))) gmp := runtime.GOMAXPROCS(50) defer runtime.GOMAXPROCS(gmp) var mu sync.Mutex @@ -823,7 +823,7 @@ func TestCachedPool_N_OrchestratorsGoodPricing_ReturnsNOrchestrators(t *testing. }, } - server.BroadcastCfg.SetMaxPrice(big.NewRat(10, 1)) + server.BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(10, 1))) gmp := runtime.GOMAXPROCS(50) defer runtime.GOMAXPROCS(gmp) var mu sync.Mutex diff --git a/eth/watchers/pricefeedwatcher.go b/eth/watchers/pricefeedwatcher.go index 2eb14e157..feb01c672 100644 --- a/eth/watchers/pricefeedwatcher.go +++ b/eth/watchers/pricefeedwatcher.go @@ -23,7 +23,6 @@ const ( // allows fetching the current price as well as listening for updates on the // PriceUpdated channel. type PriceFeedWatcher struct { - ctx context.Context baseRetryDelay time.Duration priceFeed eth.PriceFeedEthClient @@ -32,12 +31,15 @@ type PriceFeedWatcher struct { mu sync.RWMutex running bool current eth.PriceData + cancelWatch func() + priceEventFeed event.Feed + subscriptions event.SubscriptionScope } // NewPriceFeedWatcher creates a new PriceFeedWatcher instance. It will already // fetch the current price and start a goroutine to watch for updates. -func NewPriceFeedWatcher(ctx context.Context, ethClient *ethclient.Client, priceFeedAddr string) (*PriceFeedWatcher, error) { +func NewPriceFeedWatcher(ethClient *ethclient.Client, priceFeedAddr string) (*PriceFeedWatcher, error) { priceFeed, err := eth.NewPriceFeedEthClient(ethClient, priceFeedAddr) if err != nil { return nil, fmt.Errorf("failed to create price feed client: %w", err) @@ -54,7 +56,6 @@ func NewPriceFeedWatcher(ctx context.Context, ethClient *ethclient.Client, price } w := &PriceFeedWatcher{ - ctx: ctx, baseRetryDelay: priceUpdateBaseRetryDelay, priceFeed: priceFeed, currencyBase: currencyFrom, @@ -85,10 +86,31 @@ func (w *PriceFeedWatcher) Current() (eth.PriceData, error) { // To unsubscribe, simply call `Unsubscribe` on the returned subscription. // The sink channel should have ample buffer space to avoid blocking other // subscribers. Slow subscribers are not dropped. -func (w *PriceFeedWatcher) Subscribe(sink chan<- eth.PriceData) event.Subscription { - sub := w.priceEventFeed.Subscribe(sink) +func (w *PriceFeedWatcher) Subscribe(ctx context.Context, sink chan<- eth.PriceData) { + w.mu.Lock() + sub := w.subscriptions.Track(w.priceEventFeed.Subscribe(sink)) + w.mu.Unlock() + + go func() { + loop: for { + select { + case <-ctx.Done(): + break loop + case <-sub.Err(): + clog.Errorf(ctx, "PriceFeedWatcher subscription error: %v", sub.Err()) + } + } + sub.Unsubscribe() + + w.mu.Lock() + defer w.mu.Unlock() + if w.subscriptions.Count() == 0 { + w.cancelWatch() + w.cancelWatch = nil + } + }() + w.ensureWatch() - return sub } // updatePrice fetches the latest price data from the price feed and updates the @@ -116,19 +138,22 @@ func (w *PriceFeedWatcher) updatePrice() (eth.PriceData, error) { func (w *PriceFeedWatcher) ensureWatch() { w.mu.Lock() defer w.mu.Unlock() - if w.running { + + if w.cancelWatch != nil { + // already running return } - w.running = true + ctx, cancel := context.WithCancel(context.Background()) + w.cancelWatch = cancel - ticker := newTruncatedTicker(w.ctx, priceUpdatePeriod) - go w.watchTicker(ticker) + ticker := newTruncatedTicker(ctx, priceUpdatePeriod) + go w.watchTicker(ctx, ticker) } -func (w *PriceFeedWatcher) watchTicker(ticker <-chan time.Time) { +func (w *PriceFeedWatcher) watchTicker(ctx context.Context, ticker <-chan time.Time) { for { select { - case <-w.ctx.Done(): + case <-ctx.Done(): return case <-ticker: attempt, retryDelay := 1, w.baseRetryDelay @@ -137,13 +162,13 @@ func (w *PriceFeedWatcher) watchTicker(ticker <-chan time.Time) { if err == nil { break } else if attempt >= priceUpdateMaxRetries { - clog.Errorf(w.ctx, "Failed to fetch updated price from PriceFeed attempts=%d err=%q", attempt, err) + clog.Errorf(ctx, "Failed to fetch updated price from PriceFeed attempts=%d err=%q", attempt, err) break } - clog.Warningf(w.ctx, "Failed to fetch updated price from PriceFeed, retrying after retryDelay=%d attempt=%d err=%q", retryDelay, attempt, err) + clog.Warningf(ctx, "Failed to fetch updated price from PriceFeed, retrying after retryDelay=%d attempt=%d err=%q", retryDelay, attempt, err) select { - case <-w.ctx.Done(): + case <-ctx.Done(): return case <-time.After(retryDelay): } diff --git a/eth/watchers/pricefeedwatcher_test.go b/eth/watchers/pricefeedwatcher_test.go index 6c0e01687..13496f710 100644 --- a/eth/watchers/pricefeedwatcher_test.go +++ b/eth/watchers/pricefeedwatcher_test.go @@ -43,12 +43,14 @@ func TestPriceFeedWatcher_UpdatePrice(t *testing.T) { currencyQuote: "USD", } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() priceUpdated := make(chan eth.PriceData, 1) - sub := w.Subscribe(priceUpdated) - defer sub.Unsubscribe() + w.Subscribe(ctx, priceUpdated) - require.NoError(t, w.updatePrice()) - require.Equal(t, priceData, w.current) + newPrice, err := w.updatePrice() + require.NoError(t, err) + require.Equal(t, priceData, newPrice) select { case updatedPrice := <-priceUpdated: @@ -69,9 +71,10 @@ func TestPriceFeedWatcher_Watch(t *testing.T) { currencyQuote: "USD", } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() priceUpdated := make(chan eth.PriceData, 1) - sub := w.Subscribe(priceUpdated) - defer sub.Unsubscribe() + w.Subscribe(ctx, priceUpdated) priceData := eth.PriceData{ RoundID: 10, @@ -100,8 +103,6 @@ func TestPriceFeedWatcher_Watch(t *testing.T) { // Start the watch loop fakeTicker := make(chan time.Time, 10) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() go func() { w.watchTicker(ctx, fakeTicker) }() @@ -157,14 +158,13 @@ func TestPriceFeedWatcher_WatchErrorRetries(t *testing.T) { currencyQuote: "USD", } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() priceUpdated := make(chan eth.PriceData, 1) - sub := w.Subscribe(priceUpdated) - defer sub.Unsubscribe() + w.Subscribe(ctx, priceUpdated) // Start watch loop fakeTicker := make(chan time.Time, 10) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() go func() { w.watchTicker(ctx, fakeTicker) }() diff --git a/server/broadcast.go b/server/broadcast.go index 5658f0cda..2edcd4f58 100755 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -56,7 +56,7 @@ var submitMultiSession = func(ctx context.Context, sess *BroadcastSession, seg * var maxTranscodeAttempts = errors.New("hit max transcode attempts") type BroadcastConfig struct { - maxPrice *big.Rat + maxPrice *core.AutoConvertedPrice mu sync.RWMutex } @@ -68,16 +68,19 @@ type SegFlightMetadata struct { func (cfg *BroadcastConfig) MaxPrice() *big.Rat { cfg.mu.RLock() defer cfg.mu.RUnlock() - return cfg.maxPrice + if cfg.maxPrice == nil { + return nil + } + return cfg.maxPrice.Value() } -func (cfg *BroadcastConfig) SetMaxPrice(price *big.Rat) { +func (cfg *BroadcastConfig) SetMaxPrice(price *core.AutoConvertedPrice) { cfg.mu.Lock() defer cfg.mu.Unlock() + prevPrice := cfg.maxPrice cfg.maxPrice = price - - if monitor.Enabled { - monitor.MaxTranscodingPrice(price) + if prevPrice != nil { + prevPrice.Stop() } } diff --git a/server/handlers.go b/server/handlers.go index 7461065fc..ece713245 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -20,6 +20,7 @@ import ( "github.com/livepeer/go-livepeer/core" "github.com/livepeer/go-livepeer/eth" "github.com/livepeer/go-livepeer/eth/types" + "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/go-livepeer/pm" "github.com/livepeer/lpms/ffmpeg" "github.com/pkg/errors" @@ -125,6 +126,7 @@ func setBroadcastConfigHandler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { pricePerUnit := r.FormValue("maxPricePerUnit") pixelsPerUnit := r.FormValue("pixelsPerUnit") + currency := r.FormValue("currency") transcodingOptions := r.FormValue("transcodingOptions") if (pricePerUnit == "" || pixelsPerUnit == "") && transcodingOptions == "" { @@ -149,13 +151,21 @@ func setBroadcastConfigHandler() http.Handler { return } - var price *big.Rat + var autoPrice *core.AutoConvertedPrice if pr > 0 { - price = big.NewRat(pr, px) + pricePerPixel := big.NewRat(pr, px) + autoPrice, err = core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) { + if monitor.Enabled { + monitor.MaxTranscodingPrice(price) + } + glog.Infof("Maximum transcoding price: %v wei per pixel\n", price.RatString()) + }) + if err != nil { + panic(fmt.Errorf("Error converting price: %v", err)) + } } - BroadcastCfg.SetMaxPrice(price) - glog.Infof("Maximum transcoding price: %d per %q pixels\n", pr, px) + BroadcastCfg.SetMaxPrice(autoPrice) } // set broadcast profiles @@ -291,7 +301,8 @@ func (s *LivepeerServer) activateOrchestratorHandler(client eth.LivepeerEthClien return } - if err := s.setOrchestratorPriceInfo("default", r.FormValue("pricePerUnit"), r.FormValue("pixelsPerUnit")); err != nil { + pricePerUnit, pixelsPerUnit, currency := r.FormValue("pricePerUnit"), r.FormValue("pixelsPerUnit"), r.FormValue("currency") + if err := s.setOrchestratorPriceInfo("default", pricePerUnit, pixelsPerUnit, currency); err != nil { respond400(w, err.Error()) return } @@ -385,8 +396,9 @@ func (s *LivepeerServer) setOrchestratorConfigHandler(client eth.LivepeerEthClie return mustHaveClient(client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { pixels := r.FormValue("pixelsPerUnit") price := r.FormValue("pricePerUnit") + currency := r.FormValue("currency") if pixels != "" && price != "" { - if err := s.setOrchestratorPriceInfo("default", price, pixels); err != nil { + if err := s.setOrchestratorPriceInfo("default", price, pixels, currency); err != nil { respond400(w, err.Error()) return } @@ -458,7 +470,7 @@ func (s *LivepeerServer) setOrchestratorConfigHandler(client eth.LivepeerEthClie })) } -func (s *LivepeerServer) setOrchestratorPriceInfo(broadcasterEthAddr, pricePerUnitStr, pixelsPerUnitStr string) error { +func (s *LivepeerServer) setOrchestratorPriceInfo(broadcasterEthAddr, pricePerUnitStr, pixelsPerUnitStr, currency string) error { ok, err := regexp.MatchString("^[0-9]+$", pricePerUnitStr) if err != nil { return err @@ -499,12 +511,18 @@ func (s *LivepeerServer) setOrchestratorPriceInfo(broadcasterEthAddr, pricePerUn return fmt.Errorf("pixels per unit must be greater than 0, provided %d", pixelsPerUnit) } - s.LivepeerNode.SetBasePrice(broadcasterEthAddr, big.NewRat(pricePerUnit, pixelsPerUnit)) - if broadcasterEthAddr == "default" { - glog.Infof("Price per pixel set to %d wei for %d pixels\n", pricePerUnit, pixelsPerUnit) - } else { - glog.Infof("Price per pixel set to %d wei for %d pixels for broadcaster %s\n", pricePerUnit, pixelsPerUnit, broadcasterEthAddr) + pricePerPixel := big.NewRat(pricePerUnit, pixelsPerUnit) + autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) { + if broadcasterEthAddr == "default" { + glog.Infof("Price: %v wei per pixel\n ", price.RatString()) + } else { + glog.Infof("Price: %v wei per pixel for broadcaster %v", price.RatString(), broadcasterEthAddr) + } + }) + if err != nil { + panic(fmt.Errorf("Error converting price: %v", err)) } + s.LivepeerNode.SetBasePrice(broadcasterEthAddr, autoPrice) return nil } @@ -564,9 +582,10 @@ func (s *LivepeerServer) setPriceForBroadcaster() http.Handler { if s.LivepeerNode.NodeType == core.OrchestratorNode { pricePerUnitStr := r.FormValue("pricePerUnit") pixelsPerUnitStr := r.FormValue("pixelsPerUnit") + currency := r.FormValue("currency") broadcasterEthAddr := r.FormValue("broadcasterEthAddr") - err := s.setOrchestratorPriceInfo(broadcasterEthAddr, pricePerUnitStr, pixelsPerUnitStr) + err := s.setOrchestratorPriceInfo(broadcasterEthAddr, pricePerUnitStr, pixelsPerUnitStr, currency) if err == nil { respondOk(w, []byte(fmt.Sprintf("Price per pixel set to %s wei for %s pixels for broadcaster %s\n", pricePerUnitStr, pixelsPerUnitStr, broadcasterEthAddr))) } else { diff --git a/server/handlers_test.go b/server/handlers_test.go index aa01fe88c..e1fa99ba5 100644 --- a/server/handlers_test.go +++ b/server/handlers_test.go @@ -116,7 +116,7 @@ func TestOrchestratorInfoHandler_Success(t *testing.T) { s := &LivepeerServer{LivepeerNode: n} price := big.NewRat(1, 2) - s.LivepeerNode.SetBasePrice("default", price) + s.LivepeerNode.SetBasePrice("default", core.NewFixedPrice(price)) trans := &types.Transcoder{ ServiceURI: "127.0.0.1:8935", @@ -259,7 +259,7 @@ func TestSetBroadcastConfigHandler_Success(t *testing.T) { func TestGetBroadcastConfigHandler(t *testing.T) { assert := assert.New(t) - BroadcastCfg.maxPrice = big.NewRat(1, 2) + BroadcastCfg.maxPrice = core.NewFixedPrice(big.NewRat(1, 2)) BroadcastJobVideoProfiles = []ffmpeg.VideoProfile{ ffmpeg.VideoProfileLookup["P240p25fps16x9"], } @@ -501,26 +501,26 @@ func TestSetOrchestratorPriceInfo(t *testing.T) { s := stubServer() // pricePerUnit is not an integer - err := s.setOrchestratorPriceInfo("default", "nil", "1") + err := s.setOrchestratorPriceInfo("default", "nil", "1", "") assert.Error(t, err) assert.True(t, strings.Contains(err.Error(), "pricePerUnit is not a valid integer")) // pixelsPerUnit is not an integer - err = s.setOrchestratorPriceInfo("default", "1", "nil") + err = s.setOrchestratorPriceInfo("default", "1", "nil", "") assert.Error(t, err) assert.True(t, strings.Contains(err.Error(), "pixelsPerUnit is not a valid integer")) - err = s.setOrchestratorPriceInfo("default", "1", "1") + err = s.setOrchestratorPriceInfo("default", "1", "1", "") assert.Nil(t, err) assert.Zero(t, s.LivepeerNode.GetBasePrice("default").Cmp(big.NewRat(1, 1))) - err = s.setOrchestratorPriceInfo("default", "-5", "1") + err = s.setOrchestratorPriceInfo("default", "-5", "1", "") assert.EqualErrorf(t, err, err.Error(), "price unit must be greater than or equal to 0, provided %d\n", -5) // pixels per unit <= 0 - err = s.setOrchestratorPriceInfo("default", "1", "0") + err = s.setOrchestratorPriceInfo("default", "1", "0", "") assert.EqualErrorf(t, err, err.Error(), "pixels per unit must be greater than 0, provided %d\n", 0) - err = s.setOrchestratorPriceInfo("default", "1", "-5") + err = s.setOrchestratorPriceInfo("default", "1", "-5", "") assert.EqualErrorf(t, err, err.Error(), "pixels per unit must be greater than 0, provided %d\n", -5) } diff --git a/server/rpc_test.go b/server/rpc_test.go index 712568c20..ab35bb087 100644 --- a/server/rpc_test.go +++ b/server/rpc_test.go @@ -550,7 +550,7 @@ func TestGenPayment(t *testing.T) { s.Sender = sender // Test invalid price - BroadcastCfg.SetMaxPrice(big.NewRat(1, 5)) + BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 5))) payment, err = genPayment(context.TODO(), s, 1) assert.Equal("", payment) assert.Errorf(err, err.Error(), "Orchestrator price higher than the set maximum price of %v wei per %v pixels", int64(1), int64(5)) @@ -687,12 +687,12 @@ func TestValidatePrice(t *testing.T) { defer BroadcastCfg.SetMaxPrice(nil) // B MaxPrice > O Price - BroadcastCfg.SetMaxPrice(big.NewRat(5, 1)) + BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(5, 1))) err = validatePrice(s) assert.Nil(err) // B MaxPrice == O Price - BroadcastCfg.SetMaxPrice(big.NewRat(1, 3)) + BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 3))) err = validatePrice(s) assert.Nil(err) @@ -713,7 +713,7 @@ func TestValidatePrice(t *testing.T) { // B MaxPrice < O Price s.InitialPrice = nil - BroadcastCfg.SetMaxPrice(big.NewRat(1, 5)) + BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 5))) err = validatePrice(s) assert.EqualError(err, fmt.Sprintf("Orchestrator price higher than the set maximum price of %v wei per %v pixels", int64(1), int64(5))) diff --git a/server/segment_rpc_test.go b/server/segment_rpc_test.go index 6feb1a5ba..f8786202c 100644 --- a/server/segment_rpc_test.go +++ b/server/segment_rpc_test.go @@ -1679,7 +1679,7 @@ func TestSubmitSegment_GenPaymentError_ValidatePriceError(t *testing.T) { OrchestratorInfo: oinfo, } - BroadcastCfg.SetMaxPrice(big.NewRat(1, 5)) + BroadcastCfg.SetMaxPrice(core.NewFixedPrice(big.NewRat(1, 5))) defer BroadcastCfg.SetMaxPrice(nil) _, err := SubmitSegment(context.TODO(), s, &stream.HLSSegment{}, nil, 0, false, true) diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index 59a77b5c8..efc723d60 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -122,7 +122,7 @@ func lpCfg() starter.LivepeerConfig { ethPassword := "" network := "devnet" blockPollingInterval := 1 - pricePerUnit := 1 + pricePerUnit := "1" initializeRound := true cfg := starter.DefaultLivepeerConfig()