diff --git a/error.go b/error.go index fd92ae9..738d410 100644 --- a/error.go +++ b/error.go @@ -8,16 +8,46 @@ import ( type JobErrorCode string const ( - ErrPrecedentStepFailure JobErrorCode = "precedent step failed" - ErrStepFailed JobErrorCode = "step failed" - ErrRefStepNotInJob JobErrorCode = "trying to reference to a step not registered in job" - ErrAddStepInSealedJob JobErrorCode = "trying to add step to a sealed job definition" + ErrPrecedentStepFailed JobErrorCode = "PrecedentStepFailed" + ErrStepFailed JobErrorCode = "StepFailed" + + ErrRefStepNotInJob JobErrorCode = "RefStepNotInJob" + MsgRefStepNotInJob string = "trying to reference to step %q, but it is not registered in job" + + ErrAddStepInSealedJob JobErrorCode = "AddStepInSealedJob" + MsgAddStepInSealedJob string = "trying to add step %q to a sealed job definition" + + ErrAddExistingStep JobErrorCode = "AddExistingStep" + MsgAddExistingStep string = "trying to add step %q to job definition, but it already exists" + + ErrDuplicateInputParentStep JobErrorCode = "DuplicateInputParentStep" + MsgDuplicateInputParentStep string = "at least 2 input parentSteps are same" + + ErrRuntimeStepNotFound JobErrorCode = "RuntimeStepNotFound" + MsgRuntimeStepNotFound string = "runtime step %q not found, must be a bug in asyncjob" ) func (code JobErrorCode) Error() string { return string(code) } +func (code JobErrorCode) WithMessage(msg string) *MessageError { + return &MessageError{Code: code, Message: msg} +} + +type MessageError struct { + Code JobErrorCode + Message string +} + +func (me *MessageError) Error() string { + return me.Code.Error() + ": " + me.Message +} + +func (me *MessageError) Unwrap() error { + return me.Code +} + type JobError struct { Code JobErrorCode StepError error @@ -48,7 +78,7 @@ func (je *JobError) RootCause() error { } // precendent step failure, track to the root - if je.Code == ErrPrecedentStepFailure { + if je.Code == ErrPrecedentStepFailed { precedentStepErr := &JobError{} if !errors.As(je.StepError, &precedentStepErr) { return je.StepError diff --git a/go.mod b/go.mod index c319393..896b58b 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index b1aea8a..5236496 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/Azure/go-asynctask v1.3.1/go.mod h1:S1Ee5SVnt6ZUJ84brodPiHvoNfN2wgDyV github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/job_definition.go b/job_definition.go new file mode 100644 index 0000000..d5a28ff --- /dev/null +++ b/job_definition.go @@ -0,0 +1,111 @@ +package asyncjob + +import ( + "context" + "errors" + "fmt" + + "github.com/Azure/go-asyncjob/graph" +) + +// Interface for a job definition +type JobDefinitionMeta interface { + GetName() string + GetStep(stepName string) (StepDefinitionMeta, bool) // TODO: switch bool to error + Seal() + Sealed() bool + Visualize() (string, error) + + // not exposing for now. + addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error + getRootStep() StepDefinitionMeta +} + +// JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG). +type JobDefinition[T any] struct { + name string + + sealed bool + steps map[string]StepDefinitionMeta + stepsDag *graph.Graph[StepDefinitionMeta] + rootStep *StepDefinition[T] +} + +// Create new JobDefinition +// it is suggest to build jobDefinition statically on process start, and reuse it for each job instance. +func NewJobDefinition[T any](name string) *JobDefinition[T] { + j := &JobDefinition[T]{ + name: name, + steps: make(map[string]StepDefinitionMeta), + stepsDag: graph.NewGraph[StepDefinitionMeta](connectStepDefinition), + } + + rootStep := newStepDefinition[T](name, stepTypeRoot) + j.rootStep = rootStep + + j.steps[j.rootStep.GetName()] = j.rootStep + j.stepsDag.AddNode(j.rootStep) + + return j +} + +// Start execution of the job definition. +// this will create and return new instance of the job +// caller will then be able to wait for the job instance +func (jd *JobDefinition[T]) Start(ctx context.Context, input *T, jobOptions ...JobOptionPreparer) *JobInstance[T] { + if !jd.Sealed() { + jd.Seal() + } + + ji := newJobInstance(jd, input, jobOptions...) + ji.start(ctx) + + return ji +} + +func (jd *JobDefinition[T]) getRootStep() StepDefinitionMeta { + return jd.rootStep +} + +func (jd *JobDefinition[T]) GetName() string { + return jd.name +} + +func (jd *JobDefinition[T]) Seal() { + if jd.sealed { + return + } + jd.sealed = true +} + +func (jd *JobDefinition[T]) Sealed() bool { + return jd.sealed +} + +// GetStep returns the stepDefinition by name +func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool) { + stepMeta, ok := jd.steps[stepName] + return stepMeta, ok +} + +// AddStep adds a step to the job definition, with optional preceding steps +func (jd *JobDefinition[T]) addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error { + jd.steps[step.GetName()] = step + jd.stepsDag.AddNode(step) + for _, precedingStep := range precedingSteps { + if err := jd.stepsDag.Connect(precedingStep, step); err != nil { + if errors.Is(err, graph.ErrConnectNotExistingNode) { + return ErrRefStepNotInJob.WithMessage(fmt.Sprintf("referenced step %s not found", precedingStep.GetName())) + } + + return err + } + } + + return nil +} + +// Visualize the job definition in graphviz dot format +func (jd *JobDefinition[T]) Visualize() (string, error) { + return jd.stepsDag.ToDotGraph() +} diff --git a/job.go b/job_instance.go similarity index 58% rename from job.go rename to job_instance.go index 05883a5..f920478 100644 --- a/job.go +++ b/job_instance.go @@ -7,111 +7,18 @@ import ( "github.com/Azure/go-asyncjob/graph" "github.com/Azure/go-asynctask" + "github.com/google/uuid" ) -// Interface for a job definition -type JobDefinitionMeta interface { - GetName() string - GetStep(stepName string) (StepDefinitionMeta, bool) // TODO: switch bool to error - Seal() - Sealed() bool - - // not exposing for now. - addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) - getRootStep() StepDefinitionMeta -} - -// JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG). -type JobDefinition[T any] struct { - name string - - sealed bool - steps map[string]StepDefinitionMeta - stepsDag *graph.Graph[StepDefinitionMeta] - rootStep *StepDefinition[T] -} - -// Create new JobDefinition -// it is suggest to build jobDefinition statically on process start, and reuse it for each job instance. -func NewJobDefinition[T any](name string) *JobDefinition[T] { - j := &JobDefinition[T]{ - name: name, - steps: make(map[string]StepDefinitionMeta), - stepsDag: graph.NewGraph[StepDefinitionMeta](connectStepDefinition), - } - - rootStep := newStepDefinition[T](name, stepTypeRoot) - j.rootStep = rootStep - - j.steps[j.rootStep.GetName()] = j.rootStep - j.stepsDag.AddNode(j.rootStep) - - return j -} - -// Start execution of the job definition. -// this will create and return new instance of the job -// caller will then be able to wait for the job instance -func (jd *JobDefinition[T]) Start(ctx context.Context, input *T, jobOptions ...JobOptionPreparer) *JobInstance[T] { - if !jd.Sealed() { - jd.Seal() - } - - ji := newJobInstance(jd, input, jobOptions...) - ji.start(ctx) - - return ji -} - -func (jd *JobDefinition[T]) getRootStep() StepDefinitionMeta { - return jd.rootStep -} - -func (jd *JobDefinition[T]) GetName() string { - return jd.name -} - -func (jd *JobDefinition[T]) Seal() { - if jd.sealed { - return - } - jd.sealed = true -} - -func (jd *JobDefinition[T]) Sealed() bool { - return jd.sealed -} - -// GetStep returns the stepDefinition by name -func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool) { - stepMeta, ok := jd.steps[stepName] - return stepMeta, ok -} - -// AddStep adds a step to the job definition, with optional preceding steps -func (jd *JobDefinition[T]) addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) { - jd.steps[step.GetName()] = step - jd.stepsDag.AddNode(step) - for _, precedingStep := range precedingSteps { - jd.stepsDag.Connect(precedingStep, step) - } -} - -// Visualize the job definition in graphviz dot format -func (jd *JobDefinition[T]) Visualize() (string, error) { - return jd.stepsDag.ToDotGraph() -} - type JobInstanceMeta interface { + GetJobInstanceId() string GetJobDefinition() JobDefinitionMeta GetStepInstance(stepName string) (StepInstanceMeta, bool) Wait(context.Context) error + Visualize() (string, error) // not exposing for now addStepInstance(step StepInstanceMeta, precedingSteps ...StepInstanceMeta) - - // future considering: - // - return result of given step } type JobExecutionOptions struct { @@ -159,6 +66,10 @@ func newJobInstance[T any](jd *JobDefinition[T], input *T, jobInstanceOptions .. ji.jobOptions = decorator(ji.jobOptions) } + if ji.jobOptions.Id == "" { + ji.jobOptions.Id = uuid.New().String() + } + return ji } diff --git a/job_result.go b/job_result.go index fca0cec..c74f792 100644 --- a/job_result.go +++ b/job_result.go @@ -26,8 +26,8 @@ type JobInstanceWithResult[Tin, Tout any] struct { resultStep *StepInstance[Tout] } -func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input *Tin) *JobInstanceWithResult[Tin, Tout] { - ji := jd.JobDefinition.Start(ctx, input) +func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input *Tin, jobOptions ...JobOptionPreparer) *JobInstanceWithResult[Tin, Tout] { + ji := jd.JobDefinition.Start(ctx, input, jobOptions...) return &JobInstanceWithResult[Tin, Tout]{ JobInstance: ji, diff --git a/job_result_test.go b/job_result_test.go deleted file mode 100644 index a79f558..0000000 --- a/job_result_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package asyncjob_test - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestSimpleJobWithResult(t *testing.T) { - t.Parallel() - - jobInstance := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLib{ - Params: &SqlSummaryJobParameters{ - ServerName: "server2", - Table1: "table3", - Query1: "query3", - Table2: "table4", - Query2: "query4", - }, - }) - jobErr := jobInstance.Wait(context.Background()) - assert.NoError(t, jobErr) - renderGraph(t, jobInstance) - - jobResult, jobErr := jobInstance.Result(context.Background()) - assert.NoError(t, jobErr) - assert.NotNil(t, jobResult) -} diff --git a/job_test.go b/job_test.go index 0d6fbd5..3988d0a 100644 --- a/job_test.go +++ b/job_test.go @@ -14,7 +14,7 @@ import ( func TestSimpleJob(t *testing.T) { t.Parallel() - jobInstance := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLib{ + jobInstance1 := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLib{ Params: &SqlSummaryJobParameters{ ServerName: "server1", Table1: "table1", @@ -22,10 +22,7 @@ func TestSimpleJob(t *testing.T) { Table2: "table2", Query2: "query2", }, - }) - jobErr := jobInstance.Wait(context.Background()) - assert.NoError(t, jobErr) - renderGraph(t, jobInstance) + }, asyncjob.WithJobId("jobInstance1")) jobInstance2 := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLib{ Params: &SqlSummaryJobParameters{ @@ -35,10 +32,47 @@ func TestSimpleJob(t *testing.T) { Table2: "table4", Query2: "query4", }, - }) + }, asyncjob.WithJobId("jobInstance2")) + + jobInstance3 := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLib{ + Params: &SqlSummaryJobParameters{ + ServerName: "server3", + Table1: "table5", + Query1: "query5", + Table2: "table6", + Query2: "query6", + }, + }, asyncjob.WithSequentialExecution()) + + jobErr := jobInstance1.Wait(context.Background()) + assert.NoError(t, jobErr) + renderGraph(t, jobInstance1) + jobErr = jobInstance2.Wait(context.Background()) assert.NoError(t, jobErr) renderGraph(t, jobInstance2) + + jobErr = jobInstance3.Wait(context.Background()) + assert.NoError(t, jobErr) + renderGraph(t, jobInstance3) + + jobResult, jobErr := jobInstance1.Result(context.Background()) + assert.NoError(t, jobErr) + assert.Equal(t, jobResult.QueryResult1["serverName"], "server1") + assert.Equal(t, jobResult.QueryResult1["tableName"], "table1") + assert.Equal(t, jobResult.QueryResult1["queryName"], "query1") + assert.Equal(t, jobResult.QueryResult2["serverName"], "server1") + assert.Equal(t, jobResult.QueryResult2["tableName"], "table2") + assert.Equal(t, jobResult.QueryResult2["queryName"], "query2") + + jobResult3, jobErr := jobInstance3.Result(context.Background()) + assert.NoError(t, jobErr) + assert.Equal(t, jobResult3.QueryResult1["serverName"], "server3") + assert.Equal(t, jobResult3.QueryResult1["tableName"], "table5") + assert.Equal(t, jobResult3.QueryResult1["queryName"], "query5") + assert.Equal(t, jobResult3.QueryResult2["serverName"], "server3") + assert.Equal(t, jobResult3.QueryResult2["tableName"], "table6") + assert.Equal(t, jobResult3.QueryResult2["queryName"], "query6") } func TestJobError(t *testing.T) { @@ -95,11 +129,23 @@ func TestJobPanic(t *testing.T) { func TestJobStepRetry(t *testing.T) { t.Parallel() - jd, err := BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{"QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3)}) + jd, err := BuildJob(map[string]asyncjob.RetryPolicy{ + "GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3), + "QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3), + "Summarize": newLinearRetryPolicy(time.Millisecond*3, 3), + }) assert.NoError(t, err) + invalidStep := &asyncjob.StepDefinition[string]{} + _, err = asyncjob.JobWithResult(jd, invalidStep) + assert.Error(t, err) + + // newly created job definition should not be sealed + assert.False(t, jd.Sealed()) + ctx := context.WithValue(context.Background(), testLoggingContextKey, t) - ctx = context.WithValue(ctx, "error-injection.server1.table1.query1", fmt.Errorf("query exeeded memory limit")) + + // gain code coverage on retry policy in StepAfter jobInstance := jd.Start(ctx, &SqlSummaryJobLib{ Params: &SqlSummaryJobParameters{ ServerName: "server1", @@ -113,6 +159,9 @@ func TestJobStepRetry(t *testing.T) { }, }) + // once Start() is triggered, job definition should be sealed + assert.True(t, jd.Sealed()) + err = jobInstance.Wait(context.Background()) assert.Error(t, err) @@ -120,11 +169,48 @@ func TestJobStepRetry(t *testing.T) { errors.As(err, &jobErr) assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed) assert.Equal(t, "QueryTable1", jobErr.StepInstance.GetName()) - exeData := jobErr.StepInstance.ExecutionData() assert.Equal(t, exeData.Retried.Count, 3) - renderGraph(t, jobInstance) + // gain code coverage on retry policy in AddStep + jobInstance1 := jd.Start(ctx, &SqlSummaryJobLib{ + Params: &SqlSummaryJobParameters{ + ServerName: "server1", + Table1: "table1", + Query1: "query1", + Table2: "table2", + Query2: "query2", + ErrorInjection: map[string]func() error{ + "GetConnection": func() error { return fmt.Errorf("dial 1.2.3.4 timedout") }, + }, + }, + }) + err = jobInstance1.Wait(context.Background()) + assert.Error(t, err) + jobErr = &asyncjob.JobError{} + errors.As(err, &jobErr) + assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed) + assert.Equal(t, "GetConnection", jobErr.StepInstance.GetName()) + + // gain code coverage on retry policy in AfterBoth + jobInstance2 := jd.Start(ctx, &SqlSummaryJobLib{ + Params: &SqlSummaryJobParameters{ + ServerName: "server1", + Table1: "table1", + Query1: "query1", + Table2: "table2", + Query2: "query2", + ErrorInjection: map[string]func() error{ + "SummarizeQueryResult": func() error { return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") }, + }, + }, + }) + err = jobInstance2.Wait(context.Background()) + assert.Error(t, err) + jobErr = &asyncjob.JobError{} + errors.As(err, &jobErr) + assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed) + assert.Equal(t, "Summarize", jobErr.StepInstance.GetName()) } func renderGraph(t *testing.T, jb GraphRender) { diff --git a/step_builder.go b/step_builder.go index 1d53d9d..223b216 100644 --- a/step_builder.go +++ b/step_builder.go @@ -10,13 +10,13 @@ import ( ) // AddStep adds a step to the job definition. -func AddStep[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, stepFuncCreator func(input *JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - if j.Sealed() { - return nil, ErrAddStepInSealedJob +func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator func(input *JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + if err := addStepPreCheck(j, stepName); err != nil { + return nil, err } stepD := newStepDefinition[ST](stepName, stepTypeTask, optionDecorators...) - precedingDefSteps, err := getDependsOnSteps(stepD, j) + precedingDefSteps, err := getDependsOnSteps(j, stepD.DependsOn()) if err != nil { return nil, err } @@ -51,23 +51,20 @@ func AddStep[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName st return stepInstance } - j.addStep(stepD, precedingDefSteps...) + if err := j.addStep(stepD, precedingDefSteps...); err != nil { + return nil, err + } return stepD, nil } // StepAfter add a step after a preceding step, also take input from that preceding step -func StepAfter[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input *JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - if j.Sealed() { - return nil, ErrAddStepInSealedJob - } - - // check parentStepT is in this job - if get, ok := j.GetStep(parentStep.GetName()); !ok || get != parentStep { - return nil, fmt.Errorf("step [%s] not found in job", parentStep.GetName()) +func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input *JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + if err := addStepPreCheck(j, stepName); err != nil { + return nil, err } stepD := newStepDefinition[ST](stepName, stepTypeTask, append(optionDecorators, ExecuteAfter(parentStep))...) - precedingDefSteps, err := getDependsOnSteps(stepD, j) + precedingDefSteps, err := getDependsOnSteps(j, stepD.DependsOn()) if err != nil { return nil, err } @@ -92,41 +89,35 @@ func StepAfter[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepN parentStepInstance := getStrongTypedStepInstance(parentStep, ji) stepInstance := newStepInstance[ST](stepD, ji) + // here ContinueWith may not invoke instrumentedStepAfterBoth at all, if parentStep1 or parentStep2 returns error. stepInstance.task = asynctask.ContinueWith(ctx, parentStepInstance.task, instrumentedStepAfter(stepInstance, precedingTasks, stepFuncWithPanicHandling)) ji.addStepInstance(stepInstance, precedingInstances...) return stepInstance } - j.addStep(stepD, precedingDefSteps...) + if err := j.addStep(stepD, precedingDefSteps...); err != nil { + return nil, err + } return stepD, nil } // StepAfterBoth add a step after both preceding steps, also take input from both preceding steps -func StepAfterBoth[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input *JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - if j.Sealed() { - return nil, ErrAddStepInSealedJob +func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input *JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + if err := addStepPreCheck(j, stepName); err != nil { + return nil, err } - // check parentStepT is in this job - if get, ok := j.GetStep(parentStep1.GetName()); !ok || get != parentStep1 { - return nil, fmt.Errorf("step [%s] not found in job", parentStep1.GetName()) - } - if get, ok := j.GetStep(parentStep2.GetName()); !ok || get != parentStep2 { - return nil, fmt.Errorf("step [%s] not found in job", parentStep2.GetName()) + // compiler not allow me to compare parentStep1 and parentStep2 directly with different genericType + if parentStep1.GetName() == parentStep2.GetName() { + return nil, ErrDuplicateInputParentStep.WithMessage(MsgDuplicateInputParentStep) } stepD := newStepDefinition[ST](stepName, stepTypeTask, append(optionDecorators, ExecuteAfter(parentStep1), ExecuteAfter(parentStep2))...) - precedingDefSteps, err := getDependsOnSteps(stepD, j) + precedingDefSteps, err := getDependsOnSteps(j, stepD.DependsOn()) if err != nil { return nil, err } - // if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet. - if len(precedingDefSteps) == 0 { - precedingDefSteps = append(precedingDefSteps, j.getRootStep()) - stepD.executionOptions.DependOn = append(stepD.executionOptions.DependOn, j.getRootStep().GetName()) - } - stepD.instanceCreator = func(ctx context.Context, ji JobInstanceMeta) StepInstanceMeta { // TODO: error is ignored here precedingInstances, precedingTasks, _ := getDependsOnStepInstances(stepD, ji) @@ -147,39 +138,40 @@ func StepAfterBoth[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[ parentStepInstance1 := getStrongTypedStepInstance(parentStep1, ji) parentStepInstance2 := getStrongTypedStepInstance(parentStep2, ji) stepInstance := newStepInstance[ST](stepD, ji) + // here AfterBoth may not invoke instrumentedStepAfterBoth at all, if parentStep1 or parentStep2 returns error. stepInstance.task = asynctask.AfterBoth(ctx, parentStepInstance1.task, parentStepInstance2.task, instrumentedStepAfterBoth(stepInstance, precedingTasks, stepFuncWithPanicHandling)) ji.addStepInstance(stepInstance, precedingInstances...) return stepInstance } - j.addStep(stepD, precedingDefSteps...) + if err := j.addStep(stepD, precedingDefSteps...); err != nil { + return nil, err + } return stepD, nil } // AddStepWithStaticFunc is same as AddStep, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances) -func AddStepWithStaticFunc[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, stepFunc asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - return AddStep(bCtx, j, stepName, func(j *JT) asynctask.AsyncFunc[ST] { return stepFunc }, optionDecorators...) +func AddStepWithStaticFunc[JT, ST any](j *JobDefinition[JT], stepName string, stepFunc asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + return AddStep(j, stepName, func(j *JT) asynctask.AsyncFunc[ST] { return stepFunc }, optionDecorators...) } // StepAfterWithStaticFunc is same as StepAfter, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances) -func StepAfterWithStaticFunc[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepFunc asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - return StepAfter(bCtx, j, stepName, parentStep, func(j *JT) asynctask.ContinueFunc[PT, ST] { return stepFunc }, optionDecorators...) +func StepAfterWithStaticFunc[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepFunc asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + return StepAfter(j, stepName, parentStep, func(j *JT) asynctask.ContinueFunc[PT, ST] { return stepFunc }, optionDecorators...) } // StepAfterBothWithStaticFunc is same as StepAfterBoth, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances) -func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepFunc asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - return StepAfterBoth(bCtx, j, stepName, parentStep1, parentStep2, func(j *JT) asynctask.AfterBothFunc[PT1, PT2, ST] { return stepFunc }, optionDecorators...) +func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepFunc asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { + return StepAfterBoth(j, stepName, parentStep1, parentStep2, func(j *JT) asynctask.AfterBothFunc[PT1, PT2, ST] { return stepFunc }, optionDecorators...) } func instrumentedAddStep[T any](stepInstance *StepInstance[T], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context) (*T, error)) func(ctx context.Context) (*T, error) { return func(ctx context.Context) (*T, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { - /* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. - we need to be consistent on how to set state of dependent step. - step.executionData.StartTime = time.Now() - step.state = StepStateFailed - step.executionData.Duration = 0 */ - return nil, newStepError(ErrPrecedentStepFailure, stepInstance, err) + /* this only work on ExecuteAfter (have precedent step, but not taking input from it) + asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. + we need to be consistent on before we do any state change or error handling. */ + return nil, err } stepInstance.executionData.StartTime = time.Now() @@ -210,12 +202,10 @@ func instrumentedAddStep[T any](stepInstance *StepInstance[T], precedingTasks [] func instrumentedStepAfter[T, S any](stepInstance *StepInstance[S], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context, t *T) (*S, error)) func(ctx context.Context, t *T) (*S, error) { return func(ctx context.Context, t *T) (*S, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { - /* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. - we need to be consistent on how to set state of dependent step. - step.executionData.StartTime = time.Now() - step.state = StepStateFailed - step.executionData.Duration = 0 */ - return nil, newStepError(ErrPrecedentStepFailure, stepInstance, err) + /* this only work on ExecuteAfter (have precedent step, but not taking input from it) + asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. + we need to be consistent on before we do any state change or error handling. */ + return nil, err } stepInstance.executionData.StartTime = time.Now() @@ -247,12 +237,10 @@ func instrumentedStepAfterBoth[T, S, R any](stepInstance *StepInstance[R], prece return func(ctx context.Context, t *T, s *S) (*R, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { - /* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. - we need to be consistent on how to set state of dependent step. - step.executionData.StartTime = time.Now() - step.state = StepStateFailed - step.executionData.Duration = 0 */ - return nil, newStepError(ErrPrecedentStepFailure, stepInstance, err) + /* this only work on ExecuteAfter (have precedent step, but not taking input from it) + asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. + we need to be consistent on before we do any state change or error handling. */ + return nil, err } stepInstance.executionData.StartTime = time.Now() @@ -280,13 +268,25 @@ func instrumentedStepAfterBoth[T, S, R any](stepInstance *StepInstance[R], prece } } -func getDependsOnSteps(step StepDefinitionMeta, j JobDefinitionMeta) ([]StepDefinitionMeta, error) { +func addStepPreCheck(j JobDefinitionMeta, stepName string) error { + if j.Sealed() { + return ErrAddStepInSealedJob.WithMessage(fmt.Sprintf(MsgAddStepInSealedJob, stepName)) + } + + if _, ok := j.GetStep(stepName); ok { + return ErrAddExistingStep.WithMessage(fmt.Sprintf(MsgAddExistingStep, stepName)) + } + + return nil +} + +func getDependsOnSteps(j JobDefinitionMeta, dependsOnSteps []string) ([]StepDefinitionMeta, error) { var precedingDefSteps []StepDefinitionMeta - for _, depStepName := range step.DependsOn() { + for _, depStepName := range dependsOnSteps { if depStep, ok := j.GetStep(depStepName); ok { precedingDefSteps = append(precedingDefSteps, depStep) } else { - return nil, fmt.Errorf("step [%s] not found", depStepName) + return nil, ErrRefStepNotInJob.WithMessage(fmt.Sprintf(MsgRefStepNotInJob, depStepName)) } } @@ -301,7 +301,7 @@ func getDependsOnStepInstances(stepD StepDefinitionMeta, ji JobInstanceMeta) ([] precedingInstances = append(precedingInstances, depStep) precedingTasks = append(precedingTasks, depStep.Waitable()) } else { - return nil, nil, fmt.Errorf("runtime step [%s] not found", depStepName) + return nil, nil, ErrRuntimeStepNotFound.WithMessage(fmt.Sprintf(MsgRuntimeStepNotFound, depStepName)) } } diff --git a/step_builder_test.go b/step_builder_test.go new file mode 100644 index 0000000..34ab269 --- /dev/null +++ b/step_builder_test.go @@ -0,0 +1,75 @@ +package asyncjob_test + +import ( + "testing" + + "github.com/Azure/go-asyncjob" + "github.com/stretchr/testify/assert" +) + +func TestDefinitionRendering(t *testing.T) { + t.Parallel() + + renderGraph(t, SqlSummaryAsyncJobDefinition) +} + +func TestDefinitionBuilder(t *testing.T) { + t.Parallel() + + job := asyncjob.NewJobDefinition[SqlSummaryJobLib]("sqlSummaryJob") + notExistingTask := &asyncjob.StepDefinition[any]{} + + _, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.ExecuteAfter(notExistingTask), asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "RefStepNotInJob: trying to reference to step \"\", but it is not registered in job") + + connTsk, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + + _, err = asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddExistingStep: trying to add step \"GetConnection\" to job definition, but it already exists") + + table1ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient1", connTsk, tableClient1StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + + _, err = asyncjob.StepAfter(job, "GetTableClient1", connTsk, tableClient1StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddExistingStep: trying to add step \"GetTableClient1\" to job definition, but it already exists") + + table2ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient2", connTsk, tableClient2StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + + _, err = asyncjob.StepAfter(job, "QueryTable1", table1ClientTsk, queryTable1StepFunc, asyncjob.ExecuteAfter(notExistingTask), asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "RefStepNotInJob: trying to reference to step \"\", but it is not registered in job") + + query1Task, err := asyncjob.StepAfter(job, "QueryTable1", table1ClientTsk, queryTable1StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + query2Task, err := asyncjob.StepAfter(job, "QueryTable2", table2ClientTsk, queryTable2StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + + _, err = asyncjob.StepAfterBoth(job, "Summarize", query1Task, query2Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.NoError(t, err) + + _, err = asyncjob.StepAfterBoth(job, "Summarize", query1Task, query2Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddExistingStep: trying to add step \"Summarize\" to job definition, but it already exists") + + _, err = asyncjob.StepAfterBoth(job, "Summarize1", query1Task, query1Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "DuplicateInputParentStep: at least 2 input parentSteps are same") + + query3Task := &asyncjob.StepDefinition[SqlQueryResult]{} + _, err = asyncjob.StepAfterBoth(job, "Summarize2", query1Task, query3Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "RefStepNotInJob: trying to reference to step \"\", but it is not registered in job") + + assert.False(t, job.Sealed()) + job.Seal() + assert.True(t, job.Sealed()) + job.Seal() + assert.True(t, job.Sealed()) + + _, err = asyncjob.AddStep(job, "GetConnectionAgain", connectionStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddStepInSealedJob: trying to add step \"GetConnectionAgain\" to a sealed job definition") + + _, err = asyncjob.StepAfter(job, "QueryTable1Again", table1ClientTsk, queryTable1StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddStepInSealedJob: trying to add step \"QueryTable1Again\" to a sealed job definition") + + _, err = asyncjob.StepAfterBoth(job, "SummarizeAgain", query1Task, query2Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + assert.EqualError(t, err, "AddStepInSealedJob: trying to add step \"SummarizeAgain\" to a sealed job definition") +} diff --git a/step_definition.go b/step_definition.go new file mode 100644 index 0000000..ca31c91 --- /dev/null +++ b/step_definition.go @@ -0,0 +1,84 @@ +package asyncjob + +import ( + "context" + + "github.com/Azure/go-asyncjob/graph" +) + +type stepType string + +const stepTypeTask stepType = "task" +const stepTypeRoot stepType = "root" + +// StepDefinitionMeta is the interface for a step definition +type StepDefinitionMeta interface { + + // GetName return name of the step + GetName() string + + // DependsOn return the list of step names that this step depends on + DependsOn() []string + + // DotSpec used for generating graphviz graph + DotSpec() *graph.DotNodeSpec + + // Instantiate a new step instance + createStepInstance(context.Context, JobInstanceMeta) StepInstanceMeta +} + +// StepDefinition defines a step and it's dependencies in a job definition. +type StepDefinition[T any] struct { + name string + stepType stepType + executionOptions *StepExecutionOptions + instanceCreator func(context.Context, JobInstanceMeta) StepInstanceMeta +} + +func newStepDefinition[T any](stepName string, stepType stepType, optionDecorators ...ExecutionOptionPreparer) *StepDefinition[T] { + step := &StepDefinition[T]{ + name: stepName, + executionOptions: &StepExecutionOptions{}, + stepType: stepType, + } + + for _, decorator := range optionDecorators { + step.executionOptions = decorator(step.executionOptions) + } + + return step +} + +func (sd *StepDefinition[T]) GetName() string { + return sd.name +} + +func (sd *StepDefinition[T]) DependsOn() []string { + return sd.executionOptions.DependOn +} + +func (sd *StepDefinition[T]) createStepInstance(ctx context.Context, jobInstance JobInstanceMeta) StepInstanceMeta { + return sd.instanceCreator(ctx, jobInstance) +} + +func (sd *StepDefinition[T]) DotSpec() *graph.DotNodeSpec { + return &graph.DotNodeSpec{ + Name: sd.GetName(), + DisplayName: sd.GetName(), + Shape: "box", + Style: "filled", + FillColor: "gray", + Tooltip: "", + } +} + +func connectStepDefinition(stepFrom, stepTo StepDefinitionMeta) *graph.DotEdgeSpec { + edgeSpec := &graph.DotEdgeSpec{ + FromNodeName: stepFrom.GetName(), + ToNodeName: stepTo.GetName(), + Color: "black", + Style: "bold", + } + + return edgeSpec +} diff --git a/step_exec_options.go b/step_exec_options.go index 3c57e3a..d54a031 100644 --- a/step_exec_options.go +++ b/step_exec_options.go @@ -6,7 +6,6 @@ import ( ) type StepExecutionOptions struct { - Timeout time.Duration ErrorPolicy StepErrorPolicy RetryPolicy RetryPolicy ContextPolicy StepContextPolicy @@ -44,14 +43,6 @@ func WithRetry(retryPolicy RetryPolicy) ExecutionOptionPreparer { } } -// Limit time spend on a step. -func WithTimeout(timeout time.Duration) ExecutionOptionPreparer { - return func(options *StepExecutionOptions) *StepExecutionOptions { - options.Timeout = timeout - return options - } -} - func WithContextEnrichment(contextPolicy StepContextPolicy) ExecutionOptionPreparer { return func(options *StepExecutionOptions) *StepExecutionOptions { options.ContextPolicy = contextPolicy diff --git a/step.go b/step_instance.go similarity index 61% rename from step.go rename to step_instance.go index c89f8c8..d84b8bb 100644 --- a/step.go +++ b/step_instance.go @@ -16,90 +16,6 @@ const StepStateRunning StepState = "running" const StepStateFailed StepState = "failed" const StepStateCompleted StepState = "completed" -type stepType string - -const stepTypeTask stepType = "task" -const stepTypeRoot stepType = "root" - -// StepDefinitionMeta is the interface for a step definition -type StepDefinitionMeta interface { - - // GetName return name of the step - GetName() string - - // DependsOn return the list of step names that this step depends on - DependsOn() []string - - // ExecutionPolicy return the execution policy of the step - ExecutionPolicy() *StepExecutionOptions - - // DotSpec used for generating graphviz graph - DotSpec() *graph.DotNodeSpec - - // Instantiate a new step instance - createStepInstance(context.Context, JobInstanceMeta) StepInstanceMeta -} - -// StepDefinition defines a step and it's dependencies in a job definition. -type StepDefinition[T any] struct { - name string - stepType stepType - executionOptions *StepExecutionOptions - instanceCreator func(context.Context, JobInstanceMeta) StepInstanceMeta -} - -func newStepDefinition[T any](stepName string, stepType stepType, optionDecorators ...ExecutionOptionPreparer) *StepDefinition[T] { - step := &StepDefinition[T]{ - name: stepName, - executionOptions: &StepExecutionOptions{}, - stepType: stepType, - } - - for _, decorator := range optionDecorators { - step.executionOptions = decorator(step.executionOptions) - } - - return step -} - -func (sd *StepDefinition[T]) GetName() string { - return sd.name -} - -func (sd *StepDefinition[T]) DependsOn() []string { - return sd.executionOptions.DependOn -} - -func (sd *StepDefinition[T]) ExecutionPolicy() *StepExecutionOptions { - return sd.executionOptions -} - -func (sd *StepDefinition[T]) createStepInstance(ctx context.Context, jobInstance JobInstanceMeta) StepInstanceMeta { - return sd.instanceCreator(ctx, jobInstance) -} - -func (sd *StepDefinition[T]) DotSpec() *graph.DotNodeSpec { - return &graph.DotNodeSpec{ - Name: sd.GetName(), - DisplayName: sd.GetName(), - Shape: "box", - Style: "filled", - FillColor: "gray", - Tooltip: "", - } -} - -func connectStepDefinition(stepFrom, stepTo StepDefinitionMeta) *graph.DotEdgeSpec { - edgeSpec := &graph.DotEdgeSpec{ - FromNodeName: stepFrom.GetName(), - ToNodeName: stepTo.GetName(), - Color: "black", - Style: "bold", - } - - return edgeSpec -} - // StepInstanceMeta is the interface for a step instance type StepInstanceMeta interface { GetName() string diff --git a/util_test.go b/util_test.go index 50ae090..d3dd993 100644 --- a/util_test.go +++ b/util_test.go @@ -18,7 +18,7 @@ var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[SqlSummaryJob func init() { var err error - SqlSummaryAsyncJobDefinition, err = BuildJobWithResult(context.Background(), map[string]asyncjob.RetryPolicy{}) + SqlSummaryAsyncJobDefinition, err = BuildJobWithResult(map[string]asyncjob.RetryPolicy{}) if err != nil { panic(err) } @@ -60,9 +60,9 @@ func query2ParamStepFunc(sql *SqlSummaryJobLib) asynctask.AsyncFunc[string] { } } -func connectionStepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[string, SqlConnection] { - return func(ctx context.Context, serverName *string) (*SqlConnection, error) { - return sql.GetConnection(ctx, serverName) +func connectionStepFunc(sql *SqlSummaryJobLib) asynctask.AsyncFunc[SqlConnection] { + return func(ctx context.Context) (*SqlConnection, error) { + return sql.GetConnection(ctx, &sql.Params.ServerName) } } @@ -72,15 +72,27 @@ func checkAuthStepFunc(sql *SqlSummaryJobLib) asynctask.AsyncFunc[interface{}] { }) } -func tableClientStepFunc(sql *SqlSummaryJobLib) asynctask.AfterBothFunc[SqlConnection, string, SqlTableClient] { - return func(ctx context.Context, conn *SqlConnection, tableName *string) (*SqlTableClient, error) { - return sql.GetTableClient(ctx, conn, tableName) +func tableClient1StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[SqlConnection, SqlTableClient] { + return func(ctx context.Context, conn *SqlConnection) (*SqlTableClient, error) { + return sql.GetTableClient(ctx, conn, &sql.Params.Table1) } } -func queryTableStepFunc(sql *SqlSummaryJobLib) asynctask.AfterBothFunc[SqlTableClient, string, SqlQueryResult] { - return func(ctx context.Context, tableClient *SqlTableClient, query *string) (*SqlQueryResult, error) { - return sql.ExecuteQuery(ctx, tableClient, query) +func tableClient2StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[SqlConnection, SqlTableClient] { + return func(ctx context.Context, conn *SqlConnection) (*SqlTableClient, error) { + return sql.GetTableClient(ctx, conn, &sql.Params.Table2) + } +} + +func queryTable1StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[SqlTableClient, SqlQueryResult] { + return func(ctx context.Context, tableClient *SqlTableClient) (*SqlQueryResult, error) { + return sql.ExecuteQuery(ctx, tableClient, &sql.Params.Query1) + } +} + +func queryTable2StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[SqlTableClient, SqlQueryResult] { + return func(ctx context.Context, tableClient *SqlTableClient) (*SqlQueryResult, error) { + return sql.ExecuteQuery(ctx, tableClient, &sql.Params.Query2) } } @@ -96,77 +108,53 @@ func emailNotificationStepFunc(sql *SqlSummaryJobLib) asynctask.AsyncFunc[interf }) } -func BuildJob(bCtx context.Context, retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinition[SqlSummaryJobLib], error) { +func BuildJob(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinition[SqlSummaryJobLib], error) { job := asyncjob.NewJobDefinition[SqlSummaryJobLib]("sqlSummaryJob") - serverNameParamTask, err := asyncjob.AddStep(bCtx, job, "ServerNameParam", serverNameStepFunc) - if err != nil { - return nil, fmt.Errorf("error adding step ServerNameParam: %w", err) - } - connTsk, err := asyncjob.StepAfter(bCtx, job, "GetConnection", serverNameParamTask, connectionStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + connTsk, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.WithRetry(retryPolicies["GetConnection"]), asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step GetConnection: %w", err) } - checkAuthTask, err := asyncjob.AddStep(bCtx, job, "CheckAuth", checkAuthStepFunc) + checkAuthTask, err := asyncjob.AddStep(job, "CheckAuth", checkAuthStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step CheckAuth: %w", err) } - table1ParamTsk, err := asyncjob.AddStep(bCtx, job, "Table1Param", table1NameStepFunc) - if err != nil { - return nil, fmt.Errorf("error adding step Table1Param: %w", err) - } - - table1ClientTsk, err := asyncjob.StepAfterBoth(bCtx, job, "GetTableClient1", connTsk, table1ParamTsk, tableClientStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + table1ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient1", connTsk, tableClient1StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step GetTableClient1: %w", err) } - query1ParamTsk, err := asyncjob.AddStep(bCtx, job, "Query1Param", query1ParamStepFunc) - if err != nil { - return nil, fmt.Errorf("error adding step Query1Param: %w", err) - } - - qery1ResultTsk, err := asyncjob.StepAfterBoth(bCtx, job, "QueryTable1", table1ClientTsk, query1ParamTsk, queryTableStepFunc, asyncjob.WithRetry(retryPolicies["QueryTable1"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext)) + qery1ResultTsk, err := asyncjob.StepAfter(job, "QueryTable1", table1ClientTsk, queryTable1StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable1"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step QueryTable1: %w", err) } - table2ParamTsk, err := asyncjob.AddStep(bCtx, job, "Table2NameParam", table2NameStepFunc) - if err != nil { - return nil, fmt.Errorf("error adding step Table2NameParam: %w", err) - } - - table2ClientTsk, err := asyncjob.StepAfterBoth(bCtx, job, "GetTableClient2", connTsk, table2ParamTsk, tableClientStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + table2ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient2", connTsk, tableClient2StepFunc, asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step GetTableClient2: %w", err) } - query2ParamTsk, err := asyncjob.AddStep(bCtx, job, "Query2Param", query2ParamStepFunc) - if err != nil { - return nil, fmt.Errorf("error adding step Query2Param: %w", err) - } - - qery2ResultTsk, err := asyncjob.StepAfterBoth(bCtx, job, "QueryTable2", table2ClientTsk, query2ParamTsk, queryTableStepFunc, asyncjob.WithRetry(retryPolicies["QueryTable2"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext)) + qery2ResultTsk, err := asyncjob.StepAfter(job, "QueryTable2", table2ClientTsk, queryTable2StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable2"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step QueryTable2: %w", err) } - summaryTsk, err := asyncjob.StepAfterBoth(bCtx, job, "Summarize", qery1ResultTsk, qery2ResultTsk, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) + summaryTsk, err := asyncjob.StepAfterBoth(job, "Summarize", qery1ResultTsk, qery2ResultTsk, summarizeQueryResultStepFunc, asyncjob.WithRetry(retryPolicies["Summarize"]), asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step Summarize: %w", err) } - _, err = asyncjob.AddStep(bCtx, job, "EmailNotification", emailNotificationStepFunc, asyncjob.ExecuteAfter(summaryTsk), asyncjob.WithContextEnrichment(EnrichContext)) + _, err = asyncjob.AddStep(job, "EmailNotification", emailNotificationStepFunc, asyncjob.ExecuteAfter(summaryTsk), asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { return nil, fmt.Errorf("error adding step EmailNotification: %w", err) } return job, nil } -func BuildJobWithResult(bCtx context.Context, retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult], error) { - job, err := BuildJob(bCtx, retryPolicies) +func BuildJobWithResult(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult], error) { + job, err := BuildJob(retryPolicies) if err != nil { return nil, err } @@ -295,9 +283,10 @@ func (sql *SqlSummaryJobLib) Logging(ctx context.Context, msg string) { t := tI.(*testing.T) jobName := ctx.Value("asyncjob.jobName") + jobId := ctx.Value("asyncjob.jobId") stepName := ctx.Value("asyncjob.stepName") - t.Logf("[Job: %s, Step: %s] %s", jobName, stepName, msg) + t.Logf("[Job: %s-%s, Step: %s] %s", jobName, jobId, stepName, msg) } else { fmt.Println(msg) @@ -306,6 +295,7 @@ func (sql *SqlSummaryJobLib) Logging(ctx context.Context, msg string) { func EnrichContext(ctx context.Context, instanceMeta asyncjob.StepInstanceMeta) context.Context { ctx = context.WithValue(ctx, "asyncjob.jobName", instanceMeta.GetJobInstance().GetJobDefinition().GetName()) + ctx = context.WithValue(ctx, "asyncjob.jobId", instanceMeta.GetJobInstance().GetJobInstanceId()) ctx = context.WithValue(ctx, "asyncjob.stepName", instanceMeta.GetStepDefinition().GetName()) return ctx }