Skip to content

Commit

Permalink
Merge pull request #3 from Azure/haitao/dependon-fix
Browse files Browse the repository at this point in the history
dependon fix
  • Loading branch information
haitch committed Oct 26, 2022
2 parents 011f502 + ee6ddb5 commit 2ed229d
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 33 deletions.
86 changes: 59 additions & 27 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ func NewJob(name string) *Job {
}

j.rootJob = &StepInfo[interface{}]{
name: "[Start]",
dependOn: []string{},
name: "[Start]",
task: asynctask.Start(context.Background(), func(fctx context.Context) (*interface{}, error) {
fmt.Println("RootJob Added")
// this will pause all steps from starting, until Start() method is called.
Expand All @@ -53,14 +52,14 @@ func NewJob(name string) *Job {
}),
}

j.Steps[j.rootJob.Name()] = j.rootJob
j.stepsDag.Add(j.rootJob.Name())
j.Steps[j.rootJob.GetName()] = j.rootJob
j.stepsDag.Add(j.rootJob.GetName())

return j
}

func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T) *StepInfo[T] {
step := newStepInfo[T](stepName, []string{j.rootJob.Name()})
step := newStepInfo[T](stepName)

instrumentedFunc := func(ctx context.Context) (*T, error) {
j.rootJob.Wait(ctx)
Expand All @@ -69,15 +68,18 @@ func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T)
step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, j.rootJob.Name())
j.registerStepInGraph(stepName, j.rootJob.GetName())

return step
}

