Skip to content

Commit

Permalink
add job Queued state (#4121)
Browse files Browse the repository at this point in the history
Improves visibility of queued jobs by adding a dedicated `Queued` state
  • Loading branch information
wdbaruni committed Jun 22, 2024
1 parent 41eec67 commit 5f9aeee
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pkg/models/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ const (
// yet scheduled.
JobStateTypePending

// JobStateTypeQueued is the state of a job that has been evaluated but no
// matching nodes are available yet.
JobStateTypeQueued

// JobStateTypeRunning is the state of a job that has been scheduled, with at
// least one active execution.
JobStateTypeRunning
Expand Down
13 changes: 7 additions & 6 deletions pkg/models/job_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/models/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func (p *Plan) MarkJobRunningIfEligible() {
p.DesiredJobState = JobStateTypeRunning
}

// MarkJobQueued marks the job as pending.
func (p *Plan) MarkJobQueued(event Event) {
p.DesiredJobState = JobStateTypeQueued
p.Event = event
}

func (p *Plan) MarkJobFailed(event Event) {
p.DesiredJobState = JobStateTypeFailed
p.Event = event
Expand Down
10 changes: 10 additions & 0 deletions pkg/orchestrator/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
const (
EventTopicJobSubmission models.EventTopic = "Submission"
EventTopicJobScheduling models.EventTopic = "Scheduling"
EventTopicJobQueueing models.EventTopic = "Queueing"
EventTopicExecutionTimeout models.EventTopic = "Exec Timeout"
EventTopicJobTimeout models.EventTopic = "Job Timeout"
)

const (
jobSubmittedMessage = "Job submitted"
jobTranslatedMessage = "Job tasks translated to new type"
jobQueuedMessage = "Job queued"
jobStopRequestedMessage = "Job requested to stop before completion"
jobExhaustedRetriesMessage = "Job failed because it has been retried too many times"
JobTimeoutMessage = "Job timed out"
Expand Down Expand Up @@ -70,6 +72,14 @@ func JobTimeoutEvent(timeout time.Duration) models.Event {
return *e
}

func JobQueueingEvent(reason string) models.Event {
message := jobQueuedMessage
if reason != "" {
message = fmt.Sprintf("%s. %s", message, reason)
}
return *models.NewEvent(EventTopicJobQueueing).WithMessage(message)
}

func ExecStoppedByJobStopEvent() models.Event {
return event(EventTopicJobScheduling, execStoppedByJobStopMessage, map[string]string{})
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/orchestrator/scheduler/batch_service_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,16 @@ func (b *BatchServiceJobScheduler) createMissingExecs(
plan.AppendEvaluation(delayedEvaluation)
log.Ctx(ctx).Debug().Msgf("Creating delayed evaluation %s to retry scheduling job %s in %s due to: %s",
delayedEvaluation.ID, job.ID, waitUntil.Sub(b.clock.Now()), comment)

// if not a single node was matched, then the job if fully queued and we should reflect that
// in the job state and events
if len(matching) == 0 {
// only update the state if the is running, or pending and triggered by job registration
if job.State.StateType == models.JobStateTypeRunning ||
plan.Eval.TriggeredBy == models.EvalTriggerJobRegister {
plan.MarkJobQueued(orchestrator.JobQueueingEvent(comment))
}
}
}
return nil
}
Expand Down

0 comments on commit 5f9aeee

Please sign in to comment.