From be7378e35942360f6738c60e63afbc7a387e83f0 Mon Sep 17 00:00:00 2001 From: Haitao Chen Date: Sun, 23 Jun 2024 23:19:22 -0700 Subject: [PATCH] fix retry interface with tried count (#99) * fix retry interface with tried count * update comments * fix unittest --- job_test.go | 122 +++++++++++++++++++++++++++++++++++++++---- retryer.go | 2 +- step_exec_data.go | 2 +- step_exec_options.go | 10 ++-- test_joblib_test.go | 30 +++++++---- 5 files changed, 141 insertions(+), 25 deletions(-) diff --git a/job_test.go b/job_test.go index 252e728..d703b46 100644 --- a/job_test.go +++ b/job_test.go @@ -117,7 +117,7 @@ func TestJobPanic(t *testing.T) { assert.Equal(t, jobErr.StepInstance.GetName(), "GetTableClient2") } -func TestJobStepRetry(t *testing.T) { +func TestJobStepRetryStepAfter(t *testing.T) { t.Parallel() jd, err := BuildJob(map[string]asyncjob.RetryPolicy{ "GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3), @@ -152,16 +152,55 @@ func TestJobStepRetry(t *testing.T) { err = jobInstance.Wait(context.Background()) assert.Error(t, err) - jobErr := &asyncjob.JobError{} errors.As(err, &jobErr) assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed) assert.Equal(t, "QueryTable1", jobErr.StepInstance.GetName()) exeData := jobErr.StepInstance.ExecutionData() - assert.Equal(t, exeData.Retried.Count, 3) + assert.Equal(t, exeData.Retried.Count, uint(3)) + + // recoverable error + errorInjectCount := 0 + jobInstance2 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{ + ServerName: "server1", + Table1: "table1", + Query1: "query1", + Table2: "table2", + Query2: "query2", + ErrorInjection: map[string]func() error{ + "ExecuteQuery.server1.table1.query1": func() error { + errorInjectCount++ + if errorInjectCount == 3 { // no error on 3rd retry + return nil + } + return fmt.Errorf("query exeeded memory limit") + }, + }, + })) + err = jobInstance2.Wait(context.Background()) + assert.NoError(t, err) +} + +func TestJobStepRetryAddStep(t *testing.T) { + t.Parallel() + jd, err := BuildJob(map[string]asyncjob.RetryPolicy{ + "GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3), + "QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3), + "Summarize": newLinearRetryPolicy(time.Millisecond*3, 3), + }) + assert.NoError(t, err) + + invalidStep := &asyncjob.StepDefinition[string]{} + _, err = asyncjob.JobWithResult(jd, invalidStep) + assert.Error(t, err) + + // newly created job definition should not be sealed + assert.False(t, jd.Sealed()) + + ctx := context.WithValue(context.Background(), testLoggingContextKey, t) // gain code coverage on retry policy in AddStep - jobInstance1 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{ + jobInstance := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{ ServerName: "server1", Table1: "table1", Query1: "query1", @@ -171,14 +210,17 @@ func TestJobStepRetry(t *testing.T) { "GetConnection": func() error { return fmt.Errorf("dial 1.2.3.4 timedout") }, }, })) - err = jobInstance1.Wait(context.Background()) + err = jobInstance.Wait(context.Background()) assert.Error(t, err) - jobErr = &asyncjob.JobError{} + jobErr := &asyncjob.JobError{} errors.As(err, &jobErr) assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed) assert.Equal(t, "GetConnection", jobErr.StepInstance.GetName()) + exeData := jobErr.StepInstance.ExecutionData() + assert.Equal(t, exeData.Retried.Count, uint(3)) - // gain code coverage on retry policy in AfterBoth + // recoverable error + errorInjectCount := 0 jobInstance2 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{ ServerName: "server1", Table1: "table1", @@ -186,15 +228,77 @@ func TestJobStepRetry(t *testing.T) { Table2: "table2", Query2: "query2", ErrorInjection: map[string]func() error{ - "SummarizeQueryResult": func() error { return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") }, + "GetConnection": func() error { + errorInjectCount++ + if errorInjectCount == 3 { // no error on 3rd retry + return nil + } + return fmt.Errorf("dial 1.2.3.4 timedout") + }, }, })) err = jobInstance2.Wait(context.Background()) + assert.NoError(t, err) +} + +func TestJobStepRetryAfterBoth(t *testing.T) { + t.Parallel() + jd, err := BuildJob(map[string]asyncjob.RetryPolicy{ + "GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3), + "QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3), + "Summarize": newLinearRetryPolicy(time.Millisecond*3, 3), + }) + assert.NoError(t, err) + + invalidStep := &asyncjob.StepDefinition[string]{} + _, err = asyncjob.JobWithResult(jd, invalidStep) assert.Error(t, err) - jobErr = &asyncjob.JobError{} + + // newly created job definition should not be sealed + assert.False(t, jd.Sealed()) + + ctx := context.WithValue(context.Background(), testLoggingContextKey, t) + + // gain code coverage on retry policy in AfterBoth + jobInstance := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{ + ServerName: "server1", + Table1: "table1", + Query1: "query1", + Table2: "table2", + Query2: "query2", + ErrorInjection: map[string]func() error{ + "SummarizeQueryResult": func() error { return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") }, + }, + })) + err = jobInstance.Wait(context.Background()) + assert.Error(t, err) + jobErr := &asyncjob.JobError{} errors.As(err, &jobErr) assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed) assert.Equal(t, "Summarize", jobErr.StepInstance.GetName()) + exeData := jobErr.StepInstance.ExecutionData() + assert.Equal(t, exeData.Retried.Count, uint(3)) + + // recoverable error + errorInjectCount := 0 + jobInstance2 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{ + ServerName: "server1", + Table1: "table1", + Query1: "query1", + Table2: "table2", + Query2: "query2", + ErrorInjection: map[string]func() error{ + "SummarizeQueryResult": func() error { + errorInjectCount++ + if errorInjectCount == 3 { // no error on 3rd retry + return nil + } + return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") + }, + }, + })) + err = jobInstance2.Wait(context.Background()) + assert.NoError(t, err) } func renderGraph(t *testing.T, jb GraphRender) { diff --git a/retryer.go b/retryer.go index 981b008..84a35dc 100644 --- a/retryer.go +++ b/retryer.go @@ -18,7 +18,7 @@ func newRetryer[T any](policy RetryPolicy, report *RetryReport, toRetry func() ( func (r retryer[T]) Run() (T, error) { t, err := r.function() for err != nil { - if shouldRetry, duration := r.retryPolicy.ShouldRetry(err); shouldRetry { + if shouldRetry, duration := r.retryPolicy.ShouldRetry(err, r.retryReport.Count); shouldRetry { r.retryReport.Count++ time.Sleep(duration) t, err = r.function() diff --git a/step_exec_data.go b/step_exec_data.go index 5074942..68f5358 100644 --- a/step_exec_data.go +++ b/step_exec_data.go @@ -13,5 +13,5 @@ type StepExecutionData struct { // RetryReport would record the retry count (could extend to include each retry duration, ...) type RetryReport struct { - Count int + Count uint } diff --git a/step_exec_options.go b/step_exec_options.go index d54a031..ddd7a94 100644 --- a/step_exec_options.go +++ b/step_exec_options.go @@ -17,17 +17,21 @@ type StepExecutionOptions struct { type StepErrorPolicy struct{} type RetryPolicy interface { - ShouldRetry(error) (bool, time.Duration) + // ShouldRetry returns true if the error should be retried, and the duration to wait before retrying. + // The int parameter is the retry count, first execution fail will invoke this with 0. + ShouldRetry(error, uint) (bool, time.Duration) } // StepContextPolicy allows context enrichment before passing to step. -// With StepInstanceMeta you can access StepInstance, StepDefinition, JobInstance, JobDefinition. +// +// With StepInstanceMeta you can access StepInstance, StepDefinition, JobInstance, JobDefinition. type StepContextPolicy func(context.Context, StepInstanceMeta) context.Context type ExecutionOptionPreparer func(*StepExecutionOptions) *StepExecutionOptions // Add precedence to a step. -// without taking input from it(use StepAfter/StepAfterBoth otherwise) +// +// without taking input from it(use StepAfter/StepAfterBoth otherwise) func ExecuteAfter(step StepDefinitionMeta) ExecutionOptionPreparer { return func(options *StepExecutionOptions) *StepExecutionOptions { options.DependOn = append(options.DependOn, step.GetName()) diff --git a/test_joblib_test.go b/test_joblib_test.go index 57ba566..15cbac9 100644 --- a/test_joblib_test.go +++ b/test_joblib_test.go @@ -191,7 +191,9 @@ func (sql *SqlSummaryJobLib) GetConnection(ctx context.Context, serverName *stri sql.Logging(ctx, "GetConnection") if sql.Params.ErrorInjection != nil { if errFunc, ok := sql.Params.ErrorInjection["GetConnection"]; ok { - return nil, errFunc() + if err := errFunc(); err != nil { + return nil, err + } } } return &SqlConnection{ServerName: *serverName}, nil @@ -207,7 +209,9 @@ func (sql *SqlSummaryJobLib) GetTableClient(ctx context.Context, conn *SqlConnec } if sql.Params.ErrorInjection != nil { if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok { - return nil, errFunc() + if err := errFunc(); err != nil { + return nil, err + } } } return &SqlTableClient{ServerName: conn.ServerName, TableName: *tableName}, nil @@ -223,7 +227,9 @@ func (sql *SqlSummaryJobLib) CheckAuth(ctx context.Context) error { } if sql.Params.ErrorInjection != nil { if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok { - return errFunc() + if err := errFunc(); err != nil { + return err + } } } return nil @@ -239,7 +245,9 @@ func (sql *SqlSummaryJobLib) ExecuteQuery(ctx context.Context, tableClient *SqlT } if sql.Params.ErrorInjection != nil { if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok { - return nil, errFunc() + if err := errFunc(); err != nil { + return nil, err + } } } @@ -264,7 +272,9 @@ func (sql *SqlSummaryJobLib) SummarizeQueryResult(ctx context.Context, result1 * } if sql.Params.ErrorInjection != nil { if errFunc, ok := sql.Params.ErrorInjection[injectionKey]; ok { - return nil, errFunc() + if err := errFunc(); err != nil { + return nil, err + } } } return &SummarizedResult{QueryResult1: result1.Data, QueryResult2: result2.Data}, nil @@ -299,20 +309,18 @@ func EnrichContext(ctx context.Context, instanceMeta asyncjob.StepInstanceMeta) type linearRetryPolicy struct { sleepInterval time.Duration - maxRetryCount int - tried int + maxRetryCount uint } -func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount int) asyncjob.RetryPolicy { +func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount uint) asyncjob.RetryPolicy { return &linearRetryPolicy{ sleepInterval: sleepInterval, maxRetryCount: maxRetryCount, } } -func (lrp *linearRetryPolicy) ShouldRetry(error) (bool, time.Duration) { - if lrp.tried < lrp.maxRetryCount { - lrp.tried++ +func (lrp *linearRetryPolicy) ShouldRetry(_ error, tried uint) (bool, time.Duration) { + if tried < lrp.maxRetryCount { return true, lrp.sleepInterval }