diff --git a/go.mod b/go.mod index b98611a..8b30fbf 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,10 @@ module github.com/Azure/go-asyncjob -go 1.19 +go 1.20 require ( github.com/Azure/go-asyncjob/graph v0.2.0 - github.com/Azure/go-asynctask v1.4.0 + github.com/Azure/go-asynctask v1.6.0 github.com/google/uuid v1.4.0 github.com/stretchr/testify v1.8.4 ) diff --git a/go.sum b/go.sum index 5d9cf70..750cfc7 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/Azure/go-asyncjob/graph v0.2.0 h1:0GFnQit3+ZUxpc67ogusooa38GSFRPH2e1+h+L/33hc= github.com/Azure/go-asyncjob/graph v0.2.0/go.mod h1:3Z7w9aUBIrDriypH8O+hK0aeqKWKYuKSNxwrDxFy34s= -github.com/Azure/go-asynctask v1.4.0 h1:dJx6RXLqWGXI9jvFkwc30eEQfcvO9wCAyjI08H9kf1A= -github.com/Azure/go-asynctask v1.4.0/go.mod h1:xmdyX2MRd9vCpnglRFMz7D8pMjuz1lNhC5yVmsMHn48= +github.com/Azure/go-asynctask v1.6.0 h1:Njc/K4Q7LmG3Z5UVESiKcnS8Sn9LAZRF8OlQhFjMvq0= +github.com/Azure/go-asynctask v1.6.0/go.mod h1:RLw9j8Ln+K0PBJGo4qOsRsFuGxq4DAZ03nghoBcIqNA= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= diff --git a/job_definition.go b/job_definition.go index ed7adb7..e6d80ad 100644 --- a/job_definition.go +++ b/job_definition.go @@ -32,7 +32,8 @@ type JobDefinition[T any] struct { } // Create new JobDefinition -// it is suggest to build jobDefinition statically on process start, and reuse it for each job instance. +// +// it is suggest to build jobDefinition statically on process start, and reuse it for each job instance. func NewJobDefinition[T any](name string) *JobDefinition[T] { j := &JobDefinition[T]{ name: name, @@ -50,9 +51,10 @@ func NewJobDefinition[T any](name string) *JobDefinition[T] { } // Start execution of the job definition. -// this will create and return new instance of the job -// caller will then be able to wait for the job instance -func (jd *JobDefinition[T]) Start(ctx context.Context, input *T, jobOptions ...JobOptionPreparer) *JobInstance[T] { +// +// this will create and return new instance of the job +// caller will then be able to wait for the job instance +func (jd *JobDefinition[T]) Start(ctx context.Context, input T, jobOptions ...JobOptionPreparer) *JobInstance[T] { if !jd.Sealed() { jd.Seal() } diff --git a/job_instance.go b/job_instance.go index 49dbb26..e9aeb0a 100644 --- a/job_instance.go +++ b/job_instance.go @@ -44,14 +44,14 @@ func WithSequentialExecution() JobOptionPreparer { // JobInstance is the instance of a jobDefinition type JobInstance[T any] struct { jobOptions *JobExecutionOptions - input *T + input T Definition *JobDefinition[T] rootStep *StepInstance[T] steps map[string]StepInstanceMeta stepsDag *graph.Graph[StepInstanceMeta] } -func newJobInstance[T any](jd *JobDefinition[T], input *T, jobInstanceOptions ...JobOptionPreparer) *JobInstance[T] { +func newJobInstance[T any](jd *JobDefinition[T], input T, jobInstanceOptions ...JobOptionPreparer) *JobInstance[T] { ji := &JobInstance[T]{ Definition: jd, input: input, diff --git a/job_result.go b/job_result.go index c74f792..f55efe3 100644 --- a/job_result.go +++ b/job_result.go @@ -26,7 +26,7 @@ type JobInstanceWithResult[Tin, Tout any] struct { resultStep *StepInstance[Tout] } -func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input *Tin, jobOptions ...JobOptionPreparer) *JobInstanceWithResult[Tin, Tout] { +func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input Tin, jobOptions ...JobOptionPreparer) *JobInstanceWithResult[Tin, Tout] { ji := jd.JobDefinition.Start(ctx, input, jobOptions...) return &JobInstanceWithResult[Tin, Tout]{ @@ -36,7 +36,8 @@ func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input * } // Result returns the result of the job from result step. -// it doesn't wait for all steps to finish, you can use Result() after Wait() if desired. -func (ji *JobInstanceWithResult[Tin, Tout]) Result(ctx context.Context) (*Tout, error) { +// +// it doesn't wait for all steps to finish, you can use Result() after Wait() if desired. +func (ji *JobInstanceWithResult[Tin, Tout]) Result(ctx context.Context) (Tout, error) { return ji.resultStep.task.Result(ctx) } diff --git a/retryer.go b/retryer.go index 74e647b..981b008 100644 --- a/retryer.go +++ b/retryer.go @@ -8,14 +8,14 @@ import ( type retryer[T any] struct { retryPolicy RetryPolicy retryReport *RetryReport - function func() (*T, error) + function func() (T, error) } -func newRetryer[T any](policy RetryPolicy, report *RetryReport, toRetry func() (*T, error)) *retryer[T] { +func newRetryer[T any](policy RetryPolicy, report *RetryReport, toRetry func() (T, error)) *retryer[T] { return &retryer[T]{retryPolicy: policy, retryReport: report, function: toRetry} } -func (r retryer[T]) Run() (*T, error) { +func (r retryer[T]) Run() (T, error) { t, err := r.function() for err != nil { if shouldRetry, duration := r.retryPolicy.ShouldRetry(err); shouldRetry { diff --git a/step_builder.go b/step_builder.go index 07df0b5..83f4aff 100644 --- a/step_builder.go +++ b/step_builder.go @@ -10,7 +10,7 @@ import ( ) // AddStep adds a step to the job definition. -func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator func(input *JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { +func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator func(input JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { if err := addStepPreCheck(j, stepName); err != nil { return nil, err } @@ -33,7 +33,7 @@ func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator jiStrongTyped := ji.(*JobInstance[JT]) stepFunc := stepFuncCreator(jiStrongTyped.input) - stepFuncWithPanicHandling := func(ctx context.Context) (result *ST, err error) { + stepFuncWithPanicHandling := func(ctx context.Context) (result ST, err error) { // handle panic from user code defer func() { if r := recover(); r != nil { @@ -58,7 +58,7 @@ func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator } // StepAfter add a step after a preceding step, also take input from that preceding step -func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input *JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { +func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { if err := addStepPreCheck(j, stepName); err != nil { return nil, err } @@ -75,7 +75,7 @@ func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep jiStrongTyped := ji.(*JobInstance[JT]) stepFunc := stepAfterFuncCreator(jiStrongTyped.input) - stepFuncWithPanicHandling := func(ctx context.Context, pt *PT) (result *ST, err error) { + stepFuncWithPanicHandling := func(ctx context.Context, pt PT) (result ST, err error) { // handle panic from user code defer func() { if r := recover(); r != nil { @@ -102,7 +102,7 @@ func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep } // StepAfterBoth add a step after both preceding steps, also take input from both preceding steps -func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input *JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { +func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { if err := addStepPreCheck(j, stepName); err != nil { return nil, err } @@ -124,7 +124,7 @@ func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, jiStrongTyped := ji.(*JobInstance[JT]) stepFunc := stepAfterBothFuncCreator(jiStrongTyped.input) - stepFuncWithPanicHandling := func(ctx context.Context, pt1 *PT1, pt2 *PT2) (result *ST, err error) { + stepFuncWithPanicHandling := func(ctx context.Context, pt1 PT1, pt2 PT2) (result ST, err error) { // handle panic from user code defer func() { if r := recover(); r != nil { @@ -152,37 +152,37 @@ func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, // AddStepWithStaticFunc is same as AddStep, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances) func AddStepWithStaticFunc[JT, ST any](j *JobDefinition[JT], stepName string, stepFunc asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - return AddStep(j, stepName, func(j *JT) asynctask.AsyncFunc[ST] { return stepFunc }, optionDecorators...) + return AddStep(j, stepName, func(j JT) asynctask.AsyncFunc[ST] { return stepFunc }, optionDecorators...) } // StepAfterWithStaticFunc is same as StepAfter, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances) func StepAfterWithStaticFunc[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepFunc asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - return StepAfter(j, stepName, parentStep, func(j *JT) asynctask.ContinueFunc[PT, ST] { return stepFunc }, optionDecorators...) + return StepAfter(j, stepName, parentStep, func(j JT) asynctask.ContinueFunc[PT, ST] { return stepFunc }, optionDecorators...) } // StepAfterBothWithStaticFunc is same as StepAfterBoth, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances) func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepFunc asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) { - return StepAfterBoth(j, stepName, parentStep1, parentStep2, func(j *JT) asynctask.AfterBothFunc[PT1, PT2, ST] { return stepFunc }, optionDecorators...) + return StepAfterBoth(j, stepName, parentStep1, parentStep2, func(j JT) asynctask.AfterBothFunc[PT1, PT2, ST] { return stepFunc }, optionDecorators...) } -func instrumentedAddStep[T any](stepInstance *StepInstance[T], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context) (*T, error)) func(ctx context.Context) (*T, error) { - return func(ctx context.Context) (*T, error) { +func instrumentedAddStep[T any](stepInstance *StepInstance[T], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context) (T, error)) func(ctx context.Context) (T, error) { + return func(ctx context.Context) (T, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { /* this only work on ExecuteAfter (have precedent step, but not taking input from it) asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. we need to be consistent on before we do any state change or error handling. */ - return nil, err + return *new(T), err } stepInstance.executionData.StartTime = time.Now() stepInstance.state = StepStateRunning ctx = stepInstance.EnrichContext(ctx) - var result *T + var result T var err error if stepInstance.Definition.executionOptions.RetryPolicy != nil { stepInstance.executionData.Retried = &RetryReport{} - result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (*T, error) { return stepFunc(ctx) }).Run() + result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (T, error) { return stepFunc(ctx) }).Run() } else { result, err = stepFunc(ctx) } @@ -191,7 +191,7 @@ func instrumentedAddStep[T any](stepInstance *StepInstance[T], precedingTasks [] if err != nil { stepInstance.state = StepStateFailed - return nil, newStepError(ErrStepFailed, stepInstance, err) + return *new(T), newStepError(ErrStepFailed, stepInstance, err) } else { stepInstance.state = StepStateCompleted return result, nil @@ -199,24 +199,24 @@ func instrumentedAddStep[T any](stepInstance *StepInstance[T], precedingTasks [] } } -func instrumentedStepAfter[T, S any](stepInstance *StepInstance[S], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context, t *T) (*S, error)) func(ctx context.Context, t *T) (*S, error) { - return func(ctx context.Context, t *T) (*S, error) { +func instrumentedStepAfter[T, S any](stepInstance *StepInstance[S], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context, t T) (S, error)) func(ctx context.Context, t T) (S, error) { + return func(ctx context.Context, t T) (S, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { /* this only work on ExecuteAfter (have precedent step, but not taking input from it) asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. we need to be consistent on before we do any state change or error handling. */ - return nil, err + return *new(S), err } stepInstance.executionData.StartTime = time.Now() stepInstance.state = StepStateRunning ctx = stepInstance.EnrichContext(ctx) - var result *S + var result S var err error if stepInstance.Definition.executionOptions.RetryPolicy != nil { stepInstance.executionData.Retried = &RetryReport{} - result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (*S, error) { return stepFunc(ctx, t) }).Run() + result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (S, error) { return stepFunc(ctx, t) }).Run() } else { result, err = stepFunc(ctx, t) } @@ -225,7 +225,7 @@ func instrumentedStepAfter[T, S any](stepInstance *StepInstance[S], precedingTas if err != nil { stepInstance.state = StepStateFailed - return nil, newStepError(ErrStepFailed, stepInstance, err) + return *new(S), newStepError(ErrStepFailed, stepInstance, err) } else { stepInstance.state = StepStateCompleted return result, nil @@ -233,25 +233,25 @@ func instrumentedStepAfter[T, S any](stepInstance *StepInstance[S], precedingTas } } -func instrumentedStepAfterBoth[T, S, R any](stepInstance *StepInstance[R], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context, t *T, s *S) (*R, error)) func(ctx context.Context, t *T, s *S) (*R, error) { - return func(ctx context.Context, t *T, s *S) (*R, error) { +func instrumentedStepAfterBoth[T, S, R any](stepInstance *StepInstance[R], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context, t T, s S) (R, error)) func(ctx context.Context, t T, s S) (R, error) { + return func(ctx context.Context, t T, s S) (R, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { /* this only work on ExecuteAfter (have precedent step, but not taking input from it) asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. we need to be consistent on before we do any state change or error handling. */ - return nil, err + return *new(R), err } stepInstance.executionData.StartTime = time.Now() stepInstance.state = StepStateRunning ctx = stepInstance.EnrichContext(ctx) - var result *R + var result R var err error if stepInstance.Definition.executionOptions.RetryPolicy != nil { stepInstance.executionData.Retried = &RetryReport{} - result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (*R, error) { return stepFunc(ctx, t, s) }).Run() + result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (R, error) { return stepFunc(ctx, t, s) }).Run() } else { result, err = stepFunc(ctx, t, s) } @@ -260,7 +260,7 @@ func instrumentedStepAfterBoth[T, S, R any](stepInstance *StepInstance[R], prece if err != nil { stepInstance.state = StepStateFailed - return nil, newStepError(ErrStepFailed, stepInstance, err) + return *new(R), newStepError(ErrStepFailed, stepInstance, err) } else { stepInstance.state = StepStateCompleted return result, nil @@ -309,11 +309,12 @@ func getDependsOnStepInstances(stepD StepDefinitionMeta, ji JobInstanceMeta) ([] } // this is most vulunerable point of this library -// we have strongTyped steps -// we can create stronglyTyped stepInstance from stronglyTyped stepDefinition -// We cannot store strongTyped stepInstance and passing it to next step -// now we need this typeAssertion, to beable to link steps -// in theory, we have all the info, we construct the instance, if it panics, we should fix it. +// +// we have strongTyped steps +// we can create stronglyTyped stepInstance from stronglyTyped stepDefinition +// We cannot store strongTyped stepInstance and passing it to next step +// now we need this typeAssertion, to beable to link steps +// in theory, we have all the info, we construct the instance, if it panics, we should fix it. func getStrongTypedStepInstance[T any](stepD *StepDefinition[T], ji JobInstanceMeta) *StepInstance[T] { stepInstanceMeta, ok := ji.GetStepInstance(stepD.GetName()) if !ok { diff --git a/step_builder_test.go b/step_builder_test.go index 34ab269..4c095ec 100644 --- a/step_builder_test.go +++ b/step_builder_test.go @@ -16,7 +16,7 @@ func TestDefinitionRendering(t *testing.T) { func TestDefinitionBuilder(t *testing.T) { t.Parallel() - job := asyncjob.NewJobDefinition[SqlSummaryJobLib]("sqlSummaryJob") + job := asyncjob.NewJobDefinition[*SqlSummaryJobLib]("sqlSummaryJob") notExistingTask := &asyncjob.StepDefinition[any]{} _, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.ExecuteAfter(notExistingTask), asyncjob.WithContextEnrichment(EnrichContext)) @@ -54,7 +54,7 @@ func TestDefinitionBuilder(t *testing.T) { _, err = asyncjob.StepAfterBoth(job, "Summarize1", query1Task, query1Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) assert.EqualError(t, err, "DuplicateInputParentStep: at least 2 input parentSteps are same") - query3Task := &asyncjob.StepDefinition[SqlQueryResult]{} + query3Task := &asyncjob.StepDefinition[*SqlQueryResult]{} _, err = asyncjob.StepAfterBoth(job, "Summarize2", query1Task, query3Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext)) assert.EqualError(t, err, "RefStepNotInJob: trying to reference to step \"\", but it is not registered in job") diff --git a/test_joblib_test.go b/test_joblib_test.go index 31fd142..57ba566 100644 --- a/test_joblib_test.go +++ b/test_joblib_test.go @@ -16,8 +16,9 @@ type testingLoggerKey string const testLoggingContextKey testingLoggerKey = "test-logging" // SqlSummaryAsyncJobDefinition is the job definition for the SqlSummaryJobLib -// JobDefinition fit perfectly in init() function -var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult] +// +// JobDefinition fit perfectly in init() function +var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[*SqlSummaryJobLib, *SummarizedResult] func init() { var err error @@ -48,7 +49,7 @@ func NewSqlJobLib(params *SqlSummaryJobParameters) *SqlSummaryJobLib { } } -func connectionStepFunc(sql *SqlSummaryJobLib) asynctask.AsyncFunc[SqlConnection] { +func connectionStepFunc(sql *SqlSummaryJobLib) asynctask.AsyncFunc[*SqlConnection] { return func(ctx context.Context) (*SqlConnection, error) { return sql.GetConnection(ctx, &sql.Params.ServerName) } @@ -60,31 +61,31 @@ func checkAuthStepFunc(sql *SqlSummaryJobLib) asynctask.AsyncFunc[interface{}] { }) } -func tableClient1StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[SqlConnection, SqlTableClient] { +func tableClient1StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[*SqlConnection, *SqlTableClient] { return func(ctx context.Context, conn *SqlConnection) (*SqlTableClient, error) { return sql.GetTableClient(ctx, conn, &sql.Params.Table1) } } -func tableClient2StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[SqlConnection, SqlTableClient] { +func tableClient2StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[*SqlConnection, *SqlTableClient] { return func(ctx context.Context, conn *SqlConnection) (*SqlTableClient, error) { return sql.GetTableClient(ctx, conn, &sql.Params.Table2) } } -func queryTable1StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[SqlTableClient, SqlQueryResult] { +func queryTable1StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[*SqlTableClient, *SqlQueryResult] { return func(ctx context.Context, tableClient *SqlTableClient) (*SqlQueryResult, error) { return sql.ExecuteQuery(ctx, tableClient, &sql.Params.Query1) } } -func queryTable2StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[SqlTableClient, SqlQueryResult] { +func queryTable2StepFunc(sql *SqlSummaryJobLib) asynctask.ContinueFunc[*SqlTableClient, *SqlQueryResult] { return func(ctx context.Context, tableClient *SqlTableClient) (*SqlQueryResult, error) { return sql.ExecuteQuery(ctx, tableClient, &sql.Params.Query2) } } -func summarizeQueryResultStepFunc(sql *SqlSummaryJobLib) asynctask.AfterBothFunc[SqlQueryResult, SqlQueryResult, SummarizedResult] { +func summarizeQueryResultStepFunc(sql *SqlSummaryJobLib) asynctask.AfterBothFunc[*SqlQueryResult, *SqlQueryResult, *SummarizedResult] { return func(ctx context.Context, query1Result *SqlQueryResult, query2Result *SqlQueryResult) (*SummarizedResult, error) { return sql.SummarizeQueryResult(ctx, query1Result, query2Result) } @@ -96,8 +97,8 @@ func emailNotificationStepFunc(sql *SqlSummaryJobLib) asynctask.AsyncFunc[interf }) } -func BuildJob(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinition[SqlSummaryJobLib], error) { - job := asyncjob.NewJobDefinition[SqlSummaryJobLib]("sqlSummaryJob") +func BuildJob(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinition[*SqlSummaryJobLib], error) { + job := asyncjob.NewJobDefinition[*SqlSummaryJobLib]("sqlSummaryJob") connTsk, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.WithRetry(retryPolicies["GetConnection"]), asyncjob.WithContextEnrichment(EnrichContext)) if err != nil { @@ -141,7 +142,7 @@ func BuildJob(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefin return job, nil } -func BuildJobWithResult(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult], error) { +func BuildJobWithResult(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinitionWithResult[*SqlSummaryJobLib, *SummarizedResult], error) { job, err := BuildJob(retryPolicies) if err != nil { return nil, err @@ -151,7 +152,7 @@ func BuildJobWithResult(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjo if !ok { return nil, fmt.Errorf("step Summarize not found") } - summaryStep, ok := summaryStepMeta.(*asyncjob.StepDefinition[SummarizedResult]) + summaryStep, ok := summaryStepMeta.(*asyncjob.StepDefinition[*SummarizedResult]) if !ok { return nil, fmt.Errorf("step Summarize have different generic type parameter: %T", summaryStepMeta) }