Skip to content

Commit

Permalink
fix retry interface with tried count (#99)
Browse files Browse the repository at this point in the history
* fix retry interface with tried count

* update comments

* fix unittest
  • Loading branch information
haitch committed Jun 24, 2024
1 parent a8f14d5 commit be7378e
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 25 deletions.
122 changes: 113 additions & 9 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestJobPanic(t *testing.T) {
assert.Equal(t, jobErr.StepInstance.GetName(), "GetTableClient2")
}

func TestJobStepRetry(t *testing.T) {
func TestJobStepRetryStepAfter(t *testing.T) {
t.Parallel()
jd, err := BuildJob(map[string]asyncjob.RetryPolicy{
"GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3),
Expand Down Expand Up @@ -152,16 +152,55 @@ func TestJobStepRetry(t *testing.T) {

err = jobInstance.Wait(context.Background())
assert.Error(t, err)

jobErr := &asyncjob.JobError{}
errors.As(err, &jobErr)
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, "QueryTable1", jobErr.StepInstance.GetName())
exeData := jobErr.StepInstance.ExecutionData()
assert.Equal(t, exeData.Retried.Count, 3)
assert.Equal(t, exeData.Retried.Count, uint(3))

// recoverable error
errorInjectCount := 0
jobInstance2 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"ExecuteQuery.server1.table1.query1": func() error {
errorInjectCount++
if errorInjectCount == 3 { // no error on 3rd retry
return nil
}
return fmt.Errorf("query exeeded memory limit")
},
},
}))
err = jobInstance2.Wait(context.Background())
assert.NoError(t, err)
}

func TestJobStepRetryAddStep(t *testing.T) {
t.Parallel()
jd, err := BuildJob(map[string]asyncjob.RetryPolicy{
"GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3),
"QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3),
"Summarize": newLinearRetryPolicy(time.Millisecond*3, 3),
})
assert.NoError(t, err)

invalidStep := &asyncjob.StepDefinition[string]{}
_, err = asyncjob.JobWithResult(jd, invalidStep)
assert.Error(t, err)

// newly created job definition should not be sealed
assert.False(t, jd.Sealed())

ctx := context.WithValue(context.Background(), testLoggingContextKey, t)

// gain code coverage on retry policy in AddStep
jobInstance1 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
jobInstance := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Expand All @@ -171,30 +210,95 @@ func TestJobStepRetry(t *testing.T) {
"GetConnection": func() error { return fmt.Errorf("dial 1.2.3.4 timedout") },
},
}))
err = jobInstance1.Wait(context.Background())
err = jobInstance.Wait(context.Background())
assert.Error(t, err)
jobErr = &asyncjob.JobError{}
jobErr := &asyncjob.JobError{}
errors.As(err, &jobErr)
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, "GetConnection", jobErr.StepInstance.GetName())
exeData := jobErr.StepInstance.ExecutionData()
assert.Equal(t, exeData.Retried.Count, uint(3))

// gain code coverage on retry policy in AfterBoth
// recoverable error
errorInjectCount := 0
jobInstance2 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"SummarizeQueryResult": func() error { return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") },
"GetConnection": func() error {
errorInjectCount++
if errorInjectCount == 3 { // no error on 3rd retry
return nil
}
return fmt.Errorf("dial 1.2.3.4 timedout")
},
},
}))
err = jobInstance2.Wait(context.Background())
assert.NoError(t, err)
}

func TestJobStepRetryAfterBoth(t *testing.T) {
t.Parallel()
jd, err := BuildJob(map[string]asyncjob.RetryPolicy{
"GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3),
"QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3),
"Summarize": newLinearRetryPolicy(time.Millisecond*3, 3),
})
assert.NoError(t, err)

invalidStep := &asyncjob.StepDefinition[string]{}
_, err = asyncjob.JobWithResult(jd, invalidStep)
assert.Error(t, err)
jobErr = &asyncjob.JobError{}

// newly created job definition should not be sealed
assert.False(t, jd.Sealed())

ctx := context.WithValue(context.Background(), testLoggingContextKey, t)

// gain code coverage on retry policy in AfterBoth
jobInstance := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"SummarizeQueryResult": func() error { return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") },
},
}))
err = jobInstance.Wait(context.Background())
assert.Error(t, err)
jobErr := &asyncjob.JobError{}
errors.As(err, &jobErr)
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, "Summarize", jobErr.StepInstance.GetName())
exeData := jobErr.StepInstance.ExecutionData()
assert.Equal(t, exeData.Retried.Count, uint(3))

// recoverable error
errorInjectCount := 0
jobInstance2 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"SummarizeQueryResult": func() error {
errorInjectCount++
if errorInjectCount == 3 { // no error on 3rd retry
return nil
}
return fmt.Errorf("result1 and result2 having different schema version, cannot merge.")
},
},
}))
err = jobInstance2.Wait(context.Background())
assert.NoError(t, err)
}

