Skip to content

Commit

Permalink
Merge pull request #17 from Azure/haitao/lint_tweaks
Browse files Browse the repository at this point in the history
linter tweaks, update readme on overhead cost
  • Loading branch information
haitch committed Jan 25, 2023
2 parents ecf5ade + bccb245 commit 58cfa1f
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 122 deletions.
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,40 @@ SqlSummaryAsyncJobDefinition = asyncjob.JobWithResult(job /*from previous sectio
jobInstance1 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})
result, err := jobInstance1.Result(ctx)
```

### Overhead?
- go routine will be created for each step in your jobDefinition, when you call .Start()
- each step also hold tiny memory as well for state tracking.
- userFunction is instrumented with state tracking, panic handling.

Here is some simple visualize on how it actual looks like:
```mermaid
gantt
title asyncjob.Start()
dateFormat HH:mm
section GetConnection
WaitPrecedingTasks :des11, 00:00,0ms
userFunction :des12, after des11, 20ms
section GetTableClient1
WaitPrecedingTasks :des21, 00:00,20ms
userFunction :des22, after des21, 15ms
section GetTableClient2
WaitPrecedingTasks :des31, 00:00,20ms
userFunction :des32, after des31, 21ms
section QueryTable1
WaitPrecedingTasks :des41, 00:00,35ms
userFunction :des42, after des41, 24ms
section QueryTable2
WaitPrecedingTasks :des51, 00:00,41ms
userFunction :des52, after des51, 30ms
section QueryResultSummarize
WaitPrecedingTasks :des61, 00:00, 71ms
userFunction :des62, after des61, 10ms
```
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ go 1.18
require (
github.com/Azure/go-asyncjob/graph v0.2.0
github.com/Azure/go-asynctask v1.3.1
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.8.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 1 addition & 1 deletion job_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewJobDefinition[T any](name string) *JobDefinition[T] {
j := &JobDefinition[T]{
name: name,
steps: make(map[string]StepDefinitionMeta),
stepsDag: graph.NewGraph[StepDefinitionMeta](connectStepDefinition),
stepsDag: graph.NewGraph(connectStepDefinition),
}

rootStep := newStepDefinition[T](name, stepTypeRoot)
Expand Down
6 changes: 2 additions & 4 deletions job_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package asyncjob
import (
"context"
"errors"
"sync"

"github.com/Azure/go-asyncjob/graph"
"github.com/Azure/go-asynctask"
Expand Down Expand Up @@ -47,7 +46,6 @@ type JobInstance[T any] struct {
jobOptions *JobExecutionOptions
input *T
Definition *JobDefinition[T]
jobStart *sync.WaitGroup
rootStep *StepInstance[T]
steps map[string]StepInstanceMeta
stepsDag *graph.Graph[StepInstanceMeta]
Expand All @@ -58,7 +56,7 @@ func newJobInstance[T any](jd *JobDefinition[T], input *T, jobInstanceOptions ..
Definition: jd,
input: input,
steps: map[string]StepInstanceMeta{},
stepsDag: graph.NewGraph[StepInstanceMeta](connectStepInstance),
stepsDag: graph.NewGraph(connectStepInstance),
jobOptions: &JobExecutionOptions{},
}

Expand All @@ -76,7 +74,7 @@ func newJobInstance[T any](jd *JobDefinition[T], input *T, jobInstanceOptions ..
func (ji *JobInstance[T]) start(ctx context.Context) {
// create root step instance
ji.rootStep = newStepInstance(ji.Definition.rootStep, ji)
ji.rootStep.task = asynctask.NewCompletedTask[T](ji.input)
ji.rootStep.task = asynctask.NewCompletedTask(ji.input)
ji.rootStep.state = StepStateCompleted
ji.steps[ji.rootStep.GetName()] = ji.rootStep
ji.stepsDag.AddNode(ji.rootStep)
Expand Down
152 changes: 68 additions & 84 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,29 @@ import (
func TestSimpleJob(t *testing.T) {
t.Parallel()

jobInstance1 := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
},
}, asyncjob.WithJobId("jobInstance1"))

jobInstance2 := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server2",
Table1: "table3",
Query1: "query3",
Table2: "table4",
Query2: "query4",
},
}, asyncjob.WithJobId("jobInstance2"))

jobInstance3 := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server3",
Table1: "table5",
Query1: "query5",
Table2: "table6",
Query2: "query6",
},
}, asyncjob.WithSequentialExecution())
jobInstance1 := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
}), asyncjob.WithJobId("jobInstance1"))

jobInstance2 := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server2",
Table1: "table3",
Query1: "query3",
Table2: "table4",
Query2: "query4",
}), asyncjob.WithJobId("jobInstance2"))

jobInstance3 := SqlSummaryAsyncJobDefinition.Start(context.WithValue(context.Background(), testLoggingContextKey, t), NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server3",
Table1: "table5",
Query1: "query5",
Table2: "table6",
Query2: "query6",
}), asyncjob.WithSequentialExecution())

jobErr := jobInstance1.Wait(context.Background())
assert.NoError(t, jobErr)
Expand Down Expand Up @@ -79,18 +73,16 @@ func TestJobError(t *testing.T) {
t.Parallel()

ctx := context.WithValue(context.Background(), testLoggingContextKey, t)
jobInstance := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"GetTableClient.server1.table1": func() error { return fmt.Errorf("table1 not exists") },
},
jobInstance := SqlSummaryAsyncJobDefinition.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"GetTableClient.server1.table1": func() error { return fmt.Errorf("table1 not exists") },
},
})
}))

err := jobInstance.Wait(context.Background())
assert.Error(t, err)
Expand All @@ -105,18 +97,16 @@ func TestJobPanic(t *testing.T) {
t.Parallel()

ctx := context.WithValue(context.Background(), testLoggingContextKey, t)
jobInstance := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
PanicInjection: map[string]bool{
"GetTableClient.server1.table2": true,
},
jobInstance := SqlSummaryAsyncJobDefinition.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
PanicInjection: map[string]bool{
"GetTableClient.server1.table2": true,
},
})
}))

err := jobInstance.Wait(context.Background())
assert.Error(t, err)
Expand Down Expand Up @@ -146,18 +136,16 @@ func TestJobStepRetry(t *testing.T) {
ctx := context.WithValue(context.Background(), testLoggingContextKey, t)

// gain code coverage on retry policy in StepAfter
jobInstance := jd.Start(ctx, &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"ExecuteQuery.server1.table1.query1": func() error { return fmt.Errorf("query exeeded memory limit") },
},
jobInstance := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"ExecuteQuery.server1.table1.query1": func() error { return fmt.Errorf("query exeeded memory limit") },
},
})
}))

// once Start() is triggered, job definition should be sealed
assert.True(t, jd.Sealed())
Expand All @@ -173,18 +161,16 @@ func TestJobStepRetry(t *testing.T) {
assert.Equal(t, exeData.Retried.Count, 3)

// gain code coverage on retry policy in AddStep
jobInstance1 := jd.Start(ctx, &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"GetConnection": func() error { return fmt.Errorf("dial 1.2.3.4 timedout") },
},
jobInstance1 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"GetConnection": func() error { return fmt.Errorf("dial 1.2.3.4 timedout") },
},
})
}))
err = jobInstance1.Wait(context.Background())
assert.Error(t, err)
jobErr = &asyncjob.JobError{}
Expand All @@ -193,18 +179,16 @@ func TestJobStepRetry(t *testing.T) {
assert.Equal(t, "GetConnection", jobErr.StepInstance.GetName())

// gain code coverage on retry policy in AfterBoth
jobInstance2 := jd.Start(ctx, &SqlSummaryJobLib{
Params: &SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"SummarizeQueryResult": func() error { return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") },
},
jobInstance2 := jd.Start(ctx, NewSqlJobLib(&SqlSummaryJobParameters{
ServerName: "server1",
Table1: "table1",
Query1: "query1",
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{
"SummarizeQueryResult": func() error { return fmt.Errorf("result1 and result2 having different schema version, cannot merge.") },
},
})
}))
err = jobInstance2.Wait(context.Background())
assert.Error(t, err)
jobErr = &asyncjob.JobError{}
Expand Down
12 changes: 6 additions & 6 deletions step_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator
// handle panic from user code
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panic cought: %v, StackTrace: %s", r, debug.Stack())
err = fmt.Errorf("panic cought: %v, StackTrace: %s", r, debug.Stack())
}
}()

result, err = stepFunc(ctx)
return result, err
}

stepInstance := newStepInstance[ST](stepD, ji)
stepInstance := newStepInstance(stepD, ji)
stepInstance.task = asynctask.Start(ctx, instrumentedAddStep(stepInstance, precedingTasks, stepFuncWithPanicHandling))
ji.addStepInstance(stepInstance, precedingInstances...)
return stepInstance
Expand Down Expand Up @@ -79,7 +79,7 @@ func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep
// handle panic from user code
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panic cought: %v, StackTrace: %s", r, debug.Stack())
err = fmt.Errorf("panic cought: %v, StackTrace: %s", r, debug.Stack())
}
}()

Expand All @@ -88,7 +88,7 @@ func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep
}

parentStepInstance := getStrongTypedStepInstance(parentStep, ji)
stepInstance := newStepInstance[ST](stepD, ji)
stepInstance := newStepInstance(stepD, ji)
// here ContinueWith may not invoke instrumentedStepAfterBoth at all, if parentStep1 or parentStep2 returns error.
stepInstance.task = asynctask.ContinueWith(ctx, parentStepInstance.task, instrumentedStepAfter(stepInstance, precedingTasks, stepFuncWithPanicHandling))
ji.addStepInstance(stepInstance, precedingInstances...)
Expand Down Expand Up @@ -128,7 +128,7 @@ func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string,
// handle panic from user code
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panic cought: %v, StackTrace: %s", r, debug.Stack())
err = fmt.Errorf("panic cought: %v, StackTrace: %s", r, debug.Stack())
}
}()

Expand All @@ -137,7 +137,7 @@ func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string,
}
parentStepInstance1 := getStrongTypedStepInstance(parentStep1, ji)
parentStepInstance2 := getStrongTypedStepInstance(parentStep2, ji)
stepInstance := newStepInstance[ST](stepD, ji)
stepInstance := newStepInstance(stepD, ji)
// here AfterBoth may not invoke instrumentedStepAfterBoth at all, if parentStep1 or parentStep2 returns error.
stepInstance.task = asynctask.AfterBoth(ctx, parentStepInstance1.task, parentStepInstance2.task, instrumentedStepAfterBoth(stepInstance, precedingTasks, stepFuncWithPanicHandling))
ji.addStepInstance(stepInstance, precedingInstances...)
Expand Down
Loading

0 comments on commit 58cfa1f

Please sign in to comment.