Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dependon fix #3

Merged
merged 1 commit into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 59 additions & 27 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ func NewJob(name string) *Job {
}

j.rootJob = &StepInfo[interface{}]{
name: "[Start]",
dependOn: []string{},
name: "[Start]",
task: asynctask.Start(context.Background(), func(fctx context.Context) (*interface{}, error) {
fmt.Println("RootJob Added")
// this will pause all steps from starting, until Start() method is called.
Expand All @@ -53,14 +52,14 @@ func NewJob(name string) *Job {
}),
}

j.Steps[j.rootJob.Name()] = j.rootJob
j.stepsDag.Add(j.rootJob.Name())
j.Steps[j.rootJob.GetName()] = j.rootJob
j.stepsDag.Add(j.rootJob.GetName())

return j
}

func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T) *StepInfo[T] {
step := newStepInfo[T](stepName, []string{j.rootJob.Name()})
step := newStepInfo[T](stepName)

instrumentedFunc := func(ctx context.Context) (*T, error) {
j.rootJob.Wait(ctx)
Expand All @@ -69,15 +68,18 @@ func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T)
step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, j.rootJob.Name())
j.registerStepInGraph(stepName, j.rootJob.GetName())

return step
}

func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asynctask.AsyncFunc[T], dependOn []string, optionDecorators ...ExecutionOptionPreparer) (*StepInfo[T], error) {
// manually specified the dependencies, without consume the result.
func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asynctask.AsyncFunc[T], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[T], error) {
step := newStepInfo[T](stepName, optionDecorators...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingStepNames = step.DependsOn()
var precedingTasks []asynctask.Waitable
for _, stepName := range dependOn {
for _, stepName := range precedingStepNames {
if step, ok := j.Steps[stepName]; ok {
precedingTasks = append(precedingTasks, step.Waitable())
} else {
Expand All @@ -87,11 +89,10 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn

// if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet.
if len(precedingTasks) == 0 {
precedingStepNames = append(precedingStepNames, j.rootJob.GetName())
precedingTasks = append(precedingTasks, j.rootJob.Waitable())
}

step := newStepInfo[T](stepName, dependOn, optionDecorators...)

// instrument to :
// replaceRuntimeContext,
// trackStepState
Expand All @@ -115,30 +116,45 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn
step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
if len(dependOn) > 0 {
j.registerStepInGraph(stepName, dependOn...)
} else {
j.registerStepInGraph(stepName, j.rootJob.Name())
}
j.registerStepInGraph(stepName, precedingStepNames...)

return step, nil
}

func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentStep *StepInfo[T], stepFunc asynctask.ContinueFunc[T, S], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[S], error) {
// check parentStepT is in this job
if get, ok := j.Steps[parentStep.Name()]; !ok || get != parentStep {
return nil, fmt.Errorf("step [%s] not found in job", parentStep.Name())
if get, ok := j.Steps[parentStep.GetName()]; !ok || get != parentStep {
return nil, fmt.Errorf("step [%s] not found in job", parentStep.GetName())
}

step := newStepInfo[S](stepName, append(optionDecorators, ExecuteAfter(parentStep))...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingStepNames = step.DependsOn()
var precedingTasks []asynctask.Waitable
for _, stepName := range precedingStepNames {
if step, ok := j.Steps[stepName]; ok {
precedingTasks = append(precedingTasks, step.Waitable())
} else {
return nil, fmt.Errorf("step [%s] not found", stepName)
}
}

step := newStepInfo[S](stepName, []string{parentStep.Name()}, optionDecorators...)
// if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet.
if len(precedingTasks) == 0 {
precedingTasks = append(precedingTasks, j.rootJob.Waitable())
}

// instrument to :
// replaceRuntimeContext
// trackStepState
// retryHandling (TODO)
// errorHandling (TODO)
// timeoutHandling (TODO)
instrumentedFunc := func(_ context.Context, t *T) (*S, error) {
instrumentedFunc := func(ctx context.Context, t *T) (*S, error) {
if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
return nil, err
}
step.state = StepStateRunning
result, err := stepFunc(j.runtimeCtx, t)
if err != nil {
Expand All @@ -152,7 +168,7 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt
step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, parentStep.Name())
j.registerStepInGraph(stepName, precedingStepNames...)
if err := j.stepsDag.Validate(); err != nil {
return nil, fmt.Errorf("cycle dependency detected: %s", err)
}
Expand All @@ -161,15 +177,30 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt

func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, parentStepT *StepInfo[T], parentStepS *StepInfo[S], stepFunc asynctask.AfterBothFunc[T, S, R], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[R], error) {
// check parentStepT is in this job
if get, ok := j.Steps[parentStepT.Name()]; !ok || get != parentStepT {
return nil, fmt.Errorf("step [%s] not found in job", parentStepT.Name())
if get, ok := j.Steps[parentStepT.GetName()]; !ok || get != parentStepT {
return nil, fmt.Errorf("step [%s] not found in job", parentStepT.GetName())
}
if get, ok := j.Steps[parentStepS.Name()]; !ok || get != parentStepS {
return nil, fmt.Errorf("step [%s] not found in job", parentStepS.Name())
if get, ok := j.Steps[parentStepS.GetName()]; !ok || get != parentStepS {
return nil, fmt.Errorf("step [%s] not found in job", parentStepS.GetName())
}

step := newStepInfo[R](stepName, []string{parentStepT.Name(), parentStepS.Name()}, optionDecorators...)
step := newStepInfo[R](stepName, append(optionDecorators, ExecuteAfter(parentStepT), ExecuteAfter(parentStepS))...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingStepNames = step.DependsOn()
var precedingTasks []asynctask.Waitable
for _, stepName := range precedingStepNames {
if step, ok := j.Steps[stepName]; ok {
precedingTasks = append(precedingTasks, step.Waitable())
} else {
return nil, fmt.Errorf("step [%s] not found", stepName)
}
}

// if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet.
if len(precedingTasks) == 0 {
precedingTasks = append(precedingTasks, j.rootJob.Waitable())
}
// instrument to :
// replaceRuntimeContext
// trackStepState
Expand All @@ -190,7 +221,8 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, p
step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, parentStepT.Name(), parentStepS.Name())
j.registerStepInGraph(stepName, precedingStepNames...)

return step, nil
}

Expand Down
21 changes: 15 additions & 6 deletions step.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type StepExecutionOptions struct {
Timeout time.Duration
ErrorPolicy StepErrorPolicy
RetryPolicy StepRetryPolicy

// dependencies that are not input.
DependOn []string
}

type StepErrorPolicy struct{}
Expand All @@ -26,8 +29,15 @@ type StepRetryPolicy struct{}

type ExecutionOptionPreparer func(*StepExecutionOptions) *StepExecutionOptions

func ExecuteAfter(step StepMeta) ExecutionOptionPreparer {
return func(options *StepExecutionOptions) *StepExecutionOptions {
options.DependOn = append(options.DependOn, step.GetName())
return options
}
}

type StepMeta interface {
Name() string
GetName() string
GetState() StepState
DependsOn() []string
Wait(context.Context) error
Expand All @@ -39,14 +49,13 @@ type StepInfo[T any] struct {
name string
task *asynctask.Task[T]
state StepState
dependOn []string
executionOptions *StepExecutionOptions
job *Job
}

func newStepInfo[T any](stepName string, dependOn []string, optionDecorators ...ExecutionOptionPreparer) *StepInfo[T] {
func newStepInfo[T any](stepName string, optionDecorators ...ExecutionOptionPreparer) *StepInfo[T] {
step := &StepInfo[T]{
name: stepName,
dependOn: dependOn,
state: StepStatePending,
executionOptions: &StepExecutionOptions{},
}
Expand All @@ -61,7 +70,7 @@ func newStepInfo[T any](stepName string, dependOn []string, optionDecorators ...
// compiler check
var _ StepMeta = &StepInfo[string]{}

func (si *StepInfo[T]) Name() string {
func (si *StepInfo[T]) GetName() string {
return si.name
}

Expand All @@ -70,7 +79,7 @@ func (si *StepInfo[T]) GetState() StepState {
}

func (si *StepInfo[T]) DependsOn() []string {
return si.dependOn
return si.executionOptions.DependOn
}

func (si *StepInfo[T]) Wait(ctx context.Context) error {
Expand Down