Skip to content

Commit

Permalink
Merge pull request #4 from Azure/haitao/new-dag-lib
Browse files Browse the repository at this point in the history
drop terraform dag lib
  • Loading branch information
haitch committed Oct 28, 2022
2 parents 2ed229d + 7f3c57c commit b6c5803
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 806 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ AsyncJob aiming to help you organize code in dependencyGraph(DAG), instead of a
```

### visualize of a job
this visualize depend on [terraform/dag](github.com/hashicorp/terraform/dag), with some limitation, may need some upstream tweaks:
- able to customize node name
- able to distinguash type of node (param, executionBlock)
- able to show state of node (pending, running, completed, failed)
tried https://github.com/hashicorp/terraform/tree/main/internal/dag, which doesn't have own go module, but terraform go module have too much dependencies.
baking a inhouse one.

```
digraph {
Expand Down
16 changes: 1 addition & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,4 @@ module github.com/Azure/go-asyncjob

go 1.18

require (
github.com/Azure/go-asynctask v1.2.1-after-all
github.com/hashicorp/terraform v0.15.3
)

require (
github.com/agext/levenshtein v1.2.2 // indirect
github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl/v2 v2.10.0 // indirect
github.com/mitchellh/go-wordwrap v1.0.0 // indirect
github.com/zclconf/go-cty v1.8.3 // indirect
golang.org/x/text v0.3.5 // indirect
)
require github.com/Azure/go-asynctask v1.3.0
766 changes: 3 additions & 763 deletions go.sum

Large diffs are not rendered by default.

80 changes: 80 additions & 0 deletions graph/graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package graph

import (
"bytes"
)

type GraphErrorCode string

const (
ErrDuplicateNode GraphErrorCode = "node with same key already exists in this graph"
ErrConnectNotExistingNode GraphErrorCode = "node to connect does not exist in this graph"
)

func (ge GraphErrorCode) Error() string {
return string(ge)
}

type NodeConstrain interface {
// Name of the node, used as key in the graph, so should be unique.
GetName() string
}

type Edge[NT NodeConstrain] struct {
From NT
To NT
}

type Graph[NT NodeConstrain] struct {
nodes map[string]NT
nodeEdges map[string][]*Edge[NT]
}

func NewGraph[NT NodeConstrain]() *Graph[NT] {
return &Graph[NT]{
nodes: make(map[string]NT),
nodeEdges: make(map[string][]*Edge[NT]),
}
}

func (g *Graph[NT]) AddNode(n NT) error {
nodeKey := n.GetName()
if _, ok := g.nodes[nodeKey]; ok {
return ErrDuplicateNode
}
g.nodes[nodeKey] = n

return nil
}

func (g *Graph[NT]) Connect(from, to string) error {
var nodeFrom, nodeTo NT
var ok bool
if nodeFrom, ok = g.nodes[from]; !ok {
return ErrConnectNotExistingNode
}

if nodeTo, ok = g.nodes[to]; !ok {
return ErrConnectNotExistingNode
}

g.nodeEdges[from] = append(g.nodeEdges[from], &Edge[NT]{From: nodeFrom, To: nodeTo})
return nil
}

// https://en.wikipedia.org/wiki/DOT_(graph_description_language)
func (g *Graph[NT]) ToDotGraph() (string, error) {
edges := make(map[string][]string)
for _, nodeEdges := range g.nodeEdges {
for _, edge := range nodeEdges {
edges[edge.From.GetName()] = append(edges[edge.From.GetName()], edge.To.GetName())
}
}

buf := new(bytes.Buffer)
err := digraphTemplate.Execute(buf, edges)
if err != nil {
return "", err
}
return buf.String(), nil
}
15 changes: 15 additions & 0 deletions graph/template.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package graph

import (
"text/template"
)

var digraphTemplate = template.Must(template.New("digraph").Parse(digraphTemplateText))

const digraphTemplateText = `digraph {
compound = "true"
newrank = "true"
subgraph "root" {
{{ range $from, $toList := $}}{{ range $_, $to := $toList}} "{{$from}}" -> "{{$to}}"
{{ end }}{{ end }} }
}`
34 changes: 13 additions & 21 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"sync"

"github.com/Azure/go-asyncjob/graph"
"github.com/Azure/go-asynctask"
"github.com/hashicorp/terraform/dag"
)

type JobState string
Expand All @@ -22,7 +22,7 @@ type Job struct {
state JobState
rootJob *StepInfo[interface{}]
jobStart *sync.WaitGroup
stepsDag *dag.AcyclicGraph
stepsDag *graph.Graph[StepMeta]

// runtimeCtx is captured to separate build context and runtime context.
// golang not recommending to store context in the struct. I don't have better idea.
Expand All @@ -38,7 +38,7 @@ func NewJob(name string) *Job {

jobStart: &jobStart,
state: JobStatePending,
stepsDag: &dag.AcyclicGraph{},
stepsDag: graph.NewGraph[StepMeta](),
}

j.rootJob = &StepInfo[interface{}]{
Expand All @@ -53,7 +53,7 @@ func NewJob(name string) *Job {
}

j.Steps[j.rootJob.GetName()] = j.rootJob
j.stepsDag.Add(j.rootJob.GetName())
j.stepsDag.AddNode(j.rootJob)

return j
}
Expand All @@ -68,7 +68,7 @@ func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T)
step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, j.rootJob.GetName())
j.registerStepInGraph(step, j.rootJob.GetName())

return step
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn
step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, precedingStepNames...)
j.registerStepInGraph(step, precedingStepNames...)

return step, nil
}
Expand Down Expand Up @@ -168,10 +168,7 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt
step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, precedingStepNames...)
if err := j.stepsDag.Validate(); err != nil {
return nil, fmt.Errorf("cycle dependency detected: %s", err)
}
j.registerStepInGraph(step, precedingStepNames...)
return step, nil
}

Expand Down Expand Up @@ -221,7 +218,7 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, p
step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, precedingStepNames...)
j.registerStepInGraph(step, precedingStepNames...)

return step, nil
}
Expand All @@ -246,21 +243,16 @@ func (j *Job) Wait(ctx context.Context) error {
return asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, tasks...)
}

func (j *Job) registerStepInGraph(stepName string, precedingStep ...string) error {
j.stepsDag.Add(stepName)
func (j *Job) registerStepInGraph(step StepMeta, precedingStep ...string) error {
j.stepsDag.AddNode(step)
for _, precedingStepName := range precedingStep {
j.stepsDag.Connect(dag.BasicEdge(precedingStepName, stepName))
if err := j.stepsDag.Validate(); err != nil {
return fmt.Errorf("failed to add step %q depend on %q, likely a cycle dependency. %w", stepName, precedingStepName, err)
}
j.stepsDag.Connect(precedingStepName, step.GetName())
}

return nil
}

// Visualize return a DAG of the job execution graph
func (j *Job) Visualize() string {
opts := &dag.DotOpts{MaxDepth: 42}
actual := j.stepsDag.Dot(opts)
return string(actual)
func (j *Job) Visualize() (string, error) {
return j.stepsDag.ToDotGraph()
}
9 changes: 6 additions & 3 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ func TestSimpleJob(t *testing.T) {
}
jb := sb.BuildJob(context.Background())

dotGraph := jb.Visualize()
fmt.Println(dotGraph)

jb.Start(context.Background())
jb.Wait(context.Background())

dotGraph, err := jb.Visualize()
if err != nil {
t.FailNow()
}
fmt.Println(dotGraph)
}

var _ JobBuilder = &SqlJobBuilder{}
Expand Down

0 comments on commit b6c5803

Please sign in to comment.