Skip to content

Commit

Permalink
Merge pull request #15 from Azure/haitao/panic_handling
Browse files Browse the repository at this point in the history
panic handling, seal a jobDefinition
  • Loading branch information
haitch committed Dec 11, 2022
2 parents f530737 + 42bf785 commit 870e93a
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 106 deletions.
3 changes: 2 additions & 1 deletion error.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ type JobErrorCode string
const (
ErrPrecedentStepFailure JobErrorCode = "precedent step failed"
ErrStepFailed JobErrorCode = "step failed"
ErrStepNotInJob JobErrorCode = "trying to reference to a step not registered in job"
ErrRefStepNotInJob JobErrorCode = "trying to reference to a step not registered in job"
ErrAddStepInSealedJob JobErrorCode = "trying to add step to a sealed job definition"
)

func (code JobErrorCode) Error() string {
Expand Down
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
module github.com/Azure/go-asyncjob/v2
module github.com/Azure/go-asyncjob

go 1.18

require (
github.com/Azure/go-asyncjob/graph v0.2.0 // indirect
github.com/Azure/go-asynctask v1.3.1 // indirect
github.com/Azure/go-asyncjob/graph v0.2.0
github.com/Azure/go-asynctask v1.3.1
github.com/stretchr/testify v1.8.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
4 changes: 1 addition & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/Azure/go-asyncjob/graph v0.1.0 h1:qisFc4PtgaE2FDE41GRcbk2eASsR12OecJeD8qk6fkc=
github.com/Azure/go-asyncjob/graph v0.1.0/go.mod h1:3Z7w9aUBIrDriypH8O+hK0aeqKWKYuKSNxwrDxFy34s=
github.com/Azure/go-asyncjob/graph v0.2.0 h1:0GFnQit3+ZUxpc67ogusooa38GSFRPH2e1+h+L/33hc=
github.com/Azure/go-asyncjob/graph v0.2.0/go.mod h1:3Z7w9aUBIrDriypH8O+hK0aeqKWKYuKSNxwrDxFy34s=
github.com/Azure/go-asynctask v1.3.1 h1:zE/7Zwbdg7/+V2kRKb3IV4RTqmn8DUKriVzXcNq7ubg=
Expand All @@ -11,12 +9,12 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
26 changes: 22 additions & 4 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type JobDefinitionMeta interface {
GetName() string
GetStep(stepName string) (StepDefinitionMeta, bool) // TODO: switch bool to error
Seal()
Sealed() bool

// not exposing for now.
addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta)
Expand All @@ -21,7 +23,9 @@ type JobDefinitionMeta interface {

// JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG).
type JobDefinition[T any] struct {
Name string
name string

sealed bool
steps map[string]StepDefinitionMeta
stepsDag *graph.Graph[StepDefinitionMeta]
rootStep *StepDefinition[T]
Expand All @@ -31,7 +35,7 @@ type JobDefinition[T any] struct {
// it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.
func NewJobDefinition[T any](name string) *JobDefinition[T] {
j := &JobDefinition[T]{
Name: name,
name: name,
steps: make(map[string]StepDefinitionMeta),
stepsDag: graph.NewGraph[StepDefinitionMeta](connectStepDefinition),
}
Expand All @@ -49,6 +53,9 @@ func NewJobDefinition[T any](name string) *JobDefinition[T] {
// this will create and return new instance of the job
// caller will then be able to wait for the job instance
func (jd *JobDefinition[T]) Start(ctx context.Context, input *T, jobOptions ...JobOptionPreparer) *JobInstance[T] {
if !jd.Sealed() {
jd.Seal()
}

ji := newJobInstance(jd, input, jobOptions...)
ji.start(ctx)
Expand All @@ -61,7 +68,18 @@ func (jd *JobDefinition[T]) getRootStep() StepDefinitionMeta {
}

func (jd *JobDefinition[T]) GetName() string {
return jd.Name
return jd.name
}

func (jd *JobDefinition[T]) Seal() {
if jd.sealed {
return
}
jd.sealed = true
}

func (jd *JobDefinition[T]) Sealed() bool {
return jd.sealed
}

// GetStep returns the stepDefinition by name
Expand Down Expand Up @@ -155,7 +173,7 @@ func (ji *JobInstance[T]) start(ctx context.Context) {
// construct job instance graph, with TopologySort ordering
orderedSteps := ji.Definition.stepsDag.TopologicalSort()
for _, stepDef := range orderedSteps {
if stepDef.GetName() == ji.Definition.Name {
if stepDef.GetName() == ji.Definition.GetName() {
continue
}
ji.steps[stepDef.GetName()] = stepDef.createStepInstance(ctx, ji)
Expand Down
2 changes: 1 addition & 1 deletion job_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type JobDefinitionWithResult[Tin, Tout any] struct {
func JobWithResult[Tin, Tout any](jd *JobDefinition[Tin], resultStep *StepDefinition[Tout]) (*JobDefinitionWithResult[Tin, Tout], error) {
sdGet, ok := jd.GetStep(resultStep.GetName())
if !ok || sdGet != resultStep {
return nil, ErrStepNotInJob
return nil, ErrRefStepNotInJob
}

return &JobDefinitionWithResult[Tin, Tout]{
Expand Down
8 changes: 1 addition & 7 deletions job_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,20 @@ import (
"context"
"testing"

"github.com/Azure/go-asyncjob/v2"
"github.com/stretchr/testify/assert"
)

func TestSimpleJobWithResult(t *testing.T) {
t.Parallel()

jd, err := BuildJobWithResult(context.Background(), map[string]asyncjob.RetryPolicy{})
assert.NoError(t, err)
renderGraph(t, jd)

jobInstance := jd.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLibAdvanced{
jobInstance := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server2",
Table1: "table3",
Query1: "query3",
Table2: "table4",
Query2: "query4",
},
SqlSummaryJobLib: SqlSummaryJobLib{},
})
jobErr := jobInstance.Wait(context.Background())
assert.NoError(t, jobErr)
Expand Down
44 changes: 18 additions & 26 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,34 @@ import (
"testing"
"time"

"github.com/Azure/go-asyncjob/v2"
"github.com/Azure/go-asyncjob"
"github.com/stretchr/testify/assert"
)

func TestSimpleJob(t *testing.T) {
t.Parallel()

jd, err := BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{})
assert.NoError(t, err)
renderGraph(t, jd)

jobInstance := jd.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLibAdvanced{
jobInstance := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
},
SqlSummaryJobLib: SqlSummaryJobLib{},
})
jobErr := jobInstance.Wait(context.Background())
assert.NoError(t, jobErr)
renderGraph(t, jobInstance)

jobInstance2 := jd.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLibAdvanced{
jobInstance2 := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server2",
Table1: "table3",
Query1: "query3",
Table2: "table4",
Query2: "query4",
},
SqlSummaryJobLib: SqlSummaryJobLib{},
})
jobErr = jobInstance2.Wait(context.Background())
assert.NoError(t, jobErr)
Expand All @@ -50,23 +44,21 @@ func TestSimpleJob(t *testing.T) {
func TestJobError(t *testing.T) {
t.Parallel()

jd, err := BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{})
assert.NoError(t, err)

ctx := context.WithValue(context.Background(), testLoggingContextKey, t)
ctx = context.WithValue(ctx, "error-injection.server1.table1", fmt.Errorf("table1 not exists"))
jobInstance := jd.Start(ctx, &SqlSummaryJobLibAdvanced{
jobInstance := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"GetTableClient.server1.table1": func() error { return fmt.Errorf("table1 not exists") },
},
},
SqlSummaryJobLib: SqlSummaryJobLib{},
})

err = jobInstance.Wait(context.Background())
err := jobInstance.Wait(context.Background())
assert.Error(t, err)

jobErr := &asyncjob.JobError{}
Expand All @@ -77,30 +69,28 @@ func TestJobError(t *testing.T) {

func TestJobPanic(t *testing.T) {
t.Parallel()
jd, err := BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{})
assert.NoError(t, err)

ctx := context.WithValue(context.Background(), testLoggingContextKey, t)
ctx = context.WithValue(ctx, "panic-injection.server1.table2", true)
jobInstance := jd.Start(ctx, &SqlSummaryJobLibAdvanced{
jobInstance := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
PanicInjection: map[string]bool{
"GetTableClient.server1.table2": true,
},
},
SqlSummaryJobLib: SqlSummaryJobLib{},
})

err = jobInstance.Wait(context.Background())
err := jobInstance.Wait(context.Background())
assert.Error(t, err)

/* panic is out of reach of jobError, but planning to catch panic in the future
jobErr := &asyncjob.JobError{}
assert.True(t, errors.As(err, &jobErr))
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, jobErr.StepName, "getTableClient1")*/
assert.Equal(t, jobErr.StepInstance.GetName(), "GetTableClient2")
}

func TestJobStepRetry(t *testing.T) {
Expand All @@ -110,15 +100,17 @@ func TestJobStepRetry(t *testing.T) {

ctx := context.WithValue(context.Background(), testLoggingContextKey, t)
ctx = context.WithValue(ctx, "error-injection.server1.table1.query1", fmt.Errorf("query exeeded memory limit"))
jobInstance := jd.Start(ctx, &SqlSummaryJobLibAdvanced{
jobInstance := jd.Start(ctx, &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"ExecuteQuery.server1.table1.query1": func() error { return fmt.Errorf("query exeeded memory limit") },
},
},
SqlSummaryJobLib: SqlSummaryJobLib{},
})

err = jobInstance.Wait(context.Background())
Expand Down
17 changes: 2 additions & 15 deletions retryer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package asyncjob

import (
"fmt"
"runtime/debug"
"time"
)

Expand All @@ -17,24 +15,13 @@ func newRetryer[T any](policy RetryPolicy, report *RetryReport, toRetry func() (
return &retryer[T]{retryPolicy: policy, retryReport: report, function: toRetry}
}

func (r *retryer[T]) funcWithPanicHandled() (result *T, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panic cought: %v, StackTrace: %s", r, debug.Stack())
}
}()
result, err = r.function()

return result, err
}

func (r retryer[T]) Run() (*T, error) {
t, err := r.funcWithPanicHandled()
t, err := r.function()
for err != nil {
if shouldRetry, duration := r.retryPolicy.ShouldRetry(err); shouldRetry {
r.retryReport.Count++
time.Sleep(duration)
t, err = r.funcWithPanicHandled()
t, err = r.function()
} else {
break
}
Expand Down
2 changes: 1 addition & 1 deletion step.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (si *StepInstance[T]) GetState() StepState {
func (si *StepInstance[T]) EnrichContext(ctx context.Context) (result context.Context) {
result = ctx
if si.Definition.executionOptions.ContextPolicy != nil {
// handle panic from user code
// TODO: bubble up the error somehow
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered in EnrichContext", r)
Expand Down
Loading

0 comments on commit 870e93a

Please sign in to comment.