diff --git a/error.go b/error.go index fd92ae9..738d410 100644 --- a/error.go +++ b/error.go @@ -8,16 +8,46 @@ import ( type JobErrorCode string const ( - ErrPrecedentStepFailure JobErrorCode = "precedent step failed" - ErrStepFailed JobErrorCode = "step failed" - ErrRefStepNotInJob JobErrorCode = "trying to reference to a step not registered in job" - ErrAddStepInSealedJob JobErrorCode = "trying to add step to a sealed job definition" + ErrPrecedentStepFailed JobErrorCode = "PrecedentStepFailed" + ErrStepFailed JobErrorCode = "StepFailed" + + ErrRefStepNotInJob JobErrorCode = "RefStepNotInJob" + MsgRefStepNotInJob string = "trying to reference to step %q, but it is not registered in job" + + ErrAddStepInSealedJob JobErrorCode = "AddStepInSealedJob" + MsgAddStepInSealedJob string = "trying to add step %q to a sealed job definition" + + ErrAddExistingStep JobErrorCode = "AddExistingStep" + MsgAddExistingStep string = "trying to add step %q to job definition, but it already exists" + + ErrDuplicateInputParentStep JobErrorCode = "DuplicateInputParentStep" + MsgDuplicateInputParentStep string = "at least 2 input parentSteps are same" + + ErrRuntimeStepNotFound JobErrorCode = "RuntimeStepNotFound" + MsgRuntimeStepNotFound string = "runtime step %q not found, must be a bug in asyncjob" ) func (code JobErrorCode) Error() string { return string(code) } +func (code JobErrorCode) WithMessage(msg string) *MessageError { + return &MessageError{Code: code, Message: msg} +} + +type MessageError struct { + Code JobErrorCode + Message string +} + +func (me *MessageError) Error() string { + return me.Code.Error() + ": " + me.Message +} + +func (me *MessageError) Unwrap() error { + return me.Code +} + type JobError struct { Code JobErrorCode StepError error @@ -48,7 +78,7 @@ func (je *JobError) RootCause() error { } // precendent step failure, track to the root - if je.Code == ErrPrecedentStepFailure { + if je.Code == ErrPrecedentStepFailed { precedentStepErr := &JobError{} if !errors.As(je.StepError, &precedentStepErr) { return je.StepError diff --git a/job_definition.go b/job_definition.go index ad6520c..d5a28ff 100644 --- a/job_definition.go +++ b/job_definition.go @@ -2,6 +2,8 @@ package asyncjob import ( "context" + "errors" + "fmt" "github.com/Azure/go-asyncjob/graph" ) @@ -15,7 +17,7 @@ type JobDefinitionMeta interface { Visualize() (string, error) // not exposing for now. - addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) + addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error getRootStep() StepDefinitionMeta } @@ -87,12 +89,20 @@ func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool) } // AddStep adds a step to the job definition, with optional preceding steps -func (jd *JobDefinition[T]) addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) { +func (jd *JobDefinition[T]) addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error { jd.steps[step.GetName()] = step jd.stepsDag.AddNode(step) for _, precedingStep := range precedingSteps { - jd.stepsDag.Connect(precedingStep, step) + if err := jd.stepsDag.Connect(precedingStep, step); err != nil { + if errors.Is(err, graph.ErrConnectNotExistingNode) { + return ErrRefStepNotInJob.WithMessage(fmt.Sprintf("referenced step %s not found", precedingStep.GetName())) + } + + return err + } } + + return nil } // Visualize the job definition in graphviz dot format diff --git a/job_test.go b/job_test.go index 3c266b9..3988d0a 100644 --- a/job_test.go +++ b/job_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/Azure/go-asyncjob" - "github.com/Azure/go-asynctask" "github.com/stretchr/testify/assert" ) @@ -130,7 +129,11 @@ func TestJobPanic(t *testing.T) { func TestJobStepRetry(t *testing.T) { t.Parallel() - jd, err := BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{"QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3)}) + jd, err := BuildJob(map[string]asyncjob.RetryPolicy{ + "GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3), + "QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3), + "Summarize": newLinearRetryPolicy(time.Millisecond*3, 3), + }) assert.NoError(t, err) invalidStep := &asyncjob.StepDefinition[string]{} @@ -141,7 +144,8 @@ func TestJobStepRetry(t *testing.T) { assert.False(t, jd.Sealed()) ctx := context.WithValue(context.Background(), testLoggingContextKey, t) - ctx = context.WithValue(ctx, "error-injection.server1.table1.query1", fmt.Errorf("query exeeded memory limit")) + + // gain code coverage on retry policy in StepAfter jobInstance := jd.Start(ctx, &SqlSummaryJobLib{ Params: &SqlSummaryJobParameters{ ServerName: "server1", @@ -165,50 +169,48 @@ func TestJobStepRetry(t *testing.T) { errors.As(err, &jobErr) assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed) assert.Equal(t, "QueryTable1", jobErr.StepInstance.GetName()) - exeData := jobErr.StepInstance.ExecutionData() assert.Equal(t, exeData.Retried.Count, 3) - renderGraph(t, jobInstance) -} - -func TestDefinitionBuilder(t *testing.T) { - t.Parallel() - - renderGraph(t, SqlSummaryAsyncJobDefinition) - - SqlSummaryAsyncJobDefinition.Seal() - - _, err := asyncjob.AddStep(context.Background(), SqlSummaryAsyncJobDefinition.JobDefinition, "EmailNotification2", emailNotificationStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) - assert.Error(t, err) - - qery2ResultTskMeta, ok := SqlSummaryAsyncJobDefinition.GetStep("QueryTable2") - assert.True(t, ok) - query2Task, ok := qery2ResultTskMeta.(*asyncjob.StepDefinition[SqlQueryResult]) - assert.True(t, ok) - - dummyStepFunc := func(sql *SqlSummaryJobLib) asynctask.ContinueFunc[SqlQueryResult, any] { - return func(ctx context.Context, result *SqlQueryResult) (*any, error) { - return nil, nil - } - } - - _, err = asyncjob.StepAfter(context.Background(), SqlSummaryAsyncJobDefinition.JobDefinition, "dummyStep", query2Task, dummyStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + // gain code coverage on retry policy in AddStep + jobInstance1 := jd.Start(ctx, &SqlSummaryJobLib{ + Params: &SqlSummaryJobParameters{ + ServerName: "server1", + Table1: "table1", + Query1: "query1", + Table2: "table2", + Query2: "query2", + ErrorInjection: map[string]func() error{ + "GetConnection": func() error { return fmt.Errorf("dial 1.2.3.4 timedout") }, + }, + }, + }) + err = jobInstance1.Wait(context.Background()) assert.Error(t, err) + jobErr = &asyncjob.JobError{} + errors.As(err, &jobErr) + assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed) + assert.Equal(t, "GetConnection", jobErr.StepInstance.GetName()) - qery1ResultTskMeta, ok := SqlSummaryAsyncJobDefinition.GetStep("QueryTable1") - assert.True(t, ok) - query1Task, ok := qery1ResultTskMeta.(*asyncjob.StepDefinition[SqlQueryResult]) - assert.True(t, ok) - - advancedSummaryStepFunc := func(sql *SqlSummaryJobLib) asynctask.AfterBothFunc[SqlQueryResult, SqlQueryResult, any] { - return func(ctx context.Context, result1 *SqlQueryResult, result2 *SqlQueryResult) (*any, error) { - return nil, nil - } - } - - _, err = asyncjob.StepAfterBoth(context.Background(), SqlSummaryAsyncJobDefinition.JobDefinition, "dummyStep", query1Task, query2Task, advancedSummaryStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + // gain code coverage on retry policy in AfterBoth + jobInstance2 := jd.Start(ctx, &SqlSummaryJobLib{ + Params: &SqlSummaryJobParameters{ + ServerName: "server1", + Table1: "table1", + Query1: "query1", + Table2: "table2", + Query2: "query2", + ErrorInjection: map[string]func() error{ + "SummarizeQueryResult": func() error { return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") }, + }, + }, + }) + err = jobInstance2.Wait(context.Background()) assert.Error(t, err) + jobErr = &asyncjob.JobError{} + errors.As(err, &jobErr) + assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed) + assert.Equal(t, "Summarize", jobErr.StepInstance.GetName()) } func renderGraph(t *testing.T, jb GraphRender) { diff --git a/step_builder.go b/step_builder.go index 1d53d9d..223b216 100644 --- a/step_builder.go +++ b/step_builder.go @@ -10,13 +10,13 @@ import ( ) // AddStep adds a step to the job definition. -func AddStep[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, stepFuncCreator func(input *JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - if j.Sealed() { - return nil, ErrAddStepInSealedJob +func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator func(input *JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + if err := addStepPreCheck(j, stepName); err != nil { + return nil, err } stepD := newStepDefinition[ST](stepName, stepTypeTask, optionDecorators...) - precedingDefSteps, err := getDependsOnSteps(stepD, j) + precedingDefSteps, err := getDependsOnSteps(j, stepD.DependsOn()) if err != nil { return nil, err } @@ -51,23 +51,20 @@ func AddStep[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName st return stepInstance } - j.addStep(stepD, precedingDefSteps...) + if err := j.addStep(stepD, precedingDefSteps...); err != nil { + return nil, err + } return stepD, nil } // StepAfter add a step after a preceding step, also take input from that preceding step -func StepAfter[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input *JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - if j.Sealed() { - return nil, ErrAddStepInSealedJob - } - - // check parentStepT is in this job - if get, ok := j.GetStep(parentStep.GetName()); !ok || get != parentStep { - return nil, fmt.Errorf("step [%s] not found in job", parentStep.GetName()) +func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input *JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + if err := addStepPreCheck(j, stepName); err != nil { + return nil, err } stepD := newStepDefinition[ST](stepName, stepTypeTask, append(optionDecorators, ExecuteAfter(parentStep))...) - precedingDefSteps, err := getDependsOnSteps(stepD, j) + precedingDefSteps, err := getDependsOnSteps(j, stepD.DependsOn()) if err != nil { return nil, err } @@ -92,41 +89,35 @@ func StepAfter[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepN parentStepInstance := getStrongTypedStepInstance(parentStep, ji) stepInstance := newStepInstance[ST](stepD, ji) + // here ContinueWith may not invoke instrumentedStepAfterBoth at all, if parentStep1 or parentStep2 returns error. stepInstance.task = asynctask.ContinueWith(ctx, parentStepInstance.task, instrumentedStepAfter(stepInstance, precedingTasks, stepFuncWithPanicHandling)) ji.addStepInstance(stepInstance, precedingInstances...) return stepInstance } - j.addStep(stepD, precedingDefSteps...) + if err := j.addStep(stepD, precedingDefSteps...); err != nil { + return nil, err + } return stepD, nil } // StepAfterBoth add a step after both preceding steps, also take input from both preceding steps -func StepAfterBoth[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input *JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - if j.Sealed() { - return nil, ErrAddStepInSealedJob +func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input *JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + if err := addStepPreCheck(j, stepName); err != nil { + return nil, err } - // check parentStepT is in this job - if get, ok := j.GetStep(parentStep1.GetName()); !ok || get != parentStep1 { - return nil, fmt.Errorf("step [%s] not found in job", parentStep1.GetName()) - } - if get, ok := j.GetStep(parentStep2.GetName()); !ok || get != parentStep2 { - return nil, fmt.Errorf("step [%s] not found in job", parentStep2.GetName()) + // compiler not allow me to compare parentStep1 and parentStep2 directly with different genericType + if parentStep1.GetName() == parentStep2.GetName() { + return nil, ErrDuplicateInputParentStep.WithMessage(MsgDuplicateInputParentStep) } stepD := newStepDefinition[ST](stepName, stepTypeTask, append(optionDecorators, ExecuteAfter(parentStep1), ExecuteAfter(parentStep2))...) - precedingDefSteps, err := getDependsOnSteps(stepD, j) + precedingDefSteps, err := getDependsOnSteps(j, stepD.DependsOn()) if err != nil { return nil, err } - // if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet. - if len(precedingDefSteps) == 0 { - precedingDefSteps = append(precedingDefSteps, j.getRootStep()) - stepD.executionOptions.DependOn = append(stepD.executionOptions.DependOn, j.getRootStep().GetName()) - } - stepD.instanceCreator = func(ctx context.Context, ji JobInstanceMeta) StepInstanceMeta { // TODO: error is ignored here precedingInstances, precedingTasks, _ := getDependsOnStepInstances(stepD, ji) @@ -147,39 +138,40 @@ func StepAfterBoth[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[ parentStepInstance1 := getStrongTypedStepInstance(parentStep1, ji) parentStepInstance2 := getStrongTypedStepInstance(parentStep2, ji) stepInstance := newStepInstance[ST](stepD, ji) + // here AfterBoth may not invoke instrumentedStepAfterBoth at all, if parentStep1 or parentStep2 returns error. stepInstance.task = asynctask.AfterBoth(ctx, parentStepInstance1.task, parentStepInstance2.task, instrumentedStepAfterBoth(stepInstance, precedingTasks, stepFuncWithPanicHandling)) ji.addStepInstance(stepInstance, precedingInstances...) return stepInstance } - j.addStep(stepD, precedingDefSteps...) + if err := j.addStep(stepD, precedingDefSteps...); err != nil { + return nil, err + } return stepD, nil } // AddStepWithStaticFunc is same as AddStep, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances) -func AddStepWithStaticFunc[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, stepFunc asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - return AddStep(bCtx, j, stepName, func(j *JT) asynctask.AsyncFunc[ST] { return stepFunc }, optionDecorators...) +func AddStepWithStaticFunc[JT, ST any](j *JobDefinition[JT], stepName string, stepFunc asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + return AddStep(j, stepName, func(j *JT) asynctask.AsyncFunc[ST] { return stepFunc }, optionDecorators...) } // StepAfterWithStaticFunc is same as StepAfter, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances) -func StepAfterWithStaticFunc[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepFunc asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - return StepAfter(bCtx, j, stepName, parentStep, func(j *JT) asynctask.ContinueFunc[PT, ST] { return stepFunc }, optionDecorators...) +func StepAfterWithStaticFunc[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepFunc asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + return StepAfter(j, stepName, parentStep, func(j *JT) asynctask.ContinueFunc[PT, ST] { return stepFunc }, optionDecorators...) } // StepAfterBothWithStaticFunc is same as StepAfterBoth, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances) -func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepFunc asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - return StepAfterBoth(bCtx, j, stepName, parentStep1, parentStep2, func(j *JT) asynctask.AfterBothFunc[PT1, PT2, ST] { return stepFunc }, optionDecorators...) +func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepFunc asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + return StepAfterBoth(j, stepName, parentStep1, parentStep2, func(j *JT) asynctask.AfterBothFunc[PT1, PT2, ST] { return stepFunc }, optionDecorators...) } func instrumentedAddStep[T any](stepInstance *StepInstance[T], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context) (*T, error)) func(ctx context.Context) (*T, error) { return func(ctx context.Context) (*T, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { - /* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. - we need to be consistent on how to set state of dependent step. - step.executionData.StartTime = time.Now() - step.state = StepStateFailed - step.executionData.Duration = 0 */ - return nil, newStepError(ErrPrecedentStepFailure, stepInstance, err) + /* this only work on ExecuteAfter (have precedent step, but not taking input from it) + asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. + we need to be consistent on before we do any state change or error handling. */ + return nil, err } stepInstance.executionData.StartTime = time.Now() @@ -210,12 +202,10 @@ func instrumentedAddStep[T any](stepInstance *StepInstance[T], precedingTasks [] func instrumentedStepAfter[T, S any](stepInstance *StepInstance[S], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context, t *T) (*S, error)) func(ctx context.Context, t *T) (*S, error) { return func(ctx context.Context, t *T) (*S, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { - /* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. - we need to be consistent on how to set state of dependent step. - step.executionData.StartTime = time.Now() - step.state = StepStateFailed - step.executionData.Duration = 0 */ - return nil, newStepError(ErrPrecedentStepFailure, stepInstance, err) + /* this only work on ExecuteAfter (have precedent step, but not taking input from it) + asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. + we need to be consistent on before we do any state change or error handling. */ + return nil, err } stepInstance.executionData.StartTime = time.Now() @@ -247,12 +237,10 @@ func instrumentedStepAfterBoth[T, S, R any](stepInstance *StepInstance[R], prece return func(ctx context.Context, t *T, s *S) (*R, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { - /* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. - we need to be consistent on how to set state of dependent step. - step.executionData.StartTime = time.Now() - step.state = StepStateFailed - step.executionData.Duration = 0 */ - return nil, newStepError(ErrPrecedentStepFailure, stepInstance, err) + /* this only work on ExecuteAfter (have precedent step, but not taking input from it) + asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. + we need to be consistent on before we do any state change or error handling. */ + return nil, err } stepInstance.executionData.StartTime = time.Now() @@ -280,13 +268,25 @@ func instrumentedStepAfterBoth[T, S, R any](stepInstance *StepInstance[R], prece } } -func getDependsOnSteps(step StepDefinitionMeta, j JobDefinitionMeta) ([]StepDefinitionMeta, error) { +func addStepPreCheck(j JobDefinitionMeta, stepName string) error { + if j.Sealed() { + return ErrAddStepInSealedJob.WithMessage(fmt.Sprintf(MsgAddStepInSealedJob, stepName)) + } + + if _, ok := j.GetStep(stepName); ok { + return ErrAddExistingStep.WithMessage(fmt.Sprintf(MsgAddExistingStep, stepName)) + } + + return nil +} + +func getDependsOnSteps(j JobDefinitionMeta, dependsOnSteps []string) ([]StepDefinitionMeta, error) { var precedingDefSteps []StepDefinitionMeta - for _, depStepName := range step.DependsOn() { + for _, depStepName := range dependsOnSteps { if depStep, ok := j.GetStep(depStepName); ok { precedingDefSteps = append(precedingDefSteps, depStep) } else { - return nil, fmt.Errorf("step [%s] not found", depStepName) + return nil, ErrRefStepNotInJob.WithMessage(fmt.Sprintf(MsgRefStepNotInJob, depStepName)) } } @@ -301,7 +301,7 @@ func getDependsOnStepInstances(stepD StepDefinitionMeta, ji JobInstanceMeta) ([] precedingInstances = append(precedingInstances, depStep) precedingTasks = append(precedingTasks, depStep.Waitable()) } else { - return nil, nil, fmt.Errorf("runtime step [%s] not found", depStepName) + return nil, nil, ErrRuntimeStepNotFound.WithMessage(fmt.Sprintf(MsgRuntimeStepNotFound, depStepName)) } } diff --git a/step_builder_test.go b/step_builder_test.go new file mode 100644 index 0000000..34ab269 --- /dev/null +++ b/step_builder_test.go @@ -0,0 +1,75 @@ +package asyncjob_test + +import ( + "testing" + + "github.com/Azure/go-asyncjob" + "github.com/stretchr/testify/assert" +) + +func TestDefinitionRendering(t *testing.T) { + t.Parallel() + + renderGraph(t, SqlSummaryAsyncJobDefinition) +} + +func TestDefinitionBuilder(t *testing.T) { + t.Parallel() + + job := asyncjob.NewJobDefinition[SqlSummaryJobLib]("sqlSummaryJob") + notExistingTask := &asyncjob.StepDefinition[any]{} + + _, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.ExecuteAfter(notExistingTask), asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "RefStepNotInJob: trying to reference to step \"\", but it is not registered in job") + + connTsk, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + + _, err = asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddExistingStep: trying to add step \"GetConnection\" to job definition, but it already exists") + + table1ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient1", connTsk, tableClient1StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + + _, err = asyncjob.StepAfter(job, "GetTableClient1", connTsk, tableClient1StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddExistingStep: trying to add step \"GetTableClient1\" to job definition, but it already exists") + + table2ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient2", connTsk, tableClient2StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + + _, err = asyncjob.StepAfter(job, "QueryTable1", table1ClientTsk, queryTable1StepFunc, asyncjob.ExecuteAfter(notExistingTask), asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "RefStepNotInJob: trying to reference to step \"\", but it is not registered in job") + + query1Task, err := asyncjob.StepAfter(job, "QueryTable1", table1ClientTsk, queryTable1StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + query2Task, err := asyncjob.StepAfter(job, "QueryTable2", table2ClientTsk, queryTable2StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + + _, err = asyncjob.StepAfterBoth(job, "Summarize", query1Task, query2Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + + _, err = asyncjob.StepAfterBoth(job, "Summarize", query1Task, query2Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddExistingStep: trying to add step \"Summarize\" to job definition, but it already exists") + + _, err = asyncjob.StepAfterBoth(job, "Summarize1", query1Task, query1Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "DuplicateInputParentStep: at least 2 input parentSteps are same") + + query3Task := &asyncjob.StepDefinition[SqlQueryResult]{} + _, err = asyncjob.StepAfterBoth(job, "Summarize2", query1Task, query3Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "RefStepNotInJob: trying to reference to step \"\", but it is not registered in job") + + assert.False(t, job.Sealed()) + job.Seal() + assert.True(t, job.Sealed()) + job.Seal() + assert.True(t, job.Sealed()) + + _, err = asyncjob.AddStep(job, "GetConnectionAgain", connectionStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddStepInSealedJob: trying to add step \"GetConnectionAgain\" to a sealed job definition") + + _, err = asyncjob.StepAfter(job, "QueryTable1Again", table1ClientTsk, queryTable1StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddStepInSealedJob: trying to add step \"QueryTable1Again\" to a sealed job definition") + + _, err = asyncjob.StepAfterBoth(job, "SummarizeAgain", query1Task, query2Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddStepInSealedJob: trying to add step \"SummarizeAgain\" to a sealed job definition") +} diff --git a/util_test.go b/util_test.go index a56b0b2..d3dd993 100644 --- a/util_test.go +++ b/util_test.go @@ -18,7 +18,7 @@ var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[SqlSummaryJob func init() { var err error - SqlSummaryAsyncJobDefinition, err = BuildJobWithResult(context.Background(), map[string]asyncjob.RetryPolicy{}) + SqlSummaryAsyncJobDefinition, err = BuildJobWithResult(map[string]asyncjob.RetryPolicy{}) if err != nil { panic(err) } @@ -108,53 +108,53 @@ func emailNotificationStepFunc(sql *SqlSummaryJobLib) asynctask.AsyncFunc[interf }) } -func BuildJob(bCtx context.Context, retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinition[SqlSummaryJobLib], error) { +func BuildJob(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinition[SqlSummaryJobLib], error) { job := asyncjob.NewJobDefinition[SqlSummaryJobLib]("sqlSummaryJob") - connTsk, err := asyncjob.AddStep(bCtx, job, "GetConnection", connectionStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + connTsk, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.WithRetry(retryPolicies["GetConnection"]), asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step GetConnection: %w", err) } - checkAuthTask, err := asyncjob.AddStep(bCtx, job, "CheckAuth", checkAuthStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + checkAuthTask, err := asyncjob.AddStep(job, "CheckAuth", checkAuthStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step CheckAuth: %w", err) } - table1ClientTsk, err := asyncjob.StepAfter(bCtx, job, "GetTableClient1", connTsk, tableClient1StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + table1ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient1", connTsk, tableClient1StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step GetTableClient1: %w", err) } - qery1ResultTsk, err := asyncjob.StepAfter(bCtx, job, "QueryTable1", table1ClientTsk, queryTable1StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable1"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext)) + qery1ResultTsk, err := asyncjob.StepAfter(job, "QueryTable1", table1ClientTsk, queryTable1StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable1"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step QueryTable1: %w", err) } - table2ClientTsk, err := asyncjob.StepAfter(bCtx, job, "GetTableClient2", connTsk, tableClient2StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + table2ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient2", connTsk, tableClient2StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step GetTableClient2: %w", err) } - qery2ResultTsk, err := asyncjob.StepAfter(bCtx, job, "QueryTable2", table2ClientTsk, queryTable2StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable2"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext)) + qery2ResultTsk, err := asyncjob.StepAfter(job, "QueryTable2", table2ClientTsk, queryTable2StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable2"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step QueryTable2: %w", err) } - summaryTsk, err := asyncjob.StepAfterBoth(bCtx, job, "Summarize", qery1ResultTsk, qery2ResultTsk, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + summaryTsk, err := asyncjob.StepAfterBoth(job, "Summarize", qery1ResultTsk, qery2ResultTsk, summarizeQueryResultStepFunc, asyncjob.WithRetry(retryPolicies["Summarize"]), asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step Summarize: %w", err) } - _, err = asyncjob.AddStep(bCtx, job, "EmailNotification", emailNotificationStepFunc, asyncjob.ExecuteAfter(summaryTsk), asyncjob.WithContextEnrichment(EnrichContext)) + _, err = asyncjob.AddStep(job, "EmailNotification", emailNotificationStepFunc, asyncjob.ExecuteAfter(summaryTsk), asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step EmailNotification: %w", err) } return job, nil } -func BuildJobWithResult(bCtx context.Context, retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult], error) { - job, err := BuildJob(bCtx, retryPolicies) +func BuildJobWithResult(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult], error) { + job, err := BuildJob(retryPolicies) if err != nil { return nil, err }