Skip to content

Commit

Permalink
[incomplete] Implement bugfix for child-workflow bug in uber-go#1138
Browse files Browse the repository at this point in the history
A proof-of-concept that I believe resolves the bug entirely...
... but is not backwards compatible.

Merging this will break any workflows currently executing the buggy
behavior.  While that should be fairly rare, and is likely undesirable,
we should find some way to detect buggy behavior and maintain it so
these workflows are not permanently broken.
  • Loading branch information
Groxx committed Oct 28, 2021
1 parent 4e54882 commit a34693c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
4 changes: 1 addition & 3 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3158,8 +3158,6 @@ func (s *WorkflowTestSuiteUnitTest) Test_Regression_ExecuteChildWorkflowWithCanc
})
s.Run("canceled before child starts", func() {
// ... and should not start the child (i.e. be canceled) when canceled before it is started.
check(0, "no err") // but it does not! this is a bug to fix.
// this should be:
// check(0, "canceled")
check(0, "canceled")
})
}
30 changes: 27 additions & 3 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,14 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
decodeFutureImpl: mainFuture.(*decodeFutureImpl),
executionFuture: executionFuture.(*futureImpl),
}

// Starting with a canceled context should immediately fail, no need to even try.
if ctx.Err() != nil {
mainSettable.SetError(ctx.Err())
executionSettable.SetError(ctx.Err())
return result
}

workflowOptionsFromCtx := getWorkflowEnvOptions(ctx)
dc := workflowOptionsFromCtx.dataConverter
env := getWorkflowEnvironment(ctx)
Expand Down Expand Up @@ -928,6 +936,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil

ctxDone, cancellable := ctx.Done().(*channelImpl)
cancellationCallback := &receiveCallback{}
shouldCancelAsync := false
err = getWorkflowEnvironment(ctx).ExecuteChildWorkflow(params, func(r []byte, e error) {
mainSettable.Set(r, e)
if cancellable {
Expand All @@ -939,6 +948,11 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
childWorkflowExecution = &r
}
executionSettable.Set(r, e)

// forward the delayed cancellation if necessary
if shouldCancelAsync && e == nil && !mainFuture.IsReady() {
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
}
})

if err != nil {
Expand All @@ -949,9 +963,19 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil

if cancellable {
cancellationCallback.fn = func(v interface{}, more bool) bool {
if ctx.Err() == ErrCanceled && childWorkflowExecution != nil && !mainFuture.IsReady() {
// child workflow started, and ctx cancelled
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
if ctx.Err() == ErrCanceled {
if childWorkflowExecution != nil && !mainFuture.IsReady() {
// child workflow started, and ctx cancelled. forward cancel to the child.
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
} else if childWorkflowExecution == nil {
// decision to start the child has been made, but it has not yet started.

// TODO: ideal, but not strictly necessary for correctness:
// if it's in the same decision, revoke that cancel synchronously.

// if the decision has already gone through: wait for it to be started, and then cancel it.
shouldCancelAsync = true
}
}
return false
}
Expand Down

0 comments on commit a34693c

Please sign in to comment.