Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify job with interface #7

Merged
merged 1 commit into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 29 additions & 220 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@ const JobStatePending JobState = "pending"
const JobStateRunning JobState = "running"
const JobStateCompleted JobState = "completed"

const rootStepName = "job"
const rootStepName = "$job"

type JobInterface interface {
GetStep(stepName string) (StepMeta, bool) // switch bool to error
AddStep(step StepMeta, precedingSteps ...StepMeta)
RootStep() *StepInfo[interface{}]

Start(ctx context.Context) error

RuntimeContext() context.Context
}

type Job struct {
Name string
Expand All @@ -32,6 +42,8 @@ type Job struct {
runtimeCtx context.Context
}

var _ JobInterface = &Job{}

func NewJob(name string) *Job {
jobStart := sync.WaitGroup{}
jobStart.Add(1)
Expand Down Expand Up @@ -64,220 +76,23 @@ func NewJob(name string) *Job {
return j
}

func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T) *StepInfo[T] {
step := newStepInfo[T](stepName, stepTypeParam)

instrumentedFunc := func(ctx context.Context) (*T, error) {
j.rootStep.Wait(ctx)
step.executionData.StartTime = time.Now()
step.state = StepStateCompleted
step.executionData.Duration = time.Since(j.rootStep.executionData.StartTime)
return value, nil
}
step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(step, j.rootStep)

return step
func (j *Job) RootStep() *StepInfo[interface{}] {
return j.rootStep
}

func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asynctask.AsyncFunc[T], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[T], error) {
step := newStepInfo[T](stepName, stepTypeTask, optionDecorators...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingTasks []asynctask.Waitable
var precedingSteps []StepMeta
for _, depStepName := range step.DependsOn() {
if depStep, ok := j.Steps[depStepName]; ok {
precedingTasks = append(precedingTasks, depStep.Waitable())
precedingSteps = append(precedingSteps, depStep)
} else {
return nil, fmt.Errorf("step [%s] not found", depStepName)
}
}

// if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet.
if len(precedingTasks) == 0 {
precedingSteps = append(precedingSteps, j.rootStep)
precedingTasks = append(precedingTasks, j.rootStep.Waitable())
}

// instrument to :
// replaceRuntimeContext,
// trackStepState
// retryHandling (TODO)
// errorHandling (TODO)
// timeoutHandling (TODO)
instrumentedFunc := func(ctx context.Context) (*T, error) {
if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
/* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed.
we need to be consistent on how to set state of dependent step.
step.executionData.StartTime = time.Now()
step.state = StepStateFailed
step.executionData.Duration = 0 */
return nil, newJobError(ErrPrecedentStepFailure, "")
}
step.executionData.StartTime = time.Now()
step.state = StepStateRunning

var result *T
var err error
if step.executionOptions.RetryPolicy != nil {
step.executionData.Retried = &RetryReport{}
result, err = newRetryer(step.executionOptions.RetryPolicy, step.executionData.Retried, func() (*T, error) { return stepFunc(j.runtimeCtx) }).Run()
} else {
result, err = stepFunc(j.runtimeCtx)
}

if err != nil {
step.state = StepStateFailed
} else {
step.state = StepStateCompleted
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
}

step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(step, precedingSteps...)

return step, nil
func (j *Job) GetStep(stepName string) (StepMeta, bool) {
stepMeta, ok := j.Steps[stepName]
return stepMeta, ok
}

func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentStep *StepInfo[T], stepFunc asynctask.ContinueFunc[T, S], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[S], error) {
// check parentStepT is in this job
if get, ok := j.Steps[parentStep.GetName()]; !ok || get != parentStep {
return nil, fmt.Errorf("step [%s] not found in job", parentStep.GetName())
}

step := newStepInfo[S](stepName, stepTypeTask, append(optionDecorators, ExecuteAfter(parentStep))...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingSteps []StepMeta
var precedingTasks []asynctask.Waitable
for _, depStepName := range step.DependsOn() {
if depStep, ok := j.Steps[depStepName]; ok {
precedingSteps = append(precedingSteps, depStep)
precedingTasks = append(precedingTasks, depStep.Waitable())
} else {
return nil, fmt.Errorf("step [%s] not found", depStepName)
}
}

// instrument to :
// replaceRuntimeContext
// trackStepState
// retryHandling (TODO)
// errorHandling (TODO)
// timeoutHandling (TODO)
instrumentedFunc := func(ctx context.Context, t *T) (*S, error) {
if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
/* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed.
we need to be consistent on how to set state of dependent step.
step.executionData.StartTime = time.Now()
step.state = StepStateFailed
step.executionData.Duration = 0 */
return nil, newJobError(ErrPrecedentStepFailure, "")
}
step.executionData.StartTime = time.Now()
step.state = StepStateRunning
var result *S
var err error
if step.executionOptions.RetryPolicy != nil {
step.executionData.Retried = &RetryReport{}
result, err = newRetryer(step.executionOptions.RetryPolicy, step.executionData.Retried, func() (*S, error) { return stepFunc(j.runtimeCtx, t) }).Run()
} else {
result, err = stepFunc(j.runtimeCtx, t)
}

if err != nil {
step.state = StepStateFailed
} else {
step.state = StepStateCompleted
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
}

step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(step, precedingSteps...)
return step, nil
}

func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, parentStepT *StepInfo[T], parentStepS *StepInfo[S], stepFunc asynctask.AfterBothFunc[T, S, R], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[R], error) {
// check parentStepT is in this job
if get, ok := j.Steps[parentStepT.GetName()]; !ok || get != parentStepT {
return nil, fmt.Errorf("step [%s] not found in job", parentStepT.GetName())
}
if get, ok := j.Steps[parentStepS.GetName()]; !ok || get != parentStepS {
return nil, fmt.Errorf("step [%s] not found in job", parentStepS.GetName())
}

step := newStepInfo[R](stepName, stepTypeTask, append(optionDecorators, ExecuteAfter(parentStepT), ExecuteAfter(parentStepS))...)

// also consider specified the dependencies from ExecutionOptionPreparer, without consume the result.
var precedingSteps []StepMeta
var precedingTasks []asynctask.Waitable
for _, depStepName := range step.DependsOn() {
if depStep, ok := j.Steps[depStepName]; ok {
precedingSteps = append(precedingSteps, depStep)
precedingTasks = append(precedingTasks, depStep.Waitable())
} else {
return nil, fmt.Errorf("step [%s] not found", depStepName)
}
}

// instrument to :
// replaceRuntimeContext
// trackStepState
// retryHandling (TODO)
// errorHandling (TODO)
// timeoutHandling (TODO)
instrumentedFunc := func(ctx context.Context, t *T, s *S) (*R, error) {
if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
/* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed.
we need to be consistent on how to set state of dependent step.
step.executionData.StartTime = time.Now()
step.state = StepStateFailed
step.executionData.Duration = 0 */
return nil, newJobError(ErrPrecedentStepFailure, "")
}

step.executionData.StartTime = time.Now()
step.state = StepStateRunning

var result *R
var err error
if step.executionOptions.RetryPolicy != nil {
step.executionData.Retried = &RetryReport{}
result, err = newRetryer(step.executionOptions.RetryPolicy, step.executionData.Retried, func() (*R, error) { return stepFunc(j.runtimeCtx, t, s) }).Run()
} else {
result, err = stepFunc(j.runtimeCtx, t, s)
}

if err != nil {
step.state = StepStateFailed
} else {
step.state = StepStateCompleted
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
func (j *Job) AddStep(step StepMeta, precedingSteps ...StepMeta) {
// TODO: check conflict
j.Steps[step.GetName()] = step
stepNode := newStepNode(step)
j.stepsDag.AddNode(stepNode)
for _, precedingStep := range precedingSteps {
j.stepsDag.Connect(precedingStep.getID(), step.getID())
}

step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(step, precedingSteps...)

return step, nil
}

func (j *Job) Start(ctx context.Context) error {
Expand All @@ -290,6 +105,10 @@ func (j *Job) Start(ctx context.Context) error {
return nil
}

func (j *Job) RuntimeContext() context.Context {
return j.runtimeCtx
}

func (j *Job) Wait(ctx context.Context) error {
var tasks []asynctask.Waitable
for _, step := range j.Steps {
Expand All @@ -298,16 +117,6 @@ func (j *Job) Wait(ctx context.Context) error {
return asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, tasks...)
}

func (j *Job) registerStepInGraph(step StepMeta, precedingSteps ...StepMeta) error {
stepNode := newStepNode(step)
j.stepsDag.AddNode(stepNode)
for _, precedingStep := range precedingSteps {
j.stepsDag.Connect(precedingStep.getID(), step.getID())
}

return nil
}

// Visualize return a DAG of the job execution graph
func (j *Job) Visualize() (string, error) {
return j.stepsDag.ToDotGraph()
Expand Down
Loading