Skip to content

Commit

Permalink
Merge branch 'master' into childcancelfix
Browse files Browse the repository at this point in the history
  • Loading branch information
Groxx committed Oct 29, 2021
2 parents 7d8a897 + 9c6399d commit 53e2122
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 21 deletions.
25 changes: 17 additions & 8 deletions activity/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,14 @@ parameter is the standard Go context.
The second string parameter is a custom activity-specific parameter that can be used to pass in data into the activity
on start. An activity can have one or more such parameters. All parameters to an activity function must be
serializable, which essentially means that params can’t be channels, functions, variadic, or unsafe pointer.
Exact details will depend on your DataConverter, but by default they must work with encoding/json.Marshal (and
Unmarshal on the receiving side, which has the same limitations plus generally cannot deserialize into an interface).
The activity declares two return values: (string, error). The string return value is used to return the result of the
activity. The error return value is used to indicate an error was encountered during execution.
This activity declares two return values: (string, error). The string return value is used to return the result of the
activity, and can be retrieved in the workflow with this activity's Future.
The error return value is used to indicate an error was encountered during execution.
Results must be serializable, like parameters, but only a single result value is allowed (i.e. you cannot return
(string, string, error)).
Implementation
Expand All @@ -77,8 +82,9 @@ constructs.
Failing the activity
To mark an activity as failed, all that needs to happen is for the activity function to return an error via the error
return value.
To mark an activity as failed, return an error from your activity function via the error return value.
Note that failed activities do not record the non-error return's value: you cannot usefully return both a
value and an error, only the error will be recorded.
Activity Heartbeating
Expand Down Expand Up @@ -112,10 +118,13 @@ payload containing progress information.
Activity Cancellation
When an activity is cancelled (or its workflow execution is completed or failed) the context passed into its function
is cancelled which sets its Done channel’s closed state. So an activity can use that to perform any necessary cleanup
and abort its execution. Currently cancellation is delivered only to activities that call RecordHeartbeat.
is cancelled which closes its Done() channel. So an activity can use that to perform any necessary cleanup
and abort its execution.
Async/Manual Activity Completion
Currently, cancellation is delivered only to activities that call RecordHeartbeat. If heartbeating is not performed,
the activity will continue to run normally, but fail to record its result when it completes.
Async and Manual Activity Completion
In certain scenarios completing an activity upon completion of its function is not possible or desirable.
Expand Down Expand Up @@ -178,7 +187,7 @@ For a full example of implementing this pattern see the Expense sample.
Registration
In order to for some workflow execution to be able to invoke an activity type, the worker process needs to be aware of
In order for a workflow to be able to execute an activity type, the worker process needs to be aware of
all the implementations it has access to. An activity is registered with the following call:
activity.Register(SimpleActivity)
Expand Down
53 changes: 53 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ package client

