Skip to content

Commit

Permalink
compute nodes no longer reject bids based on queued capacity (#4002)
Browse files Browse the repository at this point in the history
This PR not only closes #3992, but also adds a node ranker based on
available and queued capacity, which we didn't have before. I had to
introduce the ranker along with this PR to make sure some tests are
passing that were relying on bid rejection due to exceeding resource
limits, specifically `TestParallelGPU`
  • Loading branch information
wdbaruni committed May 20, 2024
1 parent 72b378f commit a81f56f
Show file tree
Hide file tree
Showing 29 changed files with 701 additions and 275 deletions.
3 changes: 0 additions & 3 deletions cmd/cli/config/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ func autoConfig(ctx context.Context, settings *autoSettings) error {
if err := setResources(types.NodeComputeCapacityDefaultJobResourceLimits, settings.DefaultPercentage, physicalResources); err != nil {
return err
}
if err := setResources(types.NodeComputeCapacityQueueResourceLimits, settings.QueuePercentage, physicalResources); err != nil {
return err
}

return nil
}
Expand Down
4 changes: 1 addition & 3 deletions cmd/cli/serve/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ func GetComputeConfig(ctx context.Context, createExecutionStore bool) (node.Comp
}

totalResources, totalErr := cfg.Capacity.TotalResourceLimits.ToResources()
queueResources, queueErr := cfg.Capacity.QueueResourceLimits.ToResources()
jobResources, jobErr := cfg.Capacity.JobResourceLimits.ToResources()
defaultResources, defaultErr := cfg.Capacity.DefaultJobResourceLimits.ToResources()
if err := errors.Join(totalErr, queueErr, jobErr, defaultErr); err != nil {
if err := errors.Join(totalErr, jobErr, defaultErr); err != nil {
return node.ComputeConfig{}, err
}

Expand All @@ -56,7 +55,6 @@ func GetComputeConfig(ctx context.Context, createExecutionStore bool) (node.Comp

return node.NewComputeConfigWith(node.ComputeConfigParams{
TotalResourceLimits: *totalResources,
QueueResourceLimits: *queueResources,
JobResourceLimits: *jobResources,
DefaultJobResourceLimits: *defaultResources,
IgnorePhysicalResourceLimits: cfg.Capacity.IgnorePhysicalResourceLimits,
Expand Down
10 changes: 0 additions & 10 deletions docs/docs/dev/cli-reference/all-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,6 @@ Config File:
disk: 568 GB
gpu: "0"
memory: 52 GB
queueresourcelimits:
cpu: 15000m
disk: 1.1 TB
gpu: "0"
memory: 103 GB
totalresourcelimits:
cpu: 7500m
disk: 568 GB
Expand Down Expand Up @@ -450,11 +445,6 @@ Config File:
disk: 568 GB
gpu: "0"
memory: 52 GB
queueresourcelimits:
cpu: 50000m
disk: 3.8 TB
gpu: "0"
memory: 344 GB
totalresourcelimits:
cpu: 7500m
disk: 568 GB
Expand Down
15 changes: 0 additions & 15 deletions docs/docs/dev/cli-reference/cli/config/auto-resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ bacalhau config auto-resources [flags]
disk: 568 GB
gpu: "0"
memory: 52 GB
queueresourcelimits:
cpu: 15000m
disk: 1.1 TB
gpu: "0"
memory: 103 GB
totalresourcelimits:
cpu: 7500m
disk: 568 GB
Expand Down Expand Up @@ -96,11 +91,6 @@ bacalhau config auto-resources [flags]
disk: 568 GB
gpu: "0"
memory: 52 GB
queueresourcelimits:
cpu: 50000m
disk: 3.8 TB
gpu: "0"
memory: 344 GB
totalresourcelimits:
cpu: 7500m
disk: 568 GB
Expand Down Expand Up @@ -132,11 +122,6 @@ bacalhau config auto-resources [flags]
disk: 190 GB
gpu: "0"
memory: 17 GB
queueresourcelimits:
cpu: 15000m
disk: 1.1 TB
gpu: "0"
memory: 103 GB
totalresourcelimits:
cpu: 2500m
disk: 190 GB
Expand Down
50 changes: 0 additions & 50 deletions pkg/bidstrategy/resource/capacity_available_strategy.go

This file was deleted.

36 changes: 20 additions & 16 deletions pkg/compute/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,16 @@ func (b Bidder) runSemanticBidding(
strategyType := reflect.TypeOf(s).String()
resp, err := s.ShouldBid(ctx, request)

log.Ctx(ctx).WithLevel(logger.ErrOrDebug(err)).
Err(err).
Str("Job", request.Job.ID).
Str("Strategy", strategyType).
Bool("Bid", resp.ShouldBid).
Bool("Wait", resp.ShouldWait).
Str("Reason", resp.Reason).
Send()
if err != nil || !resp.ShouldBid {
log.Ctx(ctx).WithLevel(logger.ErrOrDebug(err)).
Err(err).
Str("Job", request.Job.ID).
Str("Strategy", strategyType).
Bool("Bid", resp.ShouldBid).
Bool("Wait", resp.ShouldWait).
Str("Reason", resp.Reason).
Send()
}

if err != nil {
// NB: failure here results in a callback to OnComputeFailure
Expand Down Expand Up @@ -335,14 +337,16 @@ func (b Bidder) runResourceBidding(
strategyType := reflect.TypeOf(s).String()
resp, err := s.ShouldBidBasedOnUsage(ctx, request, *resourceUsage)

log.Ctx(ctx).WithLevel(logger.ErrOrDebug(err)).
Err(err).
Str("Job", request.Job.ID).
Str("Strategy", strategyType).
Bool("Bid", resp.ShouldBid).
Bool("Wait", resp.ShouldWait).
Str("Reason", resp.Reason).
Send()
if err != nil || !resp.ShouldBid {
log.Ctx(ctx).WithLevel(logger.ErrOrDebug(err)).
Err(err).
Str("Job", request.Job.ID).
Str("Strategy", strategyType).
Bool("Bid", resp.ShouldBid).
Bool("Wait", resp.ShouldWait).
Str("Reason", resp.Reason).
Send()
}

if err != nil {
// NB: failure here results in a callback to OnComputeFailure
Expand Down
11 changes: 11 additions & 0 deletions pkg/compute/capacity/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ type Tracker interface {
Remove(ctx context.Context, usage models.Resources)
}

// UsageTracker keeps track of the current resource usage of the compute node.
// Useful when tracking jobs in the queue pending and haven't started yet.
type UsageTracker interface {
// Add adds the given resource usage to the tracker.
Add(ctx context.Context, usage models.Resources)
// Remove removes the given resource usage from the tracker.
Remove(ctx context.Context, usage models.Resources)
// GetUsedCapacity returns the current resource usage of the tracker
GetUsedCapacity(ctx context.Context) models.Resources
}

// UsageCalculator calculates the resource usage of a job.
// Can also be used to populate the resource usage of a job with default values if not defined
type UsageCalculator interface {
Expand Down
41 changes: 41 additions & 0 deletions pkg/compute/capacity/usag_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//go:build unit || !integration

package capacity

import (
"context"
"testing"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/stretchr/testify/suite"
)

type LocalUsageTrackerTestSuite struct {
suite.Suite
tracker *LocalUsageTracker
}

func (s *LocalUsageTrackerTestSuite) SetupTest() {
s.tracker = NewLocalUsageTracker()
}

func (s *LocalUsageTrackerTestSuite) TestAddAndRemove() {
ctx := context.Background()
usage := models.Resources{CPU: 2, Memory: 1024, Disk: 10000, GPU: 1}

// Test Add
s.tracker.Add(ctx, usage)

usedCapacity := s.tracker.GetUsedCapacity(ctx)
s.Require().Equal(usage, usedCapacity)

// Test Remove
s.tracker.Remove(ctx, usage)

usedCapacity = s.tracker.GetUsedCapacity(ctx)
s.Require().True(usedCapacity.IsZero())
}

func TestLocalUsageTrackerTestSuite(t *testing.T) {
suite.Run(t, new(LocalUsageTrackerTestSuite))
}
38 changes: 38 additions & 0 deletions pkg/compute/capacity/usage_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package capacity

import (
"context"
"sync"

"github.com/bacalhau-project/bacalhau/pkg/models"
)

// LocalUsageTracker keeps track of the resources used regardless of the total capacity.
// It is useful when tracking jobs in the queue pending and haven't started yet.
type LocalUsageTracker struct {
usedCapacity models.Resources
mu sync.Mutex
}

func NewLocalUsageTracker() *LocalUsageTracker {
return &LocalUsageTracker{}
}

func (t *LocalUsageTracker) Add(ctx context.Context, usage models.Resources) {
t.mu.Lock()
defer t.mu.Unlock()
t.usedCapacity = *t.usedCapacity.Add(usage)
}

func (t *LocalUsageTracker) Remove(ctx context.Context, usage models.Resources) {
t.mu.Lock()
defer t.mu.Unlock()
t.usedCapacity = *t.usedCapacity.Sub(usage)
}

func (t *LocalUsageTracker) GetUsedCapacity(ctx context.Context) models.Resources {
return t.usedCapacity
}

// compile-time check that LocalUsageTracker implements Tracker
var _ UsageTracker = (*LocalUsageTracker)(nil)
31 changes: 10 additions & 21 deletions pkg/compute/executor_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ExecutorBufferParams struct {
DelegateExecutor Executor
Callback Callback
RunningCapacityTracker capacity.Tracker
EnqueuedCapacityTracker capacity.Tracker
EnqueuedUsageTracker capacity.UsageTracker
DefaultJobExecutionTimeout time.Duration
}

Expand All @@ -44,7 +44,7 @@ type ExecutorBufferParams struct {
type ExecutorBuffer struct {
ID string
runningCapacity capacity.Tracker
enqueuedCapacity capacity.Tracker
enqueuedCapacity capacity.UsageTracker
delegateService Executor
callback Callback
running map[string]*bufferTask
Expand All @@ -61,7 +61,7 @@ func NewExecutorBuffer(params ExecutorBufferParams) *ExecutorBuffer {
r := &ExecutorBuffer{
ID: params.ID,
runningCapacity: params.RunningCapacityTracker,
enqueuedCapacity: params.EnqueuedCapacityTracker,
enqueuedCapacity: params.EnqueuedUsageTracker,
delegateService: params.DelegateExecutor,
callback: params.Callback,
running: make(map[string]*bufferTask),
Expand Down Expand Up @@ -108,18 +108,7 @@ func (s *ExecutorBuffer) Run(ctx context.Context, localExecutionState store.Loca
err = models.NewBaseError("execution %s already running", execution.ID)
return err
}
if added := s.enqueuedCapacity.AddIfHasCapacity(ctx, *execution.TotalAllocatedResources()); added == nil {
err = models.NewBaseError("not enough capacity to enqueue job").WithRetryable()
return err
} else {
// Update the execution to include all the resources that have
// actually been allocated. Effectively this is picking the GPU(s)
// that the job will use. Note that this is not persisted here, as
// it was based on current usage information which would change
// under a restart, so it will only persist if the job starts
execution.AllocateResources(execution.Job.Task().Name, *added)
}

s.enqueuedCapacity.Add(ctx, *execution.TotalAllocatedResources())
s.queuedTasks.Enqueue(newBufferTask(localExecutionState), int64(execution.Job.Priority))
s.deque()
return err
Expand Down Expand Up @@ -182,21 +171,21 @@ func (s *ExecutorBuffer) deque() {
for i := 0; i < max; i++ {
qitem := s.queuedTasks.DequeueWhere(func(task *bufferTask) bool {
// If we don't have enough resources to run this task, then we will skip it
queued := task.localExecutionState.Execution.TotalAllocatedResources()
added := s.runningCapacity.AddIfHasCapacity(ctx, *queued)
if added == nil {
queuedResources := task.localExecutionState.Execution.TotalAllocatedResources()
allocatedResources := s.runningCapacity.AddIfHasCapacity(ctx, *queuedResources)
if allocatedResources == nil {
return false
}

// Update the execution to include all the resources that have
// actually been allocated
task.localExecutionState.Execution.AllocateResources(
task.localExecutionState.Execution.Job.Task().Name,
*added,
*allocatedResources,
)

// Claim the resources now so that we don't count allocated resources
s.enqueuedCapacity.Remove(ctx, *queued)
// Claim the resources now so that we don't count queued resources
s.enqueuedCapacity.Remove(ctx, *queuedResources)
return true
})

Expand Down
Loading

0 comments on commit a81f56f

Please sign in to comment.