Skip to content

Commit

Permalink
graphviz works again for jobInstance
Browse files Browse the repository at this point in the history
  • Loading branch information
haitch committed Nov 21, 2022
1 parent 4a83bfe commit b3f1110
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 32 deletions.
4 changes: 3 additions & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ func (jd *JobDefinition[T]) Start(ctx context.Context, input *T) *JobInstance[T]
// create root step instance
ji.rootStep = newStepInstance(jd.rootStep)
ji.rootStep.task = asynctask.NewCompletedTask[T](input)
ji.rootStep.state = StepStateCompleted
ji.steps[ji.rootStep.GetName()] = ji.rootStep
ji.stepsDag.AddNode(ji.rootStep)

// construct job instance graph, with TopologySort ordering
orderedSteps := jd.stepsDag.TopologicalSort()
for _, stepDef := range orderedSteps {
if stepDef.GetName() == jd.Name {
continue
}
ji.steps[stepDef.GetName()] = stepDef.CreateStepInstance(ctx, ji)
ji.steps[stepDef.GetName()] = stepDef.createStepInstance(ctx, ji)

}

Expand Down
48 changes: 28 additions & 20 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ func TestSimpleJob(t *testing.T) {
t.Parallel()
sb := &SqlSummaryJobLib{}

jb := sb.BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{})
renderGraph(jb)
jobInstance := jb.Start(context.Background(), &SqlSummaryJobParameters{
jd := sb.BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{})
renderGraph(t, jd)

jobInstance := jd.Start(context.Background(), &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Expand All @@ -25,17 +26,19 @@ func TestSimpleJob(t *testing.T) {
})
jobErr := jobInstance.Wait(context.Background())
assert.NoError(t, jobErr)

renderGraph(t, jobInstance)
}

func TestJobError(t *testing.T) {
t.Parallel()
sb := &SqlSummaryJobLib{}

jb := sb.BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{})
renderGraph(jb)
jd := sb.BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{})
renderGraph(t, jd)

