Skip to content

Commit

Permalink
filter out nodes with high queue capacity (#4051)
Browse files Browse the repository at this point in the history
This change is related to
#4049 where instead of
queueing locally in each compute node, we try to queue in the requester
instead so that jobs are scheduled to new nodes that join, or to the
first node that frees up its resources.

The current state is we don't filter out nodes if they don't have
immediate available capacity or if their queue is growing large. We rank
nodes with more capacity higher, but we don't filter out nodes with no
capacity. This change allows operators to define
`NodeOverSubscriptionFactor` in the requester node to allow it to filter
out any compute node with total active and queue capacity beyond the
factor. The default is `1.5` which means the compute node can queue
locally half of its total capacity in addition to the running capacity.

## Testing
This change has been tested with
#4049 in dev stack as
documented in that issue
  • Loading branch information
wdbaruni committed Jun 3, 2024
1 parent 09388c1 commit 067717d
Show file tree
Hide file tree
Showing 20 changed files with 700 additions and 11 deletions.
1 change: 1 addition & 0 deletions cmd/cli/serve/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func GetRequesterConfig(ctx context.Context, cfg types.RequesterConfig, createJo
EvalBrokerSubsequentRetryDelay: time.Duration(cfg.EvaluationBroker.EvalBrokerSubsequentRetryDelay),
EvalBrokerMaxRetryCount: cfg.EvaluationBroker.EvalBrokerMaxRetryCount,
WorkerCount: cfg.Worker.WorkerCount,
NodeOverSubscriptionFactor: cfg.Scheduler.NodeOverSubscriptionFactor,
WorkerEvalDequeueTimeout: time.Duration(cfg.Worker.WorkerEvalDequeueTimeout),
WorkerEvalDequeueBaseBackoff: time.Duration(cfg.Worker.WorkerEvalDequeueBaseBackoff),
WorkerEvalDequeueMaxBackoff: time.Duration(cfg.Worker.WorkerEvalDequeueMaxBackoff),
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/configenv/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ var DevelopmentRequesterConfig = types.RequesterConfig{
WorkerEvalDequeueMaxBackoff: types.Duration(30 * time.Second),
},
Scheduler: types.SchedulerConfig{
QueueBackoff: types.Duration(30 * time.Second),
QueueBackoff: types.Duration(30 * time.Second),
NodeOverSubscriptionFactor: 1.5,
},
JobDefaults: types.JobDefaults{
ExecutionTimeout: types.Duration(30 * time.Minute),
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/configenv/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ var LocalRequesterConfig = types.RequesterConfig{
WorkerEvalDequeueMaxBackoff: types.Duration(30 * time.Second),
},
Scheduler: types.SchedulerConfig{
QueueBackoff: types.Duration(30 * time.Second),
QueueBackoff: types.Duration(30 * time.Second),
NodeOverSubscriptionFactor: 1.5,
},
JobDefaults: types.JobDefaults{
ExecutionTimeout: types.Duration(30 * time.Minute),
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/configenv/production.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ var ProductionRequesterConfig = types.RequesterConfig{
WorkerEvalDequeueMaxBackoff: types.Duration(30 * time.Second),
},
Scheduler: types.SchedulerConfig{
QueueBackoff: types.Duration(1 * time.Minute),
QueueBackoff: types.Duration(1 * time.Minute),
NodeOverSubscriptionFactor: 1.5,
},
JobDefaults: types.JobDefaults{
ExecutionTimeout: types.Duration(30 * time.Minute),
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/configenv/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ var StagingRequesterConfig = types.RequesterConfig{
WorkerEvalDequeueMaxBackoff: types.Duration(30 * time.Second),
},
Scheduler: types.SchedulerConfig{
QueueBackoff: types.Duration(1 * time.Minute),
QueueBackoff: types.Duration(1 * time.Minute),
NodeOverSubscriptionFactor: 1.5,
},
JobDefaults: types.JobDefaults{
ExecutionTimeout: types.Duration(30 * time.Minute),
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/configenv/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ var TestingRequesterConfig = types.RequesterConfig{
WorkerEvalDequeueMaxBackoff: types.Duration(30 * time.Second),
},
Scheduler: types.SchedulerConfig{
QueueBackoff: types.Duration(5 * time.Second),
QueueBackoff: types.Duration(5 * time.Second),
NodeOverSubscriptionFactor: 1.5,
},
JobDefaults: types.JobDefaults{
ExecutionTimeout: types.Duration(30 * time.Second),
Expand Down
1 change: 1 addition & 0 deletions pkg/config/types/generated_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ const NodeRequesterWorkerWorkerEvalDequeueBaseBackoff = "Node.Requester.Worker.W
const NodeRequesterWorkerWorkerEvalDequeueMaxBackoff = "Node.Requester.Worker.WorkerEvalDequeueMaxBackoff"
const NodeRequesterScheduler = "Node.Requester.Scheduler"
const NodeRequesterSchedulerQueueBackoff = "Node.Requester.Scheduler.QueueBackoff"
const NodeRequesterSchedulerNodeOverSubscriptionFactor = "Node.Requester.Scheduler.NodeOverSubscriptionFactor"
const NodeRequesterStorageProvider = "Node.Requester.StorageProvider"
const NodeRequesterStorageProviderS3 = "Node.Requester.StorageProvider.S3"
const NodeRequesterStorageProviderS3PreSignedURLDisabled = "Node.Requester.StorageProvider.S3.PreSignedURLDisabled"
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/types/generated_viper_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func SetDefaults(cfg BacalhauConfig, opts ...SetOption) {
p.Viper.SetDefault(NodeRequesterWorkerWorkerEvalDequeueMaxBackoff, cfg.Node.Requester.Worker.WorkerEvalDequeueMaxBackoff.AsTimeDuration())
p.Viper.SetDefault(NodeRequesterScheduler, cfg.Node.Requester.Scheduler)
p.Viper.SetDefault(NodeRequesterSchedulerQueueBackoff, cfg.Node.Requester.Scheduler.QueueBackoff.AsTimeDuration())
p.Viper.SetDefault(NodeRequesterSchedulerNodeOverSubscriptionFactor, cfg.Node.Requester.Scheduler.NodeOverSubscriptionFactor)
p.Viper.SetDefault(NodeRequesterStorageProvider, cfg.Node.Requester.StorageProvider)
p.Viper.SetDefault(NodeRequesterStorageProviderS3, cfg.Node.Requester.StorageProvider.S3)
p.Viper.SetDefault(NodeRequesterStorageProviderS3PreSignedURLDisabled, cfg.Node.Requester.StorageProvider.S3.PreSignedURLDisabled)
Expand Down Expand Up @@ -343,6 +344,7 @@ func Set(cfg BacalhauConfig, opts ...SetOption) {
p.Viper.Set(NodeRequesterWorkerWorkerEvalDequeueMaxBackoff, cfg.Node.Requester.Worker.WorkerEvalDequeueMaxBackoff.AsTimeDuration())
p.Viper.Set(NodeRequesterScheduler, cfg.Node.Requester.Scheduler)
p.Viper.Set(NodeRequesterSchedulerQueueBackoff, cfg.Node.Requester.Scheduler.QueueBackoff.AsTimeDuration())
p.Viper.Set(NodeRequesterSchedulerNodeOverSubscriptionFactor, cfg.Node.Requester.Scheduler.NodeOverSubscriptionFactor)
p.Viper.Set(NodeRequesterStorageProvider, cfg.Node.Requester.StorageProvider)
p.Viper.Set(NodeRequesterStorageProviderS3, cfg.Node.Requester.StorageProvider.S3)
p.Viper.Set(NodeRequesterStorageProviderS3PreSignedURLDisabled, cfg.Node.Requester.StorageProvider.S3.PreSignedURLDisabled)
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/types/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type WorkerConfig struct {
}

type SchedulerConfig struct {
QueueBackoff Duration `yaml:"QueueBackoff"`
QueueBackoff Duration `yaml:"QueueBackoff"`
NodeOverSubscriptionFactor float64 `yaml:"NodeOverSubscriptionFactor"`
}

type StorageProviderConfig struct {
Expand Down
26 changes: 25 additions & 1 deletion pkg/lib/validate/numbers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,31 @@ import (
// It returns an error if the value is not greater than zero, using the provided message and arguments.
// T is a generic type constrained to math.Number, allowing the function to work with various numeric types.
func IsGreaterThanZero[T math.Number](value T, msg string, args ...any) error {
if value <= 0 {
return IsGreaterThan(value, 0, msg, args...)
}

// IsGreaterOrEqualToZero checks if the provided numeric value (of type T) is greater or equal to zero.
// It returns an error if the value is less than zero, using the provided message and arguments.
// T is a generic type constrained to math.Number, allowing the function to work with various numeric types.
func IsGreaterOrEqualToZero[T math.Number](value T, msg string, args ...any) error {
return IsGreaterOrEqual(value, 0, msg, args...)
}

// IsGreaterThan checks if the first provided numeric value (of type T) is greater than the second.
// It returns an error if the first value is not greater than the second, using the provided message and arguments.
// T is a generic type constrained to math.Number, allowing the function to work with various numeric types.
func IsGreaterThan[T math.Number](value, other T, msg string, args ...any) error {
if value <= other {
return createError(msg, args...)
}
return nil
}

// IsGreaterOrEqual checks if the first provided numeric value (of type T) is greater or equal to the second.
// It returns an error if the first value is less than the second, using the provided message and arguments.
// T is a generic type constrained to math.Number, allowing the function to work with various numeric types.
func IsGreaterOrEqual[T math.Number](value, other T, msg string, args ...any) error {
if value < other {
return createError(msg, args...)
}
return nil
Expand Down
83 changes: 83 additions & 0 deletions pkg/lib/validate/numbers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,86 @@ func TestIsGreaterThanZero(t *testing.T) {
t.Errorf("IsGreaterThanZero failed: unexpected error for float value %v", floatValue)
}
}

func TestIsGreaterOrEqualToZero(t *testing.T) {
// Test with value less than zero
err := IsGreaterOrEqualToZero(-1, "value should be greater or equal to zero")
if err == nil || err.Error() != "value should be greater or equal to zero" {
t.Errorf("IsGreaterOrEqualToZero failed: expected error for value -1")
}

// Test with zero
err = IsGreaterOrEqualToZero(0, "value should be greater or equal to zero")
if err != nil {
t.Errorf("IsGreaterOrEqualToZero failed: unexpected error for value 0")
}

// Test with value greater than zero
err = IsGreaterOrEqualToZero(1, "value should be greater or equal to zero")
if err != nil {
t.Errorf("IsGreaterOrEqualToZero failed: unexpected error for value 1")
}

// Test with different numeric types
var floatValue float64 = 0.0
err = IsGreaterOrEqualToZero(floatValue, "value should be greater or equal to zero")
if err != nil {
t.Errorf("IsGreaterOrEqualToZero failed: unexpected error for float value %v", floatValue)
}
}

func TestIsGreaterThan(t *testing.T) {
// Test with value less than other
err := IsGreaterThan(1, 2, "value should be greater than other")
if err == nil || err.Error() != "value should be greater than other" {
t.Errorf("IsGreaterThan failed: expected error for values 1, 2")
}

// Test with value equal to other
err = IsGreaterThan(2, 2, "value should be greater than other")
if err == nil || err.Error() != "value should be greater than other" {
t.Errorf("IsGreaterThan failed: expected error for values 2, 2")
}

// Test with value greater than other
err = IsGreaterThan(3, 2, "value should be greater than other")
if err != nil {
t.Errorf("IsGreaterThan failed: unexpected error for values 3, 2")
}

// Test with different numeric types
var floatValue float64 = 2.5
var otherFloatValue float64 = 1.5
err = IsGreaterThan(floatValue, otherFloatValue, "value should be greater than other")
if err != nil {
t.Errorf("IsGreaterThan failed: unexpected error for float values %v, %v", floatValue, otherFloatValue)
}
}

func TestIsGreaterOrEqual(t *testing.T) {
// Test with value less than other
err := IsGreaterOrEqual(1, 2, "value should be greater or equal to other")
if err == nil || err.Error() != "value should be greater or equal to other" {
t.Errorf("IsGreaterOrEqual failed: expected error for values 1, 2")
}

// Test with value equal to other
err = IsGreaterOrEqual(2, 2, "value should be greater or equal to other")
if err != nil {
t.Errorf("IsGreaterOrEqual failed: unexpected error for values 2, 2")
}

// Test with value greater than other
err = IsGreaterOrEqual(3, 2, "value should be greater or equal to other")
if err != nil {
t.Errorf("IsGreaterOrEqual failed: unexpected error for values 3, 2")
}

// Test with different numeric types
var floatValue float64 = 1.5
var otherFloatValue float64 = 1.5
err = IsGreaterOrEqual(floatValue, otherFloatValue, "value should be greater or equal to other")
if err != nil {
t.Errorf("IsGreaterOrEqual failed: unexpected error for float values %v, %v", floatValue, otherFloatValue)
}
}
4 changes: 4 additions & 0 deletions pkg/models/node_approval.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func stringToApproval(s string) membership {
return unknown
}

func (t membership) IsUndefined() bool {
return t == unknown
}

func (t membership) IsValid() bool {
return t >= membership(1) && t <= membership(len(strMembershipArray))
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/models/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,16 @@ func (r *Resources) Sub(other Resources) *Resources {
return usage
}

// Multiply returns the product of the resources
func (r *Resources) Multiply(factor float64) *Resources {
return &Resources{
CPU: r.CPU * factor,
Memory: uint64(float64(r.Memory) * factor),
Disk: uint64(float64(r.Disk) * factor),
GPU: uint64(float64(r.GPU) * factor),
}
}

func (r *Resources) LessThan(other Resources) bool {
return r.CPU < other.CPU && r.Memory < other.Memory && r.Disk < other.Disk && r.GPU < other.GPU
}
Expand Down Expand Up @@ -307,7 +317,7 @@ func (r *Resources) IsZero() bool {
func (r *Resources) String() string {
mem := humanize.Bytes(r.Memory)
disk := humanize.Bytes(r.Disk)
return fmt.Sprintf("{CPU: %f, Memory: %s, Disk: %s, GPU: %d}", r.CPU, mem, disk, r.GPU)
return fmt.Sprintf("{CPU: %g, Memory: %s, Disk: %s, GPU: %d}", r.CPU, mem, disk, r.GPU)
}

// AllocatedResources is the set of resources to be used by an execution, which
Expand Down
2 changes: 2 additions & 0 deletions pkg/node/config_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ var TestRequesterConfig = RequesterConfigParams{
WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond,
WorkerEvalDequeueMaxBackoff: 200 * time.Millisecond,

NodeOverSubscriptionFactor: 1.5,

TranslationEnabled: false,

S3PreSignedURLDisabled: false,
Expand Down
12 changes: 10 additions & 2 deletions pkg/node/config_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type RequesterConfigParams struct {
WorkerEvalDequeueMaxBackoff time.Duration

// scheduler config
SchedulerQueueBackoff time.Duration
SchedulerQueueBackoff time.Duration
NodeOverSubscriptionFactor float64

// Should the orchestrator attempt to translate jobs?
TranslationEnabled bool
Expand Down Expand Up @@ -76,10 +77,17 @@ func NewRequesterConfigWithDefaults() (RequesterConfig, error) {

//nolint:gosimple
func NewRequesterConfigWith(params RequesterConfigParams) (RequesterConfig, error) {
if err := mergo.Merge(&params, getRequesterConfigParams()); err != nil {
defaults := getRequesterConfigParams()
if err := mergo.Merge(&params, defaults); err != nil {
return RequesterConfig{}, fmt.Errorf("creating requester config: %w", err)
}

// TODO: move away from how we define approval states as they don't have clear
// zero value and don't play nicely with merge
if params.DefaultApprovalState.IsUndefined() {
params.DefaultApprovalState = defaults.DefaultApprovalState
}

log.Debug().Msgf("Requester config: %+v", params)
return RequesterConfig{
RequesterConfigParams: params,
Expand Down
5 changes: 5 additions & 0 deletions pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func NewRequesterNode(
Info().
Msgf("Nodes joining the cluster will be assigned approval state: %s", requesterConfig.DefaultApprovalState.String())

overSubscriptionNodeRanker, err := ranking.NewOverSubscriptionNodeRanker(requesterConfig.NodeOverSubscriptionFactor)
if err != nil {
return nil, err
}
// compute node ranker
nodeRankerChain := ranking.NewChain()
nodeRankerChain.Add(
Expand All @@ -94,6 +98,7 @@ func NewRequesterNode(
ranking.NewStoragesNodeRanker(),
ranking.NewLabelsNodeRanker(),
ranking.NewMaxUsageNodeRanker(),
overSubscriptionNodeRanker,
ranking.NewMinVersionNodeRanker(ranking.MinVersionNodeRankerParams{MinVersion: requesterConfig.MinBacalhauVersion}),
ranking.NewPreviousExecutionsNodeRanker(ranking.PreviousExecutionsNodeRankerParams{JobStore: jobStore}),
ranking.NewAvailableCapacityNodeRanker(),
Expand Down
76 changes: 76 additions & 0 deletions pkg/orchestrator/selection/ranking/over_subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package ranking

import (
"context"
"fmt"

"github.com/rs/zerolog/log"

"github.com/bacalhau-project/bacalhau/pkg/lib/validate"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator"
)

type OverSubscriptionNodeRanker struct {
factor float64
}

func NewOverSubscriptionNodeRanker(factor float64) (*OverSubscriptionNodeRanker, error) {
err := validate.IsGreaterOrEqual(factor, 1,
"over subscription factor %f must be greater or equal to 1", factor)
if err != nil {
return nil, err
}
return &OverSubscriptionNodeRanker{factor: factor}, nil
}

// RankNodes ranks nodes based on the ratio of queued capacity to total capacity.
// - Rank -1: If the ratio of is greater than the factor, the node is considered over-subscribed.
// - Rank 0: If the node is not over-subscribed.
func (s *OverSubscriptionNodeRanker) RankNodes(
ctx context.Context, job models.Job, nodes []models.NodeInfo) ([]orchestrator.NodeRank, error) {
jobResourceUsage, err := job.Task().ResourcesConfig.ToResources()
if err != nil {
return nil, fmt.Errorf("failed to convert job resources config to resources: %w", err)
}

ranks := make([]orchestrator.NodeRank, len(nodes))
for i, node := range nodes {
var rank int
var reason string

if node.ComputeNodeInfo == nil || node.ComputeNodeInfo.MaxCapacity.IsZero() {
rank = orchestrator.RankUnsuitable
reason = "node queue usage is unknown"
} else {
// overSubscriptionCapacity is the capacity at which the node can accept more jobs
overSubscriptionCapacity := node.ComputeNodeInfo.MaxCapacity.Multiply(s.factor)

// totalUsage is the sub of actively running capacity, queued capacity and new job resources
totalUsage := node.ComputeNodeInfo.MaxCapacity.
Sub(node.ComputeNodeInfo.AvailableCapacity).
Add(node.ComputeNodeInfo.QueueUsedCapacity).
Add(*jobResourceUsage)

if totalUsage.LessThanEq(*overSubscriptionCapacity) {
rank = orchestrator.RankPossible
reason = "node is not over-subscribed"
} else {
rank = orchestrator.RankUnsuitable
reason = "node busy with available capacity " + node.ComputeNodeInfo.AvailableCapacity.String()
if !node.ComputeNodeInfo.QueueUsedCapacity.IsZero() {
reason += " and queue capacity " + node.ComputeNodeInfo.QueueUsedCapacity.String()
}
}
}

ranks[i] = orchestrator.NodeRank{
NodeInfo: node,
Rank: rank,
Reason: reason,
Retryable: true,
}
log.Ctx(ctx).Trace().Object("Rank", ranks[i]).Msg("Ranked node")
}
return ranks, nil
}
Loading

0 comments on commit 067717d

Please sign in to comment.