Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control scene classification with T command line args #2686

Merged
merged 2 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#### Orchestrator

#### Transcoder
- \#2686 Control non-stream specific scene classification with command line args

### Bug Fixes 🐞

Expand Down
3 changes: 2 additions & 1 deletion cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.Netint = flag.String("netint", *cfg.Netint, "Comma-separated list of NetInt device GUIDs (or \"all\" for all available devices)")
cfg.TestTranscoder = flag.Bool("testTranscoder", *cfg.TestTranscoder, "Test Nvidia GPU transcoding at startup")
cfg.SceneClassificationModelPath = flag.String("sceneClassificationModelPath", *cfg.SceneClassificationModelPath, "Path to scene classification model")
cfg.DetectContent = flag.Bool("detectContent", *cfg.DetectContent, "Set to true to enable content type detection")
cfg.DetectContent = flag.Bool("detectContent", *cfg.DetectContent, "Enables content type detection capability and automatic detection. If not specified, transcoder won't advertise corresponding capabilities and receive such jobs.")
cfg.DetectionSampleRate = flag.Uint("detectionSampleRate", *cfg.DetectionSampleRate, "Run content detection automatically on every nth frame of each segment, independently of requested stream transcoding configuration.")

// Onchain:
cfg.EthAcctAddr = flag.String("ethAcctAddr", *cfg.EthAcctAddr, "Existing Eth account address")
Expand Down
10 changes: 10 additions & 0 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common/math"
"io/ioutil"
"math/big"
"net"
Expand Down Expand Up @@ -87,6 +88,7 @@ type LivepeerConfig struct {
TestTranscoder *bool
SceneClassificationModelPath *string
DetectContent *bool
DetectionSampleRate *uint
EthAcctAddr *string
EthPassword *string
EthKeystorePath *string
Expand Down Expand Up @@ -153,6 +155,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultNetint := ""
defaultTestTranscoder := true
defaultDetectContent := false
defaultDetectionSampleRate := uint(math.MaxUint32)
defaultSceneClassificationModelPath := "tasmodel.pb"

// Onchain:
Expand Down Expand Up @@ -231,6 +234,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
TestTranscoder: &defaultTestTranscoder,
SceneClassificationModelPath: &defaultSceneClassificationModelPath,
DetectContent: &defaultDetectContent,
DetectionSampleRate: &defaultDetectionSampleRate,

// Onchain:
EthAcctAddr: &defaultEthAcctAddr,
Expand Down Expand Up @@ -295,6 +299,11 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
return
}

if *cfg.DetectionSampleRate <= 0 {
glog.Fatal("-detectionSampleRate must be greater than zero")
return
}

blockPollingTime := time.Duration(*cfg.BlockPollingInterval) * time.Second

