Skip to content

Commit

Permalink
api/v1/orchestrator/jobs/{id}/executions endpoint, move sorting to …
Browse files Browse the repository at this point in the history
…job store rather then having it at endpoint level (#4193)

## What is this PR about ?
With this PR we shifting the sorting for executions endpoint a the job
store level rather then having at endpoint level.


## GitHub Keywords
closes #3852
  • Loading branch information
udsamani committed Jul 4, 2024
1 parent 9c41fd7 commit 7290224
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 142 deletions.
2 changes: 1 addition & 1 deletion cmd/cli/job/executions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/util/templates"
)

var executionsOrderByFields = []string{"modify_time", "create_time", "id", "state"}
var executionsOrderByFields = []string{"modified_at", "created_at"}

var (
executionShort = `List executions for a job by id.`
Expand Down
87 changes: 65 additions & 22 deletions pkg/jobstore/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"slices"
"sort"
"strings"
"time"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/lib/marshaller"
"github.com/bacalhau-project/bacalhau/pkg/lib/math"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/util"
"github.com/bacalhau-project/bacalhau/pkg/util/idgen"
)

Expand Down Expand Up @@ -145,7 +147,8 @@ func (b *BoltJobStore) BeginTx(ctx context.Context) (jobstore.TxContext, error)
func (b *BoltJobStore) Watch(ctx context.Context,
types jobstore.StoreWatcherType,
events jobstore.StoreEventType,
options ...jobstore.WatcherOption) *jobstore.Watcher {
options ...jobstore.WatcherOption,
) *jobstore.Watcher {
return b.watchersManager.NewWatcher(ctx, types, events, options...)
}

Expand Down Expand Up @@ -266,6 +269,33 @@ func (b *BoltJobStore) getExecutions(tx *bolt.Tx, options jobstore.GetExecutions
job = &j
}

// Sort By Given Order By
var sortFnc func(a, b models.Execution) int
switch options.OrderBy {
// create_time will eventually be deprectated. It is being used for backward compatibility.
case "create_time", "created_at", "": //nolint: goconst
sortFnc = func(a, b models.Execution) int { return util.Compare[int64]{}.Cmp(a.CreateTime, b.CreateTime) }
// modify_time will eventually be deprecated. It is being used for backward compatibility.
case "modify_time", "modified_at":
sortFnc = func(a, b models.Execution) int { return util.Compare[int64]{}.Cmp(a.ModifyTime, b.ModifyTime) }
default:
return nil, fmt.Errorf("OrderBy %s not supported for getExecutions", options.OrderBy)
}

if options.Reverse {
baseSortFnc := sortFnc
sortFnc = func(a, b models.Execution) int {
r := baseSortFnc(a, b)
if r == -1 {
return 1
}
if r == 1 {
return -1
}
return 0
}
}

bkt, err := NewBucketPath(BucketJobs, jobID, BucketJobExecutions).Get(tx, false)
if err != nil {
return nil, err
Expand All @@ -285,6 +315,14 @@ func (b *BoltJobStore) getExecutions(tx *bolt.Tx, options jobstore.GetExecutions
return nil
})

// sort executions
slices.SortFunc(execs, sortFnc)

// apply limit
if options.Limit > 0 && len(execs) > options.Limit {
execs = execs[:options.Limit]
}

return execs, err
}

Expand Down Expand Up @@ -325,26 +363,30 @@ func (b *BoltJobStore) getJobs(tx *bolt.Tx, query jobstore.JobQuery) (*jobstore.
}

// Sort the jobs according to the query.SortBy and query.SortOrder
listSorter := func(i, j int) bool {
switch query.SortBy {
case "modified_at":
if query.SortReverse {
return result[i].ModifyTime > result[j].ModifyTime
} else {
return result[i].ModifyTime < result[j].ModifyTime
var sortFunc func(a, b models.Job) int
switch query.SortBy {
case "created_at", "":
sortFunc = func(a, b models.Job) int { return util.Compare[int64]{}.Cmp(a.CreateTime, b.CreateTime) }
case "modified_at":
sortFunc = func(a, b models.Job) int { return util.Compare[int64]{}.Cmp(a.ModifyTime, b.ModifyTime) }
default:
return nil, fmt.Errorf("OrderBy %s not supported for listJobs", query.SortBy)
}
if query.SortReverse {
baseSortFnc := sortFunc
sortFunc = func(a, b models.Job) int {
r := baseSortFnc(a, b)
if r == -1 {
return 1
}
default:
// We apply created_at as a default sort so that we can use it for pagination.
// Without a known default we won't have a stable sort that makes sense for
// offsets/limits.
if query.SortReverse {
return result[i].CreateTime > result[j].CreateTime
} else {
return result[i].CreateTime < result[j].CreateTime
if r == 1 {
return -1
}
return 0
}
}
sort.Slice(result, listSorter)

slices.SortFunc(result, sortFunc)

// If we have a selector, filter the results to only those that match
if query.Selector != nil {
Expand Down Expand Up @@ -587,7 +629,8 @@ func createInProgressIndexKey(job *models.Job) string {
// GetJobHistory returns the job (and execution) history for the provided options
func (b *BoltJobStore) GetJobHistory(ctx context.Context,
jobID string,
options jobstore.JobHistoryFilterOptions) ([]models.JobHistory, error) {
options jobstore.JobHistoryFilterOptions,
) ([]models.JobHistory, error) {
var history []models.JobHistory
err := b.view(ctx, func(tx *bolt.Tx) (err error) {
history, err = b.getJobHistory(tx, jobID, options)
Expand All @@ -598,7 +641,8 @@ func (b *BoltJobStore) GetJobHistory(ctx context.Context,
}

func (b *BoltJobStore) getJobHistory(tx *bolt.Tx, jobID string,
options jobstore.JobHistoryFilterOptions) ([]models.JobHistory, error) {
options jobstore.JobHistoryFilterOptions,
) ([]models.JobHistory, error) {
var history []models.JobHistory

jobID, err := b.reifyJobID(tx, jobID)
Expand All @@ -621,7 +665,6 @@ func (b *BoltJobStore) getJobHistory(tx *bolt.Tx, jobID string,
history = append(history, item)
return nil
})

if err != nil {
return nil, err
}
Expand All @@ -644,7 +687,6 @@ func (b *BoltJobStore) getJobHistory(tx *bolt.Tx, jobID string,
history = append(history, item)
return nil
})

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1096,7 +1138,8 @@ func (b *BoltJobStore) updateExecution(tx *bolt.Tx, request jobstore.UpdateExecu
}

func (b *BoltJobStore) appendExecutionHistory(tx *bolt.Tx, updated models.Execution,
previous models.ExecutionStateType, event models.Event) error {
previous models.ExecutionStateType, event models.Event,
) error {
historyEntry := models.JobHistory{
Type: models.JobHistoryTypeExecutionLevel,
JobID: updated.JobID,
Expand Down
Loading

0 comments on commit 7290224

Please sign in to comment.