Skip to content

Commit

Permalink
"Backported" un-bugfix in case people hit non-determinism errors
Browse files Browse the repository at this point in the history
Before merging, this deserves a changelog entry.
  • Loading branch information
Groxx committed Oct 28, 2021
1 parent a34693c commit 7d8a897
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 12 deletions.
11 changes: 6 additions & 5 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ type (
memo map[string]interface{}
searchAttributes map[string]interface{}
parentClosePolicy ParentClosePolicy
bugports Bugports
}

executeWorkflowParams struct {
Expand Down Expand Up @@ -596,7 +597,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
hasResult = false
v, ok, m := c.receiveAsyncImpl(callback)

if !ok && !m { //channel closed and empty
if !ok && !m { // channel closed and empty
return m
}

Expand All @@ -606,7 +607,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
state.unblocked()
return m
}
continue //corrupt signal. Drop and reset process
continue // corrupt signal. Drop and reset process
}
for {
if hasResult {
Expand All @@ -615,7 +616,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
state.unblocked()
return more
}
break //Corrupt signal. Drop and reset process.
break // Corrupt signal. Drop and reset process.
}
state.yield(fmt.Sprintf("blocked on %s.Receive", c.name))
}
Expand All @@ -631,7 +632,7 @@ func (c *channelImpl) ReceiveAsync(valuePtr interface{}) (ok bool) {
func (c *channelImpl) ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool) {
for {
v, ok, more := c.receiveAsyncImpl(nil)
if !ok && !more { //channel closed and empty
if !ok && !more { // channel closed and empty
return ok, more
}

Expand Down Expand Up @@ -774,7 +775,7 @@ func (c *channelImpl) Close() {
// Takes a value and assigns that 'to' value. logs a metric if it is unable to deserialize
func (c *channelImpl) assignValue(from interface{}, to interface{}) error {
err := decodeAndAssignValue(c.dataConverter, from, to)
//add to metrics
// add to metrics
if err != nil {
c.env.GetLogger().Error(fmt.Sprintf("Corrupt signal received on channel %s. Error deserializing", c.name), zap.Error(err))
c.env.GetMetricsScope().Counter(metrics.CorruptedSignalsCounter).Inc(1)
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2793,7 +2793,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowAlreadyRunning() {
ctx1 := WithChildWorkflowOptions(ctx, ChildWorkflowOptions{
WorkflowID: "Test_ChildWorkflowAlreadyRunning",
ExecutionStartToCloseTimeout: time.Minute,
//WorkflowIDReusePolicy: WorkflowIDReusePolicyAllowDuplicate,
// WorkflowIDReusePolicy: WorkflowIDReusePolicyAllowDuplicate,
})

var result1, result2 string
Expand Down
68 changes: 62 additions & 6 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,49 @@ type (
// ParentClosePolicy - Optional policy to decide what to do for the child.
// Default is Terminate (if onboarded to this feature)
ParentClosePolicy ParentClosePolicy

// Bugports allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily
// emulating old behavior until a fix is deployed.
Bugports Bugports
}

// Bugports allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily
// emulating old behavior until a fix is deployed.
// By default, bugs (especially rarely-occurring ones) are fixed and all users are opted into the new behavior.
// Back-ported buggy behavior *may* be available via these flags.
//
// Fields in here are NOT guaranteed to be stable. They will almost certainly be removed in the next major
// release, and might be removed earlier if a need arises, e.g. if the historical behavior causes too much of an
// increase in code complexity.
//
// See each individual field for details.
Bugports struct {
// StartChildWorkflowsOnCanceledContext allows emulating older, buggy behavior that existed prior to v0.18.4.
//
// Prior to the fix, child workflows would be started and keep running when their context was canceled in two
// situations:
// 1) when the context was canceled before ExecuteChildWorkflow is called, and
// 2) when the context was canceled after ExecuteChildWorkflow but before the child workflow was started.
//
// 1 is unfortunately easy to trigger, though many workflows will encounter an error earlier and not reach the
// child-workflow-executing code. 2 is expected to be very rare in practice.
//
// To permanently emulate old behavior, use a disconnected context when starting child workflows, and
// cancel it only after `childfuture.GetWorkflowExecution().Get(...)` returns. This can be used when this flag
// is removed in the future.
//
// If you have currently-broken workflows and need to repair them, there are two primary options:
//
// 1: Check the BinaryChecksum value of your new deploy and/or of the decision that is currently failing
// workflows. Then set this flag when replaying history on those not-fixed checksums. Concretely, this means
// checking both `workflow.GetInfo(ctx).BinaryChecksum` (note that sufficiently old clients may not have
// recorded a value, and it may be nil) and `workflow.IsReplaying(ctx)`.
//
// 2: Reset broken workflows back to either before the buggy behavior was recorded, or before the fixed behavior
// was deployed. A "bad binary" reset type can do the latter in bulk, see the CLI's
// `cadence workflow reset-batch --reset_type BadBinary --help` for details. For the former, check the failing
// histories, identify the point at which the bug occurred, and reset to prior to that decision task.
StartChildWorkflowsOnCanceledContext bool
}
)

Expand Down Expand Up @@ -896,15 +939,23 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
decodeFutureImpl: mainFuture.(*decodeFutureImpl),
executionFuture: executionFuture.(*futureImpl),
}
// clients prior to v0.18.4 would incorrectly start child workflows that were started with cancelled contexts,
// and did not react to cancellation between requested and started.
correctChildCancellation := true
workflowOptionsFromCtx := getWorkflowEnvOptions(ctx)

// Starting with a canceled context should immediately fail, no need to even try.
if ctx.Err() != nil {
mainSettable.SetError(ctx.Err())
executionSettable.SetError(ctx.Err())
return result
if workflowOptionsFromCtx.bugports.StartChildWorkflowsOnCanceledContext {
// backport the bug
correctChildCancellation = false
} else {
mainSettable.SetError(ctx.Err())
executionSettable.SetError(ctx.Err())
return result
}
}

workflowOptionsFromCtx := getWorkflowEnvOptions(ctx)
dc := workflowOptionsFromCtx.dataConverter
env := getWorkflowEnvironment(ctx)
wfType, input, err := getValidatedWorkflowFunction(childWorkflowType, args, dc, env.GetRegistry())
Expand Down Expand Up @@ -951,7 +1002,11 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil

// forward the delayed cancellation if necessary
if shouldCancelAsync && e == nil && !mainFuture.IsReady() {
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
if workflowOptionsFromCtx.bugports.StartChildWorkflowsOnCanceledContext {
// do nothing: buggy behavior did not forward the cancellation
} else {
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
}
}
})

Expand All @@ -967,7 +1022,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
if childWorkflowExecution != nil && !mainFuture.IsReady() {
// child workflow started, and ctx cancelled. forward cancel to the child.
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
} else if childWorkflowExecution == nil {
} else if childWorkflowExecution == nil && correctChildCancellation {
// decision to start the child has been made, but it has not yet started.

// TODO: ideal, but not strictly necessary for correctness:
Expand Down Expand Up @@ -1294,6 +1349,7 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context {
wfOptions.memo = cwo.Memo
wfOptions.searchAttributes = cwo.SearchAttributes
wfOptions.parentClosePolicy = cwo.ParentClosePolicy
wfOptions.bugports = cwo.Bugports

return ctx1
}
Expand Down
217 changes: 217 additions & 0 deletions test/replaytests/child_bug.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
[
{
"eventId": 1,
"eventType": "WorkflowExecutionStarted",
"taskId": 1048925,
"timestamp": 1635394320054371800,
"version": 0,
"workflowExecutionStartedEventAttributes": {
"attempt": 0,
"continuedExecutionRunId": "",
"cronSchedule": "",
"executionStartToCloseTimeoutSeconds": 300,
"firstDecisionTaskBackoffSeconds": 0,
"firstExecutionRunId": "4962aeea-a5e0-4f1e-bcb7-098c38c19fd1",
"identity": "cadence-cli@local",
"input": "",
"originalExecutionRunId": "4962aeea-a5e0-4f1e-bcb7-098c38c19fd1",
"taskList": {
"name": "task_list"
},
"taskStartToCloseTimeoutSeconds": 2,
"workflowType": {
"name": "parent"
}
}
},
{
"decisionTaskScheduledEventAttributes": {
"attempt": 0,
"startToCloseTimeoutSeconds": 2,
"taskList": {
"name": "task_list"
}
},
"eventId": 2,
"eventType": "DecisionTaskScheduled",
"taskId": 1048926,
"timestamp": 1635394320054487000,
"version": 0
},
{
"decisionTaskStartedEventAttributes": {
"identity": "44608@local@task_list",
"requestId": "1144deaf-aa6f-464f-9a1b-65cee85439fc",
"scheduledEventId": 2
},
"eventId": 3,
"eventType": "DecisionTaskStarted",
"taskId": 1048934,
"timestamp": 1635394320123179000,
"version": 0
},
{
"decisionTaskCompletedEventAttributes": {
"binaryChecksum": "unknown",
"identity": "44608@local@task_list",
"scheduledEventId": 2,
"startedEventId": 3
},
"eventId": 4,
"eventType": "DecisionTaskCompleted",
"taskId": 1048937,
"timestamp": 1635394320185062700,
"version": 0
},
{
"eventId": 5,
"eventType": "StartChildWorkflowExecutionInitiated",
"startChildWorkflowExecutionInitiatedEventAttributes": {
"cronSchedule": "",
"decisionTaskCompletedEventId": 4,
"domain": "local",
"executionStartToCloseTimeoutSeconds": 30,
"header": {},
"parentClosePolicy": "TERMINATE",
"taskList": {
"name": "task_list"
},
"taskStartToCloseTimeoutSeconds": 30,
"workflowId": "4962aeea-a5e0-4f1e-bcb7-098c38c19fd1_0",
"workflowIdReusePolicy": "TerminateIfRunning",
"workflowType": {
"name": "child"
}
},
"taskId": 1048938,
"timestamp": 1635394320186925600,
"version": 0
},
{
"childWorkflowExecutionStartedEventAttributes": {
"domain": "local",
"header": {},
"initiatedEventId": 5,
"workflowExecution": {
"runId": "748709af-0150-4493-8a57-d0d358ac1aad",
"workflowId": "4962aeea-a5e0-4f1e-bcb7-098c38c19fd1_0"
},
"workflowType": {
"name": "child-sleep"
}
},
"eventId": 6,
"eventType": "ChildWorkflowExecutionStarted",
"taskId": 1048945,
"timestamp": 1635394320249768000,
"version": 0
},
{
"decisionTaskScheduledEventAttributes": {
"attempt": 0,
"startToCloseTimeoutSeconds": 2,
"taskList": {
"name": "local:97607b30-6825-48ad-a7ad-1bc050d3d549"
}
},
"eventId": 7,
"eventType": "DecisionTaskScheduled",
"taskId": 1048947,
"timestamp": 1635394320249960200,
"version": 0
},
{
"decisionTaskStartedEventAttributes": {
"identity": "44608@local@task_list",
"requestId": "f3626e79-c365-4514-8243-c1cd619f02c4",
"scheduledEventId": 7
},
"eventId": 8,
"eventType": "DecisionTaskStarted",
"taskId": 1048953,
"timestamp": 1635394320288285700,
"version": 0
},
{
"decisionTaskCompletedEventAttributes": {
"binaryChecksum": "unknown",
"identity": "44608@local@task_list",
"scheduledEventId": 7,
"startedEventId": 8
},
"eventId": 9,
"eventType": "DecisionTaskCompleted",
"taskId": 1048960,
"timestamp": 1635394320336447700,
"version": 0
},
{
"childWorkflowExecutionCompletedEventAttributes": {
"domain": "local",
"initiatedEventId": 5,
"startedEventId": 6,
"workflowExecution": {
"runId": "748709af-0150-4493-8a57-d0d358ac1aad",
"workflowId": "4962aeea-a5e0-4f1e-bcb7-098c38c19fd1_0"
},
"workflowType": {
"name": "child-sleep"
}
},
"eventId": 10,
"eventType": "ChildWorkflowExecutionCompleted",
"taskId": 1048980,
"timestamp": 1635394330479970800,
"version": 0
},
{
"decisionTaskScheduledEventAttributes": {
"attempt": 0,
"startToCloseTimeoutSeconds": 2,
"taskList": {
"name": "task_list"
}
},
"eventId": 11,
"eventType": "DecisionTaskScheduled",
"taskId": 1048982,
"timestamp": 1635394330480030200,
"version": 0
},
{
"decisionTaskStartedEventAttributes": {
"identity": "44608@local@task_list",
"requestId": "c3314aa8-33bb-45df-aad9-3cfdd118bfe7",
"scheduledEventId": 11
},
"eventId": 12,
"eventType": "DecisionTaskStarted",
"taskId": 1048985,
"timestamp": 1635394330507992000,
"version": 0
},
{
"decisionTaskCompletedEventAttributes": {
"binaryChecksum": "unknown",
"identity": "44608@local@task_list",
"scheduledEventId": 11,
"startedEventId": 12
},
"eventId": 13,
"eventType": "DecisionTaskCompleted",
"taskId": 1048988,
"timestamp": 1635394330549916200,
"version": 0
},
{
"eventId": 14,
"eventType": "WorkflowExecutionCompleted",
"taskId": 1048989,
"timestamp": 1635394330549969400,
"version": 0,
"workflowExecutionCompletedEventAttributes": {
"decisionTaskCompletedEventId": 13,
"result": ""
}
}
]
Loading

0 comments on commit 7d8a897

Please sign in to comment.