Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Haitao/fix_err_case #8

Merged
merged 3 commits into from
Nov 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@ module github.com/Azure/go-asyncjob
go 1.18

require github.com/Azure/go-asynctask v1.3.0

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
github.com/Azure/go-asynctask v1.3.0 h1:QBx9mGbGi4Urz4YeZ3o1c7cLGL4iUch+mGgNGupTLMI=
github.com/Azure/go-asynctask v1.3.0/go.mod h1:S1Ee5SVnt6ZUJ84brodPiHvoNfN2wgDyVO7UYTI5WeM=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
17 changes: 12 additions & 5 deletions graph/error.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
package graph

type GraphErrorCode string
type GraphCodeError string

const (
ErrDuplicateNode GraphErrorCode = "node with same key already exists in this graph"
ErrConnectNotExistingNode GraphErrorCode = "node to connect does not exist in this graph"
ErrDuplicateNode GraphCodeError = "node with same key already exists in this graph"
ErrConnectNotExistingNode GraphCodeError = "node to connect does not exist in this graph"
)

func (ge GraphErrorCode) Error() string {
func (ge GraphCodeError) Error() string {
return string(ge)
}

type GraphError struct {
Code GraphErrorCode
Code GraphCodeError
Message string
}

func NewGraphError(code GraphCodeError, message string) *GraphError {
return &GraphError{
Code: code,
Message: message,
}
}

func (ge *GraphError) Error() string {
return ge.Code.Error() + ": " + ge.Message
}
Expand Down
18 changes: 10 additions & 8 deletions graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package graph

import (
"bytes"
"fmt"
)

// NodeConstrain is a constraint for a node in a graph
Expand Down Expand Up @@ -56,25 +57,26 @@ func NewGraph[NT NodeConstrain](edgeSpecFunc EdgeSpecFunc[NT]) *Graph[NT] {
func (g *Graph[NT]) AddNode(n NT) error {
nodeKey := n.DotSpec().ID
if _, ok := g.nodes[nodeKey]; ok {
return ErrDuplicateNode
return NewGraphError(ErrDuplicateNode, fmt.Sprintf("node with key %s already exists in this graph", nodeKey))
}
g.nodes[nodeKey] = n

return nil
}

func (g *Graph[NT]) Connect(from, to string) error {
var nodeFrom, nodeTo NT
func (g *Graph[NT]) Connect(from, to NT) error {
fromNodeKey := from.DotSpec().ID
toNodeKey := to.DotSpec().ID
var ok bool
if nodeFrom, ok = g.nodes[from]; !ok {
return ErrConnectNotExistingNode
if from, ok = g.nodes[fromNodeKey]; !ok {
return NewGraphError(ErrConnectNotExistingNode, fmt.Sprintf("cannot connect node %s, it's not added in this graph yet", fromNodeKey))
}

if nodeTo, ok = g.nodes[to]; !ok {
return ErrConnectNotExistingNode
if to, ok = g.nodes[toNodeKey]; !ok {
return NewGraphError(ErrConnectNotExistingNode, fmt.Sprintf("cannot connect node %s, it's not added in this graph yet", toNodeKey))
}

g.nodeEdges[from] = append(g.nodeEdges[from], &Edge[NT]{From: nodeFrom, To: nodeTo})
g.nodeEdges[fromNodeKey] = append(g.nodeEdges[fromNodeKey], &Edge[NT]{From: from, To: to})
return nil
}

Expand Down
29 changes: 20 additions & 9 deletions graph/graph_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package graph_test

import (
"errors"
"fmt"
"testing"

"github.com/Azure/go-asyncjob/graph"
"github.com/stretchr/testify/assert"
)

func TestSimpleJob(t *testing.T) {
g := graph.NewGraph[*testNode](edgeSpecFromConnection)
func TestSimpleGraph(t *testing.T) {
g := graph.NewGraph(edgeSpecFromConnection)
root := &testNode{Name: "root"}
g.AddNode(root)
calc1 := &testNode{Name: "calc1"}
Expand All @@ -18,16 +20,25 @@ func TestSimpleJob(t *testing.T) {
summary := &testNode{Name: "summary"}
g.AddNode(summary)

g.Connect(root.DotSpec().ID, calc1.DotSpec().ID)
g.Connect(root.DotSpec().ID, calc2.DotSpec().ID)
g.Connect(calc1.DotSpec().ID, summary.DotSpec().ID)
g.Connect(calc2.DotSpec().ID, summary.DotSpec().ID)
g.Connect(root, calc1)
g.Connect(root, calc2)
g.Connect(calc1, summary)
g.Connect(calc2, summary)

graph, err := g.ToDotGraph()
graphStr, err := g.ToDotGraph()
if err != nil {
t.Fatal(err)
assert.NoError(t, err)
}
fmt.Println(graph)
t.Log(graphStr)

err = g.AddNode(calc1)
assert.Error(t, err)
assert.True(t, errors.Is(err, graph.ErrDuplicateNode))

calc3 := &testNode{Name: "calc3"}
err = g.Connect(root, calc3)
assert.Error(t, err)
assert.True(t, errors.Is(err, graph.ErrConnectNotExistingNode))
}

type testNode struct {
Expand Down
2 changes: 1 addition & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (j *Job) AddStep(step StepMeta, precedingSteps ...StepMeta) {
stepNode := newStepNode(step)
j.stepsDag.AddNode(stepNode)
for _, precedingStep := range precedingSteps {
j.stepsDag.Connect(precedingStep.getID(), step.getID())
j.stepsDag.Connect(newStepNode(precedingStep), stepNode)
}
}

Expand Down
30 changes: 21 additions & 9 deletions job_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package asyncjob
package asyncjob_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/Azure/go-asyncjob"
"github.com/stretchr/testify/assert"
)

func TestSimpleJob(t *testing.T) {
Expand All @@ -14,15 +17,18 @@ func TestSimpleJob(t *testing.T) {
Query1: "query1",
Table2: "table2",
Query2: "query2",
RetryPolicies: map[string]RetryPolicy{},
RetryPolicies: map[string]asyncjob.RetryPolicy{},
}
jb := sb.BuildJob(context.Background())

jb.Start(context.Background())
jb.Wait(context.Background())
jobErr := jb.Wait(context.Background())
if jobErr != nil {
assert.NoError(t, jobErr)
}

dotGraph, err := jb.Visualize()
if err != nil {
dotGraph, vizErr := jb.Visualize()
if vizErr != nil {
t.FailNow()
}
fmt.Println(dotGraph)
Expand All @@ -36,12 +42,16 @@ func TestSimpleJobError(t *testing.T) {
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{"ExecuteQuery.query2": getErrorFunc(fmt.Errorf("table2 schema error"), 1)},
RetryPolicies: map[string]RetryPolicy{},
RetryPolicies: map[string]asyncjob.RetryPolicy{},
}
jb := sb.BuildJob(context.Background())

jb.Start(context.Background())
jb.Wait(context.Background())
jobErr := jb.Wait(context.Background())
if jobErr != nil {
assert.Error(t, jobErr)
}

dotGraph, err := jb.Visualize()
if err != nil {
Expand All @@ -63,7 +73,7 @@ func TestSimpleJobPanic(t *testing.T) {
"GetConnection": getErrorFunc(fmt.Errorf("InternalServerError"), 1),
"ExecuteQuery.panicQuery1": getPanicFunc(4),
},
RetryPolicies: map[string]RetryPolicy{
RetryPolicies: map[string]asyncjob.RetryPolicy{
"CheckAuth": linearRetry, // coverage for AddStep
"GetConnection": linearRetry, // coverage for StepAfter
"QueryTable1": linearRetry, // coverage for StepAfterBoth
Expand All @@ -72,8 +82,10 @@ func TestSimpleJobPanic(t *testing.T) {
jb := sb.BuildJob(context.Background())

jb.Start(context.Background())
err := jb.Wait(context.Background())
fmt.Print(err)
jobErr := jb.Wait(context.Background())
if jobErr != nil {
assert.Error(t, jobErr)
}

dotGraph, err := jb.Visualize()
if err != nil {
Expand Down
21 changes: 12 additions & 9 deletions job_builder.go → step_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,15 @@ func AddStep[T any](bCtx context.Context, j JobInterface, stepName string, stepF
result, err = stepFunc(j.RuntimeContext())
}

step.executionData.Duration = time.Since(step.executionData.StartTime)

if err != nil {
step.state = StepStateFailed
return nil, newStepError(stepName, err)
} else {
step.state = StepStateCompleted
return result, nil
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
}

step.task = asynctask.Start(bCtx, instrumentedFunc)
Expand Down Expand Up @@ -136,14 +137,15 @@ func StepAfter[T, S any](bCtx context.Context, j JobInterface, stepName string,
result, err = stepFunc(j.RuntimeContext(), t)
}

step.executionData.Duration = time.Since(step.executionData.StartTime)

if err != nil {
step.state = StepStateFailed
return nil, newStepError(stepName, err)
} else {
step.state = StepStateCompleted
return result, nil
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
}

step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc)
Expand Down Expand Up @@ -203,14 +205,15 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j JobInterface, stepName s
result, err = stepFunc(j.RuntimeContext(), t, s)
}

step.executionData.Duration = time.Since(step.executionData.StartTime)

if err != nil {
step.state = StepStateFailed
return nil, newStepError(stepName, err)
} else {
step.state = StepStateCompleted
return result, nil
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
}

step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc)
Expand Down
37 changes: 19 additions & 18 deletions util_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package asyncjob
package asyncjob_test

import (
"context"
"fmt"
"time"

"github.com/Azure/go-asyncjob"
"github.com/Azure/go-asynctask"
)

Expand All @@ -15,7 +16,7 @@ type SqlSummaryJobLib struct {
Table2 string
Query2 string
ErrorInjection map[string]func() error
RetryPolicies map[string]RetryPolicy
RetryPolicies map[string]asyncjob.RetryPolicy
}

type SqlConnection struct {
Expand Down Expand Up @@ -102,26 +103,26 @@ func (sql *SqlSummaryJobLib) EmailNotification(ctx context.Context) error {
return nil
}

func (sql *SqlSummaryJobLib) BuildJob(bCtx context.Context) *Job {
job := NewJob("sqlSummaryJob")
func (sql *SqlSummaryJobLib) BuildJob(bCtx context.Context) *asyncjob.Job {
job := asyncjob.NewJob("sqlSummaryJob")

serverNameParamTask := InputParam(bCtx, job, "serverName", &sql.ServerName)
connTsk, _ := StepAfter(bCtx, job, "GetConnection", serverNameParamTask, sql.GetConnection, WithRetry(sql.RetryPolicies["GetConnection"]))
serverNameParamTask := asyncjob.InputParam(bCtx, job, "serverName", &sql.ServerName)
connTsk, _ := asyncjob.StepAfter(bCtx, job, "GetConnection", serverNameParamTask, sql.GetConnection, asyncjob.WithRetry(sql.RetryPolicies["GetConnection"]))

checkAuthTask, _ := AddStep(bCtx, job, "CheckAuth", asynctask.ActionToFunc(sql.CheckAuth), WithRetry(sql.RetryPolicies["CheckAuth"]))
checkAuthTask, _ := asyncjob.AddStep(bCtx, job, "CheckAuth", asynctask.ActionToFunc(sql.CheckAuth), asyncjob.WithRetry(sql.RetryPolicies["CheckAuth"]))

table1ParamTsk := InputParam(bCtx, job, "table1", &sql.Table1)
table1ClientTsk, _ := StepAfterBoth(bCtx, job, "getTableClient1", connTsk, table1ParamTsk, sql.GetTableClient)
query1ParamTsk := InputParam(bCtx, job, "query1", &sql.Query1)
qery1ResultTsk, _ := StepAfterBoth(bCtx, job, "QueryTable1", table1ClientTsk, query1ParamTsk, sql.ExecuteQuery, WithRetry(sql.RetryPolicies["QueryTable1"]), ExecuteAfter(checkAuthTask))
table1ParamTsk := asyncjob.InputParam(bCtx, job, "table1", &sql.Table1)
table1ClientTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "getTableClient1", connTsk, table1ParamTsk, sql.GetTableClient)
query1ParamTsk := asyncjob.InputParam(bCtx, job, "query1", &sql.Query1)
qery1ResultTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "QueryTable1", table1ClientTsk, query1ParamTsk, sql.ExecuteQuery, asyncjob.WithRetry(sql.RetryPolicies["QueryTable1"]), asyncjob.ExecuteAfter(checkAuthTask))

table2ParamTsk := InputParam(bCtx, job, "table2", &sql.Table2)
table2ClientTsk, _ := StepAfterBoth(bCtx, job, "getTableClient2", connTsk, table2ParamTsk, sql.GetTableClient)
query2ParamTsk := InputParam(bCtx, job, "query2", &sql.Query2)
qery2ResultTsk, _ := StepAfterBoth(bCtx, job, "QueryTable2", table2ClientTsk, query2ParamTsk, sql.ExecuteQuery, WithRetry(sql.RetryPolicies["QueryTable2"]), ExecuteAfter(checkAuthTask))
table2ParamTsk := asyncjob.InputParam(bCtx, job, "table2", &sql.Table2)
table2ClientTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "getTableClient2", connTsk, table2ParamTsk, sql.GetTableClient)
query2ParamTsk := asyncjob.InputParam(bCtx, job, "query2", &sql.Query2)
qery2ResultTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "QueryTable2", table2ClientTsk, query2ParamTsk, sql.ExecuteQuery, asyncjob.WithRetry(sql.RetryPolicies["QueryTable2"]), asyncjob.ExecuteAfter(checkAuthTask))

summaryTsk, _ := StepAfterBoth(bCtx, job, "summarize", qery1ResultTsk, qery2ResultTsk, sql.SummarizeQueryResult)
AddStep(bCtx, job, "emailNotification", asynctask.ActionToFunc(sql.EmailNotification), ExecuteAfter(summaryTsk))
summaryTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "summarize", qery1ResultTsk, qery2ResultTsk, sql.SummarizeQueryResult)
asyncjob.AddStep(bCtx, job, "emailNotification", asynctask.ActionToFunc(sql.EmailNotification), asyncjob.ExecuteAfter(summaryTsk))
return job
}

Expand All @@ -131,7 +132,7 @@ type linearRetryPolicy struct {
tried int
}

func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount int) RetryPolicy {
func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount int) asyncjob.RetryPolicy {
return &linearRetryPolicy{
sleepInterval: sleepInterval,
maxRetryCount: maxRetryCount,
Expand Down