Skip to content

Commit

Permalink
refactor: migrate exec command to use models (#4155)
Browse files Browse the repository at this point in the history
Co-authored-by: frrist <[email protected]>
  • Loading branch information
frrist and frrist committed Jul 2, 2024
1 parent 844b276 commit 064edc8
Showing 1 changed file with 19 additions and 124 deletions.
143 changes: 19 additions & 124 deletions cmd/cli/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ import (
"github.com/bacalhau-project/bacalhau/cmd/util"
"github.com/bacalhau-project/bacalhau/cmd/util/flags/cliflags"
"github.com/bacalhau-project/bacalhau/cmd/util/hook"
"github.com/bacalhau-project/bacalhau/cmd/util/parse"
"github.com/bacalhau-project/bacalhau/cmd/util/printer"
"github.com/bacalhau-project/bacalhau/pkg/lib/template"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/models/migration/legacy"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2"
"github.com/bacalhau-project/bacalhau/pkg/storage/inline"
Expand Down Expand Up @@ -54,14 +52,16 @@ Supported job types:
)

type ExecOptions struct {
SpecSettings *cliflags.SpecFlagSettings
JobSettings *cliflags.JobSettings
TaskSettings *cliflags.TaskSettings
RunTimeSettings *cliflags.RunTimeSettings
Code string
}

func NewExecOptions() *ExecOptions {
return &ExecOptions{
SpecSettings: cliflags.NewSpecFlagDefaultSettings(),
JobSettings: cliflags.DefaultJobSettings(),
TaskSettings: cliflags.DefaultTaskSettings(),
RunTimeSettings: cliflags.DefaultRunTimeSettings(),
}
}
Expand Down Expand Up @@ -100,8 +100,10 @@ func NewCmdWithOptions(options *ExecOptions) *cobra.Command {
},
}

execCmd.PersistentFlags().AddFlagSet(cliflags.SpecFlags(options.SpecSettings))
execCmd.PersistentFlags().AddFlagSet(cliflags.NewRunTimeSettingsFlags(options.RunTimeSettings))
cliflags.RegisterJobFlags(execCmd, options.JobSettings)
cliflags.RegisterTaskFlags(execCmd, options.TaskSettings)

execCmd.Flags().AddFlagSet(cliflags.NewRunTimeSettingsFlags(options.RunTimeSettings))
execCmd.Flags().StringVar(&options.Code, "code", "", "Specifies the file, or directory of code to send with the request")

return execCmd
Expand Down Expand Up @@ -210,11 +212,6 @@ func PrepareJob(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, opti
job.Tasks[0].Engine.Params["Command"] = jobType
job.Tasks[0].Engine.Params["Arguments"] = cmdArgs[1:]

// Attach any inputs the user specified to the job spec
if err := prepareInputs(options, job); err != nil {
return nil, err
}

// Process --code if anything was specified. In future we may want to try and determine this
// ourselves where it is not specified, but it will likely be dependent on job type.
if options.Code != "" {
Expand All @@ -223,129 +220,27 @@ func PrepareJob(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, opti
}
}

publisherSpec := options.SpecSettings.Publisher.Value()
if publisherSpec != nil {
job.Tasks[0].Publisher = &models.SpecConfig{
Type: publisherSpec.Type.String(),
Params: publisherSpec.Params,
}
}

// Handle ResultPaths by using the legacy parser and converting.
if err := prepareJobOutputs(cmd.Context(), options, job); err != nil {
return nil, err
}

// Parse labels from flag, we expect key=value for the non-legacy models.Job
if err := prepareLabels(options, job); err != nil {
return nil, err
}

// Constraints for node selection
if err := prepareConstraints(options, job); err != nil {
return nil, err
job.Labels, err = options.JobSettings.Labels()
job.Task().Publisher = options.TaskSettings.Publisher.Value()
job.Task().ResultPaths = options.TaskSettings.ResultPaths
job.Task().Env = options.TaskSettings.EnvironmentVariables
job.Task().InputSources = options.TaskSettings.InputSources.Values()
if err != nil {
return nil, fmt.Errorf("parsing job labes: %w", err)
}

// Environment variables
if err := prepareEnvVars(options, job); err != nil {
return nil, err
job.Constraints, err = options.JobSettings.Constraints()
if err != nil {
return nil, fmt.Errorf("parsing job constraints: %w", err)
}

// Set the execution timeouts
job.Tasks[0].Timeouts = &models.TimeoutConfig{
TotalTimeout: options.SpecSettings.Timeout,
TotalTimeout: options.TaskSettings.Timeout,
}

// Unsupported in new job specifications (models.Job)
// options.SpecSettings.DoNotTrack

return job, nil
}

func prepareConstraints(options *ExecOptions, job *models.Job) error {
if nodeSelectorRequirements, err := parse.NodeSelector(options.SpecSettings.Selector); err != nil {
return err
} else {
constraints, err := legacy.FromLegacyLabelSelector(nodeSelectorRequirements)
if err != nil {
return err
}
job.Constraints = constraints
}

return nil
}

func prepareInputs(options *ExecOptions, job *models.Job) error {
for _, ss := range options.SpecSettings.Inputs.Values() {
src, err := legacy.FromLegacyStorageSpecToInputSource(ss)
if err != nil {
return fmt.Errorf("failed to process input %s: %w", ss.Name, err)
}

job.Tasks[0].InputSources = append(job.Tasks[0].InputSources, src)
}
return nil
}

func prepareLabels(options *ExecOptions, job *models.Job) error {
if len(options.SpecSettings.Labels) > 0 {
if labels, err := parse.StringSliceToMap(options.SpecSettings.Labels); err != nil {
return err
} else {
job.Labels = labels
}
}
return nil
}

func prepareEnvVars(options *ExecOptions, job *models.Job) error {
if len(options.SpecSettings.EnvVar) > 0 {
if env, err := parse.StringSliceToMap(options.SpecSettings.EnvVar); err != nil {
return err
} else {
job.Tasks[0].Env = env
}
}
return nil
}

func prepareJobOutputs(ctx context.Context, options *ExecOptions, job *models.Job) error {
legacyOutputs, err := parse.JobOutputs(ctx, options.SpecSettings.OutputVolumes)
if err != nil {
return err
}

if len(legacyOutputs) == 0 {
return nil
}

// If we only have the single legacy default output then we will only use it if we have a publisher
// configured. If no publisher then we can just return early.
if len(legacyOutputs) == 1 && legacyOutputs[0].Name == "outputs" && legacyOutputs[0].Path == "/outputs" {
if job.Tasks[0].Publisher == nil {
return nil
}
}

job.Tasks[0].ResultPaths = make([]*models.ResultPath, 0, len(legacyOutputs))
for _, output := range legacyOutputs {
rp := &models.ResultPath{
Name: output.Name,
Path: output.Path,
}

e := rp.Validate()
if e != nil {
return e
}

job.Tasks[0].ResultPaths = append(job.Tasks[0].ResultPaths, rp)
}

return nil
}

// addInlineContent will use codeLocation to determine if it is a single file or a
// directory and will attach to the job as an inline attachment.
func addInlineContent(ctx context.Context, codeLocation string, job *models.Job) error {
Expand Down

0 comments on commit 064edc8

Please sign in to comment.