Skip to content

Commit

Permalink
Merge pull request #7 from Azure/haitao/job-input
Browse files Browse the repository at this point in the history
simplify job with interface
  • Loading branch information
haitch committed Nov 7, 2022
2 parents ac85f3a + b6f55e6 commit b1b1221
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 220 deletions.
249 changes: 29 additions & 220 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@ const JobStatePending JobState = "pending"
const JobStateRunning JobState = "running"
const JobStateCompleted JobState = "completed"

const rootStepName = "job"
const rootStepName = "$job"

type JobInterface interface {
GetStep(stepName string) (StepMeta, bool) // switch bool to error
AddStep(step StepMeta, precedingSteps ...StepMeta)
RootStep() *StepInfo[interface{}]

Start(ctx context.Context) error

RuntimeContext() context.Context
}

type Job struct {
Name string
Expand All @@ -32,6 +42,8 @@ type Job struct {
runtimeCtx context.Context
}

var _ JobInterface = &Job{}

func NewJob(name string) *Job {
jobStart := sync.WaitGroup{}
jobStart.Add(1)
Expand Down Expand Up @@ -64,220 +76,23 @@ func NewJob(name string) *Job {
return j
}

func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T) *StepInfo[T] {
step := newStepInfo[T](stepName, stepTypeParam)

instrumentedFunc := func(ctx context.Context) (*T, error) {
j.rootStep.Wait(ctx)
step.executionData.StartTime = time.Now()
step.state = StepStateCompleted
step.executionData.Duration = time.Since(j.rootStep.executionData.StartTime)
return value, nil
}
step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(step, j.rootStep)

return step
func (j *Job) RootStep() *StepInfo[interface{}] {
return j.rootStep
}

func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asynctask.AsyncFunc[T], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[T], error) {
step := newStepInfo[T](stepName, stepTypeTask, optionDecorators...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingTasks []asynctask.Waitable
var precedingSteps []StepMeta
for _, depStepName := range step.DependsOn() {
if depStep, ok := j.Steps[depStepName]; ok {
precedingTasks = append(precedingTasks, depStep.Waitable())
precedingSteps = append(precedingSteps, depStep)
} else {
return nil, fmt.Errorf("step [%s] not found", depStepName)
}
}

// if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet.
if len(precedingTasks) == 0 {
precedingSteps = append(precedingSteps, j.rootStep)
precedingTasks = append(precedingTasks, j.rootStep.Waitable())
}

// instrument to :
// replaceRuntimeContext,
// trackStepState
// retryHandling (TODO)
// errorHandling (TODO)
// timeoutHandling (TODO)
instrumentedFunc := func(ctx context.Context) (*T, error) {
if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
/* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed.
we need to be consistent on how to set state of dependent step.
step.executionData.StartTime = time.Now()
step.state = StepStateFailed
step.executionData.Duration = 0 */
return nil, newJobError(ErrPrecedentStepFailure, "")
}
step.executionData.StartTime = time.Now()
step.state = StepStateRunning

var result *T
var err error
if step.executionOptions.RetryPolicy != nil {
step.executionData.Retried = &RetryReport{}
result, err = newRetryer(step.executionOptions.RetryPolicy, step.executionData.Retried, func() (*T, error) { return stepFunc(j.runtimeCtx) }).Run()
} else {
result, err = stepFunc(j.runtimeCtx)
}

if err != nil {
step.state = StepStateFailed
} else {
step.state = StepStateCompleted
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
}

step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(step, precedingSteps...)

return step, nil
func (j *Job) GetStep(stepName string) (StepMeta, bool) {
stepMeta, ok := j.Steps[stepName]
return stepMeta, ok
}

func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentStep *StepInfo[T], stepFunc asynctask.ContinueFunc[T, S], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[S], error) {
// check parentStepT is in this job
if get, ok := j.Steps[parentStep.GetName()]; !ok || get != parentStep {
return nil, fmt.Errorf("step [%s] not found in job", parentStep.GetName())
}

step := newStepInfo[S](stepName, stepTypeTask, append(optionDecorators, ExecuteAfter(parentStep))...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingSteps []StepMeta
var precedingTasks []asynctask.Waitable
for _, depStepName := range step.DependsOn() {
if depStep, ok := j.Steps[depStepName]; ok {
precedingSteps = append(precedingSteps, depStep)
precedingTasks = append(precedingTasks, depStep.Waitable())
} else {
return nil, fmt.Errorf("step [%s] not found", depStepName)
}
}

// instrument to :
// replaceRuntimeContext
// trackStepState
// retryHandling (TODO)
// errorHandling (TODO)
// timeoutHandling (TODO)
instrumentedFunc := func(ctx context.Context, t *T) (*S, error) {
if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
/* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed.
we need to be consistent on how to set state of dependent step.
step.executionData.StartTime = time.Now()
step.state = StepStateFailed
step.executionData.Duration = 0 */
return nil, newJobError(ErrPrecedentStepFailure, "")
}
step.executionData.StartTime = time.Now()
step.state = StepStateRunning
var result *S
var err error
if step.executionOptions.RetryPolicy != nil {
step.executionData.Retried = &RetryReport{}
result, err = newRetryer(step.executionOptions.RetryPolicy, step.executionData.Retried, func() (*S, error) { return stepFunc(j.runtimeCtx, t) }).Run()
} else {
result, err = stepFunc(j.runtimeCtx, t)
}

if err != nil {
step.state = StepStateFailed
} else {
step.state = StepStateCompleted
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
}

step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(step, precedingSteps...)
return step, nil
}

func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, parentStepT *StepInfo[T], parentStepS *StepInfo[S], stepFunc asynctask.AfterBothFunc[T, S, R], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[R], error) {
// check parentStepT is in this job
if get, ok := j.Steps[parentStepT.GetName()]; !ok || get != parentStepT {
return nil, fmt.Errorf("step [%s] not found in job", parentStepT.GetName())
}
if get, ok := j.Steps[parentStepS.GetName()]; !ok || get != parentStepS {
return nil, fmt.Errorf("step [%s] not found in job", parentStepS.GetName())
}

step := newStepInfo[R](stepName, stepTypeTask, append(optionDecorators, ExecuteAfter(parentStepT), ExecuteAfter(parentStepS))...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingSteps []StepMeta
var precedingTasks []asynctask.Waitable
for _, depStepName := range step.DependsOn() {
if depStep, ok := j.Steps[depStepName]; ok {
precedingSteps = append(precedingSteps, depStep)
precedingTasks = append(precedingTasks, depStep.Waitable())
} else {
return nil, fmt.Errorf("step [%s] not found", depStepName)
}
}

// instrument to :
// replaceRuntimeContext
// trackStepState
// retryHandling (TODO)
// errorHandling (TODO)
// timeoutHandling (TODO)
instrumentedFunc := func(ctx context.Context, t *T, s *S) (*R, error) {
if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
/* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed.
we need to be consistent on how to set state of dependent step.
step.executionData.StartTime = time.Now()
step.state = StepStateFailed
step.executionData.Duration = 0 */
return nil, newJobError(ErrPrecedentStepFailure, "")
}

step.executionData.StartTime = time.Now()
step.state = StepStateRunning

var result *R
var err error
if step.executionOptions.RetryPolicy != nil {
step.executionData.Retried = &RetryReport{}
result, err = newRetryer(step.executionOptions.RetryPolicy, step.executionData.Retried, func() (*R, error) { return stepFunc(j.runtimeCtx, t, s) }).Run()
} else {
result, err = stepFunc(j.runtimeCtx, t, s)
}

if err != nil {
step.state = StepStateFailed
} else {
step.state = StepStateCompleted
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
func (j *Job) AddStep(step StepMeta, precedingSteps ...StepMeta) {
// TODO: check conflict
j.Steps[step.GetName()] = step
stepNode := newStepNode(step)
j.stepsDag.AddNode(stepNode)
for _, precedingStep := range precedingSteps {
j.stepsDag.Connect(precedingStep.getID(), step.getID())
}

step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(step, precedingSteps...)

return step, nil
}

func (j *Job) Start(ctx context.Context) error {
Expand All @@ -290,6 +105,10 @@ func (j *Job) Start(ctx context.Context) error {
return nil
}

func (j *Job) RuntimeContext() context.Context {
return j.runtimeCtx
}

func (j *Job) Wait(ctx context.Context) error {
var tasks []asynctask.Waitable
for _, step := range j.Steps {
Expand All @@ -298,16 +117,6 @@ func (j *Job) Wait(ctx context.Context) error {
return asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, tasks...)
}

func (j *Job) registerStepInGraph(step StepMeta, precedingSteps ...StepMeta) error {
stepNode := newStepNode(step)
j.stepsDag.AddNode(stepNode)
for _, precedingStep := range precedingSteps {
j.stepsDag.Connect(precedingStep.getID(), step.getID())
}

return nil
}

// Visualize return a DAG of the job execution graph
func (j *Job) Visualize() (string, error) {
return j.stepsDag.ToDotGraph()
Expand Down
Loading

0 comments on commit b1b1221

Please sign in to comment.