func renderGraph(t *testing.T, jb GraphRender) {
Expand Down
2 changes: 1 addition & 1 deletion retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newRetryer[T any](policy RetryPolicy, report *RetryReport, toRetry func() (
func (r retryer[T]) Run() (T, error) {
t, err := r.function()
for err != nil {
if shouldRetry, duration := r.retryPolicy.ShouldRetry(err); shouldRetry {
if shouldRetry, duration := r.retryPolicy.ShouldRetry(err, r.retryReport.Count); shouldRetry {
r.retryReport.Count++
time.Sleep(duration)
t, err = r.function()
Expand Down
2 changes: 1 addition & 1 deletion step_exec_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ type StepExecutionData struct {

// RetryReport would record the retry count (could extend to include each retry duration, ...)
type RetryReport struct {
Count int
Count uint
}
10 changes: 7 additions & 3 deletions step_exec_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@ type StepExecutionOptions struct {
type StepErrorPolicy struct{}

type RetryPolicy interface {
ShouldRetry(error) (bool, time.Duration)
// ShouldRetry returns true if the error should be retried, and the duration to wait before retrying.
// The int parameter is the retry count, first execution fail will invoke this with 0.
ShouldRetry(error, uint) (bool, time.Duration)
}

// StepContextPolicy allows context enrichment before passing to step.
// With StepInstanceMeta you can access StepInstance, StepDefinition, JobInstance, JobDefinition.
//
// With StepInstanceMeta you can access StepInstance, StepDefinition, JobInstance, JobDefinition.
type StepContextPolicy func(context.Context, StepInstanceMeta) context.Context

type ExecutionOptionPreparer func(*StepExecutionOptions) *StepExecutionOptions

// Add precedence to a step.
// without taking input from it(use StepAfter/StepAfterBoth otherwise)
//
// without taking input from it(use StepAfter/StepAfterBoth otherwise)
func ExecuteAfter(step StepDefinitionMeta) ExecutionOptionPreparer {
return func(options *StepExecutionOptions) *StepExecutionOptions {
options.DependOn = append(options.DependOn, step.GetName())
Expand Down
30 changes: 19 additions & 11 deletions test_joblib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ func (sql *SqlSummaryJobLib) GetConnection(ctx context.Context, serverName *stri
sql.Logging(ctx, "GetConnection")
if sql.Params.ErrorInjection != nil {
if errFunc, ok := sql.Params.ErrorInjection["GetConnection"]; ok {
return nil, errFunc()
if err := errFunc(); err != nil {
return nil, err
}
}
}
return &SqlConnection{ServerName: *serverName}, nil
Expand All @@ -207,7 +209,9 @@ func (sql *SqlSummaryJobLib) GetTableClient(ctx context.Context, conn *SqlConnec
}
if sql.Params.ErrorInjection != nil {
if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok {
return nil, errFunc()
if err := errFunc(); err != nil {
return nil, err
}
}
}
return &SqlTableClient{ServerName: conn.ServerName, TableName: *tableName}, nil
Expand All @@ -223,7 +227,9 @@ func (sql *SqlSummaryJobLib) CheckAuth(ctx context.Context) error {
}
if sql.Params.ErrorInjection != nil {
if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok {
return errFunc()
if err := errFunc(); err != nil {
return err
}
}
}
return nil
Expand All @@ -239,7 +245,9 @@ func (sql *SqlSummaryJobLib) ExecuteQuery(ctx context.Context, tableClient *SqlT
}
if sql.Params.ErrorInjection != nil {
if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok {
return nil, errFunc()
if err := errFunc(); err != nil {
return nil, err
}
}
}

Expand All @@ -264,7 +272,9 @@ func (sql *SqlSummaryJobLib) SummarizeQueryResult(ctx context.Context, result1 *
}
if sql.Params.ErrorInjection != nil {
if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok {
return nil, errFunc()
if err := errFunc(); err != nil {
return nil, err
}
}
}
return &SummarizedResult{QueryResult1: result1.Data, QueryResult2: result2.Data}, nil
Expand Down Expand Up @@ -299,20 +309,18 @@ func EnrichContext(ctx context.Context, instanceMeta asyncjob.StepInstanceMeta)

type linearRetryPolicy struct {
sleepInterval time.Duration
maxRetryCount int
tried int
maxRetryCount uint
}

func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount int) asyncjob.RetryPolicy {
func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount uint) asyncjob.RetryPolicy {
return &linearRetryPolicy{
sleepInterval: sleepInterval,
maxRetryCount: maxRetryCount,
}
}

func (lrp *linearRetryPolicy) ShouldRetry(error) (bool, time.Duration) {
if lrp.tried < lrp.maxRetryCount {
lrp.tried++
func (lrp *linearRetryPolicy) ShouldRetry(_ error, tried uint) (bool, time.Duration) {
if tried < lrp.maxRetryCount {
return true, lrp.sleepInterval
}

Expand Down

0 comments on commit be7378e

Please sign in to comment.