Skip to content

Commit

Permalink
fail jobs exceeding TotalTimeout (#4055)
Browse files Browse the repository at this point in the history
`TotalTimeout` is a new field that was introduced in
#4049 (comment)
that covers both ExecutionTimeout and QueueTimeout, but not taking
affect and not processed by Housekeeper or Scheduler

This PR fixes that and maps `--timeout` flag in `docker run` to
`TotalTimeout` instead of `ExecutionTimeout`
  • Loading branch information
wdbaruni committed Jun 5, 2024
1 parent 03e0d03 commit 0e0325b
Show file tree
Hide file tree
Showing 33 changed files with 339 additions and 182 deletions.
2 changes: 1 addition & 1 deletion cmd/cli/docker/docker_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func (s *DockerRunSuite) TestRun_Timeout_DefaultValue() {

j := testutils.GetJobFromTestOutputLegacy(ctx, s.T(), s.Client, out)

s.Require().Equal(node.TestRequesterConfig.JobDefaults.ExecutionTimeout, j.Spec.GetTimeout(),
s.Require().Equal(node.TestRequesterConfig.JobDefaults.TotalTimeout, j.Spec.GetTimeout(),
"Did not fall back to default timeout value")
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func PrepareJob(cmd *cobra.Command, cmdArgs []string, unknownArgs []string, opti

// Set the execution timeouts
job.Tasks[0].Timeouts = &models.TimeoutConfig{
ExecutionTimeout: options.SpecSettings.Timeout,
TotalTimeout: options.SpecSettings.Timeout,
}

// Unsupported in new job specifications (models.Job)
Expand Down
1 change: 1 addition & 0 deletions cmd/cli/serve/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func GetRequesterConfig(ctx context.Context, cfg types.RequesterConfig, createJo

requesterConfig, err := node.NewRequesterConfigWith(node.RequesterConfigParams{
JobDefaults: transformer.JobDefaults{
TotalTimeout: time.Duration(cfg.JobDefaults.TotalTimeout),
ExecutionTimeout: time.Duration(cfg.JobDefaults.ExecutionTimeout),
QueueTimeout: time.Duration(cfg.JobDefaults.QueueTimeout),
},
Expand Down
24 changes: 7 additions & 17 deletions pkg/bidstrategy/semantic/timeout_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/bacalhau-project/bacalhau/pkg/bidstrategy"
"github.com/bacalhau-project/bacalhau/pkg/model"
)

type TimeoutStrategyParams struct {
Expand Down Expand Up @@ -38,17 +37,9 @@ const (
)

func (s *TimeoutStrategy) ShouldBid(_ context.Context, request bidstrategy.BidStrategyRequest) (bidstrategy.BidStrategyResponse, error) {
timeoutSeconds := request.Job.Task().Timeouts.ExecutionTimeout
if timeoutSeconds <= 0 {
return bidstrategy.NewBidResponse(true, minReason, timeoutSeconds, 0), nil
}

// Timeout will be multiplied by 1000000000 (time.Second) when it gets
// converted to a time.Duration (which is an int64 underneath), so make sure
// that it can fit into it.
var maxTimeout = int64(model.NoJobTimeout.Seconds())
if request.Job.Task().Timeouts.ExecutionTimeout > maxTimeout {
return bidstrategy.NewBidResponse(false, maxReason, timeoutSeconds, maxTimeout), nil
timeout := request.Job.Task().Timeouts.GetExecutionTimeout()
if timeout <= 0 {
return bidstrategy.NewBidResponse(true, minReason, timeout.String(), 0), nil
}

for _, clientID := range s.jobExecutionTimeoutClientIDBypassList {
Expand All @@ -58,11 +49,10 @@ func (s *TimeoutStrategy) ShouldBid(_ context.Context, request bidstrategy.BidSt
}

// skip bidding if the job spec defined a timeout value higher or lower than what we are willing to accept
timeoutDuration := request.Job.Task().Timeouts.GetExecutionTimeout()
if timeoutDuration < s.minJobExecutionTimeout {
return bidstrategy.NewBidResponse(false, minReason, timeoutDuration.String(), s.minJobExecutionTimeout.String()), nil
if timeout < s.minJobExecutionTimeout {
return bidstrategy.NewBidResponse(false, minReason, timeout.String(), s.minJobExecutionTimeout.String()), nil
}

success := s.maxJobExecutionTimeout <= 0 || (s.maxJobExecutionTimeout > 0 && timeoutDuration <= s.maxJobExecutionTimeout)
return bidstrategy.NewBidResponse(success, maxReason, timeoutDuration.String(), s.maxJobExecutionTimeout.String()), nil
success := s.maxJobExecutionTimeout <= 0 || (s.maxJobExecutionTimeout > 0 && timeout <= s.maxJobExecutionTimeout)
return bidstrategy.NewBidResponse(success, maxReason, timeout.String(), s.maxJobExecutionTimeout.String()), nil
}
24 changes: 2 additions & 22 deletions pkg/bidstrategy/semantic/timeout_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"context"
"testing"

"github.com/bacalhau-project/bacalhau/pkg/model"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/bacalhau-project/bacalhau/pkg/models"

"github.com/bacalhau-project/bacalhau/pkg/bidstrategy"
"github.com/bacalhau-project/bacalhau/pkg/bidstrategy/semantic"
)
Expand All @@ -23,26 +23,6 @@ func TestTimeoutStrategy(t *testing.T) {
shouldBid bool
reason string
}{
{
name: "timeout-too-large",
params: semantic.TimeoutStrategyParams{
JobExecutionTimeoutClientIDBypassList: []string{"client"},
},
request: bidstrategy.BidStrategyRequest{
Job: models.Job{
Namespace: "client",
Tasks: []*models.Task{
{
Timeouts: &models.TimeoutConfig{
ExecutionTimeout: int64(model.NoJobTimeout.Seconds()) + 1,
},
},
},
},
},
shouldBid: false,
reason: "this node does not accept jobs with timeout 9223372037 (the maximum allowed is 9223372036)",
},
{
name: "client-skip-list",
params: semantic.TimeoutStrategyParams{
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/configenv/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ var DevelopmentRequesterConfig = types.RequesterConfig{
NodeOverSubscriptionFactor: 1.5,
},
JobDefaults: types.JobDefaults{
ExecutionTimeout: types.Duration(30 * time.Minute),
TotalTimeout: types.Duration(30 * time.Minute),
},
StorageProvider: types.StorageProviderConfig{
S3: types.S3StorageProviderConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/configenv/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ var LocalRequesterConfig = types.RequesterConfig{
NodeOverSubscriptionFactor: 1.5,
},
JobDefaults: types.JobDefaults{
ExecutionTimeout: types.Duration(30 * time.Minute),
TotalTimeout: types.Duration(30 * time.Minute),
},
StorageProvider: types.StorageProviderConfig{
S3: types.S3StorageProviderConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/configenv/production.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ var ProductionRequesterConfig = types.RequesterConfig{
NodeOverSubscriptionFactor: 1.5,
},
JobDefaults: types.JobDefaults{
ExecutionTimeout: types.Duration(30 * time.Minute),
TotalTimeout: types.Duration(30 * time.Minute),
},
StorageProvider: types.StorageProviderConfig{
S3: types.S3StorageProviderConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/configenv/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ var StagingRequesterConfig = types.RequesterConfig{
NodeOverSubscriptionFactor: 1.5,
},
JobDefaults: types.JobDefaults{
ExecutionTimeout: types.Duration(30 * time.Minute),
TotalTimeout: types.Duration(30 * time.Minute),
},
StorageProvider: types.StorageProviderConfig{
S3: types.S3StorageProviderConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/configenv/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ var TestingRequesterConfig = types.RequesterConfig{
NodeOverSubscriptionFactor: 1.5,
},
JobDefaults: types.JobDefaults{
ExecutionTimeout: types.Duration(30 * time.Second),
TotalTimeout: types.Duration(30 * time.Second),
},
StorageProvider: types.StorageProviderConfig{
S3: types.S3StorageProviderConfig{
Expand Down
3 changes: 1 addition & 2 deletions pkg/config/types/gen_viper/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ func main() {
defaultConfig := types.BacalhauConfig{}

// Adding the package name
fmt.Fprintf(file, `
// CODE GENERATED BY pkg/config/types/gen_viper DO NOT EDIT
fmt.Fprintf(file, `// CODE GENERATED BY pkg/config/types/gen_viper DO NOT EDIT
package types
Expand Down
1 change: 1 addition & 0 deletions pkg/config/types/generated_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ const NodeComputeControlPlaneSettingsHeartbeatFrequency = "Node.Compute.ControlP
const NodeComputeControlPlaneSettingsHeartbeatTopic = "Node.Compute.ControlPlaneSettings.HeartbeatTopic"
const NodeRequester = "Node.Requester"
const NodeRequesterJobDefaults = "Node.Requester.JobDefaults"
const NodeRequesterJobDefaultsTotalTimeout = "Node.Requester.JobDefaults.TotalTimeout"
const NodeRequesterJobDefaultsExecutionTimeout = "Node.Requester.JobDefaults.ExecutionTimeout"
const NodeRequesterJobDefaultsQueueTimeout = "Node.Requester.JobDefaults.QueueTimeout"
const NodeRequesterExternalVerifierHook = "Node.Requester.ExternalVerifierHook"
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/types/generated_viper_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func SetDefaults(cfg BacalhauConfig, opts ...SetOption) {
p.Viper.SetDefault(NodeComputeControlPlaneSettingsHeartbeatTopic, cfg.Node.Compute.ControlPlaneSettings.HeartbeatTopic)
p.Viper.SetDefault(NodeRequester, cfg.Node.Requester)
p.Viper.SetDefault(NodeRequesterJobDefaults, cfg.Node.Requester.JobDefaults)
p.Viper.SetDefault(NodeRequesterJobDefaultsTotalTimeout, cfg.Node.Requester.JobDefaults.TotalTimeout.AsTimeDuration())
p.Viper.SetDefault(NodeRequesterJobDefaultsExecutionTimeout, cfg.Node.Requester.JobDefaults.ExecutionTimeout.AsTimeDuration())
p.Viper.SetDefault(NodeRequesterJobDefaultsQueueTimeout, cfg.Node.Requester.JobDefaults.QueueTimeout.AsTimeDuration())
p.Viper.SetDefault(NodeRequesterExternalVerifierHook, cfg.Node.Requester.ExternalVerifierHook)
Expand Down Expand Up @@ -314,6 +315,7 @@ func Set(cfg BacalhauConfig, opts ...SetOption) {
p.Viper.Set(NodeComputeControlPlaneSettingsHeartbeatTopic, cfg.Node.Compute.ControlPlaneSettings.HeartbeatTopic)
p.Viper.Set(NodeRequester, cfg.Node.Requester)
p.Viper.Set(NodeRequesterJobDefaults, cfg.Node.Requester.JobDefaults)
p.Viper.Set(NodeRequesterJobDefaultsTotalTimeout, cfg.Node.Requester.JobDefaults.TotalTimeout.AsTimeDuration())
p.Viper.Set(NodeRequesterJobDefaultsExecutionTimeout, cfg.Node.Requester.JobDefaults.ExecutionTimeout.AsTimeDuration())
p.Viper.Set(NodeRequesterJobDefaultsQueueTimeout, cfg.Node.Requester.JobDefaults.QueueTimeout.AsTimeDuration())
p.Viper.Set(NodeRequesterExternalVerifierHook, cfg.Node.Requester.ExternalVerifierHook)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/types/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type S3StorageProviderConfig struct {
}

type JobDefaults struct {
TotalTimeout Duration `yaml:"TotalTimeout"`
ExecutionTimeout Duration `yaml:"ExecutionTimeout"`
QueueTimeout Duration `yaml:"QueueTimeout"`
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/models/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
EvalTriggerJobRegister = "job-register"
EvalTriggerJobCancel = "job-cancel"
EvalTriggerJobQueue = "job-queue"
EvalTriggerJobTimeout = "job-timeout"

EvalTriggerExecFailure = "exec-failure"
EvalTriggerExecUpdate = "exec-update"
EvalTriggerExecTimeout = "exec-timeout"
Expand Down
7 changes: 7 additions & 0 deletions pkg/models/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,10 @@ func (j *Job) AllStorageTypes() []string {
func (j *Job) IsLongRunning() bool {
return j.Type == JobTypeService || j.Type == JobTypeDaemon
}

// IsExpired returns true if the job is still running beyond the expiration time
func (j *Job) IsExpired(expirationTime time.Time) bool {
return !j.IsTerminal() &&
j.Task().Timeouts.TotalTimeout > 0 &&
j.GetCreateTime().Before(expirationTime)
}
2 changes: 1 addition & 1 deletion pkg/models/migration/legacy/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func FromLegacyJobSpec(legacy model.Spec) (*models.Task, error) {
},
Network: network,
Timeouts: &models.TimeoutConfig{
ExecutionTimeout: legacy.Timeout,
TotalTimeout: legacy.Timeout,
},
}
return task, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/models/migration/legacy/to.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func ToLegacyJobSpec(job *models.Job) (*model.Spec, error) {
GPU: job.Task().ResourcesConfig.GPU,
},
Network: networkConfig,
Timeout: job.Task().Timeouts.ExecutionTimeout,
Timeout: job.Task().Timeouts.TotalTimeout,
Inputs: inputs,
Outputs: outputs,
Annotations: annotations,
Expand Down Expand Up @@ -210,7 +210,7 @@ func ToLegacyJobStatus(job models.Job, executions []models.Execution) (*model.Jo
Version: int(job.Revision),
CreateTime: time.Unix(0, job.CreateTime),
UpdateTime: time.Unix(0, job.ModifyTime),
TimeoutAt: time.Unix(job.Task().Timeouts.ExecutionTimeout, job.CreateTime),
TimeoutAt: time.Unix(job.Task().Timeouts.TotalTimeout, job.CreateTime),
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/node/config_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewDefaultComputeParam(storagePath string) ComputeConfigParams {

var DefaultRequesterConfig = RequesterConfigParams{
JobDefaults: transformer.JobDefaults{
ExecutionTimeout: model.NoJobTimeout,
TotalTimeout: model.NoJobTimeout,
},

HousekeepingBackgroundTaskInterval: 30 * time.Second,
Expand Down Expand Up @@ -82,7 +82,7 @@ var DefaultRequesterConfig = RequesterConfigParams{

var TestRequesterConfig = RequesterConfigParams{
JobDefaults: transformer.JobDefaults{
ExecutionTimeout: 30 * time.Second,
TotalTimeout: 30 * time.Second,
},
HousekeepingBackgroundTaskInterval: 30 * time.Second,
HousekeepingTimeoutBuffer: 100 * time.Millisecond,
Expand Down
14 changes: 7 additions & 7 deletions pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ func NewRequesterNode(
}

endpoint := requester.NewBaseEndpoint(&requester.BaseEndpointParams{
ID: nodeID,
EventEmitter: eventEmitter,
ComputeEndpoint: computeProxy,
Store: jobStore,
StorageProviders: storageProvider,
DefaultJobExecutionTimeout: requesterConfig.JobDefaults.ExecutionTimeout,
DefaultPublisher: requesterConfig.DefaultPublisher,
ID: nodeID,
EventEmitter: eventEmitter,
ComputeEndpoint: computeProxy,
Store: jobStore,
StorageProviders: storageProvider,
DefaultJobTimeout: requesterConfig.JobDefaults.TotalTimeout,
DefaultPublisher: requesterConfig.DefaultPublisher,
})

var translationProvider translation.TranslatorProvider
Expand Down
14 changes: 12 additions & 2 deletions pkg/orchestrator/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ const (
EventTopicJobSubmission models.EventTopic = "Submission"
EventTopicJobScheduling models.EventTopic = "Scheduling"
EventTopicExecutionTimeout models.EventTopic = "Exec Timeout"
EventTopicJobTimeout models.EventTopic = "Job Timeout"
)

const (
jobSubmittedMessage = "Job submitted"
jobTranslatedMessage = "Job tasks translated to new type"
jobStopRequestedMessage = "Job requested to stop before completion"
jobExhaustedRetriesMessage = "Job failed because it has been retried too many times"
JobTimeoutMessage = "Job timed out"

execStoppedByJobStopMessage = "Execution stop requested because job has been stopped"
execStoppedByNodeUnhealthyMessage = "Execution stop requested because node has disappeared"
Expand All @@ -27,7 +29,7 @@ const (
execFailedMessage = "Execution did not complete successfully"

executionTimeoutMessage = "Execution timed out"
executionTimeoutHint = "Try increasing the task timeout or reducing the task size"
timeoutHint = "Try increasing the task timeout or reducing the task size"
)

func event(topic models.EventTopic, msg string, details map[string]string) models.Event {
Expand Down Expand Up @@ -60,6 +62,14 @@ func JobExhaustedRetriesEvent() models.Event {
return event(EventTopicJobScheduling, jobExhaustedRetriesMessage, map[string]string{})
}

func JobTimeoutEvent(timeout time.Duration) models.Event {
e := models.NewEvent(EventTopicJobTimeout).
WithError(fmt.Errorf("%s. Job took longer than %s", JobTimeoutMessage, timeout)).
WithHint(timeoutHint).
WithFailsExecution(true)
return *e
}

func ExecStoppedByJobStopEvent() models.Event {
return event(EventTopicJobScheduling, execStoppedByJobStopMessage, map[string]string{})
}
Expand All @@ -71,7 +81,7 @@ func ExecStoppedByNodeUnhealthyEvent() models.Event {
func ExecStoppedByExecutionTimeoutEvent(timeout time.Duration) models.Event {
e := models.NewEvent(EventTopicExecutionTimeout).
WithError(fmt.Errorf("%s. Execution took longer than %s", executionTimeoutMessage, timeout)).
WithHint(executionTimeoutHint).
WithHint(timeoutHint).
WithFailsExecution(true)
return *e
}
Expand Down
Loading

0 comments on commit 0e0325b

Please sign in to comment.