Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix retry interface with tried count #99

Merged
merged 3 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 113 additions & 9 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestJobPanic(t *testing.T) {
assert.Equal(t, jobErr.StepInstance.GetName(), "GetTableClient2")
}

func TestJobStepRetry(t *testing.T) {
func TestJobStepRetryStepAfter(t *testing.T) {
t.Parallel()
jd, err := BuildJob(map[string]asyncjob.RetryPolicy{
"GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3),
Expand Down Expand Up @@ -152,16 +152,55 @@ func TestJobStepRetry(t *testing.T) {

err = jobInstance.Wait(context.Background())
assert.Error(t, err)

jobErr := &asyncjob.JobError{}
errors.As(err, &jobErr)
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, "QueryTable1", jobErr.StepInstance.GetName())
exeData := jobErr.StepInstance.ExecutionData()
assert.Equal(t, exeData.Retried.Count, 3)
assert.Equal(t, exeData.Retried.Count, uint(3))

// recoverable error
errorInjectCount := 0
jobInstance2 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"ExecuteQuery.server1.table1.query1": func() error {
errorInjectCount++
if errorInjectCount == 3 { // no error on 3rd retry
return nil
}
return fmt.Errorf("query exeeded memory limit")
},
},
}))
err = jobInstance2.Wait(context.Background())
assert.NoError(t, err)
}

func TestJobStepRetryAddStep(t *testing.T) {
t.Parallel()
jd, err := BuildJob(map[string]asyncjob.RetryPolicy{
"GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3),
"QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3),
"Summarize": newLinearRetryPolicy(time.Millisecond*3, 3),
})
assert.NoError(t, err)

invalidStep := &asyncjob.StepDefinition[string]{}
_, err = asyncjob.JobWithResult(jd, invalidStep)
assert.Error(t, err)

// newly created job definition should not be sealed
assert.False(t, jd.Sealed())

ctx := context.WithValue(context.Background(), testLoggingContextKey, t)

// gain code coverage on retry policy in AddStep
jobInstance1 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
jobInstance := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Expand All @@ -171,30 +210,95 @@ func TestJobStepRetry(t *testing.T) {
"GetConnection": func() error { return fmt.Errorf("dial 1.2.3.4 timedout") },
},
}))
err = jobInstance1.Wait(context.Background())
err = jobInstance.Wait(context.Background())
assert.Error(t, err)
jobErr = &asyncjob.JobError{}
jobErr := &asyncjob.JobError{}
errors.As(err, &jobErr)
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, "GetConnection", jobErr.StepInstance.GetName())
exeData := jobErr.StepInstance.ExecutionData()
assert.Equal(t, exeData.Retried.Count, uint(3))

// gain code coverage on retry policy in AfterBoth
// recoverable error
errorInjectCount := 0
jobInstance2 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"SummarizeQueryResult": func() error { return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") },
"GetConnection": func() error {
errorInjectCount++
if errorInjectCount == 3 { // no error on 3rd retry
return nil
}
return fmt.Errorf("dial 1.2.3.4 timedout")
},
},
}))
err = jobInstance2.Wait(context.Background())
assert.NoError(t, err)
}

func TestJobStepRetryAfterBoth(t *testing.T) {
t.Parallel()
jd, err := BuildJob(map[string]asyncjob.RetryPolicy{
"GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3),
"QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3),
"Summarize": newLinearRetryPolicy(time.Millisecond*3, 3),
})
assert.NoError(t, err)

invalidStep := &asyncjob.StepDefinition[string]{}
_, err = asyncjob.JobWithResult(jd, invalidStep)
assert.Error(t, err)
jobErr = &asyncjob.JobError{}

// newly created job definition should not be sealed
assert.False(t, jd.Sealed())

ctx := context.WithValue(context.Background(), testLoggingContextKey, t)

// gain code coverage on retry policy in AfterBoth
jobInstance := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"SummarizeQueryResult": func() error { return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") },
},
}))
err = jobInstance.Wait(context.Background())
assert.Error(t, err)
jobErr := &asyncjob.JobError{}
errors.As(err, &jobErr)
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, "Summarize", jobErr.StepInstance.GetName())
exeData := jobErr.StepInstance.ExecutionData()
assert.Equal(t, exeData.Retried.Count, uint(3))

// recoverable error
errorInjectCount := 0
jobInstance2 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"SummarizeQueryResult": func() error {
errorInjectCount++
if errorInjectCount == 3 { // no error on 3rd retry
return nil
}
return fmt.Errorf("result1 and result2 having different schema version, cannot merge.")
},
},
}))
err = jobInstance2.Wait(context.Background())
assert.NoError(t, err)
}

