From 8fc43ff18f70ee3daa982285f1bb5a3b3b25ff99 Mon Sep 17 00:00:00 2001 From: Haitao Chen Date: Fri, 28 Oct 2022 01:45:54 -0700 Subject: [PATCH 1/8] resource for ref --- graph/template.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/graph/template.go b/graph/template.go index 561bbeb..9ba769e 100644 --- a/graph/template.go +++ b/graph/template.go @@ -4,6 +4,9 @@ import ( "text/template" ) +// https://www.graphviz.org/docs/ +// http://magjac.com/graphviz-visual-editor/ + var digraphTemplate = template.Must(template.New("digraph").Parse(digraphTemplateText)) const digraphTemplateText = `digraph { @@ -13,3 +16,11 @@ const digraphTemplateText = `digraph { {{ range $from, $toList := $}}{{ range $_, $to := $toList}} "{{$from}}" -> "{{$to}}" {{ end }}{{ end }} } }` + +const digraphTemplateTextV2 = `digraph { + compound = "true" + newrank = "true" + subgraph "root" { +{{ range $node := $.Nodes}} "{{$node.Name}}" [label="{{$node.Name}}", shape="{{node.Shape}}"] +{{ end }}{{ end }} } +}` From 701c5b6278399784676771c7dd81b97ccce2da0a Mon Sep 17 00:00:00 2001 From: Haitao Chen Date: Sun, 30 Oct 2022 11:38:05 -0700 Subject: [PATCH 2/8] digraph with rich info --- graph/graph.go | 14 +++++++++++--- graph/template.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/graph/graph.go b/graph/graph.go index 70e57aa..eb93df9 100644 --- a/graph/graph.go +++ b/graph/graph.go @@ -17,7 +17,7 @@ func (ge GraphErrorCode) Error() string { type NodeConstrain interface { // Name of the node, used as key in the graph, so should be unique. - GetName() string + DotSpec() DotNodeSpec } type Edge[NT NodeConstrain] struct { @@ -25,6 +25,14 @@ type Edge[NT NodeConstrain] struct { To NT } +type DotNodeSpec struct { + ID string + Name string + Tooltip string + Shape string + Style string +} + type Graph[NT NodeConstrain] struct { nodes map[string]NT nodeEdges map[string][]*Edge[NT] @@ -38,7 +46,7 @@ func NewGraph[NT NodeConstrain]() *Graph[NT] { } func (g *Graph[NT]) AddNode(n NT) error { - nodeKey := n.GetName() + nodeKey := n.DotSpec().ID if _, ok := g.nodes[nodeKey]; ok { return ErrDuplicateNode } @@ -67,7 +75,7 @@ func (g *Graph[NT]) ToDotGraph() (string, error) { edges := make(map[string][]string) for _, nodeEdges := range g.nodeEdges { for _, edge := range nodeEdges { - edges[edge.From.GetName()] = append(edges[edge.From.GetName()], edge.To.GetName()) + edges[edge.From.DotSpec().ID] = append(edges[edge.From.DotSpec().ID], edge.To.DotSpec().ID) } } diff --git a/graph/template.go b/graph/template.go index 9ba769e..60ff44b 100644 --- a/graph/template.go +++ b/graph/template.go @@ -24,3 +24,34 @@ const digraphTemplateTextV2 = `digraph { {{ range $node := $.Nodes}} "{{$node.Name}}" [label="{{$node.Name}}", shape="{{node.Shape}}"] {{ end }}{{ end }} } }` + +/* ideal output +digraph G { + jobroot [shape=triangle style=filled fillcolor=gold tooltip="State: Failed\nDuration:5s"] + param_servername [label="servername" shape=doublecircle style=filled fillcolor=green tooltip="Value: dummy.server.io"] + param_table1 [label="table1" shape=doublecircle style=filled fillcolor=green tooltip="Value: table1"] + param_query1 [label="query1" shape=doublecircle style=filled fillcolor=green tooltip="Value: select * from table1"] + param_table2 [label="table2" shape=doublecircle style=filled fillcolor=green tooltip="Value: table2"] + param_query2 [label="query2" shape=doublecircle style=filled fillcolor=green tooltip="Value: select * from table2"] + jobroot -> param_servername [tooltip="time:2022-10-28T21:16:07Z"] + param_servername -> func_getConnection + func_getConnection [label="getConnection" shape=ellipse style=filled fillcolor=green tooltip="State: Finished\nDuration:1s"] + func_query1 [label="query1" shape=ellipse style=filled fillcolor=green tooltip="State: Finished\nDuration:2s"] + func_query2 [label="query2" shape=ellipse style=filled fillcolor=red tooltip="State: Failed\nDuration:2s"] + jobroot -> param_table1 [style=bold color=green tooltip="time:2022-10-28T21:16:07Z"] + param_table1 -> func_query1 [tooltip="time:2022-10-28T21:16:07Z"] + jobroot -> param_query1 [tooltip="time:2022-10-28T21:16:07Z"] + param_query1 -> func_query1 + jobroot -> param_table2 [tooltip="time:2022-10-28T21:16:07Z"] + param_table2 -> func_query2 + jobroot -> param_query2 [tooltip="time:2022-10-28T21:16:07Z"] + param_query2 -> func_query2 + func_getConnection -> func_query1 + func_query1 -> func_summarize + func_getConnection -> func_query2 + func_query2 -> func_summarize [color=red] + func_summarize [label="summarize" shape=ellipse style=filled fillcolor=red tooltip="State: Blocked"] + func_email [label="email" shape=ellipse style=filled tooltip="State: Pending"] + func_summarize -> func_email [style=dotted] +} +*/ From 80cea5665240bbe11eba92c257debdfca4e80d1e Mon Sep 17 00:00:00 2001 From: Haitao Chen Date: Mon, 31 Oct 2022 01:45:05 -0700 Subject: [PATCH 3/8] graph pkg working --- graph/graph.go | 54 +++++++++++++++++++++++++++++++------------ graph/graph_test.go | 56 +++++++++++++++++++++++++++++++++++++++++++++ graph/template.go | 15 ++++-------- 3 files changed, 100 insertions(+), 25 deletions(-) create mode 100644 graph/graph_test.go diff --git a/graph/graph.go b/graph/graph.go index eb93df9..0088052 100644 --- a/graph/graph.go +++ b/graph/graph.go @@ -15,33 +15,47 @@ func (ge GraphErrorCode) Error() string { return string(ge) } +// NodeConstrain is a constraint for a node in a graph type NodeConstrain interface { - // Name of the node, used as key in the graph, so should be unique. - DotSpec() DotNodeSpec + DotSpec() *DotNodeSpec } +// EdgeSpecFunc is a function that returns the DOT specification for an edge. +type EdgeSpecFunc[T NodeConstrain] func(from, to T) *DotEdgeSpec + type Edge[NT NodeConstrain] struct { From NT To NT } type DotNodeSpec struct { - ID string - Name string - Tooltip string - Shape string - Style string + ID string + Name string + Tooltip string + Shape string + Style string + FillColor string +} + +type DotEdgeSpec struct { + FromNodeID string + ToNodeID string + Tooltip string + Style string + Color string } type Graph[NT NodeConstrain] struct { - nodes map[string]NT - nodeEdges map[string][]*Edge[NT] + nodes map[string]NT + nodeEdges map[string][]*Edge[NT] + edgeSpecFunc EdgeSpecFunc[NT] } -func NewGraph[NT NodeConstrain]() *Graph[NT] { +func NewGraph[NT NodeConstrain](edgeSpecFunc EdgeSpecFunc[NT]) *Graph[NT] { return &Graph[NT]{ - nodes: make(map[string]NT), - nodeEdges: make(map[string][]*Edge[NT]), + nodes: make(map[string]NT), + nodeEdges: make(map[string][]*Edge[NT]), + edgeSpecFunc: edgeSpecFunc, } } @@ -72,17 +86,27 @@ func (g *Graph[NT]) Connect(from, to string) error { // https://en.wikipedia.org/wiki/DOT_(graph_description_language) func (g *Graph[NT]) ToDotGraph() (string, error) { - edges := make(map[string][]string) + nodes := make([]*DotNodeSpec, 0) + for _, node := range g.nodes { + nodes = append(nodes, node.DotSpec()) + } + + edges := make([]*DotEdgeSpec, 0) for _, nodeEdges := range g.nodeEdges { for _, edge := range nodeEdges { - edges[edge.From.DotSpec().ID] = append(edges[edge.From.DotSpec().ID], edge.To.DotSpec().ID) + edges = append(edges, g.edgeSpecFunc(edge.From, edge.To)) } } buf := new(bytes.Buffer) - err := digraphTemplate.Execute(buf, edges) + err := digraphTemplate.Execute(buf, templateRef{Nodes: nodes, Edges: edges}) if err != nil { return "", err } return buf.String(), nil } + +type templateRef struct { + Nodes []*DotNodeSpec + Edges []*DotEdgeSpec +} diff --git a/graph/graph_test.go b/graph/graph_test.go new file mode 100644 index 0000000..03700b2 --- /dev/null +++ b/graph/graph_test.go @@ -0,0 +1,56 @@ +package graph_test + +import ( + "fmt" + "testing" + + "github.com/Azure/go-asyncjob/graph" +) + +func TestSimpleJob(t *testing.T) { + g := graph.NewGraph[*testNode](edgeSpecFromConnection) + root := &testNode{Name: "root"} + g.AddNode(root) + calc1 := &testNode{Name: "calc1"} + g.AddNode(calc1) + calc2 := &testNode{Name: "calc2"} + g.AddNode(calc2) + summary := &testNode{Name: "summary"} + g.AddNode(summary) + + g.Connect(root.DotSpec().ID, calc1.DotSpec().ID) + g.Connect(root.DotSpec().ID, calc2.DotSpec().ID) + g.Connect(calc1.DotSpec().ID, summary.DotSpec().ID) + g.Connect(calc2.DotSpec().ID, summary.DotSpec().ID) + + graph, err := g.ToDotGraph() + if err != nil { + t.Fatal(err) + } + fmt.Println(graph) +} + +type testNode struct { + Name string +} + +func (tn *testNode) DotSpec() *graph.DotNodeSpec { + return &graph.DotNodeSpec{ + ID: tn.Name, + Name: tn.Name, + Tooltip: tn.Name, + Shape: "box", + Style: "filled", + FillColor: "green", + } +} + +func edgeSpecFromConnection(from, to *testNode) *graph.DotEdgeSpec { + return &graph.DotEdgeSpec{ + FromNodeID: from.DotSpec().ID, + ToNodeID: to.DotSpec().ID, + Tooltip: fmt.Sprintf("%s -> %s", from.DotSpec().Name, to.DotSpec().Name), + Style: "solid", + Color: "black", + } +} diff --git a/graph/template.go b/graph/template.go index 60ff44b..6d0f672 100644 --- a/graph/template.go +++ b/graph/template.go @@ -13,16 +13,11 @@ const digraphTemplateText = `digraph { compound = "true" newrank = "true" subgraph "root" { -{{ range $from, $toList := $}}{{ range $_, $to := $toList}} "{{$from}}" -> "{{$to}}" -{{ end }}{{ end }} } -}` - -const digraphTemplateTextV2 = `digraph { - compound = "true" - newrank = "true" - subgraph "root" { -{{ range $node := $.Nodes}} "{{$node.Name}}" [label="{{$node.Name}}", shape="{{node.Shape}}"] -{{ end }}{{ end }} } +{{ range $node := $.Nodes}} {{$node.ID}} [label="{{$node.Name}}" shape={{$node.Shape}} style={{$node.Style}} tooltip={{$node.Tooltip}} fillcolor={{$node.FillColor}}] +{{ end }} +{{ range $edge := $.Edges}} {{$edge.FromNodeID}} -> {{$edge.ToNodeID}} [style={{$edge.Style}} tooltip="{{$edge.Tooltip}}" color={{$edge.Color}}] +{{ end }} + } }` /* ideal output From 53279e69cb2a76bbfbf420f295c239d41b4d8876 Mon Sep 17 00:00:00 2001 From: Haitao Chen Date: Mon, 31 Oct 2022 15:37:19 -0700 Subject: [PATCH 4/8] some working state --- graph/error.go | 25 +++++++++ graph/graph.go | 16 ++---- graph/template.go | 2 +- graph_node.go | 90 ++++++++++++++++++++++++++++++ job.go | 134 ++++++++++++++++++++++++++++----------------- job_builder.go | 9 --- job_test.go | 27 +++++++-- sqljob_lib_test.go | 6 ++ step.go | 28 +++++++++- step_exec_data.go | 10 ++++ 10 files changed, 268 insertions(+), 79 deletions(-) create mode 100644 graph/error.go create mode 100644 graph_node.go delete mode 100644 job_builder.go create mode 100644 step_exec_data.go diff --git a/graph/error.go b/graph/error.go new file mode 100644 index 0000000..3aae51a --- /dev/null +++ b/graph/error.go @@ -0,0 +1,25 @@ +package graph + +type GraphErrorCode string + +const ( + ErrDuplicateNode GraphErrorCode = "node with same key already exists in this graph" + ErrConnectNotExistingNode GraphErrorCode = "node to connect does not exist in this graph" +) + +func (ge GraphErrorCode) Error() string { + return string(ge) +} + +type GraphError struct { + Code GraphErrorCode + Message string +} + +func (ge *GraphError) Error() string { + return ge.Code.Error() + ": " + ge.Message +} + +func (ge *GraphError) Unwrap() error { + return ge.Code +} diff --git a/graph/graph.go b/graph/graph.go index 0088052..ac4a70c 100644 --- a/graph/graph.go +++ b/graph/graph.go @@ -4,17 +4,6 @@ import ( "bytes" ) -type GraphErrorCode string - -const ( - ErrDuplicateNode GraphErrorCode = "node with same key already exists in this graph" - ErrConnectNotExistingNode GraphErrorCode = "node to connect does not exist in this graph" -) - -func (ge GraphErrorCode) Error() string { - return string(ge) -} - // NodeConstrain is a constraint for a node in a graph type NodeConstrain interface { DotSpec() *DotNodeSpec @@ -28,6 +17,7 @@ type Edge[NT NodeConstrain] struct { To NT } +// DotNodeSpec is the specification for a node in a DOT graph type DotNodeSpec struct { ID string Name string @@ -37,6 +27,7 @@ type DotNodeSpec struct { FillColor string } +// DotEdgeSpec is the specification for an edge in DOT graph type DotEdgeSpec struct { FromNodeID string ToNodeID string @@ -45,12 +36,14 @@ type DotEdgeSpec struct { Color string } +// Graph hold the nodes and edges of a graph type Graph[NT NodeConstrain] struct { nodes map[string]NT nodeEdges map[string][]*Edge[NT] edgeSpecFunc EdgeSpecFunc[NT] } +// NewGraph creates a new graph func NewGraph[NT NodeConstrain](edgeSpecFunc EdgeSpecFunc[NT]) *Graph[NT] { return &Graph[NT]{ nodes: make(map[string]NT), @@ -59,6 +52,7 @@ func NewGraph[NT NodeConstrain](edgeSpecFunc EdgeSpecFunc[NT]) *Graph[NT] { } } +// AddNode adds a node to the graph func (g *Graph[NT]) AddNode(n NT) error { nodeKey := n.DotSpec().ID if _, ok := g.nodes[nodeKey]; ok { diff --git a/graph/template.go b/graph/template.go index 6d0f672..4fab88c 100644 --- a/graph/template.go +++ b/graph/template.go @@ -13,7 +13,7 @@ const digraphTemplateText = `digraph { compound = "true" newrank = "true" subgraph "root" { -{{ range $node := $.Nodes}} {{$node.ID}} [label="{{$node.Name}}" shape={{$node.Shape}} style={{$node.Style}} tooltip={{$node.Tooltip}} fillcolor={{$node.FillColor}}] +{{ range $node := $.Nodes}} {{$node.ID}} [label="{{$node.Name}}" shape={{$node.Shape}} style={{$node.Style}} tooltip="{{$node.Tooltip}}" fillcolor={{$node.FillColor}}] {{ end }} {{ range $edge := $.Edges}} {{$edge.FromNodeID}} -> {{$edge.ToNodeID}} [style={{$edge.Style}} tooltip="{{$edge.Tooltip}}" color={{$edge.Color}}] {{ end }} diff --git a/graph_node.go b/graph_node.go new file mode 100644 index 0000000..086793b --- /dev/null +++ b/graph_node.go @@ -0,0 +1,90 @@ +package asyncjob + +import ( + "fmt" + + "github.com/Azure/go-asyncjob/graph" +) + +type stepNode struct { + StepMeta +} + +func newStepNode(sm StepMeta) *stepNode { + return &stepNode{ + StepMeta: sm, + } +} + +func (sn *stepNode) DotSpec() *graph.DotNodeSpec { + return &graph.DotNodeSpec{ + ID: sn.getID(), + Name: sn.GetName(), + Shape: sn.getShape(), + Style: "filled", + FillColor: sn.getFillColor(), + Tooltip: sn.getTooltip(), + } +} + +func (sn *stepNode) getShape() string { + switch sn.getType() { + case stepTypeRoot: + return "triangle" + case stepTypeParam: + return "doublecircle" + case stepTypeTask: + return "ellipse" + default: + return "box" + } +} + +func (sn *stepNode) getFillColor() string { + switch sn.GetState() { + case StepStatePending: + return "gray" + case StepStateRunning: + return "yellow" + case StepStateCompleted: + return "green" + case StepStateFailed: + return "red" + default: + return "white" + } +} + +func (sn *stepNode) getTooltip() string { + state := sn.GetState() + executionData := sn.ExecutionData() + + if state != StepStatePending && executionData != nil { + return fmt.Sprintf("Type: %s\\nName: %s\\nState: %s\\nStartAt: %s\\nDuration: %s", string(sn.getType()), sn.GetName(), state, executionData.StartTime, executionData.Duration) + } + + return fmt.Sprintf("Type: %s\\nName: %s", sn.getType(), sn.GetName()) +} + +func stepConn(snFrom, snTo *stepNode) *graph.DotEdgeSpec { + edgeSpec := &graph.DotEdgeSpec{ + FromNodeID: snFrom.getID(), + ToNodeID: snTo.getID(), + Color: "black", + Style: "bold", + } + + // update edge color, tooltip if NodeTo is started already. + if snTo.GetState() != StepStatePending { + executionData := snTo.ExecutionData() + edgeSpec.Tooltip = fmt.Sprintf("Time: %s", executionData.StartTime) + fromNodeState := snFrom.GetState() + if fromNodeState == StepStateCompleted { + edgeSpec.Color = "green" + } else if fromNodeState == StepStateFailed { + edgeSpec.Color = "red" + } + } + + return edgeSpec +} diff --git a/job.go b/job.go index 07160fe..bd34a91 100644 --- a/job.go +++ b/job.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/Azure/go-asyncjob/graph" "github.com/Azure/go-asynctask" @@ -15,14 +16,16 @@ const JobStatePending JobState = "pending" const JobStateRunning JobState = "running" const JobStateCompleted JobState = "completed" +const rootStepName = "job" + type Job struct { Name string Steps map[string]StepMeta state JobState - rootJob *StepInfo[interface{}] + rootStep *StepInfo[interface{}] jobStart *sync.WaitGroup - stepsDag *graph.Graph[StepMeta] + stepsDag *graph.Graph[*stepNode] // runtimeCtx is captured to separate build context and runtime context. // golang not recommending to store context in the struct. I don't have better idea. @@ -38,59 +41,66 @@ func NewJob(name string) *Job { jobStart: &jobStart, state: JobStatePending, - stepsDag: graph.NewGraph[StepMeta](), + stepsDag: graph.NewGraph[*stepNode](stepConn), } - j.rootJob = &StepInfo[interface{}]{ - name: "[Start]", - task: asynctask.Start(context.Background(), func(fctx context.Context) (*interface{}, error) { - fmt.Println("RootJob Added") - // this will pause all steps from starting, until Start() method is called. - jobStart.Wait() - j.state = JobStateRunning - return nil, nil - }), + rootStep := newStepInfo[interface{}](rootStepName, stepTypeRoot) + rootJobFunc := func(ctx context.Context) (*interface{}, error) { + // this will pause all steps from starting, until Start() method is called. + jobStart.Wait() + j.rootStep.executionData.StartTime = time.Now() + j.state = JobStateRunning + j.rootStep.state = StepStateCompleted + j.rootStep.executionData.Duration = time.Since(j.rootStep.executionData.StartTime) + return nil, nil } - j.Steps[j.rootJob.GetName()] = j.rootJob - j.stepsDag.AddNode(j.rootJob) + rootStep.task = asynctask.Start(context.Background(), rootJobFunc) + j.rootStep = rootStep + + j.Steps[j.rootStep.GetName()] = j.rootStep + j.stepsDag.AddNode(newStepNode(j.rootStep)) return j } func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T) *StepInfo[T] { - step := newStepInfo[T](stepName) + step := newStepInfo[T](stepName, stepTypeParam) instrumentedFunc := func(ctx context.Context) (*T, error) { - j.rootJob.Wait(ctx) + j.rootStep.Wait(ctx) + step.executionData.StartTime = time.Now() + step.state = StepStateCompleted + step.executionData.Duration = time.Since(j.rootStep.executionData.StartTime) return value, nil } step.task = asynctask.Start(bCtx, instrumentedFunc) j.Steps[stepName] = step - j.registerStepInGraph(step, j.rootJob.GetName()) + j.registerStepInGraph(step, j.rootStep) return step } func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asynctask.AsyncFunc[T], optionDecorators ...ExecutionOptionPreparer) (*StepInfo[T], error) { - step := newStepInfo[T](stepName, optionDecorators...) + step := newStepInfo[T](stepName, stepTypeTask, optionDecorators...) // also consider specified the dependencies from ExecutionOptionPreparer, without consume the result. - var precedingStepNames = step.DependsOn() var precedingTasks []asynctask.Waitable - for _, stepName := range precedingStepNames { - if step, ok := j.Steps[stepName]; ok { - precedingTasks = append(precedingTasks, step.Waitable()) + var precedingSteps []StepMeta + for _, depStepName := range step.DependsOn() { + if depStep, ok := j.Steps[depStepName]; ok { + precedingTasks = append(precedingTasks, depStep.Waitable()) + precedingSteps = append(precedingSteps, depStep) } else { - return nil, fmt.Errorf("step [%s] not found", stepName) + return nil, fmt.Errorf("step [%s] not found", depStepName) } } // if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet. if len(precedingTasks) == 0 { - precedingStepNames = append(precedingStepNames, j.rootJob.GetName()) - precedingTasks = append(precedingTasks, j.rootJob.Waitable()) + precedingSteps = append(precedingSteps, j.rootStep) + precedingTasks = append(precedingTasks, j.rootStep.Waitable()) } // instrument to : @@ -101,8 +111,12 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn // timeoutHandling (TODO) instrumentedFunc := func(ctx context.Context) (*T, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { + step.executionData.StartTime = time.Now() + step.state = StepStateFailed + step.executionData.Duration = 0 return nil, err } + step.executionData.StartTime = time.Now() step.state = StepStateRunning result, err := stepFunc(j.runtimeCtx) if err != nil { @@ -110,13 +124,14 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn } else { step.state = StepStateCompleted } + step.executionData.Duration = time.Since(step.executionData.StartTime) return result, err } step.task = asynctask.Start(bCtx, instrumentedFunc) j.Steps[stepName] = step - j.registerStepInGraph(step, precedingStepNames...) + j.registerStepInGraph(step, precedingSteps...) return step, nil } @@ -127,22 +142,24 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt return nil, fmt.Errorf("step [%s] not found in job", parentStep.GetName()) } - step := newStepInfo[S](stepName, append(optionDecorators, ExecuteAfter(parentStep))...) + step := newStepInfo[S](stepName, stepTypeTask, append(optionDecorators, ExecuteAfter(parentStep))...) // also consider specified the dependencies from ExecutionOptionPreparer, without consume the result. - var precedingStepNames = step.DependsOn() + var precedingSteps []StepMeta var precedingTasks []asynctask.Waitable - for _, stepName := range precedingStepNames { - if step, ok := j.Steps[stepName]; ok { - precedingTasks = append(precedingTasks, step.Waitable()) + for _, depStepName := range step.DependsOn() { + if depStep, ok := j.Steps[depStepName]; ok { + precedingSteps = append(precedingSteps, depStep) + precedingTasks = append(precedingTasks, depStep.Waitable()) } else { - return nil, fmt.Errorf("step [%s] not found", stepName) + return nil, fmt.Errorf("step [%s] not found", depStepName) } } // if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet. if len(precedingTasks) == 0 { - precedingTasks = append(precedingTasks, j.rootJob.Waitable()) + precedingSteps = append(precedingSteps, j.rootStep) + precedingTasks = append(precedingTasks, j.rootStep.Waitable()) } // instrument to : @@ -153,8 +170,12 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt // timeoutHandling (TODO) instrumentedFunc := func(ctx context.Context, t *T) (*S, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { + step.executionData.StartTime = time.Now() + step.state = StepStateFailed + step.executionData.Duration = 0 return nil, err } + step.executionData.StartTime = time.Now() step.state = StepStateRunning result, err := stepFunc(j.runtimeCtx, t) if err != nil { @@ -162,13 +183,14 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt } else { step.state = StepStateCompleted } + step.executionData.Duration = time.Since(step.executionData.StartTime) return result, err } step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc) j.Steps[stepName] = step - j.registerStepInGraph(step, precedingStepNames...) + j.registerStepInGraph(step, precedingSteps...) return step, nil } @@ -181,22 +203,24 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, p return nil, fmt.Errorf("step [%s] not found in job", parentStepS.GetName()) } - step := newStepInfo[R](stepName, append(optionDecorators, ExecuteAfter(parentStepT), ExecuteAfter(parentStepS))...) + step := newStepInfo[R](stepName, stepTypeTask, append(optionDecorators, ExecuteAfter(parentStepT), ExecuteAfter(parentStepS))...) // also consider specified the dependencies from ExecutionOptionPreparer, without consume the result. - var precedingStepNames = step.DependsOn() + var precedingSteps []StepMeta var precedingTasks []asynctask.Waitable - for _, stepName := range precedingStepNames { - if step, ok := j.Steps[stepName]; ok { - precedingTasks = append(precedingTasks, step.Waitable()) + for _, depStepName := range step.DependsOn() { + if depStep, ok := j.Steps[depStepName]; ok { + precedingSteps = append(precedingSteps, depStep) + precedingTasks = append(precedingTasks, depStep.Waitable()) } else { - return nil, fmt.Errorf("step [%s] not found", stepName) + return nil, fmt.Errorf("step [%s] not found", depStepName) } } // if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet. if len(precedingTasks) == 0 { - precedingTasks = append(precedingTasks, j.rootJob.Waitable()) + precedingSteps = append(precedingSteps, j.rootStep) + precedingTasks = append(precedingTasks, j.rootStep.Waitable()) } // instrument to : // replaceRuntimeContext @@ -204,7 +228,15 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, p // retryHandling (TODO) // errorHandling (TODO) // timeoutHandling (TODO) - instrumentedFunc := func(_ context.Context, t *T, s *S) (*R, error) { + instrumentedFunc := func(ctx context.Context, t *T, s *S) (*R, error) { + if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { + step.executionData.StartTime = time.Now() + step.state = StepStateFailed + step.executionData.Duration = 0 + return nil, err + } + + step.executionData.StartTime = time.Now() step.state = StepStateRunning result, err := stepFunc(j.runtimeCtx, t, s) if err != nil { @@ -212,13 +244,14 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, p } else { step.state = StepStateCompleted } + step.executionData.Duration = time.Since(step.executionData.StartTime) return result, err } step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc) j.Steps[stepName] = step - j.registerStepInGraph(step, precedingStepNames...) + j.registerStepInGraph(step, precedingSteps...) return step, nil } @@ -227,11 +260,9 @@ func (j *Job) Start(ctx context.Context) error { // TODO: lock Steps, no modification to job execution graph j.jobStart.Done() j.runtimeCtx = ctx - if err := j.rootJob.Wait(ctx); err != nil { - return fmt.Errorf("job [Start] failed: %w", err) + if err := j.rootStep.Wait(ctx); err != nil { + return fmt.Errorf("root job %s failed: %w", j.rootStep.name, err) } - - j.rootJob.state = StepStateCompleted return nil } @@ -243,10 +274,11 @@ func (j *Job) Wait(ctx context.Context) error { return asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, tasks...) } -func (j *Job) registerStepInGraph(step StepMeta, precedingStep ...string) error { - j.stepsDag.AddNode(step) - for _, precedingStepName := range precedingStep { - j.stepsDag.Connect(precedingStepName, step.GetName()) +func (j *Job) registerStepInGraph(step StepMeta, precedingSteps ...StepMeta) error { + stepNode := newStepNode(step) + j.stepsDag.AddNode(stepNode) + for _, precedingStep := range precedingSteps { + j.stepsDag.Connect(precedingStep.getID(), step.getID()) } return nil diff --git a/job_builder.go b/job_builder.go deleted file mode 100644 index 890bd51..0000000 --- a/job_builder.go +++ /dev/null @@ -1,9 +0,0 @@ -package asyncjob - -import ( - "context" -) - -type JobBuilder interface { - BuildJob(context.Context) *Job -} diff --git a/job_test.go b/job_test.go index 4a3367c..5c2c17f 100644 --- a/job_test.go +++ b/job_test.go @@ -13,7 +13,7 @@ func TestSimpleJob(t *testing.T) { Table2: "table2", Query2: "query2", } - jb := sb.BuildJob(context.Background()) + jb := sb.BuildJob(context.Background(), nil) jb.Start(context.Background()) jb.Wait(context.Background()) @@ -25,7 +25,24 @@ func TestSimpleJob(t *testing.T) { fmt.Println(dotGraph) } -var _ JobBuilder = &SqlJobBuilder{} +func TestSimpleJobError(t *testing.T) { + sb := &SqlJobBuilder{ + Table1: "table1", + Query1: "query1", + Table2: "table2", + Query2: "query2", + } + jb := sb.BuildJob(context.Background(), map[string]error{"table2": fmt.Errorf("table2 schema error")}) + + jb.Start(context.Background()) + jb.Wait(context.Background()) + + dotGraph, err := jb.Visualize() + if err != nil { + t.FailNow() + } + fmt.Println(dotGraph) +} type SqlJobBuilder struct { ServerName string @@ -35,15 +52,13 @@ type SqlJobBuilder struct { Query2 string } -func (sjb *SqlJobBuilder) BuildJob(bCtx context.Context) *Job { +func (sjb *SqlJobBuilder) BuildJob(bCtx context.Context, errorInjections map[string]error) *Job { job := NewJob("sqlSummaryJob") - jobLib := &SqlSummaryJobLib{} + jobLib := &SqlSummaryJobLib{ErrorInjection: errorInjections} serverNameParamTask := InputParam(bCtx, job, "param_serverName", &sjb.ServerName) connTsk, _ := StepAfter(bCtx, job, "getConnection", serverNameParamTask, jobLib.GetConnection) - // TODO: handle error during BuildJob - table1ParamTsk := InputParam(bCtx, job, "param_table1", &sjb.Table1) table1ClientTsk, _ := StepAfterBoth(bCtx, job, "getTableClient1", connTsk, table1ParamTsk, jobLib.GetTableClient) query1ParamTsk := InputParam(bCtx, job, "param_query1", &sjb.Query1) diff --git a/sqljob_lib_test.go b/sqljob_lib_test.go index e3f9061..16a461b 100644 --- a/sqljob_lib_test.go +++ b/sqljob_lib_test.go @@ -6,6 +6,7 @@ import ( ) type SqlSummaryJobLib struct { + ErrorInjection map[string]error } type SqlConnection struct { @@ -38,6 +39,11 @@ func (sql *SqlSummaryJobLib) GetTableClient(ctx context.Context, conn *SqlConnec func (sql *SqlSummaryJobLib) ExecuteQuery(ctx context.Context, tableClient *SqlTableClient, queryString *string) (*SqlQueryResult, error) { fmt.Println("ExecuteQuery:", *queryString) + if sql.ErrorInjection != nil { + if err, ok := sql.ErrorInjection[tableClient.TableName]; ok { + return nil, err + } + } return &SqlQueryResult{Data: map[string]interface{}{"serverName": tableClient.ServerName, "tableName": tableClient.TableName, "queryName": *queryString}}, nil } diff --git a/step.go b/step.go index 038ca5a..03fb64a 100644 --- a/step.go +++ b/step.go @@ -2,6 +2,7 @@ package asyncjob import ( "context" + "fmt" "time" "github.com/Azure/go-asynctask" @@ -14,6 +15,12 @@ const StepStateRunning StepState = "running" const StepStateFailed StepState = "failed" const StepStateCompleted StepState = "completed" +type stepType string + +const stepTypeTask stepType = "task" +const stepTypeRoot stepType = "root" +const stepTypeParam stepType = "param" + type StepExecutionOptions struct { Timeout time.Duration ErrorPolicy StepErrorPolicy @@ -43,6 +50,9 @@ type StepMeta interface { Wait(context.Context) error Waitable() asynctask.Waitable ExecutionPolicy() *StepExecutionOptions + ExecutionData() *StepExecutionData + getType() stepType + getID() string } type StepInfo[T any] struct { @@ -51,13 +61,17 @@ type StepInfo[T any] struct { state StepState executionOptions *StepExecutionOptions job *Job + executionData *StepExecutionData + stepType stepType } -func newStepInfo[T any](stepName string, optionDecorators ...ExecutionOptionPreparer) *StepInfo[T] { +func newStepInfo[T any](stepName string, stepType stepType, optionDecorators ...ExecutionOptionPreparer) *StepInfo[T] { step := &StepInfo[T]{ name: stepName, state: StepStatePending, executionOptions: &StepExecutionOptions{}, + executionData: &StepExecutionData{}, + stepType: stepType, } for _, decorator := range optionDecorators { @@ -93,3 +107,15 @@ func (si *StepInfo[T]) Waitable() asynctask.Waitable { func (si *StepInfo[T]) ExecutionPolicy() *StepExecutionOptions { return si.executionOptions } + +func (si *StepInfo[T]) ExecutionData() *StepExecutionData { + return si.executionData +} + +func (si *StepInfo[T]) getType() stepType { + return si.stepType +} + +func (sn *StepInfo[T]) getID() string { + return fmt.Sprintf("%s_%s", sn.getType(), sn.GetName()) +} diff --git a/step_exec_data.go b/step_exec_data.go new file mode 100644 index 0000000..9d86088 --- /dev/null +++ b/step_exec_data.go @@ -0,0 +1,10 @@ +package asyncjob + +import ( + "time" +) + +type StepExecutionData struct { + StartTime time.Time + Duration time.Duration +} From b0746e4d6278ca928694a0ad79426afb245f709c Mon Sep 17 00:00:00 2001 From: Haitao Chen Date: Mon, 31 Oct 2022 15:47:37 -0700 Subject: [PATCH 5/8] keep steps in pending state when preceeding task failed. --- graph_node.go | 4 ++-- job.go | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/graph_node.go b/graph_node.go index 086793b..b521c8e 100644 --- a/graph_node.go +++ b/graph_node.go @@ -34,9 +34,9 @@ func (sn *stepNode) getShape() string { case stepTypeParam: return "doublecircle" case stepTypeTask: - return "ellipse" - default: return "box" + default: + return "egg" } } diff --git a/job.go b/job.go index bd34a91..a9744d8 100644 --- a/job.go +++ b/job.go @@ -111,9 +111,11 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn // timeoutHandling (TODO) instrumentedFunc := func(ctx context.Context) (*T, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { + /* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. + we need to be consistent on how to set state of dependent step. step.executionData.StartTime = time.Now() step.state = StepStateFailed - step.executionData.Duration = 0 + step.executionData.Duration = 0 */ return nil, err } step.executionData.StartTime = time.Now() @@ -170,9 +172,11 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt // timeoutHandling (TODO) instrumentedFunc := func(ctx context.Context, t *T) (*S, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { + /* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. + we need to be consistent on how to set state of dependent step. step.executionData.StartTime = time.Now() step.state = StepStateFailed - step.executionData.Duration = 0 + step.executionData.Duration = 0 */ return nil, err } step.executionData.StartTime = time.Now() @@ -230,9 +234,11 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, p // timeoutHandling (TODO) instrumentedFunc := func(ctx context.Context, t *T, s *S) (*R, error) { if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil { + /* this only work on ExecuteAfter from input, asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed. + we need to be consistent on how to set state of dependent step. step.executionData.StartTime = time.Now() step.state = StepStateFailed - step.executionData.Duration = 0 + step.executionData.Duration = 0 */ return nil, err } From fcae944295fe2e7b30382c5e3455afc11ed717cd Mon Sep 17 00:00:00 2001 From: Haitao Chen Date: Mon, 31 Oct 2022 15:54:33 -0700 Subject: [PATCH 6/8] tweaks --- graph/template.go | 2 +- graph_node.go | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/graph/template.go b/graph/template.go index 4fab88c..e86242e 100644 --- a/graph/template.go +++ b/graph/template.go @@ -17,7 +17,7 @@ const digraphTemplateText = `digraph { {{ end }} {{ range $edge := $.Edges}} {{$edge.FromNodeID}} -> {{$edge.ToNodeID}} [style={{$edge.Style}} tooltip="{{$edge.Tooltip}}" color={{$edge.Color}}] {{ end }} - } + } }` /* ideal output diff --git a/graph_node.go b/graph_node.go index b521c8e..a2f7a97 100644 --- a/graph_node.go +++ b/graph_node.go @@ -2,6 +2,7 @@ package asyncjob import ( "fmt" + "time" "github.com/Azure/go-asyncjob/graph" ) @@ -60,7 +61,7 @@ func (sn *stepNode) getTooltip() string { executionData := sn.ExecutionData() if state != StepStatePending && executionData != nil { - return fmt.Sprintf("Type: %s\\nName: %s\\nState: %s\\nStartAt: %s\\nDuration: %s", string(sn.getType()), sn.GetName(), state, executionData.StartTime, executionData.Duration) + return fmt.Sprintf("Type: %s\\nName: %s\\nState: %s\\nStartAt: %s\\nDuration: %s", string(sn.getType()), sn.GetName(), state, executionData.StartTime.Format(time.RFC3339Nano), executionData.Duration) } return fmt.Sprintf("Type: %s\\nName: %s", sn.getType(), sn.GetName()) @@ -77,13 +78,14 @@ func stepConn(snFrom, snTo *stepNode) *graph.DotEdgeSpec { // update edge color, tooltip if NodeTo is started already. if snTo.GetState() != StepStatePending { executionData := snTo.ExecutionData() - edgeSpec.Tooltip = fmt.Sprintf("Time: %s", executionData.StartTime) - fromNodeState := snFrom.GetState() - if fromNodeState == StepStateCompleted { - edgeSpec.Color = "green" - } else if fromNodeState == StepStateFailed { - edgeSpec.Color = "red" - } + edgeSpec.Tooltip = fmt.Sprintf("Time: %s", executionData.StartTime.Format(time.RFC3339Nano)) + } + + fromNodeState := snFrom.GetState() + if fromNodeState == StepStateCompleted { + edgeSpec.Color = "green" + } else if fromNodeState == StepStateFailed { + edgeSpec.Color = "red" } return edgeSpec From d44cf0d8711f41426fe103690fe998bab2ec8a2c Mon Sep 17 00:00:00 2001 From: Haitao Chen Date: Mon, 31 Oct 2022 23:59:07 -0700 Subject: [PATCH 7/8] adding codecov --- .github/workflows/go.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index e09d683..7765f93 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -22,4 +22,7 @@ jobs: run: go build -v ./... - name: Test - run: go test -race -v ./... + run: go test -race -coverprofile=coverage.txt -covermode=atomic ./... + + - name: Codecov + uses: codecov/codecov-action@v1.0.6 From 74f8203b6d2689f5a8b12d332945ee482e58ed5b Mon Sep 17 00:00:00 2001 From: Haitao Chen Date: Tue, 1 Nov 2022 00:53:16 -0700 Subject: [PATCH 8/8] tweaks for coverage --- job.go | 11 ----------- job_test.go | 14 ++++++++------ sqljob_lib_test.go | 5 +++++ 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/job.go b/job.go index a9744d8..9ea74be 100644 --- a/job.go +++ b/job.go @@ -158,12 +158,6 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt } } - // if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet. - if len(precedingTasks) == 0 { - precedingSteps = append(precedingSteps, j.rootStep) - precedingTasks = append(precedingTasks, j.rootStep.Waitable()) - } - // instrument to : // replaceRuntimeContext // trackStepState @@ -221,11 +215,6 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, p } } - // if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet. - if len(precedingTasks) == 0 { - precedingSteps = append(precedingSteps, j.rootStep) - precedingTasks = append(precedingTasks, j.rootStep.Waitable()) - } // instrument to : // replaceRuntimeContext // trackStepState diff --git a/job_test.go b/job_test.go index 5c2c17f..13982ba 100644 --- a/job_test.go +++ b/job_test.go @@ -3,6 +3,7 @@ package asyncjob import ( "context" "fmt" + "github.com/Azure/go-asynctask" "testing" ) @@ -56,19 +57,20 @@ func (sjb *SqlJobBuilder) BuildJob(bCtx context.Context, errorInjections map[str job := NewJob("sqlSummaryJob") jobLib := &SqlSummaryJobLib{ErrorInjection: errorInjections} - serverNameParamTask := InputParam(bCtx, job, "param_serverName", &sjb.ServerName) + serverNameParamTask := InputParam(bCtx, job, "serverName", &sjb.ServerName) connTsk, _ := StepAfter(bCtx, job, "getConnection", serverNameParamTask, jobLib.GetConnection) - table1ParamTsk := InputParam(bCtx, job, "param_table1", &sjb.Table1) + table1ParamTsk := InputParam(bCtx, job, "table1", &sjb.Table1) table1ClientTsk, _ := StepAfterBoth(bCtx, job, "getTableClient1", connTsk, table1ParamTsk, jobLib.GetTableClient) - query1ParamTsk := InputParam(bCtx, job, "param_query1", &sjb.Query1) + query1ParamTsk := InputParam(bCtx, job, "query1", &sjb.Query1) qery1ResultTsk, _ := StepAfterBoth(bCtx, job, "queryTable1", table1ClientTsk, query1ParamTsk, jobLib.ExecuteQuery) - table2ParamTsk := InputParam(bCtx, job, "param_table2", &sjb.Table2) + table2ParamTsk := InputParam(bCtx, job, "table2", &sjb.Table2) table2ClientTsk, _ := StepAfterBoth(bCtx, job, "getTableClient2", connTsk, table2ParamTsk, jobLib.GetTableClient) - query2ParamTsk := InputParam(bCtx, job, "param_query2", &sjb.Query2) + query2ParamTsk := InputParam(bCtx, job, "query2", &sjb.Query2) qery2ResultTsk, _ := StepAfterBoth(bCtx, job, "queryTable2", table2ClientTsk, query2ParamTsk, jobLib.ExecuteQuery) - StepAfterBoth(bCtx, job, "summarize", qery1ResultTsk, qery2ResultTsk, jobLib.SummarizeQueryResult) + summaryTsk, _ := StepAfterBoth(bCtx, job, "summarize", qery1ResultTsk, qery2ResultTsk, jobLib.SummarizeQueryResult) + AddStep(bCtx, job, "emailNotification", asynctask.ActionToFunc(jobLib.EmailNotification), ExecuteAfter(summaryTsk)) return job } diff --git a/sqljob_lib_test.go b/sqljob_lib_test.go index 16a461b..e94eb7b 100644 --- a/sqljob_lib_test.go +++ b/sqljob_lib_test.go @@ -51,3 +51,8 @@ func (sql *SqlSummaryJobLib) SummarizeQueryResult(ctx context.Context, result1 * fmt.Println("SummarizeQueryResult") return &SummarizedResult{Data1: result1.Data, Data2: result2.Data}, nil } + +func (sql *SqlSummaryJobLib) EmailNotification(ctx context.Context) error { + fmt.Println("EmailNotification") + return nil +}