Skip to content

Commit

Permalink
better panic handling
Browse files Browse the repository at this point in the history
  • Loading branch information
haitch committed Dec 11, 2022
1 parent 9d967f9 commit 42bf785
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 21 deletions.
3 changes: 1 addition & 2 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,10 @@ func TestJobPanic(t *testing.T) {
err := jobInstance.Wait(context.Background())
assert.Error(t, err)

/* panic is out of reach of jobError, but planning to catch panic in the future
jobErr := &asyncjob.JobError{}
assert.True(t, errors.As(err, &jobErr))
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, jobErr.StepName, "getTableClient1")*/
assert.Equal(t, jobErr.StepInstance.GetName(), "GetTableClient2")
}

func TestJobStepRetry(t *testing.T) {
Expand Down
17 changes: 2 additions & 15 deletions retryer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package asyncjob

import (
"fmt"
"runtime/debug"
"time"
)

Expand All @@ -17,24 +15,13 @@ func newRetryer[T any](policy RetryPolicy, report *RetryReport, toRetry func() (
return &retryer[T]{retryPolicy: policy, retryReport: report, function: toRetry}
}

func (r *retryer[T]) funcWithPanicHandled() (result *T, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panic cought: %v, StackTrace: %s", r, debug.Stack())
}
}()
result, err = r.function()

return result, err
}

func (r retryer[T]) Run() (*T, error) {
t, err := r.funcWithPanicHandled()
t, err := r.function()
for err != nil {
if shouldRetry, duration := r.retryPolicy.ShouldRetry(err); shouldRetry {
r.retryReport.Count++
time.Sleep(duration)
t, err = r.funcWithPanicHandled()
t, err = r.function()
} else {
break
}
Expand Down
2 changes: 1 addition & 1 deletion step.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (si *StepInstance[T]) GetState() StepState {
func (si *StepInstance[T]) EnrichContext(ctx context.Context) (result context.Context) {
result = ctx
if si.Definition.executionOptions.ContextPolicy != nil {
// handle panic from user code
// TODO: bubble up the error somehow
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered in EnrichContext", r)
Expand Down
45 changes: 42 additions & 3 deletions step_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package asyncjob
import (
"context"
"fmt"
"runtime/debug"
"time"

"github.com/Azure/go-asynctask"
Expand Down Expand Up @@ -31,8 +32,21 @@ func AddStep[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName st
precedingInstances, precedingTasks, _ := getDependsOnStepInstances(stepD, ji)

jiStrongTyped := ji.(*JobInstance[JT])
stepFunc := stepFuncCreator(jiStrongTyped.input)
stepFuncWithPanicHandling := func(ctx context.Context) (result *ST, err error) {
// handle panic from user code
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panic cought: %v, StackTrace: %s", r, debug.Stack())
}
}()

result, err = stepFunc(ctx)
return result, err
}

stepInstance := newStepInstance[ST](stepD, ji)
stepInstance.task = asynctask.Start(ctx, instrumentedAddStep(stepInstance, precedingTasks, stepFuncCreator(jiStrongTyped.input)))
stepInstance.task = asynctask.Start(ctx, instrumentedAddStep(stepInstance, precedingTasks, stepFuncWithPanicHandling))
ji.addStepInstance(stepInstance, precedingInstances...)
return stepInstance
}
Expand Down Expand Up @@ -63,9 +77,22 @@ func StepAfter[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepN
precedingInstances, precedingTasks, _ := getDependsOnStepInstances(stepD, ji)

jiStrongTyped := ji.(*JobInstance[JT])
stepFunc := stepAfterFuncCreator(jiStrongTyped.input)
stepFuncWithPanicHandling := func(ctx context.Context, pt *PT) (result *ST, err error) {
// handle panic from user code
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panic cought: %v, StackTrace: %s", r, debug.Stack())
}
}()

result, err = stepFunc(ctx, pt)
return result, err
}

parentStepInstance := getStrongTypedStepInstance(parentStep, ji)
stepInstance := newStepInstance[ST](stepD, ji)
stepInstance.task = asynctask.ContinueWith(ctx, parentStepInstance.task, instrumentedStepAfter(stepInstance, precedingTasks, stepAfterFuncCreator(jiStrongTyped.input)))
stepInstance.task = asynctask.ContinueWith(ctx, parentStepInstance.task, instrumentedStepAfter(stepInstance, precedingTasks, stepFuncWithPanicHandling))
ji.addStepInstance(stepInstance, precedingInstances...)
return stepInstance
}
Expand Down Expand Up @@ -105,10 +132,22 @@ func StepAfterBoth[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[
precedingInstances, precedingTasks, _ := getDependsOnStepInstances(stepD, ji)

jiStrongTyped := ji.(*JobInstance[JT])
stepFunc := stepAfterBothFuncCreator(jiStrongTyped.input)
stepFuncWithPanicHandling := func(ctx context.Context, pt1 *PT1, pt2 *PT2) (result *ST, err error) {
// handle panic from user code
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panic cought: %v, StackTrace: %s", r, debug.Stack())
}
}()

result, err = stepFunc(ctx, pt1, pt2)
return result, err
}
parentStepInstance1 := getStrongTypedStepInstance(parentStep1, ji)
parentStepInstance2 := getStrongTypedStepInstance(parentStep2, ji)
stepInstance := newStepInstance[ST](stepD, ji)
stepInstance.task = asynctask.AfterBoth(ctx, parentStepInstance1.task, parentStepInstance2.task, instrumentedStepAfterBoth(stepInstance, precedingTasks, stepAfterBothFuncCreator(jiStrongTyped.input)))
stepInstance.task = asynctask.AfterBoth(ctx, parentStepInstance1.task, parentStepInstance2.task, instrumentedStepAfterBoth(stepInstance, precedingTasks, stepFuncWithPanicHandling))
ji.addStepInstance(stepInstance, precedingInstances...)
return stepInstance
}
Expand Down

0 comments on commit 42bf785

Please sign in to comment.