diff --git a/go.mod b/go.mod index d9ae4e2..7de798b 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,11 @@ module github.com/Azure/go-asyncjob go 1.18 require github.com/Azure/go-asynctask v1.3.0 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect + github.com/stretchr/testify v1.8.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 40dea24..a55b73b 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,19 @@ github.com/Azure/go-asynctask v1.3.0 h1:QBx9mGbGi4Urz4YeZ3o1c7cLGL4iUch+mGgNGupTLMI= github.com/Azure/go-asynctask v1.3.0/go.mod h1:S1Ee5SVnt6ZUJ84brodPiHvoNfN2wgDyVO7UYTI5WeM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/graph/error.go b/graph/error.go index 3aae51a..cae5ce3 100644 --- a/graph/error.go +++ b/graph/error.go @@ -1,21 +1,28 @@ package graph -type GraphErrorCode string +type GraphCodeError string const ( - ErrDuplicateNode GraphErrorCode = "node with same key already exists in this graph" - ErrConnectNotExistingNode GraphErrorCode = "node to connect does not exist in this graph" + ErrDuplicateNode GraphCodeError = "node with same key already exists in this graph" + ErrConnectNotExistingNode GraphCodeError = "node to connect does not exist in this graph" ) -func (ge GraphErrorCode) Error() string { +func (ge GraphCodeError) Error() string { return string(ge) } type GraphError struct { - Code GraphErrorCode + Code GraphCodeError Message string } +func NewGraphError(code GraphCodeError, message string) *GraphError { + return &GraphError{ + Code: code, + Message: message, + } +} + func (ge *GraphError) Error() string { return ge.Code.Error() + ": " + ge.Message } diff --git a/graph/graph.go b/graph/graph.go index ac4a70c..d3be92b 100644 --- a/graph/graph.go +++ b/graph/graph.go @@ -2,6 +2,7 @@ package graph import ( "bytes" + "fmt" ) // NodeConstrain is a constraint for a node in a graph @@ -56,25 +57,26 @@ func NewGraph[NT NodeConstrain](edgeSpecFunc EdgeSpecFunc[NT]) *Graph[NT] { func (g *Graph[NT]) AddNode(n NT) error { nodeKey := n.DotSpec().ID if _, ok := g.nodes[nodeKey]; ok { - return ErrDuplicateNode + return NewGraphError(ErrDuplicateNode, fmt.Sprintf("node with key %s already exists in this graph", nodeKey)) } g.nodes[nodeKey] = n return nil } -func (g *Graph[NT]) Connect(from, to string) error { - var nodeFrom, nodeTo NT +func (g *Graph[NT]) Connect(from, to NT) error { + fromNodeKey := from.DotSpec().ID + toNodeKey := to.DotSpec().ID var ok bool - if nodeFrom, ok = g.nodes[from]; !ok { - return ErrConnectNotExistingNode + if from, ok = g.nodes[fromNodeKey]; !ok { + return NewGraphError(ErrConnectNotExistingNode, fmt.Sprintf("cannot connect node %s, it's not added in this graph yet", fromNodeKey)) } - if nodeTo, ok = g.nodes[to]; !ok { - return ErrConnectNotExistingNode + if to, ok = g.nodes[toNodeKey]; !ok { + return NewGraphError(ErrConnectNotExistingNode, fmt.Sprintf("cannot connect node %s, it's not added in this graph yet", toNodeKey)) } - g.nodeEdges[from] = append(g.nodeEdges[from], &Edge[NT]{From: nodeFrom, To: nodeTo}) + g.nodeEdges[fromNodeKey] = append(g.nodeEdges[fromNodeKey], &Edge[NT]{From: from, To: to}) return nil } diff --git a/graph/graph_test.go b/graph/graph_test.go index 03700b2..938743b 100644 --- a/graph/graph_test.go +++ b/graph/graph_test.go @@ -1,14 +1,16 @@ package graph_test import ( + "errors" "fmt" "testing" "github.com/Azure/go-asyncjob/graph" + "github.com/stretchr/testify/assert" ) -func TestSimpleJob(t *testing.T) { - g := graph.NewGraph[*testNode](edgeSpecFromConnection) +func TestSimpleGraph(t *testing.T) { + g := graph.NewGraph(edgeSpecFromConnection) root := &testNode{Name: "root"} g.AddNode(root) calc1 := &testNode{Name: "calc1"} @@ -18,16 +20,25 @@ func TestSimpleJob(t *testing.T) { summary := &testNode{Name: "summary"} g.AddNode(summary) - g.Connect(root.DotSpec().ID, calc1.DotSpec().ID) - g.Connect(root.DotSpec().ID, calc2.DotSpec().ID) - g.Connect(calc1.DotSpec().ID, summary.DotSpec().ID) - g.Connect(calc2.DotSpec().ID, summary.DotSpec().ID) + g.Connect(root, calc1) + g.Connect(root, calc2) + g.Connect(calc1, summary) + g.Connect(calc2, summary) - graph, err := g.ToDotGraph() + graphStr, err := g.ToDotGraph() if err != nil { - t.Fatal(err) + assert.NoError(t, err) } - fmt.Println(graph) + t.Log(graphStr) + + err = g.AddNode(calc1) + assert.Error(t, err) + assert.True(t, errors.Is(err, graph.ErrDuplicateNode)) + + calc3 := &testNode{Name: "calc3"} + err = g.Connect(root, calc3) + assert.Error(t, err) + assert.True(t, errors.Is(err, graph.ErrConnectNotExistingNode)) } type testNode struct { diff --git a/job.go b/job.go index 1eae6ba..b5d31d5 100644 --- a/job.go +++ b/job.go @@ -91,7 +91,7 @@ func (j *Job) AddStep(step StepMeta, precedingSteps ...StepMeta) { stepNode := newStepNode(step) j.stepsDag.AddNode(stepNode) for _, precedingStep := range precedingSteps { - j.stepsDag.Connect(precedingStep.getID(), step.getID()) + j.stepsDag.Connect(newStepNode(precedingStep), stepNode) } } diff --git a/job_test.go b/job_test.go index a2ce3a0..923605d 100644 --- a/job_test.go +++ b/job_test.go @@ -1,10 +1,13 @@ -package asyncjob +package asyncjob_test import ( "context" "fmt" "testing" "time" + + "github.com/Azure/go-asyncjob" + "github.com/stretchr/testify/assert" ) func TestSimpleJob(t *testing.T) { @@ -14,15 +17,18 @@ func TestSimpleJob(t *testing.T) { Query1: "query1", Table2: "table2", Query2: "query2", - RetryPolicies: map[string]RetryPolicy{}, + RetryPolicies: map[string]asyncjob.RetryPolicy{}, } jb := sb.BuildJob(context.Background()) jb.Start(context.Background()) - jb.Wait(context.Background()) + jobErr := jb.Wait(context.Background()) + if jobErr != nil { + assert.NoError(t, jobErr) + } - dotGraph, err := jb.Visualize() - if err != nil { + dotGraph, vizErr := jb.Visualize() + if vizErr != nil { t.FailNow() } fmt.Println(dotGraph) @@ -36,12 +42,16 @@ func TestSimpleJobError(t *testing.T) { Table2: "table2", Query2: "query2", ErrorInjection: map[string]func() error{"ExecuteQuery.query2": getErrorFunc(fmt.Errorf("table2 schema error"), 1)}, - RetryPolicies: map[string]RetryPolicy{}, + RetryPolicies: map[string]asyncjob.RetryPolicy{}, } jb := sb.BuildJob(context.Background()) jb.Start(context.Background()) jb.Wait(context.Background()) + jobErr := jb.Wait(context.Background()) + if jobErr != nil { + assert.Error(t, jobErr) + } dotGraph, err := jb.Visualize() if err != nil { @@ -63,7 +73,7 @@ func TestSimpleJobPanic(t *testing.T) { "GetConnection": getErrorFunc(fmt.Errorf("InternalServerError"), 1), "ExecuteQuery.panicQuery1": getPanicFunc(4), }, - RetryPolicies: map[string]RetryPolicy{ + RetryPolicies: map[string]asyncjob.RetryPolicy{ "CheckAuth": linearRetry, // coverage for AddStep "GetConnection": linearRetry, // coverage for StepAfter "QueryTable1": linearRetry, // coverage for StepAfterBoth @@ -72,8 +82,10 @@ func TestSimpleJobPanic(t *testing.T) { jb := sb.BuildJob(context.Background()) jb.Start(context.Background()) - err := jb.Wait(context.Background()) - fmt.Print(err) + jobErr := jb.Wait(context.Background()) + if jobErr != nil { + assert.Error(t, jobErr) + } dotGraph, err := jb.Visualize() if err != nil { diff --git a/job_builder.go b/step_builder.go similarity index 97% rename from job_builder.go rename to step_builder.go index 6a51c1c..aeb240d 100644 --- a/job_builder.go +++ b/step_builder.go @@ -73,14 +73,15 @@ func AddStep[T any](bCtx context.Context, j JobInterface, stepName string, stepF result, err = stepFunc(j.RuntimeContext()) } + step.executionData.Duration = time.Since(step.executionData.StartTime) + if err != nil { step.state = StepStateFailed + return nil, newStepError(stepName, err) } else { step.state = StepStateCompleted + return result, nil } - - step.executionData.Duration = time.Since(step.executionData.StartTime) - return result, newStepError(stepName, err) } step.task = asynctask.Start(bCtx, instrumentedFunc) @@ -136,14 +137,15 @@ func StepAfter[T, S any](bCtx context.Context, j JobInterface, stepName string, result, err = stepFunc(j.RuntimeContext(), t) } + step.executionData.Duration = time.Since(step.executionData.StartTime) + if err != nil { step.state = StepStateFailed + return nil, newStepError(stepName, err) } else { step.state = StepStateCompleted + return result, nil } - - step.executionData.Duration = time.Since(step.executionData.StartTime) - return result, newStepError(stepName, err) } step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc) @@ -203,14 +205,15 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j JobInterface, stepName s result, err = stepFunc(j.RuntimeContext(), t, s) } + step.executionData.Duration = time.Since(step.executionData.StartTime) + if err != nil { step.state = StepStateFailed + return nil, newStepError(stepName, err) } else { step.state = StepStateCompleted + return result, nil } - - step.executionData.Duration = time.Since(step.executionData.StartTime) - return result, newStepError(stepName, err) } step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc) diff --git a/util_test.go b/util_test.go index 1331f39..a263040 100644 --- a/util_test.go +++ b/util_test.go @@ -1,10 +1,11 @@ -package asyncjob +package asyncjob_test import ( "context" "fmt" "time" + "github.com/Azure/go-asyncjob" "github.com/Azure/go-asynctask" ) @@ -15,7 +16,7 @@ type SqlSummaryJobLib struct { Table2 string Query2 string ErrorInjection map[string]func() error - RetryPolicies map[string]RetryPolicy + RetryPolicies map[string]asyncjob.RetryPolicy } type SqlConnection struct { @@ -102,26 +103,26 @@ func (sql *SqlSummaryJobLib) EmailNotification(ctx context.Context) error { return nil } -func (sql *SqlSummaryJobLib) BuildJob(bCtx context.Context) *Job { - job := NewJob("sqlSummaryJob") +func (sql *SqlSummaryJobLib) BuildJob(bCtx context.Context) *asyncjob.Job { + job := asyncjob.NewJob("sqlSummaryJob") - serverNameParamTask := InputParam(bCtx, job, "serverName", &sql.ServerName) - connTsk, _ := StepAfter(bCtx, job, "GetConnection", serverNameParamTask, sql.GetConnection, WithRetry(sql.RetryPolicies["GetConnection"])) + serverNameParamTask := asyncjob.InputParam(bCtx, job, "serverName", &sql.ServerName) + connTsk, _ := asyncjob.StepAfter(bCtx, job, "GetConnection", serverNameParamTask, sql.GetConnection, asyncjob.WithRetry(sql.RetryPolicies["GetConnection"])) - checkAuthTask, _ := AddStep(bCtx, job, "CheckAuth", asynctask.ActionToFunc(sql.CheckAuth), WithRetry(sql.RetryPolicies["CheckAuth"])) + checkAuthTask, _ := asyncjob.AddStep(bCtx, job, "CheckAuth", asynctask.ActionToFunc(sql.CheckAuth), asyncjob.WithRetry(sql.RetryPolicies["CheckAuth"])) - table1ParamTsk := InputParam(bCtx, job, "table1", &sql.Table1) - table1ClientTsk, _ := StepAfterBoth(bCtx, job, "getTableClient1", connTsk, table1ParamTsk, sql.GetTableClient) - query1ParamTsk := InputParam(bCtx, job, "query1", &sql.Query1) - qery1ResultTsk, _ := StepAfterBoth(bCtx, job, "QueryTable1", table1ClientTsk, query1ParamTsk, sql.ExecuteQuery, WithRetry(sql.RetryPolicies["QueryTable1"]), ExecuteAfter(checkAuthTask)) + table1ParamTsk := asyncjob.InputParam(bCtx, job, "table1", &sql.Table1) + table1ClientTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "getTableClient1", connTsk, table1ParamTsk, sql.GetTableClient) + query1ParamTsk := asyncjob.InputParam(bCtx, job, "query1", &sql.Query1) + qery1ResultTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "QueryTable1", table1ClientTsk, query1ParamTsk, sql.ExecuteQuery, asyncjob.WithRetry(sql.RetryPolicies["QueryTable1"]), asyncjob.ExecuteAfter(checkAuthTask)) - table2ParamTsk := InputParam(bCtx, job, "table2", &sql.Table2) - table2ClientTsk, _ := StepAfterBoth(bCtx, job, "getTableClient2", connTsk, table2ParamTsk, sql.GetTableClient) - query2ParamTsk := InputParam(bCtx, job, "query2", &sql.Query2) - qery2ResultTsk, _ := StepAfterBoth(bCtx, job, "QueryTable2", table2ClientTsk, query2ParamTsk, sql.ExecuteQuery, WithRetry(sql.RetryPolicies["QueryTable2"]), ExecuteAfter(checkAuthTask)) + table2ParamTsk := asyncjob.InputParam(bCtx, job, "table2", &sql.Table2) + table2ClientTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "getTableClient2", connTsk, table2ParamTsk, sql.GetTableClient) + query2ParamTsk := asyncjob.InputParam(bCtx, job, "query2", &sql.Query2) + qery2ResultTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "QueryTable2", table2ClientTsk, query2ParamTsk, sql.ExecuteQuery, asyncjob.WithRetry(sql.RetryPolicies["QueryTable2"]), asyncjob.ExecuteAfter(checkAuthTask)) - summaryTsk, _ := StepAfterBoth(bCtx, job, "summarize", qery1ResultTsk, qery2ResultTsk, sql.SummarizeQueryResult) - AddStep(bCtx, job, "emailNotification", asynctask.ActionToFunc(sql.EmailNotification), ExecuteAfter(summaryTsk)) + summaryTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "summarize", qery1ResultTsk, qery2ResultTsk, sql.SummarizeQueryResult) + asyncjob.AddStep(bCtx, job, "emailNotification", asynctask.ActionToFunc(sql.EmailNotification), asyncjob.ExecuteAfter(summaryTsk)) return job } @@ -131,7 +132,7 @@ type linearRetryPolicy struct { tried int } -func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount int) RetryPolicy { +func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount int) asyncjob.RetryPolicy { return &linearRetryPolicy{ sleepInterval: sleepInterval, maxRetryCount: maxRetryCount,