type NetworkConfig struct {
Expand Down Expand Up @@ -432,6 +441,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if accel == ffmpeg.Nvidia && *cfg.DetectContent {
if _, err := os.Stat(*cfg.SceneClassificationModelPath); err == nil {
detectorProfile := ffmpeg.DSceneAdultSoccer
detectorProfile.SampleRate = *cfg.DetectionSampleRate
detectorProfile.ModelPath = *cfg.SceneClassificationModelPath
core.DetectorProfile = &detectorProfile
for _, d := range devices {
Expand Down
7 changes: 5 additions & 2 deletions core/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ func (lb *LoadBalancingTranscoder) createSession(ctx context.Context, md *SegTra
// Acquire transcode session. Map to job id + assigned transcoder
key := job + "_" + transcoder
costEstimate := calculateCost(md.Profiles)

// create the transcoder - with AI capabilities, if required by local or stream configuration
var lpmsSession TranscoderSession
if md.DetectorEnabled {
setEffectiveDetectorConfig(md)
if md.DetectorEnabled && len(md.DetectorProfiles) == 1 {
var err error
lpmsSession, err = lb.newDetectorT(DetectorProfile, transcoder)
lpmsSession, err = lb.newDetectorT(md.DetectorProfiles[0], transcoder)
if err != nil {
return nil, err
}
Expand Down
70 changes: 33 additions & 37 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"flag"
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -41,6 +42,36 @@ func NewUnrecoverableError(err error) UnrecoverableError {

var WorkDir string

func setEffectiveDetectorConfig(md *SegTranscodingMetadata) {
aiEnabled := DetectorProfile != nil
actualProfile := DetectorProfile
if aiEnabled {
presetSampleRate := DetectorProfile.(*ffmpeg.SceneClassificationProfile).SampleRate
if md.DetectorEnabled && len(md.DetectorProfiles) == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where there can be more than one DetectorProfiles?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, these structures were designed with an aim to have multiple AI capabilities some day.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep for sure, I was more checking that there was no chance we'd unintentionally hit md.DetectorEnabled && len(md.DetectorProfiles) > 1 at some point

actualProfile = md.DetectorProfiles[0]
requestedSampleRate := actualProfile.(*ffmpeg.SceneClassificationProfile).SampleRate
// 0 is not a valid value
if requestedSampleRate == 0 {
requestedSampleRate = math.MaxUint32
}
actualProfile.(*ffmpeg.SceneClassificationProfile).SampleRate = uint(math.Min(float64(presetSampleRate),
float64(requestedSampleRate)))
// copy other fields from default AI capability, as we don't yet support custom ones
actualProfile.(*ffmpeg.SceneClassificationProfile).ModelPath = DetectorProfile.(*ffmpeg.SceneClassificationProfile).ModelPath
actualProfile.(*ffmpeg.SceneClassificationProfile).Input = DetectorProfile.(*ffmpeg.SceneClassificationProfile).Input
actualProfile.(*ffmpeg.SceneClassificationProfile).Output = DetectorProfile.(*ffmpeg.SceneClassificationProfile).Output
actualProfile.(*ffmpeg.SceneClassificationProfile).Classes = DetectorProfile.(*ffmpeg.SceneClassificationProfile).Classes
}
}
if actualProfile != nil && actualProfile.(*ffmpeg.SceneClassificationProfile).SampleRate < math.MaxUint32 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for comparison to MaxUint32 rather than having it be zero?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That value ends up in something like select=not(mod(Nframe, SampleRate)) in Ffmpeg filter, so we don't want to have 0 here. Hence, MAX as default value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Gotcha

md.DetectorProfiles = []ffmpeg.DetectorProfile{actualProfile}
md.DetectorEnabled = true
} else {
md.DetectorProfiles = []ffmpeg.DetectorProfile{}
md.DetectorEnabled = false
}
}

func (lt *LocalTranscoder) Transcode(ctx context.Context, md *SegTranscodingMetadata) (td *TranscodeData, retErr error) {
// Returns UnrecoverableError instead of panicking to gracefully notify orchestrator about transcoder's failure
defer recoverFromPanic(&retErr)
Expand All @@ -50,6 +81,7 @@ func (lt *LocalTranscoder) Transcode(ctx context.Context, md *SegTranscodingMeta
Fname: md.Fname,
Accel: ffmpeg.Software,
}
setEffectiveDetectorConfig(md)
profiles := md.Profiles
opts := profilesToTranscodeOptions(lt.workDir, ffmpeg.Software, profiles, md.CalcPerceptualHash, md.SegmentParameters)
if md.DetectorEnabled {
Expand Down Expand Up @@ -141,6 +173,7 @@ func (nv *NvidiaTranscoder) Transcode(ctx context.Context, md *SegTranscodingMet
Device: nv.device,
}
profiles := md.Profiles
setEffectiveDetectorConfig(md)
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Nvidia, profiles, md.CalcPerceptualHash, md.SegmentParameters)
if md.DetectorEnabled {
out = append(out, detectorsToTranscodeOptions(WorkDir, ffmpeg.Nvidia, md.DetectorProfiles)...)
Expand Down Expand Up @@ -354,43 +387,6 @@ func TestSoftwareTranscoderCapabilities(tmpdir string) (caps []Capability, fatal
return caps, fatalError
}

func TestNetintTranscoder(devices []string) error {
buf, _ := os.ReadFile("core/test.ts")
b := bytes.NewReader(buf)
z, err := gzip.NewReader(b)
if err != nil {
return err
}
mp4testSeg, err := ioutil.ReadAll(z)
z.Close()
if err != nil {
return err
}
fname := filepath.Join(WorkDir, "testseg.tempfile")
err = ioutil.WriteFile(fname, mp4testSeg, 0644)
if err != nil {
return err
}
defer os.Remove(fname)
for _, device := range devices {
t1 := NewNetintTranscoder(device)
// "145x1" is the minimal resolution that succeeds on Windows, so use "145x145"
p := ffmpeg.VideoProfile{Resolution: "145x145", Bitrate: "1k", Format: ffmpeg.FormatMP4}
md := &SegTranscodingMetadata{Fname: fname, Profiles: []ffmpeg.VideoProfile{p, p, p, p}}
td, err := t1.Transcode(context.Background(), md)

t1.Stop()
if err != nil {
return err
}
if len(td.Segments) == 0 || td.Pixels == 0 {
return errors.New("Empty transcoded segment")
}
}

return nil
}

func GetTranscoderFactoryByAccel(acceleration ffmpeg.Acceleration) (func(device string) TranscoderSession, func(detector ffmpeg.DetectorProfile, gpu string) (TranscoderSession, error), error) {
switch acceleration {
case ffmpeg.Nvidia:
Expand Down
26 changes: 22 additions & 4 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,9 +1013,30 @@ func transcodeSegment(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSS
}
return nil, info, err
}
// log all received detection results
if monitor.Enabled {
if len(res.Detections) > 0 {
for _, detection := range res.Detections {
switch x := detection.Value.(type) {
case *net.DetectData_SceneClassification:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth logging in a default case to catch other cases?

probs := x.SceneClassification.ClassProbs
for id, prob := range probs {
className := "unknown"
for name, lookupId := range ffmpeg.DetectorClassIDLookup {
if id == uint32(lookupId) {
className = name
break
}
}
monitor.SegSceneClassificationResult(ctx, seg.SeqNo, className, prob)
}
}
}
}
}
// [EXPERIMENTAL] send content detection results to callback webhook
// for now use detection only in common path
if DetectionWebhookURL != nil && len(res.Detections) > 0 {
if DetectionWebhookURL != nil {
clog.V(common.DEBUG).Infof(ctx, "Got detection result %v", res.Detections)
if monitor.Enabled {
monitor.SegSceneClassificationDone(ctx, seg.SeqNo)
Expand All @@ -1035,9 +1056,6 @@ func transcodeSegment(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSS
Name: name,
Probability: prob,
})
if monitor.Enabled {
monitor.SegSceneClassificationResult(ctx, seg.SeqNo, name, prob)
}
}
}
}
Expand Down