Skip to content

Commit

Permalink
drop context in builder, improve codeCoverage
Browse files Browse the repository at this point in the history
  • Loading branch information
haitch committed Dec 12, 2022
1 parent 3dd6cb1 commit bb40d6c
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 121 deletions.
40 changes: 35 additions & 5 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,46 @@ import (
type JobErrorCode string

const (
ErrPrecedentStepFailure JobErrorCode = "precedent step failed"
ErrStepFailed JobErrorCode = "step failed"
ErrRefStepNotInJob JobErrorCode = "trying to reference to a step not registered in job"
ErrAddStepInSealedJob JobErrorCode = "trying to add step to a sealed job definition"
ErrPrecedentStepFailed JobErrorCode = "PrecedentStepFailed"
ErrStepFailed JobErrorCode = "StepFailed"

ErrRefStepNotInJob JobErrorCode = "RefStepNotInJob"
MsgRefStepNotInJob string = "trying to reference to step %q, but it is not registered in job"

ErrAddStepInSealedJob JobErrorCode = "AddStepInSealedJob"
MsgAddStepInSealedJob string = "trying to add step %q to a sealed job definition"

ErrAddExistingStep JobErrorCode = "AddExistingStep"
MsgAddExistingStep string = "trying to add step %q to job definition, but it already exists"

ErrDuplicateInputParentStep JobErrorCode = "DuplicateInputParentStep"
MsgDuplicateInputParentStep string = "at least 2 input parentSteps are same"

ErrRuntimeStepNotFound JobErrorCode = "RuntimeStepNotFound"
MsgRuntimeStepNotFound string = "runtime step %q not found, must be a bug in asyncjob"
)

func (code JobErrorCode) Error() string {
return string(code)
}

func (code JobErrorCode) WithMessage(msg string) *MessageError {
return &MessageError{Code: code, Message: msg}
}

type MessageError struct {
Code JobErrorCode
Message string
}

func (me *MessageError) Error() string {
return me.Code.Error() + ": " + me.Message
}

func (me *MessageError) Unwrap() error {
return me.Code
}

type JobError struct {
Code JobErrorCode
StepError error
Expand Down Expand Up @@ -48,7 +78,7 @@ func (je *JobError) RootCause() error {
}

// precendent step failure, track to the root
if je.Code == ErrPrecedentStepFailure {
if je.Code == ErrPrecedentStepFailed {
precedentStepErr := &JobError{}
if !errors.As(je.StepError, &precedentStepErr) {
return je.StepError
Expand Down
16 changes: 13 additions & 3 deletions job_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package asyncjob

import (
"context"
"errors"
"fmt"

"github.com/Azure/go-asyncjob/graph"
)
Expand All @@ -15,7 +17,7 @@ type JobDefinitionMeta interface {
Visualize() (string, error)

// not exposing for now.
addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta)
addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error
getRootStep() StepDefinitionMeta
}

Expand Down Expand Up @@ -87,12 +89,20 @@ func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool)
}

// AddStep adds a step to the job definition, with optional preceding steps
func (jd *JobDefinition[T]) addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) {
func (jd *JobDefinition[T]) addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error {
jd.steps[step.GetName()] = step
jd.stepsDag.AddNode(step)
for _, precedingStep := range precedingSteps {
jd.stepsDag.Connect(precedingStep, step)
if err := jd.stepsDag.Connect(precedingStep, step); err != nil {
if errors.Is(err, graph.ErrConnectNotExistingNode) {
return ErrRefStepNotInJob.WithMessage(fmt.Sprintf("referenced step %s not found", precedingStep.GetName()))
}

return err
}
}

return nil
}

// Visualize the job definition in graphviz dot format
Expand Down
84 changes: 43 additions & 41 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/Azure/go-asyncjob"
"github.com/Azure/go-asynctask"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -130,7 +129,11 @@ func TestJobPanic(t *testing.T) {

func TestJobStepRetry(t *testing.T) {
t.Parallel()
jd, err := BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{"QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3)})
jd, err := BuildJob(map[string]asyncjob.RetryPolicy{
"GetConnection": newLinearRetryPolicy(time.Millisecond*3, 3),
"QueryTable1": newLinearRetryPolicy(time.Millisecond*3, 3),
"Summarize": newLinearRetryPolicy(time.Millisecond*3, 3),
})
assert.NoError(t, err)

invalidStep := &asyncjob.StepDefinition[string]{}
Expand All @@ -141,7 +144,8 @@ func TestJobStepRetry(t *testing.T) {
assert.False(t, jd.Sealed())

ctx := context.WithValue(context.Background(), testLoggingContextKey, t)
ctx = context.WithValue(ctx, "error-injection.server1.table1.query1", fmt.Errorf("query exeeded memory limit"))

// gain code coverage on retry policy in StepAfter
jobInstance := jd.Start(ctx, &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Expand All @@ -165,50 +169,48 @@ func TestJobStepRetry(t *testing.T) {
errors.As(err, &jobErr)
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, "QueryTable1", jobErr.StepInstance.GetName())

exeData := jobErr.StepInstance.ExecutionData()
assert.Equal(t, exeData.Retried.Count, 3)

renderGraph(t, jobInstance)
}

func TestDefinitionBuilder(t *testing.T) {
t.Parallel()

renderGraph(t, SqlSummaryAsyncJobDefinition)

SqlSummaryAsyncJobDefinition.Seal()

_, err := asyncjob.AddStep(context.Background(), SqlSummaryAsyncJobDefinition.JobDefinition, "EmailNotification2", emailNotificationStepFunc, asyncjob.WithContextEnrichment(EnrichContext))
assert.Error(t, err)

qery2ResultTskMeta, ok := SqlSummaryAsyncJobDefinition.GetStep("QueryTable2")
assert.True(t, ok)
query2Task, ok := qery2ResultTskMeta.(*asyncjob.StepDefinition[SqlQueryResult])
assert.True(t, ok)

dummyStepFunc := func(sql *SqlSummaryJobLib) asynctask.ContinueFunc[SqlQueryResult, any] {
return func(ctx context.Context, result *SqlQueryResult) (*any, error) {
return nil, nil
}
}

_, err = asyncjob.StepAfter(context.Background(), SqlSummaryAsyncJobDefinition.JobDefinition, "dummyStep", query2Task, dummyStepFunc, asyncjob.WithContextEnrichment(EnrichContext))
// gain code coverage on retry policy in AddStep
jobInstance1 := jd.Start(ctx, &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"GetConnection": func() error { return fmt.Errorf("dial 1.2.3.4 timedout") },
},
},
})
err = jobInstance1.Wait(context.Background())
assert.Error(t, err)
jobErr = &asyncjob.JobError{}
errors.As(err, &jobErr)
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, "GetConnection", jobErr.StepInstance.GetName())

qery1ResultTskMeta, ok := SqlSummaryAsyncJobDefinition.GetStep("QueryTable1")
assert.True(t, ok)
query1Task, ok := qery1ResultTskMeta.(*asyncjob.StepDefinition[SqlQueryResult])
assert.True(t, ok)

advancedSummaryStepFunc := func(sql *SqlSummaryJobLib) asynctask.AfterBothFunc[SqlQueryResult, SqlQueryResult, any] {
return func(ctx context.Context, result1 *SqlQueryResult, result2 *SqlQueryResult) (*any, error) {
return nil, nil
}
}

_, err = asyncjob.StepAfterBoth(context.Background(), SqlSummaryAsyncJobDefinition.JobDefinition, "dummyStep", query1Task, query2Task, advancedSummaryStepFunc, asyncjob.WithContextEnrichment(EnrichContext))
// gain code coverage on retry policy in AfterBoth
jobInstance2 := jd.Start(ctx, &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"SummarizeQueryResult": func() error { return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") },
},
},
})
err = jobInstance2.Wait(context.Background())
assert.Error(t, err)
jobErr = &asyncjob.JobError{}
errors.As(err, &jobErr)
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, "Summarize", jobErr.StepInstance.GetName())
}

func renderGraph(t *testing.T, jb GraphRender) {
Expand Down
Loading

0 comments on commit bb40d6c

Please sign in to comment.