func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asynctask.AsyncFunc[T], dependOn []string, optionDecorators ...ExecutionOptionPreparer) (*StepInfo[T], error) {
// manually specified the dependencies, without consume the result.
func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asynctask.AsyncFunc[T], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[T], error) {
step := newStepInfo[T](stepName, optionDecorators...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingStepNames = step.DependsOn()
var precedingTasks []asynctask.Waitable
for _, stepName := range dependOn {
for _, stepName := range precedingStepNames {
if step, ok := j.Steps[stepName]; ok {
precedingTasks = append(precedingTasks, step.Waitable())
} else {
Expand All @@ -87,11 +89,10 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn

// if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet.
if len(precedingTasks) == 0 {
precedingStepNames = append(precedingStepNames, j.rootJob.GetName())
precedingTasks = append(precedingTasks, j.rootJob.Waitable())
}

step := newStepInfo[T](stepName, dependOn, optionDecorators...)

// instrument to :
// replaceRuntimeContext,
// trackStepState
Expand All @@ -115,30 +116,45 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn
step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
if len(dependOn) > 0 {
j.registerStepInGraph(stepName, dependOn...)
} else {
j.registerStepInGraph(stepName, j.rootJob.Name())
}
j.registerStepInGraph(stepName, precedingStepNames...)

return step, nil
}

func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentStep *StepInfo[T], stepFunc asynctask.ContinueFunc[T, S], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[S], error) {
// check parentStepT is in this job
if get, ok := j.Steps[parentStep.Name()]; !ok || get != parentStep {
return nil, fmt.Errorf("step [%s] not found in job", parentStep.Name())
if get, ok := j.Steps[parentStep.GetName()]; !ok || get != parentStep {
return nil, fmt.Errorf("step [%s] not found in job", parentStep.GetName())
}

step := newStepInfo[S](stepName, append(optionDecorators, ExecuteAfter(parentStep))...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingStepNames = step.DependsOn()
var precedingTasks []asynctask.Waitable
for _, stepName := range precedingStepNames {
if step, ok := j.Steps[stepName]; ok {
precedingTasks = append(precedingTasks, step.Waitable())
} else {
return nil, fmt.Errorf("step [%s] not found", stepName)
}
}

step := newStepInfo[S](stepName, []string{parentStep.Name()}, optionDecorators...)
// if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet.
if len(precedingTasks) == 0 {
precedingTasks = append(precedingTasks, j.rootJob.Waitable())
}

// instrument to :
// replaceRuntimeContext
// trackStepState
// retryHandling (TODO)
// errorHandling (TODO)
// timeoutHandling (TODO)
instrumentedFunc := func(_ context.Context, t *T) (*S, error) {
instrumentedFunc := func(ctx context.Context, t *T) (*S, error) {
if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
return nil, err
}
step.state = StepStateRunning
result, err := stepFunc(j.runtimeCtx, t)
if err != nil {
Expand All @@ -152,7 +168,7 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt
step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, parentStep.Name())
j.registerStepInGraph(stepName, precedingStepNames...)
if err := j.stepsDag.Validate(); err != nil {
return nil, fmt.Errorf("cycle dependency detected: %s", err)
}
Expand All @@ -161,15 +177,30 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt

func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, parentStepT *StepInfo[T], parentStepS *StepInfo[S], stepFunc asynctask.AfterBothFunc[T, S, R], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[R], error) {
// check parentStepT is in this job
if get, ok := j.Steps[parentStepT.Name()]; !ok || get != parentStepT {
return nil, fmt.Errorf("step [%s] not found in job", parentStepT.Name())
if get, ok := j.Steps[parentStepT.GetName()]; !ok || get != parentStepT {
return nil, fmt.Errorf("step [%s] not found in job", parentStepT.GetName())
}
if get, ok := j.Steps[parentStepS.Name()]; !ok || get != parentStepS {
return nil, fmt.Errorf("step [%s] not found in job", parentStepS.Name())
if get, ok := j.Steps[parentStepS.GetName()]; !ok || get != parentStepS {
return nil, fmt.Errorf("step [%s] not found in job", parentStepS.GetName())
}

step := newStepInfo[R](stepName, []string{parentStepT.Name(), parentStepS.Name()}, optionDecorators...)
step := newStepInfo[R](stepName, append(optionDecorators, ExecuteAfter(parentStepT), ExecuteAfter(parentStepS))...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingStepNames = step.DependsOn()
var precedingTasks []asynctask.Waitable
for _, stepName := range precedingStepNames {
if step, ok := j.Steps[stepName]; ok {
precedingTasks = append(precedingTasks, step.Waitable())
} else {
return nil, fmt.Errorf("step [%s] not found", stepName)
}
}

// if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet.
if len(precedingTasks) == 0 {
precedingTasks = append(precedingTasks, j.rootJob.Waitable())
}
// instrument to :
// replaceRuntimeContext
// trackStepState
Expand All @@ -190,7 +221,8 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, p
step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, parentStepT.Name(), parentStepS.Name())
j.registerStepInGraph(stepName, precedingStepNames...)

return step, nil
}

Expand Down
21 changes: 15 additions & 6 deletions step.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type StepExecutionOptions struct {
Timeout time.Duration
ErrorPolicy StepErrorPolicy
RetryPolicy StepRetryPolicy

// dependencies that are not input.
DependOn []string
}

type StepErrorPolicy struct{}
Expand All @@ -26,8 +29,15 @@ type StepRetryPolicy struct{}

type ExecutionOptionPreparer func(*StepExecutionOptions) *StepExecutionOptions

func ExecuteAfter(step StepMeta) ExecutionOptionPreparer {
return func(options *StepExecutionOptions) *StepExecutionOptions {
options.DependOn = append(options.DependOn, step.GetName())
return options
}
}

type StepMeta interface {
Name() string
GetName() string
GetState() StepState
DependsOn() []string
Wait(context.Context) error
Expand All @@ -39,14 +49,13 @@ type StepInfo[T any] struct {
name string
task *asynctask.Task[T]
state StepState
dependOn []string
executionOptions *StepExecutionOptions
job *Job
}

func newStepInfo[T any](stepName string, dependOn []string, optionDecorators ...ExecutionOptionPreparer) *StepInfo[T] {
func newStepInfo[T any](stepName string, optionDecorators ...ExecutionOptionPreparer) *StepInfo[T] {
step := &StepInfo[T]{
name: stepName,
dependOn: dependOn,
state: StepStatePending,
executionOptions: &StepExecutionOptions{},
}
Expand All @@ -61,7 +70,7 @@ func newStepInfo[T any](stepName string, dependOn []string, optionDecorators ...
// compiler check
var _ StepMeta = &StepInfo[string]{}

func (si *StepInfo[T]) Name() string {
func (si *StepInfo[T]) GetName() string {
return si.name
}

Expand All @@ -70,7 +79,7 @@ func (si *StepInfo[T]) GetState() StepState {
}

func (si *StepInfo[T]) DependsOn() []string {
return si.dependOn
return si.executionOptions.DependOn
}

func (si *StepInfo[T]) Wait(ctx context.Context) error {
Expand Down

0 comments on commit 2ed229d

Please sign in to comment.