Skip to content

Commit

Permalink
Merge pull request #16 from Azure/haitao/split_files
Browse files Browse the repository at this point in the history
split files, improve code coverage
  • Loading branch information
haitch committed Dec 12, 2022
2 parents 870e93a + bb40d6c commit 2389eb7
Show file tree
Hide file tree
Showing 14 changed files with 509 additions and 341 deletions.
40 changes: 35 additions & 5 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,46 @@ import (
type JobErrorCode string

const (
ErrPrecedentStepFailure JobErrorCode = "precedent step failed"
ErrStepFailed JobErrorCode = "step failed"
ErrRefStepNotInJob JobErrorCode = "trying to reference to a step not registered in job"
ErrAddStepInSealedJob JobErrorCode = "trying to add step to a sealed job definition"
ErrPrecedentStepFailed JobErrorCode = "PrecedentStepFailed"
ErrStepFailed JobErrorCode = "StepFailed"

ErrRefStepNotInJob JobErrorCode = "RefStepNotInJob"
MsgRefStepNotInJob string = "trying to reference to step %q, but it is not registered in job"

ErrAddStepInSealedJob JobErrorCode = "AddStepInSealedJob"
MsgAddStepInSealedJob string = "trying to add step %q to a sealed job definition"

ErrAddExistingStep JobErrorCode = "AddExistingStep"
MsgAddExistingStep string = "trying to add step %q to job definition, but it already exists"

ErrDuplicateInputParentStep JobErrorCode = "DuplicateInputParentStep"
MsgDuplicateInputParentStep string = "at least 2 input parentSteps are same"

ErrRuntimeStepNotFound JobErrorCode = "RuntimeStepNotFound"
MsgRuntimeStepNotFound string = "runtime step %q not found, must be a bug in asyncjob"
)

func (code JobErrorCode) Error() string {
return string(code)
}

func (code JobErrorCode) WithMessage(msg string) *MessageError {
return &MessageError{Code: code, Message: msg}
}

type MessageError struct {
Code JobErrorCode
Message string
}

func (me *MessageError) Error() string {
return me.Code.Error() + ": " + me.Message
}

func (me *MessageError) Unwrap() error {
return me.Code
}

type JobError struct {
Code JobErrorCode
StepError error
Expand Down Expand Up @@ -48,7 +78,7 @@ func (je *JobError) RootCause() error {
}

// precendent step failure, track to the root
if je.Code == ErrPrecedentStepFailure {
if je.Code == ErrPrecedentStepFailed {
precedentStepErr := &JobError{}
if !errors.As(je.StepError, &precedentStepErr) {
return je.StepError
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ github.com/Azure/go-asynctask v1.3.1/go.mod h1:S1Ee5SVnt6ZUJ84brodPiHvoNfN2wgDyV
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
111 changes: 111 additions & 0 deletions job_definition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package asyncjob

import (
"context"
"errors"
"fmt"

"github.com/Azure/go-asyncjob/graph"
)

// Interface for a job definition
type JobDefinitionMeta interface {
GetName() string
GetStep(stepName string) (StepDefinitionMeta, bool) // TODO: switch bool to error
Seal()
Sealed() bool
Visualize() (string, error)

// not exposing for now.
addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error
getRootStep() StepDefinitionMeta
}

// JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG).
type JobDefinition[T any] struct {
name string

sealed bool
steps map[string]StepDefinitionMeta
stepsDag *graph.Graph[StepDefinitionMeta]
rootStep *StepDefinition[T]
}

// Create new JobDefinition
// it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.
func NewJobDefinition[T any](name string) *JobDefinition[T] {
j := &JobDefinition[T]{
name: name,
steps: make(map[string]StepDefinitionMeta),
stepsDag: graph.NewGraph[StepDefinitionMeta](connectStepDefinition),
}

rootStep := newStepDefinition[T](name, stepTypeRoot)
j.rootStep = rootStep

j.steps[j.rootStep.GetName()] = j.rootStep
j.stepsDag.AddNode(j.rootStep)

return j
}

// Start execution of the job definition.
// this will create and return new instance of the job
// caller will then be able to wait for the job instance
func (jd *JobDefinition[T]) Start(ctx context.Context, input *T, jobOptions ...JobOptionPreparer) *JobInstance[T] {
if !jd.Sealed() {
jd.Seal()
}

ji := newJobInstance(jd, input, jobOptions...)
ji.start(ctx)

return ji
}

func (jd *JobDefinition[T]) getRootStep() StepDefinitionMeta {
return jd.rootStep
}

func (jd *JobDefinition[T]) GetName() string {
return jd.name
}

func (jd *JobDefinition[T]) Seal() {
if jd.sealed {
return
}
jd.sealed = true
}

func (jd *JobDefinition[T]) Sealed() bool {
return jd.sealed
}

// GetStep returns the stepDefinition by name
func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool) {
stepMeta, ok := jd.steps[stepName]
return stepMeta, ok
}

// AddStep adds a step to the job definition, with optional preceding steps
func (jd *JobDefinition[T]) addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error {
jd.steps[step.GetName()] = step
jd.stepsDag.AddNode(step)
for _, precedingStep := range precedingSteps {
if err := jd.stepsDag.Connect(precedingStep, step); err != nil {
if errors.Is(err, graph.ErrConnectNotExistingNode) {
return ErrRefStepNotInJob.WithMessage(fmt.Sprintf("referenced step %s not found", precedingStep.GetName()))
}

return err
}
}

return nil
}

// Visualize the job definition in graphviz dot format
func (jd *JobDefinition[T]) Visualize() (string, error) {
return jd.stepsDag.ToDotGraph()
}
103 changes: 7 additions & 96 deletions job.go → job_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,111 +7,18 @@ import (

"github.com/Azure/go-asyncjob/graph"
"github.com/Azure/go-asynctask"
"github.com/google/uuid"
)

// Interface for a job definition
type JobDefinitionMeta interface {
GetName() string
GetStep(stepName string) (StepDefinitionMeta, bool) // TODO: switch bool to error
Seal()
Sealed() bool

// not exposing for now.
addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta)
getRootStep() StepDefinitionMeta
}

// JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG).
type JobDefinition[T any] struct {
name string

sealed bool
steps map[string]StepDefinitionMeta
stepsDag *graph.Graph[StepDefinitionMeta]
rootStep *StepDefinition[T]
}

// Create new JobDefinition
// it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.
func NewJobDefinition[T any](name string) *JobDefinition[T] {
j := &JobDefinition[T]{
name: name,
steps: make(map[string]StepDefinitionMeta),
stepsDag: graph.NewGraph[StepDefinitionMeta](connectStepDefinition),
}

rootStep := newStepDefinition[T](name, stepTypeRoot)
j.rootStep = rootStep

j.steps[j.rootStep.GetName()] = j.rootStep
j.stepsDag.AddNode(j.rootStep)

return j
}

// Start execution of the job definition.
// this will create and return new instance of the job
// caller will then be able to wait for the job instance
func (jd *JobDefinition[T]) Start(ctx context.Context, input *T, jobOptions ...JobOptionPreparer) *JobInstance[T] {
if !jd.Sealed() {
jd.Seal()
}

ji := newJobInstance(jd, input, jobOptions...)
ji.start(ctx)

return ji
}

func (jd *JobDefinition[T]) getRootStep() StepDefinitionMeta {
return jd.rootStep
}

func (jd *JobDefinition[T]) GetName() string {
return jd.name
}

func (jd *JobDefinition[T]) Seal() {
if jd.sealed {
return
}
jd.sealed = true
}

func (jd *JobDefinition[T]) Sealed() bool {
return jd.sealed
}

// GetStep returns the stepDefinition by name
func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool) {
stepMeta, ok := jd.steps[stepName]
return stepMeta, ok
}

// AddStep adds a step to the job definition, with optional preceding steps
func (jd *JobDefinition[T]) addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) {
jd.steps[step.GetName()] = step
jd.stepsDag.AddNode(step)
for _, precedingStep := range precedingSteps {
jd.stepsDag.Connect(precedingStep, step)
}
}

// Visualize the job definition in graphviz dot format
func (jd *JobDefinition[T]) Visualize() (string, error) {
return jd.stepsDag.ToDotGraph()
}

type JobInstanceMeta interface {
GetJobInstanceId() string
GetJobDefinition() JobDefinitionMeta
GetStepInstance(stepName string) (StepInstanceMeta, bool)
Wait(context.Context) error
Visualize() (string, error)

// not exposing for now
addStepInstance(step StepInstanceMeta, precedingSteps ...StepInstanceMeta)

// future considering:
// - return result of given step
}

type JobExecutionOptions struct {
Expand Down Expand Up @@ -159,6 +66,10 @@ func newJobInstance[T any](jd *JobDefinition[T], input *T, jobInstanceOptions ..
ji.jobOptions = decorator(ji.jobOptions)
}

if ji.jobOptions.Id == "" {
ji.jobOptions.Id = uuid.New().String()
}

return ji
}

Expand Down
4 changes: 2 additions & 2 deletions job_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type JobInstanceWithResult[Tin, Tout any] struct {
resultStep *StepInstance[Tout]
}

func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input *Tin) *JobInstanceWithResult[Tin, Tout] {
ji := jd.JobDefinition.Start(ctx, input)
func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input *Tin, jobOptions ...JobOptionPreparer) *JobInstanceWithResult[Tin, Tout] {
ji := jd.JobDefinition.Start(ctx, input, jobOptions...)

return &JobInstanceWithResult[Tin, Tout]{
JobInstance: ji,
Expand Down
29 changes: 0 additions & 29 deletions job_result_test.go

This file was deleted.

Loading

0 comments on commit 2389eb7

Please sign in to comment.