Skip to content

Commit

Permalink
Merge pull request #1 from Azure/haitao/paramstep-start
Browse files Browse the repository at this point in the history
fix a bug where step can start before job start
  • Loading branch information
haitch committed Jul 7, 2022
2 parents 43490ec + ef4f6f3 commit 2845f3c
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 29 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ AsyncJob aiming to help you organize code in dependencyGraph(DAG), instead of a
- a step would be started once all it's dependency is finished.
- output of a step can be feed into next step as input, type is checked by go generics.
- step is wrapped in [AsyncTask](github.com/Azure/go-asynctask) with strongType info preserved
- you can feed parameters as a step as well.

# Usage

Expand Down Expand Up @@ -47,7 +48,11 @@ AsyncJob aiming to help you organize code in dependencyGraph(DAG), instead of a
```

### visualize of a job
this visualize depend on github.com/hashicorp/terraform/dag, with some limitation, may need some upstream contribution.
this visualize depend on [terraform/dag](github.com/hashicorp/terraform/dag), with some limitation, may need some upstream tweaks:
- able to customize node name
- able to distinguash type of node (param, executionBlock)
- able to show state of node (pending, running, completed, failed)

```
digraph {
compound = "true"
Expand Down
9 changes: 7 additions & 2 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,14 @@ func NewJob(name string) *Job {
return j
}

func InputParam[T any](j *Job, stepName string, value *T) *StepInfo[T] {
func InputParam[T any](bCtx context.Context, j *Job, stepName string, value *T) *StepInfo[T] {
step := newStepInfo[T](stepName, []string{j.rootJob.Name()})
step.task = asynctask.NewCompletedTask(value)

instrumentedFunc := func(ctx context.Context) (*T, error) {
j.rootJob.Wait(ctx)
return value, nil
}
step.task = asynctask.Start(bCtx, instrumentedFunc)

j.Steps[stepName] = step
j.registerStepInGraph(stepName, j.rootJob.Name())
Expand Down
29 changes: 11 additions & 18 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,33 @@ func TestSimpleJob(t *testing.T) {
jb.Wait(context.Background())
}

func TestJobWithLoop(t *testing.T) {
job := NewJob("sqlSummaryJob")
jobLib := &SqlSummaryJobLib{}

bCtx := context.Background()
AddStep(bCtx, job, "Step1", jobLib.GetConnection, []string{})
AddStep(bCtx, job, "Step2", jobLib.GetConnection, []string{"Step1"})
}

var _ JobBuilder = &SqlJobBuilder{}

type SqlJobBuilder struct {
Table1 string
Query1 string
Table2 string
Query2 string
ServerName string
Table1 string
Query1 string
Table2 string
Query2 string
}

func (sjb *SqlJobBuilder) BuildJob(bCtx context.Context) *Job {
job := NewJob("sqlSummaryJob")
jobLib := &SqlSummaryJobLib{}

connTsk, _ := AddStep(bCtx, job, "getConnection", jobLib.GetConnection, []string{})
serverNameParamTask := InputParam(bCtx, job, "param_serverName", &sjb.ServerName)
connTsk, _ := StepAfter(bCtx, job, "getConnection", serverNameParamTask, jobLib.GetConnection)

// TODO: handle error during BuildJob

table1ParamTsk := InputParam(job, "param_table1", &sjb.Table1)
table1ParamTsk := InputParam(bCtx, job, "param_table1", &sjb.Table1)
table1ClientTsk, _ := StepAfterBoth(bCtx, job, "getTableClient1", connTsk, table1ParamTsk, jobLib.GetTableClient)
query1ParamTsk := InputParam(job, "param_query1", &sjb.Query1)
query1ParamTsk := InputParam(bCtx, job, "param_query1", &sjb.Query1)
qery1ResultTsk, _ := StepAfterBoth(bCtx, job, "queryTable1", table1ClientTsk, query1ParamTsk, jobLib.ExecuteQuery)

table2ParamTsk := InputParam(job, "param_table2", &sjb.Table2)
table2ParamTsk := InputParam(bCtx, job, "param_table2", &sjb.Table2)
table2ClientTsk, _ := StepAfterBoth(bCtx, job, "getTableClient2", connTsk, table2ParamTsk, jobLib.GetTableClient)
query2ParamTsk := InputParam(job, "param_query2", &sjb.Query2)
query2ParamTsk := InputParam(bCtx, job, "param_query2", &sjb.Query2)
qery2ResultTsk, _ := StepAfterBoth(bCtx, job, "queryTable2", table2ClientTsk, query2ParamTsk, jobLib.ExecuteQuery)

StepAfterBoth(bCtx, job, "summarize", qery1ResultTsk, qery2ResultTsk, jobLib.SummarizeQueryResult)
Expand Down
23 changes: 15 additions & 8 deletions sqljob_lib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,39 @@ type SqlSummaryJobLib struct {
}

type SqlConnection struct {
ServerName string
}

type SqlTableClient struct {
ServerName string
TableName string
}

type SqlQueryResult struct {
Data map[string]interface{}
}

type SummarizedResult struct{}
type SummarizedResult struct {
Data1 map[string]interface{}
Data2 map[string]interface{}
}

func (sql *SqlSummaryJobLib) GetConnection(ctx context.Context) (*SqlConnection, error) {
func (sql *SqlSummaryJobLib) GetConnection(ctx context.Context, serverName *string) (*SqlConnection, error) {
fmt.Println("GetConnection")
return &SqlConnection{}, nil
return &SqlConnection{ServerName: *serverName}, nil
}

func (sql *SqlSummaryJobLib) GetTableClient(ctx context.Context, conn *SqlConnection, tableName *string) (*SqlTableClient, error) {
fmt.Println("GetTableClient with tableName:", *tableName)
return &SqlTableClient{}, nil
return &SqlTableClient{ServerName: conn.ServerName, TableName: *tableName}, nil
}

func (sql *SqlSummaryJobLib) ExecuteQuery(ctx context.Context, tableClient *SqlTableClient, queryName *string) (*SqlQueryResult, error) {
fmt.Println("ExecuteQuery:", *queryName)
return &SqlQueryResult{}, nil
func (sql *SqlSummaryJobLib) ExecuteQuery(ctx context.Context, tableClient *SqlTableClient, queryString *string) (*SqlQueryResult, error) {
fmt.Println("ExecuteQuery:", *queryString)
return &SqlQueryResult{Data: map[string]interface{}{"serverName": tableClient.ServerName, "tableName": tableClient.TableName, "queryName": *queryString}}, nil
}

func (sql *SqlSummaryJobLib) SummarizeQueryResult(ctx context.Context, result1 *SqlQueryResult, result2 *SqlQueryResult) (*SummarizedResult, error) {
fmt.Println("SummarizeQueryResult")
return &SummarizedResult{}, nil
return &SummarizedResult{Data1: result1.Data, Data2: result2.Data}, nil
}

0 comments on commit 2845f3c

Please sign in to comment.