func renderGraph(t *testing.T, jb GraphRender) {
Expand Down
2 changes: 1 addition & 1 deletion retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newRetryer[T any](policy RetryPolicy, report *RetryReport, toRetry func() (
func (r retryer[T]) Run() (T, error) {
t, err := r.function()
for err != nil {
if shouldRetry, duration := r.retryPolicy.ShouldRetry(err); shouldRetry {
if shouldRetry, duration := r.retryPolicy.ShouldRetry(err, r.retryReport.Count); shouldRetry {
r.retryReport.Count++
time.Sleep(duration)
t, err = r.function()
Expand Down
2 changes: 1 addition & 1 deletion step_exec_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ type StepExecutionData struct {

// RetryReport would record the retry count (could extend to include each retry duration, ...)
type RetryReport struct {
Count int
Count uint
}
10 changes: 7 additions & 3 deletions step_exec_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@ type StepExecutionOptions struct {
type StepErrorPolicy struct{}

type RetryPolicy interface {
ShouldRetry(error) (bool, time.Duration)
// ShouldRetry returns true if the error should be retried, and the duration to wait before retrying.
// The int parameter is the retry count, first execution fail will invoke this with 0.
ShouldRetry(error, uint) (bool, time.Duration)
}

// StepContextPolicy allows context enrichment before passing to step.
// With StepInstanceMeta you can access StepInstance, StepDefinition, JobInstance, JobDefinition.
//
// With StepInstanceMeta you can access StepInstance, StepDefinition, JobInstance, JobDefinition.
type StepContextPolicy func(context.Context, StepInstanceMeta) context.Context

type ExecutionOptionPreparer func(*StepExecutionOptions) *StepExecutionOptions

// Add precedence to a step.
// without taking input from it(use StepAfter/StepAfterBoth otherwise)
//
// without taking input from it(use StepAfter/StepAfterBoth otherwise)
func ExecuteAfter(step StepDefinitionMeta) ExecutionOptionPreparer {
return func(options *StepExecutionOptions) *StepExecutionOptions {
options.DependOn = append(options.DependOn, step.GetName())
Expand Down
30 changes: 19 additions & 11 deletions test_joblib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ func (sql *SqlSummaryJobLib) GetConnection(ctx context.Context, serverName *stri
sql.Logging(ctx, "GetConnection")
if sql.Params.ErrorInjection != nil {
if errFunc, ok := sql.Params.ErrorInjection["GetConnection"]; ok {
return nil, errFunc()
if err := errFunc(); err != nil {
return nil, err
}
}
}
return &SqlConnection{ServerName: *serverName}, nil
Expand All @@ -207,7 +209,9 @@ func (sql *SqlSummaryJobLib) GetTableClient(ctx context.Context, conn *SqlConnec
}
if sql.Params.ErrorInjection != nil {
if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok {
return nil, errFunc()
if err := errFunc(); err != nil {
return nil, err
}
}
}
return &SqlTableClient{ServerName: conn.ServerName, TableName: *tableName}, nil
Expand All @@ -223,7 +227,9 @@ func (sql *SqlSummaryJobLib) CheckAuth(ctx context.Context) error {
}
if sql.Params.ErrorInjection != nil {
if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok {
return errFunc()
if err := errFunc(); err != nil {
return err
}
}
}
return nil
Expand All @@ -239,7 +245,9 @@ func (sql *SqlSummaryJobLib) ExecuteQuery(ctx context.Context, tableClient *SqlT
}
if sql.Params.ErrorInjection != nil {
if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok {
return nil, errFunc()
if err := errFunc(); err != nil {
return nil, err
}
}
}

Expand All @@ -264,7 +272,9 @@ func (sql *SqlSummaryJobLib) SummarizeQueryResult(ctx context.Context, result1 *
}
if sql.Params.ErrorInjection != nil {
if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok {
return nil, errFunc()
if err := errFunc(); err != nil {
return nil, err
}
}
}
return &SummarizedResult{QueryResult1: result1.Data, QueryResult2: result2.Data}, nil
Expand Down Expand Up @@ -299,20 +309,18 @@ func EnrichContext(ctx context.Context, instanceMeta asyncjob.StepInstanceMeta)

type linearRetryPolicy struct {
sleepInterval time.Duration
maxRetryCount int
tried int
maxRetryCount uint
}

func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount int) asyncjob.RetryPolicy {
func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount uint) asyncjob.RetryPolicy {
return &linearRetryPolicy{
sleepInterval: sleepInterval,
maxRetryCount: maxRetryCount,
}
}

func (lrp *linearRetryPolicy) ShouldRetry(error) (bool, time.Duration) {
if lrp.tried < lrp.maxRetryCount {
lrp.tried++
func (lrp *linearRetryPolicy) ShouldRetry(_ error, tried uint) (bool, time.Duration) {
if tried < lrp.maxRetryCount {
return true, lrp.sleepInterval
}

Expand Down
Loading