diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 0e0c47618..506e53646 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -15,17 +15,18 @@ #### Transcoder ### Bug Fixes 🐞 -- [#2586](https://github.com/livepeer/go-livepeer/pull/2586) Broadcaster: Don't pass a nil context into grpc call or it panics #### CLI #### General - [#2583](https://github.com/livepeer/go-livepeer/pull/2583) eth: Set tx GasFeeCap to min(gasPriceEstimate, current GasFeeCap) (@yondonfu) +- [#2586](https://github.com/livepeer/go-livepeer/pull/2586) Broadcaster: Don't pass a nil context into grpc call or it panics (@thomshutt, @cyberj0g) #### Broadcaster - [#2573](https://github.com/livepeer/go-livepeer/pull/2573) server: Fix timeout for stream recording background jobs (@victorges) - [#2586](https://github.com/livepeer/go-livepeer/pull/2586) Refactor RTMP connection object management to prevent race conditions (@cyberj0g) #### Orchestrator +- [#2591](https://github.com/livepeer/go-livepeer/pull/2591) Return from transcode loop if transcode session is ended by B (@yondonfu) #### Transcoder diff --git a/core/core_test.go b/core/core_test.go index eb73647ce..671c56c91 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "io/ioutil" + "runtime" "testing" "time" @@ -154,6 +155,39 @@ func TestTranscodeLoop_GivenNoSegmentsPastTimeout_CleansSegmentChan(t *testing.T assert.Nil(segChan) } +func TestTranscodeLoop_CleanupForBroadcasterEndTranscodingSession(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + drivers.NodeStorage = drivers.NewMemoryDriver(nil) + oldTranscodeLoopTimeout := transcodeLoopTimeout + defer func() { transcodeLoopTimeout = oldTranscodeLoopTimeout }() + transcodeLoopTimeout = 100 * time.Millisecond + + tmp := t.TempDir() + + ffmpeg.InitFFmpeg() + n, _ := NewLivepeerNode(ð.StubClient{}, tmp, nil) + n.Transcoder = NewLocalTranscoder(tmp) + + md := &SegTranscodingMetadata{Profiles: videoProfiles, AuthToken: stubAuthToken()} + mid := ManifestID(md.AuthToken.SessionId) + + ss := StubSegment() + _, err := n.sendToTranscodeLoop(context.TODO(), md, ss) + require.Nil(err) + require.NotNil(getSegChan(n, mid)) + + startRoutines := runtime.NumGoroutine() + + n.endTranscodingSession(md.AuthToken.SessionId, context.TODO()) + waitForTranscoderLoopTimeout(n, mid) + + endRoutines := runtime.NumGoroutine() + + assert.Equal(endRoutines, startRoutines-1) +} + func waitForTranscoderLoopTimeout(n *LivepeerNode, m ManifestID) { for i := 0; i < 3; i++ { time.Sleep(transcodeLoopTimeout * 2) diff --git a/core/orchestrator.go b/core/orchestrator.go index a61486fc8..6d415ccf3 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -637,11 +637,13 @@ func (n *LivepeerNode) transcodeSegmentLoop(logCtx context.Context, md *SegTrans clog.V(common.DEBUG).Infof(logCtx, "Segment loop timed out; closing ") n.endTranscodingSession(md.AuthToken.SessionId, logCtx) return - case chanData := <-segChan: - // nil means channel is closed by endTranscodingSession called by B - if chanData != nil { - chanData.res <- n.transcodeSeg(chanData.ctx, *n.StorageConfig, chanData.seg, chanData.md) + case chanData, ok := <-segChan: + // Check if channel was closed due to endTranscodingSession being called by B + if !ok { + cancel() + return } + chanData.res <- n.transcodeSeg(chanData.ctx, *n.StorageConfig, chanData.seg, chanData.md) } cancel() } diff --git a/server/broadcast.go b/server/broadcast.go index 467fb392c..6ca337b35 100755 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -646,8 +646,9 @@ func (bsm *BroadcastSessionsManager) collectResults(submitResultsCh chan *Submit // the caller needs to ensure bsm.sessLock is acquired before calling this. func (bsm *BroadcastSessionsManager) completeSessionUnsafe(ctx context.Context, sess *BroadcastSession, tearDown bool) { if tearDown { - err := EndTranscodingSession(ctx, sess) - clog.Errorf(ctx, "Error completing transcoding session: %q", err) + if err := EndTranscodingSession(ctx, sess); err != nil { + clog.Errorf(ctx, "Error completing transcoding session: %q", err) + } } if sess.OrchestratorScore == common.Score_Untrusted { bsm.untrustedPool.completeSession(sess)