Skip to content

Commit

Permalink
Filling out Selector docs, and some adjacent ones I noticed (#1131)
Browse files Browse the repository at this point in the history
  • Loading branch information
Groxx committed Sep 30, 2021
1 parent 1556419 commit 5886e38
Showing 1 changed file with 229 additions and 75 deletions.
304 changes: 229 additions & 75 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,92 +46,229 @@ var (
)

type (
// Channel must be used instead of native go channel by workflow code.
// Use workflow.NewChannel(ctx) method to create Channel instance.
// Channel must be used in workflows instead of a native Go chan.
//
// Use workflow.NewChannel(ctx) to create an unbuffered Channel instance,
// workflow.NewBufferedChannel(ctx, size) to create a Channel which has a buffer,
// or workflow.GetSignalChannel(ctx, "name") to get a Channel that can contain encoded data sent from other systems.
//
// workflow.GetSignalChannel is named differently because you are not "creating" a new channel. Signal channels
// are conceptually singletons that exist at all times, and they do not have to be "created" before a signal can be
// sent to a workflow. The workflow will just have no way to know that the data exists until it inspects the
// appropriate signal channel.
//
// Both NewChannel and NewBufferedChannel have "Named" constructors as well.
// These names will be visible in stack-trace queries, so they can help with debugging, but they do not otherwise
// impact behavior at all, and are not recorded anywhere (so you can change them without versioning your code).
Channel interface {
// Receive blocks until it receives a value, and then assigns the received value to the provided pointer.
// Returns false when Channel is closed.
// Parameter valuePtr is a pointer to the expected data structure to be received. For example:
// var v string
// c.Receive(ctx, &v)
// It returns false when Channel is closed and all data has already been consumed from the channel, in the same
// way as Go channel reads work.
//
// This is equivalent to `v, more := <- aChannel`.
//
// valuePtr must be assignable, and will be used to assign (for in-memory data in regular channels) or decode
// (for signal channels) the data in the channel.
//
// If decoding or assigning fails:
// - an error will be logged
// - the value will be dropped from the channel
// - Receive will automatically try again
// - This will continue until a successful value is found, or the channel is emptied and it resumes blocking.
// Closed channels with no values will always succeed, but they will not change valuePtr.
//
// Go would normally prevent incorrect-type failures like this at compile time, but the same cannot be done
// here. If you need to "try" to assign to multiple things, similar to a Future you can use:
// - for signal channels, a []byte pointer. This will give you the raw data that Cadence received, and no
// decoding will be attempted, so you can try it yourself.
// - for other channels, an interface{} pointer. All values are interfaces, so this will never fail, and you
// can inspect the type with reflection or type assertions.
Receive(ctx Context, valuePtr interface{}) (more bool)

// ReceiveAsync try to receive from Channel without blocking. If there is data available from the Channel, it
// assign the data to valuePtr and returns true. Otherwise, it returns false immediately.
// ReceiveAsync tries to receive from Channel without blocking.
// If there is data available from the Channel, it assigns the data to valuePtr and returns true.
// Otherwise, it returns false immediately.
//
// This is equivalent to:
// select {
// case v := <- aChannel: ok = true
// default: ok = false
// }
//
// Decoding or assigning failures are handled like Receive.
ReceiveAsync(valuePtr interface{}) (ok bool)

// ReceiveAsyncWithMoreFlag is same as ReceiveAsync with extra return value more to indicate if there could be
// more value from the Channel. The more is false when Channel is closed.
// ReceiveAsyncWithMoreFlag is the same as ReceiveAsync, with an extra return to indicate if there could be
// more value from the Channel. more is false when Channel is closed.
//
// This is equivalent to:
// select {
// case v, more := <- aChannel: ok = true
// default: ok = false
// }
//
// Decoding or assigning failures are handled like Receive.
ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool)

// Send blocks until the data is sent.
//
// This is equivalent to `aChannel <- v`
Send(ctx Context, v interface{})

// SendAsync try to send without blocking. It returns true if the data was sent, otherwise it returns false.
// SendAsync will try to send without blocking.
// It returns true if the data was sent (i.e. there was room in the buffer, or a reader was waiting to receive
// it), otherwise it returns false.
//
// This is equivalent to:
// select {
// case aChannel <- v: ok = true
// default: ok = false
// }
SendAsync(v interface{}) (ok bool)

// Close close the Channel, and prohibit subsequent sends.
// Close closes the Channel, and prohibits subsequent sends.
// As with a normal Go channel that has been closed, sending to a closed channel will panic.
Close()
}

// Selector must be used instead of native go select by workflow code for determinism.
// Use workflow.NewSelector(ctx) method to create a Selector instance.
// The interface is to simulate Golang's Select statement.
// For example, the logic of Golang code like below
// chA := make(chan int)
// chB := make(chan int)
// counter := 0
// for {
// select {
// case i, ok := <- chA:
// if ok{
// counter += i
// }
// case i, ok := <- chB:
// if ok{
// counter += i
// }
// }
// }
// should be written as
// s := workflow.NewSelector(ctx)
// counter := 0
// s.AddReceive(workflow.GetSignalChannel(ctx, "channelA"), func(c workflow.Channel, ok bool) {
// if ok{
// var i int
// c.Receive(ctx, &i)
// counter += i
// }
// })
// s.AddReceive(workflow.GetSignalChannel(ctx, "channelB"), func(c workflow.Channel, ok bool) {
// if ok{
// var i int
// c.Receive(ctx, &i)
// counter += i
// }
// })
// Selector must be used in workflows instead of a native Go select statement.
//
// Use workflow.NewSelector(ctx) to create a Selector instance, and then add cases to it with its methods.
// The interface is intended to simulate Go's select statement, and any Go select can be fairly trivially rewritten
// for a Selector with effectively identical behavior.
//
// For example, normal Go code like below:
// chA := make(chan int)
// chB := make(chan int)
// counter := 0
// for {
// select {
// case i, more := <- chA:
// if more {
// counter += i
// }
// case i, more := <- chB:
// if more {
// counter += i
// }
// case <- time.After(time.Hour):
// break
// }
// }
// can be written as:
// chA := workflow.NewChannel(ctx)
// chB := workflow.NewChannel(ctx)
// counter := 0
// for {
// timedout := false
// s := workflow.NewSelector(ctx)
// s.AddReceive(chA, func(c workflow.Channel, more bool) {
// if more {
// var i int
// c.Receive(ctx, &i)
// counter += i
// }
// })
// s.AddReceive(chB, func(c workflow.Channel, more bool) {
// if more {
// var i int
// c.Receive(ctx, &i)
// counter += i
// }
// })
// s.AddFuture(workflow.NewTimer(ctx, time.Hour), func(f workflow.Future) {
// timedout = true
// })
// s.Select(ctx)
// if timedout {
// break
// }
// }
//
// You can create a new Selector as needed or mutate one and call Select multiple times, but note that:
//
// 1. AddFuture will not behave the same across both patterns. Read AddFuture for more details.
//
// for {
// s.Select(ctx)
// }
// 2. There is no way to remove a case from a Selector, so you must make a new Selector to "remove" them.
//
// Finally, note that Select will not return until a condition's needs are met, like a Go selector - canceling the
// Context used to construct the Selector, or the Context used to Select, will not (directly) unblock a Select call.
// Read Select for more details.
Selector interface {
// AddReceive adds a ReceiveChannel to the selector. f is invoked when the channel has data or closed.
// ok == false indicates the channel is closed
AddReceive(c Channel, f func(c Channel, ok bool)) Selector
// AddSend adds a SendChannel to the selector. f is invoke when the channel is available to send
// AddReceive waits to until a value can be received from a channel.
// f is invoked when the channel has data or is closed.
//
// This is equivalent to `case v, more := <- aChannel`, and `more` will only
// be false when the channel is both closed and no data was received.
//
// When f is invoked, the data (or closed state) remains untouched in the channel, so
// you need to `c.Receive(ctx, &out)` (or `c.ReceiveAsync(&out)`) to remove and decode the value.
// Failure to do this is not an error - the value will simply remain in the channel until a future
// Receive retrieves it.
AddReceive(c Channel, f func(c Channel, more bool)) Selector
// AddSend waits to send a value to a channel.
// f is invoked when the value was successfully sent to the channel.
//
// This is equivalent to `case aChannel <- value`.
//
// Unlike AddReceive, the value has already been sent on the channel when f is invoked.
AddSend(c Channel, v interface{}, f func()) Selector
// AddFuture adds a Future to the selector f is invoked when future is ready
// AddFuture invokes f after a Future is ready.
// If the Future is ready before Select is called, it is eligible to be invoked immediately.
//
// There is no direct equivalent in a native Go select statement.
// It was added because Futures are common in Cadence code, and some patterns are much simpler with it.
//
// Each call to AddFuture will invoke its f at most one time, regardless of how many times Select is called.
// This means, for a Future that is (or will be) ready:
// - Adding the Future once, then calling Select twice, will invoke the callback once with the first Select
// call, and then wait for other Selector conditions in the second Select call (or block forever if there are
// no other eligible conditions).
// - Adding the same Future twice, then calling Select twice, will invoke each callback once.
// - Adding the same Future to two different Selectors, then calling Select once on each Selector, will invoke
// each Selector's callback once.
//
// Therefore, with a Future "f" that is or will become ready, this is an infinite loop that will consume as much
// CPU as possible:
// for {
// workflow.NewSelector(ctx).AddFuture(f, func(f workflow.Future){}).Select(ctx)
// }
// While this will loop once, and then wait idle forever:
// s := workflow.NewSelector(ctx).AddFuture(f, func(f workflow.Future){})
// for {
// s.Select(ctx)
// }
AddFuture(future Future, f func(f Future)) Selector
// AddDefault adds a default branch to the selector.
// f is invoked when non of the other conditions(ReceiveChannel, SendChannel and Future) is met for one call of Select
// f is invoked immediately when none of the other conditions (AddReceive, AddSend, AddFuture) are met for a
// Select call.
//
// This is equivalent to a `default:` case.
//
// Note that this applies to each Select call. If you create a Selector with only one AddDefault, and then call
// Select on it twice, f will be invoked twice.
AddDefault(f func())
// Select waits for one of the added conditions to be met and invoke the callback as described above.
// When none of the added condition is met:
// if there is no Default(added by AddDefault) and , then it will block the current goroutine
// if Default(added by AddDefault) is used, when Default callback will be executed without blocking
// When more than one of added conditions are met, only one of them will be invoked.
// Usually it's recommended to use a for loop to drain all of them, and use AddDefault to break out the
// loop properly(e.g. not missing any received data in channels)
// Select waits for one of the added conditions to be met and invokes the callback as described above.
// If no condition is met, Select will block until one or more are available, then one callback will be invoked.
// If no condition is ever met, Select will block forever.
//
// Note that Select does not return an error, and does not stop waiting if its Context is canceled.
// This mimics a native Go select statement, which has no way to be interrupted except for its listed cases.
//
// If you wish to stop Selecting when the Context is canceled, use AddReceive with the Context's Done() channel,
// in the same way as you would use a `case <- ctx.Done():` in a Go select statement. E.g.:
// cancelled := false
// s := workflow.NewSelector(ctx)
// s.AddFuture(f, func(f workflow.Future) {}) // assume this is never ready
// s.AddReceive(ctx.Done(), func(c workflow.Channel, more bool) {
// // this will be invoked when the Context is cancelled for any reason,
// // and more will be false.
// cancelled = true
// })
// s.Select(ctx)
// if cancelled {
// // this will be executed
// }
Select(ctx Context)
}

Expand All @@ -146,20 +283,37 @@ type (

// Future represents the result of an asynchronous computation.
Future interface {
// Get blocks until the future is ready. When ready it either returns non nil error or assigns result value to
// the provided pointer.
// Example:
// var v string
// if err := f.Get(ctx, &v); err != nil {
// return err
// }
// Get blocks until the future is ready.
// When ready it either returns the Future's contained error, or assigns the contained value to the output var.
// Failures to assign or decode the value will panic.
//
// Two common patterns to retrieve data are:
// var out string
// // this will assign the string value, which may be "", or an error and leave out as "".
// err := f.Get(ctx, &out)
// and
// var out *string
// // this will assign the string value, which may be "" or nil, or an error and leave out as nil.
// err := f.Get(ctx, &out)
//
// The valuePtr parameter can be nil when the encoded result value is not needed:
// err := f.Get(ctx, nil)
//
// Futures with values set in-memory via a call to their Settable's methods can be retrieved without knowing the
// type with an interface, i.e. this will not ever panic:
// var out interface{}
// // this will assign the same value that was set,
// // and you can check its type with reflection or type assertions.
// err := f.Get(ctx, &out)
//
// The valuePtr parameter can be nil when the encoded result value is not needed.
// Example:
// err = f.Get(ctx, nil)
// Futures with encoded data from e.g. activities or child workflows can bypass decoding with a byte slice, and
// similarly this will not ever panic:
// var out []byte
// // out will contain the raw bytes given to Cadence's servers, you should decode it however is necessary
// err := f.Get(ctx, &out) // err can only be the Future's contained error
Get(ctx Context, valuePtr interface{}) error

// When true Get is guaranteed to not block
// IsReady will return true Get is guaranteed to not block.
IsReady() bool
}

Expand Down

0 comments on commit 5886e38

Please sign in to comment.