import (
"context"
"errors"

"go.uber.org/cadence"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
s "go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/encoded"
Expand Down Expand Up @@ -450,3 +452,54 @@ func NewValue(data []byte) encoded.Value {
func NewValues(data []byte) encoded.Values {
return internal.NewValues(data)
}

// IsWorkflowError returns true if an error is a known returned-by-the-workflow error type, when it comes from
// WorkflowRun from either Client.ExecuteWorkflow or Client.GetWorkflow. If it returns false, the error comes from
// some other source, e.g. RPC request failures or bad arguments.
// Using it on errors from any other source is currently undefined.
//
// This checks for known types via errors.As, so it will check recursively if it encounters wrapped errors.
// Note that this is different than the various `cadence.Is[Type]Error` checks, which do not unwrap.
//
// Currently the complete list of errors checked is:
// *cadence.CustomError, *cadence.CanceledError, *workflow.ContinueAsNewError,
// *workflow.GenericError, *workflow.TimeoutError, *workflow.TerminatedError,
// *workflow.PanicError, *workflow.UnknownExternalWorkflowExecutionError
//
// See documentation for each error type for details.
func IsWorkflowError(err error) bool {
var custom *cadence.CustomError
if errors.As(err, &custom) {
return true
}
var cancel *cadence.CanceledError
if errors.As(err, &cancel) {
return true
}

var generic *workflow.GenericError
if errors.As(err, &generic) {
return true
}
var timeout *workflow.TimeoutError
if errors.As(err, &timeout) {
return true
}
var terminate *workflow.TerminatedError
if errors.As(err, &terminate) {
return true
}
var panicked *workflow.PanicError
if errors.As(err, &panicked) {
return true
}
var can *workflow.ContinueAsNewError
if errors.As(err, &can) {
return true
}
var unknown *workflow.UnknownExternalWorkflowExecutionError
if errors.As(err, &unknown) {
return true
}
return false
}
29 changes: 29 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (ts *IntegrationTestSuite) TearDownSuite() {
}

func (ts *IntegrationTestSuite) SetupTest() {
ts.Assertions = require.New(ts.T())
ts.seq++
ts.activities.clearInvoked()
ts.taskListName = fmt.Sprintf("tl-%v", ts.seq)
Expand Down Expand Up @@ -252,6 +253,7 @@ func (ts *IntegrationTestSuite) TestCancellation() {
ts.Error(err)
_, ok := err.(*cadence.CanceledError)
ts.True(ok)
ts.Truef(client.IsWorkflowError(err), "err from canceled workflows should be a workflow error: %#v", err)
}

func (ts *IntegrationTestSuite) TestStackTraceQuery() {
Expand Down Expand Up @@ -316,6 +318,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseRejectDuplicate() {
gerr, ok := err.(*workflow.GenericError)
ts.True(ok)
ts.True(strings.Contains(gerr.Error(), "WorkflowExecutionAlreadyStartedError"))
ts.Truef(client.IsWorkflowError(err), "already-started child error should be a workflow error: %#v", err)
}

func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly1() {
Expand All @@ -333,6 +336,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly1() {
gerr, ok := err.(*workflow.GenericError)
ts.True(ok)
ts.True(strings.Contains(gerr.Error(), "WorkflowExecutionAlreadyStartedError"))
ts.Truef(client.IsWorkflowError(err), "already-started child error should be a workflow error: %#v", err)
}

func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly2() {
Expand Down Expand Up @@ -365,15 +369,39 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicate() {
ts.Equal("HELLOWORLD", result)
}

func (ts *IntegrationTestSuite) TestWorkflowIDReuseErrorViaStartWorkflow() {
duplicatedWID := "test-workflowidreuse-duplicate-start-error"
// setup: run any workflow once to consume the ID
err := ts.executeWorkflow(
duplicatedWID,
ts.workflows.SimplestWorkflow,
nil,
)
ts.NoError(err, "basic workflow should succeed")

// a second attempt should fail
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()
opts := ts.startWorkflowOptions(duplicatedWID)
opts.WorkflowIDReusePolicy = client.WorkflowIDReusePolicyRejectDuplicate
exec, err := ts.libClient.StartWorkflow(ctx, opts, ts.workflows.SimplestWorkflow)
ts.Nil(exec)
ts.Error(err)
ts.IsType(&shared.WorkflowExecutionAlreadyStartedError{}, err, "should be the known already-started error type")
ts.False(client.IsWorkflowError(err), "start-workflow rejected errors should not be workflow errors")
}

func (ts *IntegrationTestSuite) TestChildWFRetryOnError() {
err := ts.executeWorkflow("test-childwf-retry-on-error", ts.workflows.ChildWorkflowRetryOnError, nil)
ts.Error(err)
ts.Truef(client.IsWorkflowError(err), "child error should be a workflow error: %#v", err)
ts.EqualValues([]string{"toUpper", "toUpper", "toUpper"}, ts.activities.invoked())
}

func (ts *IntegrationTestSuite) TestChildWFRetryOnTimeout() {
err := ts.executeWorkflow("test-childwf-retry-on-timeout", ts.workflows.ChildWorkflowRetryOnTimeout, nil)
ts.Error(err)
ts.Truef(client.IsWorkflowError(err), "child-timeout error should be a workflow error: %#v", err)
ts.EqualValues([]string{"sleep", "sleep", "sleep"}, ts.activities.invoked())
}

Expand Down Expand Up @@ -434,6 +462,7 @@ func (ts *IntegrationTestSuite) TestLargeQueryResultError() {
ts.Nil(err)
value, err := ts.libClient.QueryWorkflow(ctx, "test-large-query-error", run.GetRunID(), "large_query")
ts.Error(err)
ts.False(client.IsWorkflowError(err), "query errors should not be workflow errors, as they are request-related")

queryErr, ok := err.(*shared.QueryFailedError)
ts.True(ok)
Expand Down
110 changes: 97 additions & 13 deletions workflow/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,92 @@ Time related functions:
Failing a Workflow
To mark a workflow as failed all that needs to happen is for the workflow function to return an error via the err
return value.
To mark a workflow as failed, return an error from your workflow function via the err return value.
Note that failed workflows do not record the non-error return's value: you cannot usefully return both a
value and an error, only the error will be recorded.
Ending a Workflow externally
Inside a workflow, to end you must finish your function by returning a result or error.
Externally, two tools exist to stop workflows from outside the workflow itself, by using the CLI or RPC client:
cancellation and termination. Termination is forceful, cancellation allows a workflow to exit gracefully.
Workflows can also time out, based on their ExecutionStartToClose duration. A timeout behaves the same as
termination (it is a hard deadline on the workflow), but a different close status and final event will be reported.
Terminating a Workflow
Terminating is roughly equivalent to using `kill -9` on a process - the workflow will be ended immediately,
and no further decisions will be made. It cannot be prevented or delayed by the workflow, or by any configuration.
Any in-progress decisions or activities will fail whenever they next communicate with Cadence's servers, i.e. when
they complete or when they next heartbeat.
Because termination does not allow for any further code to be run, this also means your workflow has no
chance to clean up after itself (e.g. running a cleanup Activity to adjust a database record).
If you need to run additional logic when your workflow, use cancellation instead.
Canceling a Workflow
Canceling marks a workflow as canceled (this is a one-time, one-way operation), and immediately wakes the workflow
up to process the cancellation (schedules a new decision task). When the workflow resumes after being canceled,
the context that was passed into the workflow (and thus all derived contexts) will be canceled, which changes the
behavior of many workflow.* functions.
Canceled workflow.Context behavior
A workflow's context can be canceled by either canceling the workflow, or calling the cancel-func returned from
a worfklow.WithCancel(ctx) call. Both behave identically.
At any time, you can convert a canceled (or could-be-canceled) context into a non-canceled context by using
workflow.NewDisconnectedContext. The resulting context will ignore cancellation from the context it is derived from.
Disconnected contexts like this can be created before or after a context has been canceled, and it does not matter
how the cancellation occurred.
Because this context will not be canceled, this can be useful for using context cancellation as a way to request that
some behavior be shut down, while allowing you to run cleanup logic in activities or elsewhere.
As a general guideline, doing anything with I/O with a canceled context (e.g. executing an activity, starting a
child workflow, sleeping) will fail rather than cause external changes. Detailed descriptions are available in
documentation on functions that change their behavior with a canceled context; if it does not mention canceled-context
behavior, its behavior does not change.
For exact behavior, make sure to read the documentation on functions that you are calling.
As an incomplete summary, these actions will all fail immediately, and the associated error returns (possibly within
a Future) will be a workflow.CanceledError:
- workflow.Await
- workflow.Sleep
- workflow.Timer
Child workflows will:
- ExecuteChildWorkflow will synchronously fail with a CanceledError if canceled before it is called
(in v0.18.4 and newer. See https://github.com/uber-go/cadence-client/pull/1138 for details.)
- be canceled if the child workflow is running
- wait to complete their future.Get until the child returns, and the future will contain the final result
(which may be anything that was returned, not necessarily a CanceledError)
Activities have configurable cancellation behavior. For workflow.ExecuteActivity and workflow.ExecuteLocalActivity,
see the activity package's documentation for details. In summary though:
- ExecuteActivity will synchronously fail with a CanceledError if canceled before it is called
- the activity's future.Get will by default return a CanceledError immediately when canceled,
unless activityoptions.WaitForCancellation is true
- the activity's context will be canceled at the next heartbeat event, or not at all if that does not occur
And actions like this will be completely unaffected:
- future.Get
(futures derived from the calls above may return a CanceledError, but this is not guaranteed for all futures)
- selector.Select
(Select is completely unaffected, similar to a native select statement. if you wish to unblock when your
context is canceled, consider using an AddReceive with the context's Done() channel, as with a native select)
- channel.Send, channel.Receive, and channel.ReceiveAsync
(similar to native chan read/write operations, use a selector to wait for send/receive or some other action)
- workflow.Go
(the context argument in the callback is derived and may be canceled, but this does not stop the goroutine,
nor stop new ones from being started)
- workflow.GetVersion, workflow.GetLogger, workflow.GetMetricsScope, workflow.Now, many others
Execute Activity
Expand Down Expand Up @@ -286,14 +370,14 @@ pattern, extra care needs to be taken to ensure the child workflow is started be
Error Handling
Activities and child workflows can fail. You could handle errors differently based on different error cases. If the
activity returns an error as errors.New() or fmt.Errorf(), those errors will be converted to error.GenericError. If the
activity returns an error as error.NewCustomError("err-reason", details), that error will be converted to
*error.CustomError. There are other types of errors like error.TimeoutError, error.CanceledError and error.PanicError.
activity returns an error as errors.New() or fmt.Errorf(), those errors will be converted to workflow.GenericError. If the
activity returns an error as workflow.NewCustomError("err-reason", details), that error will be converted to
*workflow.CustomError. There are other types of errors like workflow.TimeoutError, workflow.CanceledError and workflow.PanicError.
So the error handling code would look like:
err := workflow.ExecuteActivity(ctx, YourActivityFunc).Get(ctx, nil)
switch err := err.(type) {
case *error.CustomError:
case *workflow.CustomError:
switch err.Reason() {
case "err-reason-a":
// handle error-reason-a
Expand All @@ -305,7 +389,7 @@ So the error handling code would look like:
default:
// handle all other error reasons
}
case *error.GenericError:
case *workflow.GenericError:
switch err.Error() {
case "err-msg-1":
// handle error with message "err-msg-1"
Expand All @@ -314,7 +398,7 @@ So the error handling code would look like:
default:
// handle all other generic errors
}
case *error.TimeoutError:
case *workflow.TimeoutError:
switch err.TimeoutType() {
case shared.TimeoutTypeScheduleToStart:
// handle ScheduleToStart timeout
Expand All @@ -324,9 +408,9 @@ So the error handling code would look like:
// handle heartbeat timeout
default:
}
case *error.PanicError:
// handle panic error
case *error.CanceledError:
case *workflow.PanicError:
// handle panic error
case *workflow.CanceledError:
// handle canceled error
default:
// all other cases (ideally, this should not happen)
Expand Down Expand Up @@ -530,7 +614,7 @@ The code below implements the unit tests for the SimpleWorkflow sample.
s.True(s.env.IsWorkflowCompleted())
s.NotNil(s.env.GetWorkflowError())
_, ok := s.env.GetWorkflowError().(*error.GenericError)
_, ok := s.env.GetWorkflowError().(*workflow.GenericError)
s.True(ok)
s.Equal("SimpleActivityFailure", s.env.GetWorkflowError().Error())
}
Expand Down Expand Up @@ -591,7 +675,7 @@ Lets first take a look at a test that simulates a test failing via the "activity
s.True(s.env.IsWorkflowCompleted())
s.NotNil(s.env.GetWorkflowError())
_, ok := s.env.GetWorkflowError().(*error.GenericError)
_, ok := s.env.GetWorkflowError().(*workflow.GenericError)
s.True(ok)
s.Equal("SimpleActivityFailure", s.env.GetWorkflowError().Error())
}
Expand Down

0 comments on commit 53e2122

Please sign in to comment.