Skip to content

Commit

Permalink
server: Gather transcode health info in broadcast
Browse files Browse the repository at this point in the history
 - Include both orch eth address and transcoder URI
 - transcodeAttemptInfo as an out parameter of transcodeSegment
 - named returns+defer trick to avoid repetitions of time&error setting
  • Loading branch information
victorges committed Jul 7, 2021
1 parent 226447a commit 617fe10
Showing 1 changed file with 36 additions and 4 deletions.
40 changes: 36 additions & 4 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/golang/glog"

"github.com/livepeer/go-livepeer/common"
Expand Down Expand Up @@ -469,10 +470,21 @@ func processSegment(cxn *rtmpConnection, seg *stream.HLSSegment) ([]string, erro
sv = verification.NewSegmentVerifier(Policy)
}

var (
attempts []transcodeAttemptInfo
urls []string
)
defer func() {
// TODO: Send saved attempts somewhere. Or better yet, avoid this defer and
// refactor the retry loop below to send the attempts after looping.
}()
for i := 0; i < MaxAttempts; i++ {
// if fails, retry; rudimentary
var urls []string
if urls, err = transcodeSegment(cxn, seg, name, sv); err == nil {
// if transcodeSegment fails, retry; rudimentary
info := transcodeAttemptInfo{}
urls, err = transcodeSegment(cxn, seg, name, sv, &info)
attempts = append(attempts, info)

if err == nil {
return urls, nil
}

Expand All @@ -495,8 +507,24 @@ func processSegment(cxn *rtmpConnection, seg *stream.HLSSegment) ([]string, erro
return nil, err
}

// TODO: Declare this somewhere else, probably with func that sends to queue.
type orchShortInfo struct {
Address string
TranscoderUri string
}

type transcodeAttemptInfo struct {
Orchestrator orchShortInfo
LatencyMs int64
Error error
}

func transcodeSegment(cxn *rtmpConnection, seg *stream.HLSSegment, name string,
verifier *verification.SegmentVerifier) ([]string, error) {
verifier *verification.SegmentVerifier, info *transcodeAttemptInfo) (urls []string, err error) {
defer func(startTime time.Time) {
info.LatencyMs = time.Since(startTime).Milliseconds()
info.Error = err
}(time.Now())

nonce := cxn.nonce
cpl := cxn.pl
Expand All @@ -513,6 +541,10 @@ func transcodeSegment(cxn *rtmpConnection, seg *stream.HLSSegment, name string,
// similar to the orchestrator's RemoteTranscoderFatalError
return nil, nil
}
info.Orchestrator = orchShortInfo{
TranscoderUri: sess.OrchestratorInfo.Transcoder,
Address: hexutil.Encode(sess.OrchestratorInfo.Address),
}

glog.Infof("Trying to transcode segment manifestID=%v nonce=%d seqNo=%d", cxn.mid, nonce, seg.SeqNo)
if monitor.Enabled {
Expand Down

0 comments on commit 617fe10

Please sign in to comment.