Skip to content

Commit

Permalink
Merge pull request #24 from Azure/haitao/no-pointer-bind
Browse files Browse the repository at this point in the history
remove restriction on return pointer of typeParameter
  • Loading branch information
haitch committed Nov 14, 2023
2 parents e13fa9b + a4fd191 commit 625ae9c
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 62 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module github.com/Azure/go-asyncjob

go 1.19
go 1.20

require (
github.com/Azure/go-asyncjob/graph v0.2.0
github.com/Azure/go-asynctask v1.4.0
github.com/Azure/go-asynctask v1.6.0
github.com/google/uuid v1.4.0
github.com/stretchr/testify v1.8.4
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/Azure/go-asyncjob/graph v0.2.0 h1:0GFnQit3+ZUxpc67ogusooa38GSFRPH2e1+h+L/33hc=
github.com/Azure/go-asyncjob/graph v0.2.0/go.mod h1:3Z7w9aUBIrDriypH8O+hK0aeqKWKYuKSNxwrDxFy34s=
github.com/Azure/go-asynctask v1.4.0 h1:dJx6RXLqWGXI9jvFkwc30eEQfcvO9wCAyjI08H9kf1A=
github.com/Azure/go-asynctask v1.4.0/go.mod h1:xmdyX2MRd9vCpnglRFMz7D8pMjuz1lNhC5yVmsMHn48=
github.com/Azure/go-asynctask v1.6.0 h1:Njc/K4Q7LmG3Z5UVESiKcnS8Sn9LAZRF8OlQhFjMvq0=
github.com/Azure/go-asynctask v1.6.0/go.mod h1:RLw9j8Ln+K0PBJGo4qOsRsFuGxq4DAZ03nghoBcIqNA=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
Expand Down
10 changes: 6 additions & 4 deletions job_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type JobDefinition[T any] struct {
}

// Create new JobDefinition
// it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.
//
// it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.
func NewJobDefinition[T any](name string) *JobDefinition[T] {
j := &JobDefinition[T]{
name: name,
Expand All @@ -50,9 +51,10 @@ func NewJobDefinition[T any](name string) *JobDefinition[T] {
}

// Start execution of the job definition.
// this will create and return new instance of the job
// caller will then be able to wait for the job instance
func (jd *JobDefinition[T]) Start(ctx context.Context, input *T, jobOptions ...JobOptionPreparer) *JobInstance[T] {
//
// this will create and return new instance of the job
// caller will then be able to wait for the job instance
func (jd *JobDefinition[T]) Start(ctx context.Context, input T, jobOptions ...JobOptionPreparer) *JobInstance[T] {
if !jd.Sealed() {
jd.Seal()
}
Expand Down
4 changes: 2 additions & 2 deletions job_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ func WithSequentialExecution() JobOptionPreparer {
// JobInstance is the instance of a jobDefinition
type JobInstance[T any] struct {
jobOptions *JobExecutionOptions
input *T
input T
Definition *JobDefinition[T]
rootStep *StepInstance[T]
steps map[string]StepInstanceMeta
stepsDag *graph.Graph[StepInstanceMeta]
}

func newJobInstance[T any](jd *JobDefinition[T], input *T, jobInstanceOptions ...JobOptionPreparer) *JobInstance[T] {
func newJobInstance[T any](jd *JobDefinition[T], input T, jobInstanceOptions ...JobOptionPreparer) *JobInstance[T] {
ji := &JobInstance[T]{
Definition: jd,
input: input,
Expand Down
7 changes: 4 additions & 3 deletions job_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type JobInstanceWithResult[Tin, Tout any] struct {
resultStep *StepInstance[Tout]
}

func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input *Tin, jobOptions ...JobOptionPreparer) *JobInstanceWithResult[Tin, Tout] {
func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input Tin, jobOptions ...JobOptionPreparer) *JobInstanceWithResult[Tin, Tout] {
ji := jd.JobDefinition.Start(ctx, input, jobOptions...)

return &JobInstanceWithResult[Tin, Tout]{
Expand All @@ -36,7 +36,8 @@ func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input *
}

// Result returns the result of the job from result step.
// it doesn't wait for all steps to finish, you can use Result() after Wait() if desired.
func (ji *JobInstanceWithResult[Tin, Tout]) Result(ctx context.Context) (*Tout, error) {
//
// it doesn't wait for all steps to finish, you can use Result() after Wait() if desired.
func (ji *JobInstanceWithResult[Tin, Tout]) Result(ctx context.Context) (Tout, error) {
return ji.resultStep.task.Result(ctx)
}
6 changes: 3 additions & 3 deletions retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
type retryer[T any] struct {
retryPolicy RetryPolicy
retryReport *RetryReport
function func() (*T, error)
function func() (T, error)
}

func newRetryer[T any](policy RetryPolicy, report *RetryReport, toRetry func() (*T, error)) *retryer[T] {
func newRetryer[T any](policy RetryPolicy, report *RetryReport, toRetry func() (T, error)) *retryer[T] {
return &retryer[T]{retryPolicy: policy, retryReport: report, function: toRetry}
}

func (r retryer[T]) Run() (*T, error) {
func (r retryer[T]) Run() (T, error) {
t, err := r.function()
for err != nil {
if shouldRetry, duration := r.retryPolicy.ShouldRetry(err); shouldRetry {
Expand Down
65 changes: 33 additions & 32 deletions step_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// AddStep adds a step to the job definition.
func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator func(input *JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator func(input JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
if err := addStepPreCheck(j, stepName); err != nil {
return nil, err
}
Expand All @@ -33,7 +33,7 @@ func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator

jiStrongTyped := ji.(*JobInstance[JT])
stepFunc := stepFuncCreator(jiStrongTyped.input)
stepFuncWithPanicHandling := func(ctx context.Context) (result *ST, err error) {
stepFuncWithPanicHandling := func(ctx context.Context) (result ST, err error) {
// handle panic from user code
defer func() {
if r := recover(); r != nil {
Expand All @@ -58,7 +58,7 @@ func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator
}

// StepAfter add a step after a preceding step, also take input from that preceding step
func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input *JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
if err := addStepPreCheck(j, stepName); err != nil {
return nil, err
}
Expand All @@ -75,7 +75,7 @@ func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep

jiStrongTyped := ji.(*JobInstance[JT])
stepFunc := stepAfterFuncCreator(jiStrongTyped.input)
stepFuncWithPanicHandling := func(ctx context.Context, pt *PT) (result *ST, err error) {
stepFuncWithPanicHandling := func(ctx context.Context, pt PT) (result ST, err error) {
// handle panic from user code
defer func() {
if r := recover(); r != nil {
Expand All @@ -102,7 +102,7 @@ func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep
}

// StepAfterBoth add a step after both preceding steps, also take input from both preceding steps
func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input *JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
if err := addStepPreCheck(j, stepName); err != nil {
return nil, err
}
Expand All @@ -124,7 +124,7 @@ func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string,

jiStrongTyped := ji.(*JobInstance[JT])
stepFunc := stepAfterBothFuncCreator(jiStrongTyped.input)
stepFuncWithPanicHandling := func(ctx context.Context, pt1 *PT1, pt2 *PT2) (result *ST, err error) {
stepFuncWithPanicHandling := func(ctx context.Context, pt1 PT1, pt2 PT2) (result ST, err error) {
// handle panic from user code
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -152,37 +152,37 @@ func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string,

// AddStepWithStaticFunc is same as AddStep, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)
func AddStepWithStaticFunc[JT, ST any](j *JobDefinition[JT], stepName string, stepFunc asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
return AddStep(j, stepName, func(j *JT) asynctask.AsyncFunc[ST] { return stepFunc }, optionDecorators...)
return AddStep(j, stepName, func(j JT) asynctask.AsyncFunc[ST] { return stepFunc }, optionDecorators...)
}

// StepAfterWithStaticFunc is same as StepAfter, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)
func StepAfterWithStaticFunc[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepFunc asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
return StepAfter(j, stepName, parentStep, func(j *JT) asynctask.ContinueFunc[PT, ST] { return stepFunc }, optionDecorators...)
return StepAfter(j, stepName, parentStep, func(j JT) asynctask.ContinueFunc[PT, ST] { return stepFunc }, optionDecorators...)
}

// StepAfterBothWithStaticFunc is same as StepAfterBoth, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)
func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepFunc asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
return StepAfterBoth(j, stepName, parentStep1, parentStep2, func(j *JT) asynctask.AfterBothFunc[PT1, PT2, ST] { return stepFunc }, optionDecorators...)
return StepAfterBoth(j, stepName, parentStep1, parentStep2, func(j JT) asynctask.AfterBothFunc[PT1, PT2, ST] { return stepFunc }, optionDecorators...)
}

func instrumentedAddStep[T any](stepInstance *StepInstance[T], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context) (*T, error)) func(ctx context.Context) (*T, error) {
return func(ctx context.Context) (*T, error) {
func instrumentedAddStep[T any](stepInstance *StepInstance[T], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context) (T, error)) func(ctx context.Context) (T, error) {
return func(ctx context.Context) (T, error) {
if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
/* this only work on ExecuteAfter (have precedent step, but not taking input from it)
asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed.
we need to be consistent on before we do any state change or error handling. */
return nil, err
return *new(T), err
}

stepInstance.executionData.StartTime = time.Now()
stepInstance.state = StepStateRunning
ctx = stepInstance.EnrichContext(ctx)

var result *T
var result T
var err error
if stepInstance.Definition.executionOptions.RetryPolicy != nil {
stepInstance.executionData.Retried = &RetryReport{}
result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (*T, error) { return stepFunc(ctx) }).Run()
result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (T, error) { return stepFunc(ctx) }).Run()
} else {
result, err = stepFunc(ctx)
}
Expand All @@ -191,32 +191,32 @@ func instrumentedAddStep[T any](stepInstance *StepInstance[T], precedingTasks []

if err != nil {
stepInstance.state = StepStateFailed
return nil, newStepError(ErrStepFailed, stepInstance, err)
return *new(T), newStepError(ErrStepFailed, stepInstance, err)
} else {
stepInstance.state = StepStateCompleted
return result, nil
}
}
}

func instrumentedStepAfter[T, S any](stepInstance *StepInstance[S], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context, t *T) (*S, error)) func(ctx context.Context, t *T) (*S, error) {
return func(ctx context.Context, t *T) (*S, error) {
func instrumentedStepAfter[T, S any](stepInstance *StepInstance[S], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context, t T) (S, error)) func(ctx context.Context, t T) (S, error) {
return func(ctx context.Context, t T) (S, error) {
if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
/* this only work on ExecuteAfter (have precedent step, but not taking input from it)
asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed.
we need to be consistent on before we do any state change or error handling. */
return nil, err
return *new(S), err
}

stepInstance.executionData.StartTime = time.Now()
stepInstance.state = StepStateRunning
ctx = stepInstance.EnrichContext(ctx)

var result *S
var result S
var err error
if stepInstance.Definition.executionOptions.RetryPolicy != nil {
stepInstance.executionData.Retried = &RetryReport{}
result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (*S, error) { return stepFunc(ctx, t) }).Run()
result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (S, error) { return stepFunc(ctx, t) }).Run()
} else {
result, err = stepFunc(ctx, t)
}
Expand All @@ -225,33 +225,33 @@ func instrumentedStepAfter[T, S any](stepInstance *StepInstance[S], precedingTas

if err != nil {
stepInstance.state = StepStateFailed
return nil, newStepError(ErrStepFailed, stepInstance, err)
return *new(S), newStepError(ErrStepFailed, stepInstance, err)
} else {
stepInstance.state = StepStateCompleted
return result, nil
}
}
}

func instrumentedStepAfterBoth[T, S, R any](stepInstance *StepInstance[R], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context, t *T, s *S) (*R, error)) func(ctx context.Context, t *T, s *S) (*R, error) {
return func(ctx context.Context, t *T, s *S) (*R, error) {
func instrumentedStepAfterBoth[T, S, R any](stepInstance *StepInstance[R], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context, t T, s S) (R, error)) func(ctx context.Context, t T, s S) (R, error) {
return func(ctx context.Context, t T, s S) (R, error) {

if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
/* this only work on ExecuteAfter (have precedent step, but not taking input from it)
asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed.
we need to be consistent on before we do any state change or error handling. */
return nil, err
return *new(R), err
}

stepInstance.executionData.StartTime = time.Now()
stepInstance.state = StepStateRunning
ctx = stepInstance.EnrichContext(ctx)

var result *R
var result R
var err error
if stepInstance.Definition.executionOptions.RetryPolicy != nil {
stepInstance.executionData.Retried = &RetryReport{}
result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (*R, error) { return stepFunc(ctx, t, s) }).Run()
result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (R, error) { return stepFunc(ctx, t, s) }).Run()
} else {
result, err = stepFunc(ctx, t, s)
}
Expand All @@ -260,7 +260,7 @@ func instrumentedStepAfterBoth[T, S, R any](stepInstance *StepInstance[R], prece

if err != nil {
stepInstance.state = StepStateFailed
return nil, newStepError(ErrStepFailed, stepInstance, err)
return *new(R), newStepError(ErrStepFailed, stepInstance, err)
} else {
stepInstance.state = StepStateCompleted
return result, nil
Expand Down Expand Up @@ -309,11 +309,12 @@ func getDependsOnStepInstances(stepD StepDefinitionMeta, ji JobInstanceMeta) ([]
}

// this is most vulunerable point of this library
// we have strongTyped steps
// we can create stronglyTyped stepInstance from stronglyTyped stepDefinition
// We cannot store strongTyped stepInstance and passing it to next step
// now we need this typeAssertion, to beable to link steps
// in theory, we have all the info, we construct the instance, if it panics, we should fix it.
//
// we have strongTyped steps
// we can create stronglyTyped stepInstance from stronglyTyped stepDefinition
// We cannot store strongTyped stepInstance and passing it to next step
// now we need this typeAssertion, to beable to link steps
// in theory, we have all the info, we construct the instance, if it panics, we should fix it.
func getStrongTypedStepInstance[T any](stepD *StepDefinition[T], ji JobInstanceMeta) *StepInstance[T] {
stepInstanceMeta, ok := ji.GetStepInstance(stepD.GetName())
if !ok {
Expand Down
4 changes: 2 additions & 2 deletions step_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestDefinitionRendering(t *testing.T) {
func TestDefinitionBuilder(t *testing.T) {
t.Parallel()

job := asyncjob.NewJobDefinition[SqlSummaryJobLib]("sqlSummaryJob")
job := asyncjob.NewJobDefinition[*SqlSummaryJobLib]("sqlSummaryJob")
notExistingTask := &asyncjob.StepDefinition[any]{}

_, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.ExecuteAfter(notExistingTask), asyncjob.WithContextEnrichment(EnrichContext))
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestDefinitionBuilder(t *testing.T) {
_, err = asyncjob.StepAfterBoth(job, "Summarize1", query1Task, query1Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext))
assert.EqualError(t, err, "DuplicateInputParentStep: at least 2 input parentSteps are same")

query3Task := &asyncjob.StepDefinition[SqlQueryResult]{}
query3Task := &asyncjob.StepDefinition[*SqlQueryResult]{}
_, err = asyncjob.StepAfterBoth(job, "Summarize2", query1Task, query3Task, summarizeQueryResultStepFunc, asyncjob.WithContextEnrichment(EnrichContext))
assert.EqualError(t, err, "RefStepNotInJob: trying to reference to step \"\", but it is not registered in job")

Expand Down
Loading

0 comments on commit 625ae9c

Please sign in to comment.