Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement bugfix for child-workflow bug in #1138 #1144

Merged
merged 3 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ type (
memo map[string]interface{}
searchAttributes map[string]interface{}
parentClosePolicy ParentClosePolicy
bugports Bugports
}

executeWorkflowParams struct {
Expand Down Expand Up @@ -596,7 +597,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
hasResult = false
v, ok, m := c.receiveAsyncImpl(callback)

if !ok && !m { //channel closed and empty
if !ok && !m { // channel closed and empty
return m
}

Expand All @@ -606,7 +607,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
state.unblocked()
return m
}
continue //corrupt signal. Drop and reset process
continue // corrupt signal. Drop and reset process
}
for {
if hasResult {
Expand All @@ -615,7 +616,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
state.unblocked()
return more
}
break //Corrupt signal. Drop and reset process.
break // Corrupt signal. Drop and reset process.
}
state.yield(fmt.Sprintf("blocked on %s.Receive", c.name))
}
Expand All @@ -631,7 +632,7 @@ func (c *channelImpl) ReceiveAsync(valuePtr interface{}) (ok bool) {
func (c *channelImpl) ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool) {
for {
v, ok, more := c.receiveAsyncImpl(nil)
if !ok && !more { //channel closed and empty
if !ok && !more { // channel closed and empty
return ok, more
}

Expand Down Expand Up @@ -774,7 +775,7 @@ func (c *channelImpl) Close() {
// Takes a value and assigns that 'to' value. logs a metric if it is unable to deserialize
func (c *channelImpl) assignValue(from interface{}, to interface{}) error {
err := decodeAndAssignValue(c.dataConverter, from, to)
//add to metrics
// add to metrics
if err != nil {
c.env.GetLogger().Error(fmt.Sprintf("Corrupt signal received on channel %s. Error deserializing", c.name), zap.Error(err))
c.env.GetMetricsScope().Counter(metrics.CorruptedSignalsCounter).Inc(1)
Expand Down
69 changes: 68 additions & 1 deletion internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2793,7 +2793,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowAlreadyRunning() {
ctx1 := WithChildWorkflowOptions(ctx, ChildWorkflowOptions{
WorkflowID: "Test_ChildWorkflowAlreadyRunning",
ExecutionStartToCloseTimeout: time.Minute,
//WorkflowIDReusePolicy: WorkflowIDReusePolicyAllowDuplicate,
// WorkflowIDReusePolicy: WorkflowIDReusePolicyAllowDuplicate,
})

var result1, result2 string
Expand Down Expand Up @@ -3103,3 +3103,70 @@ func (s *WorkflowTestSuiteUnitTest) Test_AwaitWithTimeout() {
_ = env.GetWorkflowResult(&result)
s.False(result)
}

func (s *WorkflowTestSuiteUnitTest) Test_Regression_ExecuteChildWorkflowWithCanceledContext() {
// cancelTime of:
// - <0 == do not cancel
// - 0 == cancel synchronously
// - >0 == cancel after waiting that long
check := func(cancelTime time.Duration, bugport bool, expected string) {
env := s.NewTestWorkflowEnvironment()
env.Test(s.T())
env.RegisterWorkflowWithOptions(func(ctx Context) error {
return Sleep(ctx, time.Minute)
}, RegisterWorkflowOptions{Name: "child"})
env.RegisterWorkflowWithOptions(func(ctx Context) (string, error) {
ctx, cancel := WithCancel(ctx)
if cancelTime == 0 {
cancel()
} else if cancelTime > 0 {
Go(ctx, func(ctx Context) {
_ = Sleep(ctx, cancelTime)
cancel()
})
}

ctx = WithChildWorkflowOptions(ctx, ChildWorkflowOptions{
ExecutionStartToCloseTimeout: 2 * time.Minute,
TaskStartToCloseTimeout: 2 * time.Minute,
Bugports: Bugports{
StartChildWorkflowsOnCanceledContext: bugport,
},
})
err := ExecuteChildWorkflow(ctx, "child").Get(ctx, nil)

if err == nil {
return "no err", nil
} else if _, ok := err.(*CanceledError); ok {
return "canceled", nil
}
return "unknown: " + err.Error(), nil
}, RegisterWorkflowOptions{Name: "parent"})

env.ExecuteWorkflow("parent")
s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())

var result string
s.NoError(env.GetWorkflowResult(&result))
s.Equal(expected, result)
}
s.Run("sanity check", func() {
// workflow should run the child successfully normally...
check(-1, false, "no err")
})
s.Run("canceled after child starts", func() {
// ... and cancel the child when the child is canceled...
check(30*time.Second, false, "canceled")
})
s.Run("canceled before child starts", func() {
// ... and should not start the child (i.e. be canceled) when canceled before it is started.
check(0, false, "canceled")
})
s.Run("canceled before child starts with bugport enabled", func() {
// prior to v0.18.4, canceling before the child was started would still start the child,
// and it would continue running.
// the bugport provides this old behavior to ease migration, at least until we feel the need to remove it.
check(0, true, "no err")
})
}
102 changes: 99 additions & 3 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,65 @@ type (
// ParentClosePolicy - Optional policy to decide what to do for the child.
// Default is Terminate (if onboarded to this feature)
ParentClosePolicy ParentClosePolicy

// Bugports allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily
// emulating old behavior until a fix is deployed.
//
// Bugports are always deprecated and may be removed in future versions.
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
// allow cleaning up the additional code complexity that they cause.
//
// deprecated
Bugports Bugports
}

// Bugports allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily
// emulating old behavior until a fix is deployed.
// By default, bugs (especially rarely-occurring ones) are fixed and all users are opted into the new behavior.
// Back-ported buggy behavior *may* be available via these flags.
//
// Fields in here are NOT guaranteed to be stable. They will almost certainly be removed in the next major
// release, and might be removed earlier if a need arises, e.g. if the historical behavior causes too much of an
// increase in code complexity.
//
// See each individual field for details.
//
// Bugports are always deprecated and may be removed in future versions.
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
// allow cleaning up the additional code complexity that they cause.
//
// deprecated
Bugports struct {
// StartChildWorkflowsOnCanceledContext allows emulating older, buggy behavior that existed prior to v0.18.4.
//
// Prior to the fix, child workflows would be started and keep running when their context was canceled in two
// situations:
// 1) when the context was canceled before ExecuteChildWorkflow is called, and
// 2) when the context was canceled after ExecuteChildWorkflow but before the child workflow was started.
//
// 1 is unfortunately easy to trigger, though many workflows will encounter an error earlier and not reach the
// child-workflow-executing code. 2 is expected to be very rare in practice.
//
// To permanently emulate old behavior, use a disconnected context when starting child workflows, and
// cancel it only after `childfuture.GetWorkflowExecution().Get(...)` returns. This can be used when this flag
// is removed in the future.
//
// If you have currently-broken workflows and need to repair them, there are two primary options:
//
// 1: Check the BinaryChecksum value of your new deploy and/or of the decision that is currently failing
// workflows. Then set this flag when replaying history on those not-fixed checksums. Concretely, this means
// checking both `workflow.GetInfo(ctx).BinaryChecksum` (note that sufficiently old clients may not have
// recorded a value, and it may be nil) and `workflow.IsReplaying(ctx)`.
//
// 2: Reset broken workflows back to either before the buggy behavior was recorded, or before the fixed behavior
// was deployed. A "bad binary" reset type can do the latter in bulk, see the CLI's
// `cadence workflow reset-batch --reset_type BadBinary --help` for details. For the former, check the failing
// histories, identify the point at which the bug occurred, and reset to prior to that decision task.
//
// Added in 0.18.4, this may be removed in or after v0.19.0, so please migrate off of it ASAP.
//
// deprecated
StartChildWorkflowsOnCanceledContext bool
}
)

Expand Down Expand Up @@ -896,7 +955,23 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
decodeFutureImpl: mainFuture.(*decodeFutureImpl),
executionFuture: executionFuture.(*futureImpl),
}
// clients prior to v0.18.4 would incorrectly start child workflows that were started with cancelled contexts,
// and did not react to cancellation between requested and started.
correctChildCancellation := true
workflowOptionsFromCtx := getWorkflowEnvOptions(ctx)

// Starting with a canceled context should immediately fail, no need to even try.
if ctx.Err() != nil {
if workflowOptionsFromCtx.bugports.StartChildWorkflowsOnCanceledContext {
// backport the bug
correctChildCancellation = false
} else {
mainSettable.SetError(ctx.Err())
executionSettable.SetError(ctx.Err())
return result
}
}

dc := workflowOptionsFromCtx.dataConverter
env := getWorkflowEnvironment(ctx)
wfType, input, err := getValidatedWorkflowFunction(childWorkflowType, args, dc, env.GetRegistry())
Expand Down Expand Up @@ -928,6 +1003,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil

ctxDone, cancellable := ctx.Done().(*channelImpl)
cancellationCallback := &receiveCallback{}
shouldCancelAsync := false
err = getWorkflowEnvironment(ctx).ExecuteChildWorkflow(params, func(r []byte, e error) {
mainSettable.Set(r, e)
if cancellable {
Expand All @@ -939,6 +1015,15 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
childWorkflowExecution = &r
}
executionSettable.Set(r, e)

// forward the delayed cancellation if necessary
if shouldCancelAsync && e == nil && !mainFuture.IsReady() {
if workflowOptionsFromCtx.bugports.StartChildWorkflowsOnCanceledContext {
// do nothing: buggy behavior did not forward the cancellation
} else {
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
}
}
})

if err != nil {
Expand All @@ -949,9 +1034,19 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil

if cancellable {
cancellationCallback.fn = func(v interface{}, more bool) bool {
if ctx.Err() == ErrCanceled && childWorkflowExecution != nil && !mainFuture.IsReady() {
// child workflow started, and ctx cancelled
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
if ctx.Err() == ErrCanceled {
if childWorkflowExecution != nil && !mainFuture.IsReady() {
// child workflow started, and ctx cancelled. forward cancel to the child.
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
} else if childWorkflowExecution == nil && correctChildCancellation {
// decision to start the child has been made, but it has not yet started.

// TODO: ideal, but not strictly necessary for correctness:
// if it's in the same decision, revoke that cancel synchronously.

// if the decision has already gone through: wait for it to be started, and then cancel it.
shouldCancelAsync = true
}
}
return false
}
Expand Down Expand Up @@ -1270,6 +1365,7 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context {
wfOptions.memo = cwo.Memo
wfOptions.searchAttributes = cwo.SearchAttributes
wfOptions.parentClosePolicy = cwo.ParentClosePolicy
wfOptions.bugports = cwo.Bugports

return ctx1
}
Expand Down
Loading