Skip to content

Commit

Permalink
Migrate docker CLI to API v2 (#4020)
Browse files Browse the repository at this point in the history
# What
- Migrates `docker run` to use the models package and the V2 client
for job submission.
- adds several new flags to the docker commands which may be found in
`cmd/util/flags/cliflags/job.go`
- marks some existing flags (`--concurrency`, and `--selector`) as
deprecated.
---------

Co-authored-by: frrist <[email protected]>
  • Loading branch information
frrist and frrist committed Jun 6, 2024
1 parent 41970fe commit 9d78148
Show file tree
Hide file tree
Showing 32 changed files with 2,544 additions and 694 deletions.
210 changes: 62 additions & 148 deletions cmd/cli/docker/docker_run.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
package docker

import (
"context"
"fmt"
"strings"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"k8s.io/kubectl/pkg/util/i18n"
"sigs.k8s.io/yaml"

"github.com/bacalhau-project/bacalhau/cmd/cli/helpers"
"github.com/bacalhau-project/bacalhau/cmd/util"
"github.com/bacalhau-project/bacalhau/cmd/util/flags/cliflags"
"github.com/bacalhau-project/bacalhau/cmd/util/flags/configflags"
"github.com/bacalhau-project/bacalhau/cmd/util/hook"
"github.com/bacalhau-project/bacalhau/cmd/util/parse"
"github.com/bacalhau-project/bacalhau/cmd/util/printer"
"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob"
"github.com/bacalhau-project/bacalhau/pkg/model"
clientv1 "github.com/bacalhau-project/bacalhau/pkg/publicapi/client"
engine_docker "github.com/bacalhau-project/bacalhau/pkg/executor/docker/models"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
clientv2 "github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2"
"github.com/bacalhau-project/bacalhau/pkg/userstrings"
"github.com/bacalhau-project/bacalhau/pkg/util/templates"
)

Expand Down Expand Up @@ -54,33 +51,23 @@ var (

// DockerRunOptions declares the arguments accepted by the `docker run` command
type DockerRunOptions struct {
Entrypoint []string
WorkingDirectory string // Working directory for docker

SpecSettings *cliflags.SpecFlagSettings // Setting for top level job spec fields.
ResourceSettings *cliflags.ResourceUsageSettings // Settings for the jobs resource requirements.
NetworkingSettings *cliflags.NetworkingFlagSettings // Settings for the jobs networking.
DealSettings *cliflags.DealFlagSettings // Settings for the jobs deal.
RunTimeSettings *cliflags.RunTimeSettingsWithDownload // Settings for running the job.
DownloadSettings *cliflags.DownloaderSettings // Settings for running Download.
Entrypoint []string
WorkingDirectory string
EnvironmentVariables []string

JobSettings *cliflags.JobSettings
TaskSettings *cliflags.TaskSettings
RunTimeSettings *cliflags.RunTimeSettings
}

const (
DefaultDockerRunWaitSeconds = 600
)

func NewDockerRunOptions() *DockerRunOptions {
return &DockerRunOptions{
Entrypoint: nil,
WorkingDirectory: "",

SpecSettings: cliflags.NewSpecFlagDefaultSettings(),
ResourceSettings: cliflags.NewDefaultResourceUsageSettings(),
NetworkingSettings: cliflags.NewDefaultNetworkingFlagSettings(),
DealSettings: cliflags.NewDefaultDealFlagSettings(),
DownloadSettings: cliflags.NewDefaultDownloaderSettings(),
RunTimeSettings: cliflags.DefaultRunTimeSettingsWithDownload(),
JobSettings: cliflags.DefaultJobSettings(),
TaskSettings: cliflags.DefaultTaskSettings(),
RunTimeSettings: cliflags.DefaultRunTimeSettings(),
}
}

Expand Down Expand Up @@ -116,164 +103,91 @@ func newDockerRunCmd() *cobra.Command { //nolint:funlen
if err != nil {
return fmt.Errorf("failed to setup repo: %w", err)
}
// create a v1 api client
apiV1, err := util.GetAPIClient(cfg)
if err != nil {
return fmt.Errorf("failed to create v1 api client: %w", err)
}
// create a v2 api client
apiV2, err := util.GetAPIClientV2(cmd, cfg)
api, err := util.GetAPIClientV2(cmd, cfg)
if err != nil {
return fmt.Errorf("failed to create v2 api client: %w", err)
}
return dockerRun(cmd, cmdArgs, apiV1, apiV2, cfg, opts)
return run(cmd, cmdArgs, api, opts)
},
}

dockerRunCmd.PersistentFlags().StringVarP(
&opts.WorkingDirectory, "workdir", "w", opts.WorkingDirectory,
`Working directory inside the container. Overrides the working directory shipped with the image (e.g. via WORKDIR in Dockerfile).`,
)

dockerRunCmd.PersistentFlags().StringSliceVar(
&opts.Entrypoint, "entrypoint", opts.Entrypoint,
`Override the default ENTRYPOINT of the image`,
)

dockerRunCmd.PersistentFlags().AddFlagSet(cliflags.SpecFlags(opts.SpecSettings))
dockerRunCmd.PersistentFlags().AddFlagSet(cliflags.DealFlags(opts.DealSettings))
dockerRunCmd.PersistentFlags().AddFlagSet(cliflags.NewDownloadFlags(opts.DownloadSettings))
dockerRunCmd.PersistentFlags().AddFlagSet(cliflags.NetworkingFlags(opts.NetworkingSettings))
dockerRunCmd.PersistentFlags().AddFlagSet(cliflags.ResourceUsageFlags(opts.ResourceSettings))
dockerRunCmd.PersistentFlags().AddFlagSet(cliflags.NewRunTimeSettingsFlagsWithDownload(opts.RunTimeSettings))
cliflags.RegisterJobFlags(dockerRunCmd, opts.JobSettings)
cliflags.RegisterTaskFlags(dockerRunCmd, opts.TaskSettings)
dockerRunCmd.Flags().AddFlagSet(cliflags.NewRunTimeSettingsFlags(opts.RunTimeSettings))

if err := configflags.RegisterFlags(dockerRunCmd, dockerRunFlags); err != nil {
util.Fatal(dockerRunCmd, err, 1)
}
// register flags unique to docker.
dockerFlags := pflag.NewFlagSet("docker", pflag.ContinueOnError)
dockerFlags.StringVarP(&opts.WorkingDirectory, "workdir", "w", opts.WorkingDirectory,
`Working directory inside the container. Overrides the working directory shipped with the image (e.g. via WORKDIR in Dockerfile).`)
dockerFlags.StringSliceVar(&opts.Entrypoint, "entrypoint", opts.Entrypoint,
`Override the default ENTRYPOINT of the image`)
dockerFlags.StringSliceVarP(&opts.EnvironmentVariables, "env", "e", opts.EnvironmentVariables,
"The environment variables to supply to the job (e.g. --env FOO=bar --env BAR=baz)")

dockerRunCmd.Flags().AddFlagSet(dockerFlags)

return dockerRunCmd
}