ctx := context.WithValue(context.Background(), "error-injection.server1.table1", fmt.Errorf("table1 not exists"))
jobInstance := jb.Start(ctx, &SqlSummaryJobParameters{
jobInstance := jd.Start(ctx, &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Expand All @@ -50,17 +53,19 @@ func TestJobError(t *testing.T) {
errors.As(err, &jobErr)
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, "getTableClient1", jobErr.StepName)

renderGraph(t, jobInstance)
}

func TestJobPanic(t *testing.T) {
t.Parallel()
sb := &SqlSummaryJobLib{}

jb := sb.BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{})
renderGraph(jb)
jd := sb.BuildJob(context.Background(), map[string]asyncjob.RetryPolicy{})
renderGraph(t, jd)

ctx := context.WithValue(context.Background(), "panic-injection.server1.table2", true)
jobInstance := jb.Start(ctx, &SqlSummaryJobParameters{
jobInstance := jd.Start(ctx, &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Expand All @@ -71,19 +76,22 @@ func TestJobPanic(t *testing.T) {
err := jobInstance.Wait(context.Background())
assert.Error(t, err)

/*
jobErr := &asyncjob.JobError{}
assert.True(t, errors.As(err, &jobErr))
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, jobErr.StepName, "getTableClient1")*/
/* panic is out of reach of jobError, but planning to catch panic in the future
jobErr := &asyncjob.JobError{}
assert.True(t, errors.As(err, &jobErr))
assert.Equal(t, jobErr.Code, asyncjob.ErrStepFailed)
assert.Equal(t, jobErr.StepName, "getTableClient1")*/

renderGraph(t, jobInstance)
}

func renderGraph[T any](jb *asyncjob.JobDefinition[T]) error {
func renderGraph(t *testing.T, jb GraphRender) {
graphStr, err := jb.Visualize()
if err != nil {
return err
}
assert.NoError(t, err)

t.Log(graphStr)
}

fmt.Println(graphStr)
return nil
type GraphRender interface {
Visualize() (string, error)
}
77 changes: 68 additions & 9 deletions step.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package asyncjob

import (
"context"
"fmt"
"time"

"github.com/Azure/go-asyncjob/graph"
"github.com/Azure/go-asynctask"
Expand All @@ -18,16 +20,26 @@ type stepType string

const stepTypeTask stepType = "task"
const stepTypeRoot stepType = "root"
const stepTypeParam stepType = "param"

// StepDefinitionMeta is the interface for a step definition
type StepDefinitionMeta interface {

// GetName return name of the step
GetName() string

// DependsOn return the list of step names that this step depends on
DependsOn() []string

// ExecutionPolicy return the execution policy of the step
ExecutionPolicy() *StepExecutionOptions

// DotSpec used for generating graphviz graph
DotSpec() *graph.DotNodeSpec
CreateStepInstance(context.Context, JobInstanceMeta) StepInstanceMeta

createStepInstance(context.Context, JobInstanceMeta) StepInstanceMeta
}

// StepDefinition defines a step and it's dependencies in a job definition.
type StepDefinition[T any] struct {
name string
stepType stepType
Expand Down Expand Up @@ -61,7 +73,7 @@ func (sd *StepDefinition[T]) ExecutionPolicy() *StepExecutionOptions {
return sd.executionOptions
}

func (sd *StepDefinition[T]) CreateStepInstance(ctx context.Context, jobInstance JobInstanceMeta) StepInstanceMeta {
func (sd *StepDefinition[T]) createStepInstance(ctx context.Context, jobInstance JobInstanceMeta) StepInstanceMeta {
return sd.instanceCreator(ctx, jobInstance)
}

Expand All @@ -87,12 +99,16 @@ func connectStepDefinition(stepFrom, stepTo StepDefinitionMeta) *graph.DotEdgeSp
return edgeSpec
}

// StepInstanceMeta is the interface for a step instance
type StepInstanceMeta interface {
GetName() string
Waitable() asynctask.Waitable
DotSpec() *graph.DotNodeSpec
ExecutionData() *StepExecutionData
GetState() StepState
}

// StepInstance is the instance of a step, within a job instance.
type StepInstance[T any] struct {
Definition *StepDefinition[T]
task *asynctask.Task[T]
Expand All @@ -116,14 +132,44 @@ func (si *StepInstance[T]) GetName() string {
return si.Definition.GetName()
}

func (sd *StepInstance[T]) DotSpec() *graph.DotNodeSpec {
func (si *StepInstance[T]) GetState() StepState {
return si.state
}

func (si *StepInstance[T]) ExecutionData() *StepExecutionData {
return si.executionData
}

func (si *StepInstance[T]) DotSpec() *graph.DotNodeSpec {
shape := "hexagon"
if si.Definition.stepType == stepTypeRoot {
shape = "triangle"
}

color := "gray"
switch si.state {
case StepStatePending:
color = "gray"
case StepStateRunning:
color = "yellow"
case StepStateCompleted:
color = "green"
case StepStateFailed:
color = "red"
}

tooltip := ""
if si.state != StepStatePending && si.executionData != nil {
tooltip = fmt.Sprintf("State: %s\\nStartAt: %s\\nDuration: %s", si.state, si.executionData.StartTime.Format(time.RFC3339Nano), si.executionData.Duration)
}

return &graph.DotNodeSpec{
Name: sd.GetName(),
DisplayName: sd.GetName(),
Shape: "box",
Name: si.GetName(),
DisplayName: si.GetName(),
Shape: shape,
Style: "filled",
FillColor: "gray",
Tooltip: "",
FillColor: color,
Tooltip: tooltip,
}
}

Expand All @@ -135,5 +181,18 @@ func connectStepInstance(stepFrom, stepTo StepInstanceMeta) *graph.DotEdgeSpec {
Style: "bold",
}

// update edge color, tooltip if NodeTo is started already.
if stepTo.GetState() != StepStatePending {
executionData := stepTo.ExecutionData()
edgeSpec.Tooltip = fmt.Sprintf("Time: %s", executionData.StartTime.Format(time.RFC3339Nano))
}

fromNodeState := stepFrom.GetState()
if fromNodeState == StepStateCompleted {
edgeSpec.Color = "green"
} else if fromNodeState == StepStateFailed {
edgeSpec.Color = "red"
}

return edgeSpec
}
2 changes: 2 additions & 0 deletions step_exec_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"time"
)

// StepExecutionData would measure the step execution time and retry report.
type StepExecutionData struct {
StartTime time.Time
Duration time.Duration
Retried *RetryReport
}

// RetryReport would record the retry count (could extend to include each retry duration, ...)
type RetryReport struct {
Count int
}
4 changes: 2 additions & 2 deletions step_exec_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type RetryPolicy interface {
}

// StepContextPolicy allows context enrichment before passing to step.
type StepContextPolicy func(context.Context) context.Context
type StepContextPolicy func(context.Context, StepInstanceMeta) context.Context

type ExecutionOptionPreparer func(*StepExecutionOptions) *StepExecutionOptions

Expand Down Expand Up @@ -52,7 +52,7 @@ func WithTimeout(timeout time.Duration) ExecutionOptionPreparer {
}
}

func WithEnrichedContext(contextPolicy StepContextPolicy) ExecutionOptionPreparer {
func EnrichedContext(contextPolicy StepContextPolicy) ExecutionOptionPreparer {
return func(options *StepExecutionOptions) *StepExecutionOptions {
options.ContextPolicy = contextPolicy
return options
Expand Down

0 comments on commit b3f1110

Please sign in to comment.