Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drop terraform dag lib #4

Merged
merged 1 commit into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ AsyncJob aiming to help you organize code in dependencyGraph(DAG), instead of a
```

### visualize of a job
this visualize depend on [terraform/dag](github.com/hashicorp/terraform/dag), with some limitation, may need some upstream tweaks:
- able to customize node name
- able to distinguash type of node (param, executionBlock)
- able to show state of node (pending, running, completed, failed)
tried https://github.com/hashicorp/terraform/tree/main/internal/dag, which doesn't have own go module, but terraform go module have too much dependencies.
baking a inhouse one.

```
digraph {
Expand Down
16 changes: 1 addition & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,4 @@ module github.com/Azure/go-asyncjob

go 1.18

require (
github.com/Azure/go-asynctask v1.2.1-after-all
github.com/hashicorp/terraform v0.15.3
)

require (
github.com/agext/levenshtein v1.2.2 // indirect
github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl/v2 v2.10.0 // indirect
github.com/mitchellh/go-wordwrap v1.0.0 // indirect
github.com/zclconf/go-cty v1.8.3 // indirect
golang.org/x/text v0.3.5 // indirect
)
require github.com/Azure/go-asynctask v1.3.0
766 changes: 3 additions & 763 deletions go.sum

Large diffs are not rendered by default.

80 changes: 80 additions & 0 deletions graph/graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package graph

import (
"bytes"
)

type GraphErrorCode string

const (
ErrDuplicateNode GraphErrorCode = "node with same key already exists in this graph"
ErrConnectNotExistingNode GraphErrorCode = "node to connect does not exist in this graph"
)

func (ge GraphErrorCode) Error() string {
return string(ge)
}

type NodeConstrain interface {
// Name of the node, used as key in the graph, so should be unique.
GetName() string
}

type Edge[NT NodeConstrain] struct {
From NT
To NT
}

type Graph[NT NodeConstrain] struct {
nodes map[string]NT
nodeEdges map[string][]*Edge[NT]
}

func NewGraph[NT NodeConstrain]() *Graph[NT] {
return &Graph[NT]{
nodes: make(map[string]NT),
nodeEdges: make(map[string][]*Edge[NT]),
}
}

func (g *Graph[NT]) AddNode(n NT) error {
nodeKey := n.GetName()
if _, ok := g.nodes[nodeKey]; ok {
return ErrDuplicateNode
}
g.nodes[nodeKey] = n

return nil
}

func (g *Graph[NT]) Connect(from, to string) error {
var nodeFrom, nodeTo NT
var ok bool
if nodeFrom, ok = g.nodes[from]; !ok {
return ErrConnectNotExistingNode
}

if nodeTo, ok = g.nodes[to]; !ok {
return ErrConnectNotExistingNode
}

g.nodeEdges[from] = append(g.nodeEdges[from], &Edge[NT]{From: nodeFrom, To: nodeTo})
return nil
}

// https://en.wikipedia.org/wiki/DOT_(graph_description_language)
func (g *Graph[NT]) ToDotGraph() (string, error) {
edges := make(map[string][]string)
for _, nodeEdges := range g.nodeEdges {
for _, edge := range nodeEdges {
edges[edge.From.GetName()] = append(edges[edge.From.GetName()], edge.To.GetName())
}
}

buf := new(bytes.Buffer)
err := digraphTemplate.Execute(buf, edges)
if err != nil {
return "", err
}
return buf.String(), nil
}
15 changes: 15 additions & 0 deletions graph/template.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package graph

import (
"text/template"
)

var digraphTemplate = template.Must(template.New("digraph").Parse(digraphTemplateText))

const digraphTemplateText = `digraph {
compound = "true"
newrank = "true"
subgraph "root" {
{{ range $from, $toList := $}}{{ range $_, $to := $toList}} "{{$from}}" -> "{{$to}}"
{{ end }}{{ end }} }
}`
34 changes: 13 additions & 21 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"sync"

"github.com/Azure/go-asyncjob/graph"
"github.com/Azure/go-asynctask"
"github.com/hashicorp/terraform/dag"
)

type JobState string
Expand All @@ -22,7 +22,7 @@ type Job struct {
state JobState
rootJob *StepInfo[interface{}]
jobStart *sync.WaitGroup
stepsDag *dag.AcyclicGraph
stepsDag *graph.Graph[StepMeta]

// runtimeCtx is captured to separate build context and runtime context.
// golang not recommending to store context in the struct. I don't have better idea.
Expand All @@ -38,7 +38,7 @@ func NewJob(name string) *Job {

jobStart: &jobStart,
state: JobStatePending,
stepsDag: &dag.AcyclicGraph{},
stepsDag: graph.NewGraph[StepMeta](),
}

j.rootJob = &StepInfo[interface{}]{
Expand All @@ -53,7 +53,7 @@ func NewJob(name string) *Job {
}

j.Steps[j.rootJob.GetName()] = j.rootJob
j.stepsDag.Add(j.rootJob.GetName())
j.stepsDag.AddNode(j.rootJob)

return j
}
Expand All @@ -68,7 +68,7 @@ func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T)
step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, j.rootJob.GetName())
j.registerStepInGraph(step, j.rootJob.GetName())

return step
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func AddStep[T any](bCtx context.Context, j *Job, stepName string, stepFunc asyn
step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, precedingStepNames...)
j.registerStepInGraph(step, precedingStepNames...)

return step, nil
}
Expand Down Expand Up @@ -168,10 +168,7 @@ func StepAfter[T, S any](bCtx context.Context, j *Job, stepName string, parentSt
step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, precedingStepNames...)
if err := j.stepsDag.Validate(); err != nil {
return nil, fmt.Errorf("cycle dependency detected: %s", err)
}
j.registerStepInGraph(step, precedingStepNames...)
return step, nil
}

Expand Down Expand Up @@ -221,7 +218,7 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j *Job, stepName string, p
step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, precedingStepNames...)
j.registerStepInGraph(step, precedingStepNames...)

return step, nil
}
Expand All @@ -246,21 +243,16 @@ func (j *Job) Wait(ctx context.Context) error {
return asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, tasks...)
}

func (j *Job) registerStepInGraph(stepName string, precedingStep ...string) error {
j.stepsDag.Add(stepName)
func (j *Job) registerStepInGraph(step StepMeta, precedingStep ...string) error {
j.stepsDag.AddNode(step)
for _, precedingStepName := range precedingStep {
j.stepsDag.Connect(dag.BasicEdge(precedingStepName, stepName))
if err := j.stepsDag.Validate(); err != nil {
return fmt.Errorf("failed to add step %q depend on %q, likely a cycle dependency. %w", stepName, precedingStepName, err)
}
j.stepsDag.Connect(precedingStepName, step.GetName())
}

return nil
}

// Visualize return a DAG of the job execution graph
func (j *Job) Visualize() string {
opts := &dag.DotOpts{MaxDepth: 42}
actual := j.stepsDag.Dot(opts)
return string(actual)
func (j *Job) Visualize() (string, error) {
return j.stepsDag.ToDotGraph()
}
9 changes: 6 additions & 3 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ func TestSimpleJob(t *testing.T) {
}
jb := sb.BuildJob(context.Background())

dotGraph := jb.Visualize()
fmt.Println(dotGraph)

jb.Start(context.Background())
jb.Wait(context.Background())

dotGraph, err := jb.Visualize()
if err != nil {
t.FailNow()
}
fmt.Println(dotGraph)
}

var _ JobBuilder = &SqlJobBuilder{}
Expand Down