func dockerRun(
cmd *cobra.Command,
cmdArgs []string,
apiV1 *clientv1.APIClient,
apiV2 clientv2.API,
cfg types.BacalhauConfig,
opts *DockerRunOptions,
) error {
func run(cmd *cobra.Command, args []string, api clientv2.API, opts *DockerRunOptions) error {
ctx := cmd.Context()

image := cmdArgs[0]
parameters := cmdArgs[1:]
j, err := CreateJob(ctx, image, parameters, opts)
job, err := build(args, opts)
if err != nil {
return fmt.Errorf("creating job: %w", err)
}

if err := legacy_job.VerifyJob(ctx, j); err != nil {
if _, ok := err.(*bacerrors.ImageNotFound); ok {
return fmt.Errorf("docker image '%s' not found in the registry, or needs authorization", image)
} else {
return fmt.Errorf("verifying job: %s", err)
}
}

quiet := opts.RunTimeSettings.PrintJobIDOnly
if !quiet {
containsTag := dockerImageContainsTag(image)
if !containsTag {
cmd.PrintErrln("Using default tag: latest. Please specify a tag/digest for better reproducibility.")
}
return err
}

if opts.RunTimeSettings.DryRun {
// Converting job to yaml
var yamlBytes []byte
yamlBytes, err = yaml.Marshal(j)
out, err := helpers.JobToYaml(job)
if err != nil {
return fmt.Errorf("converting job to yaml: %w", err)
return err
}
cmd.Print(string(yamlBytes))
cmd.Print(out)
return nil
}

if err := legacy_job.VerifyJob(ctx, j); err != nil {
return fmt.Errorf("verifying job for submission: %w", err)
}

executingJob, err := apiV1.Submit(ctx, j)
resp, err := api.Jobs().Put(ctx, &apimodels.PutJobRequest{Job: job})
if err != nil {
return fmt.Errorf("submitting job for execution: %w", err)
return fmt.Errorf("failed to submit job: %w", err)
}

return printer.PrintJobExecutionLegacy(ctx, executingJob, cmd, opts.DownloadSettings, opts.RunTimeSettings, apiV1, apiV2, cfg.Node.IPFS)
}

// CreateJob creates a job object from the given command line arguments and options.
func CreateJob(ctx context.Context, image string, parameters []string, opts *DockerRunOptions) (*model.Job, error) {
outputs, err := parse.JobOutputs(ctx, opts.SpecSettings.OutputVolumes)
if err != nil {
return nil, err
if len(resp.Warnings) > 0 {
helpers.PrintWarnings(cmd, resp.Warnings)
}

nodeSelectorRequirements, err := parse.NodeSelector(opts.SpecSettings.Selector)
if err != nil {
return nil, err
if err := printer.PrintJobExecution(ctx, resp.JobID, cmd, opts.RunTimeSettings, api); err != nil {
return fmt.Errorf("failed to print job execution: %w", err)
}

labels, err := parse.Labels(ctx, opts.SpecSettings.Labels)
return nil
}

func build(args []string, opts *DockerRunOptions) (*models.Job, error) {
image := args[0]
parameters := args[1:]
engineSpec, err := engine_docker.NewDockerEngineBuilder(image).
WithParameters(parameters...).
WithWorkingDirectory(opts.WorkingDirectory).
WithEntrypoint(opts.Entrypoint...).
WithEnvironmentVariables(opts.EnvironmentVariables...).
Build()
if err != nil {
return nil, err
}

spec, err := legacy_job.MakeDockerSpec(
image, opts.WorkingDirectory, opts.Entrypoint, opts.SpecSettings.EnvVar, parameters,
legacy_job.WithResources(
opts.ResourceSettings.CPU,
opts.ResourceSettings.Memory,
opts.ResourceSettings.Disk,
opts.ResourceSettings.GPU,
),
legacy_job.WithNetwork(
opts.NetworkingSettings.Network,
opts.NetworkingSettings.Domains,
),
legacy_job.WithTimeout(opts.SpecSettings.Timeout),
legacy_job.WithInputs(opts.SpecSettings.Inputs.Values()...),
legacy_job.WithOutputs(outputs...),
legacy_job.WithAnnotations(labels...),
legacy_job.WithNodeSelector(nodeSelectorRequirements),
legacy_job.WithDeal(
opts.DealSettings.TargetingMode,
opts.DealSettings.Concurrency,
),
)

// Publisher is optional and we won't provide it if not specified
p := opts.SpecSettings.Publisher.Value()
if p != nil {
spec.Publisher = p.Type //nolint:staticcheck
spec.PublisherSpec = *p
}

job, err := helpers.BuildJobFromFlags(engineSpec, opts.JobSettings, opts.TaskSettings)
if err != nil {
return nil, err
return nil, fmt.Errorf("building job spec: %w", err)
}

return &model.Job{
APIVersion: model.APIVersionLatest().String(),
Spec: spec,
}, nil
}

// dockerImageContainsTag checks if the image contains a tag or a digest
func dockerImageContainsTag(image string) bool {
if strings.Contains(image, ":") {
return true
}
if strings.Contains(image, "@") {
return true
// Normalize and validate the job spec
job.Normalize()
if err := job.ValidateSubmission(); err != nil {
return nil, fmt.Errorf("%s: %w", userstrings.JobSpecBad, err)
}
return false

return job, nil
}
Loading

0 comments on commit 9d78148

Please sign in to comment.