diff --git a/job.go b/job.go index d707581..d7343ca 100644 --- a/job.go +++ b/job.go @@ -42,8 +42,7 @@ func NewJob(name string) *Job { } j.rootJob = &StepInfo[interface{}]{ - name: "[Start]", - dependOn: []string{}, + name: "[Start]", task: asynctask.Start(context.Background(), func(fctx context.Context) (*interface{}, error) { fmt.Println("RootJob Added") // this will pause all steps from starting, until Start() method is called. @@ -53,14 +52,14 @@ func NewJob(name string) *Job { }), } - j.Steps[j.rootJob.Name()] = j.rootJob - j.stepsDag.Add(j.rootJob.Name()) + j.Steps[j.rootJob.GetName()] = j.rootJob + j.stepsDag.Add(j.rootJob.GetName()) return j } func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T) *StepInfo[T] { - step := newStepInfo[T](stepName, []string{j.rootJob.Name()}) + step := newStepInfo[T](stepName) instrumentedFunc := func(ctx context.Context) (*T, error) { j.rootJob.Wait(ctx) @@ -69,15 +68,18 @@ func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T) step.task = asynctask.Start(bCtx, instrumentedFunc) j.Steps[stepName] = step - j.registerStepInGraph(stepName, j.rootJob.Name()) + j.registerStepInGraph(stepName, j.rootJob.GetName()) return step } -func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asynctask.AsyncFunc[T], dependOn []string, optionDecorators ...ExecutionOptionPreparer) (*StepInfo[T], error) { - // manually specified the dependencies, without consume the result. +func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asynctask.AsyncFunc[T], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[T], error) { + step := newStepInfo[T](stepName, optionDecorators...) + + // also consider specified the dependencies from ExecutionOptionPreparer, without consume the result. + var precedingStepNames = step.DependsOn() var precedingTasks []asynctask.Waitable - for _, stepName := range dependOn { + for _, stepName := range precedingStepNames { if step, ok := j.Steps[stepName]; ok { precedingTasks = append(precedingTasks, step.Waitable()) } else { @@ -87,11 +89,10 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn // if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet. if len(precedingTasks) == 0 { + precedingStepNames = append(precedingStepNames, j.rootJob.GetName()) precedingTasks = append(precedingTasks, j.rootJob.Waitable()) } - step := newStepInfo[T](stepName, dependOn, optionDecorators...) - // instrument to : // replaceRuntimeContext, // trackStepState @@ -115,22 +116,34 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn step.task = asynctask.Start(bCtx, instrumentedFunc) j.Steps[stepName] = step - if len(dependOn) > 0 { - j.registerStepInGraph(stepName, dependOn...) - } else { - j.registerStepInGraph(stepName, j.rootJob.Name()) - } + j.registerStepInGraph(stepName, precedingStepNames...) return step, nil } func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentStep *StepInfo[T], stepFunc asynctask.ContinueFunc[T, S], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[S], error) { // check parentStepT is in this job - if get, ok := j.Steps[parentStep.Name()]; !ok || get != parentStep { - return nil, fmt.Errorf("step [%s] not found in job", parentStep.Name()) + if get, ok := j.Steps[parentStep.GetName()]; !ok || get != parentStep { + return nil, fmt.Errorf("step [%s] not found in job", parentStep.GetName()) + } + + step := newStepInfo[S](stepName, append(optionDecorators, ExecuteAfter(parentStep))...) + + // also consider specified the dependencies from ExecutionOptionPreparer, without consume the result. + var precedingStepNames = step.DependsOn() + var precedingTasks []asynctask.Waitable + for _, stepName := range precedingStepNames { + if step, ok := j.Steps[stepName]; ok { + precedingTasks = append(precedingTasks, step.Waitable()) + } else { + return nil, fmt.Errorf("step [%s] not found", stepName) + } } - step := newStepInfo[S](stepName, []string{parentStep.Name()}, optionDecorators...) + // if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet. + if len(precedingTasks) == 0 { + precedingTasks = append(precedingTasks, j.rootJob.Waitable()) + } // instrument to : // replaceRuntimeContext @@ -138,7 +151,10 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt // retryHandling (TODO) // errorHandling (TODO) // timeoutHandling (TODO) - instrumentedFunc := func(_ context.Context, t *T) (*S, error) { + instrumentedFunc := func(ctx context.Context, t *T) (*S, error) { + if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { + return nil, err + } step.state = StepStateRunning result, err := stepFunc(j.runtimeCtx, t) if err != nil { @@ -152,7 +168,7 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc) j.Steps[stepName] = step - j.registerStepInGraph(stepName, parentStep.Name()) + j.registerStepInGraph(stepName, precedingStepNames...) if err := j.stepsDag.Validate(); err != nil { return nil, fmt.Errorf("cycle dependency detected: %s", err) } @@ -161,15 +177,30 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, parentStepT *StepInfo[T], parentStepS *StepInfo[S], stepFunc asynctask.AfterBothFunc[T, S, R], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[R], error) { // check parentStepT is in this job - if get, ok := j.Steps[parentStepT.Name()]; !ok || get != parentStepT { - return nil, fmt.Errorf("step [%s] not found in job", parentStepT.Name()) + if get, ok := j.Steps[parentStepT.GetName()]; !ok || get != parentStepT { + return nil, fmt.Errorf("step [%s] not found in job", parentStepT.GetName()) } - if get, ok := j.Steps[parentStepS.Name()]; !ok || get != parentStepS { - return nil, fmt.Errorf("step [%s] not found in job", parentStepS.Name()) + if get, ok := j.Steps[parentStepS.GetName()]; !ok || get != parentStepS { + return nil, fmt.Errorf("step [%s] not found in job", parentStepS.GetName()) } - step := newStepInfo[R](stepName, []string{parentStepT.Name(), parentStepS.Name()}, optionDecorators...) + step := newStepInfo[R](stepName, append(optionDecorators, ExecuteAfter(parentStepT), ExecuteAfter(parentStepS))...) + + // also consider specified the dependencies from ExecutionOptionPreparer, without consume the result. + var precedingStepNames = step.DependsOn() + var precedingTasks []asynctask.Waitable + for _, stepName := range precedingStepNames { + if step, ok := j.Steps[stepName]; ok { + precedingTasks = append(precedingTasks, step.Waitable()) + } else { + return nil, fmt.Errorf("step [%s] not found", stepName) + } + } + // if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet. + if len(precedingTasks) == 0 { + precedingTasks = append(precedingTasks, j.rootJob.Waitable()) + } // instrument to : // replaceRuntimeContext // trackStepState @@ -190,7 +221,8 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, p step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc) j.Steps[stepName] = step - j.registerStepInGraph(stepName, parentStepT.Name(), parentStepS.Name()) + j.registerStepInGraph(stepName, precedingStepNames...) + return step, nil } diff --git a/step.go b/step.go index 63fe42f..038ca5a 100644 --- a/step.go +++ b/step.go @@ -18,6 +18,9 @@ type StepExecutionOptions struct { Timeout time.Duration ErrorPolicy StepErrorPolicy RetryPolicy StepRetryPolicy + + // dependencies that are not input. + DependOn []string } type StepErrorPolicy struct{} @@ -26,8 +29,15 @@ type StepRetryPolicy struct{} type ExecutionOptionPreparer func(*StepExecutionOptions) *StepExecutionOptions +func ExecuteAfter(step StepMeta) ExecutionOptionPreparer { + return func(options *StepExecutionOptions) *StepExecutionOptions { + options.DependOn = append(options.DependOn, step.GetName()) + return options + } +} + type StepMeta interface { - Name() string + GetName() string GetState() StepState DependsOn() []string Wait(context.Context) error @@ -39,14 +49,13 @@ type StepInfo[T any] struct { name string task *asynctask.Task[T] state StepState - dependOn []string executionOptions *StepExecutionOptions + job *Job } -func newStepInfo[T any](stepName string, dependOn []string, optionDecorators ...ExecutionOptionPreparer) *StepInfo[T] { +func newStepInfo[T any](stepName string, optionDecorators ...ExecutionOptionPreparer) *StepInfo[T] { step := &StepInfo[T]{ name: stepName, - dependOn: dependOn, state: StepStatePending, executionOptions: &StepExecutionOptions{}, } @@ -61,7 +70,7 @@ func newStepInfo[T any](stepName string, dependOn []string, optionDecorators ... // compiler check var _ StepMeta = &StepInfo[string]{} -func (si *StepInfo[T]) Name() string { +func (si *StepInfo[T]) GetName() string { return si.name } @@ -70,7 +79,7 @@ func (si *StepInfo[T]) GetState() StepState { } func (si *StepInfo[T]) DependsOn() []string { - return si.dependOn + return si.executionOptions.DependOn } func (si *StepInfo[T]) Wait(ctx context.Context) error {